515 return self._callstream(cmd, _compressible=True, **args) |
516 return self._callstream(cmd, _compressible=True, **args) |
516 |
517 |
517 def _abort(self, exception): |
518 def _abort(self, exception): |
518 raise exception |
519 raise exception |
519 |
520 |
|
521 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests): |
|
522 reactor = wireprotoframing.clientreactor(hasmultiplesend=False, |
|
523 buffersends=True) |
|
524 |
|
525 url = '%s/%s' % (apiurl, permission) |
|
526 |
|
527 if len(requests) > 1: |
|
528 url += '/multirequest' |
|
529 else: |
|
530 url += '/%s' % requests[0][0] |
|
531 |
|
532 # Request ID to (request, future) |
|
533 requestmap = {} |
|
534 |
|
535 for command, args, f in requests: |
|
536 request, action, meta = reactor.callcommand(command, args) |
|
537 assert action == 'noop' |
|
538 |
|
539 requestmap[request.requestid] = (request, f) |
|
540 |
|
541 action, meta = reactor.flushcommands() |
|
542 assert action == 'sendframes' |
|
543 |
|
544 # TODO stream this. |
|
545 body = b''.join(map(bytes, meta['framegen'])) |
|
546 |
|
547 # TODO modify user-agent to reflect v2 |
|
548 headers = { |
|
549 r'Accept': wireprotov2server.FRAMINGTYPE, |
|
550 r'Content-Type': wireprotov2server.FRAMINGTYPE, |
|
551 } |
|
552 |
|
553 req = requestbuilder(pycompat.strurl(url), body, headers) |
|
554 req.add_unredirected_header(r'Content-Length', r'%d' % len(body)) |
|
555 |
|
556 try: |
|
557 res = opener.open(req) |
|
558 except urlerr.httperror as e: |
|
559 if e.code == 401: |
|
560 raise error.Abort(_('authorization failed')) |
|
561 |
|
562 raise |
|
563 except httplib.HTTPException as e: |
|
564 ui.traceback() |
|
565 raise IOError(None, e) |
|
566 |
|
567 return reactor, requestmap, res |
|
568 |
|
569 class queuedcommandfuture(pycompat.futures.Future): |
|
570 """Wraps result() on command futures to trigger submission on call.""" |
|
571 |
|
572 def result(self, timeout=None): |
|
573 if self.done(): |
|
574 return pycompat.futures.Future.result(self, timeout) |
|
575 |
|
576 self._peerexecutor.sendcommands() |
|
577 |
|
578 # sendcommands() will restore the original __class__ and self.result |
|
579 # will resolve to Future.result. |
|
580 return self.result(timeout) |
|
581 |
|
582 @zi.implementer(repository.ipeercommandexecutor) |
|
583 class httpv2executor(object): |
|
584 def __init__(self, ui, opener, requestbuilder, apiurl, descriptor): |
|
585 self._ui = ui |
|
586 self._opener = opener |
|
587 self._requestbuilder = requestbuilder |
|
588 self._apiurl = apiurl |
|
589 self._descriptor = descriptor |
|
590 self._sent = False |
|
591 self._closed = False |
|
592 self._neededpermissions = set() |
|
593 self._calls = [] |
|
594 self._futures = weakref.WeakSet() |
|
595 self._responseexecutor = None |
|
596 self._responsef = None |
|
597 |
|
598 def __enter__(self): |
|
599 return self |
|
600 |
|
601 def __exit__(self, exctype, excvalue, exctb): |
|
602 self.close() |
|
603 |
|
604 def callcommand(self, command, args): |
|
605 if self._sent: |
|
606 raise error.ProgrammingError('callcommand() cannot be used after ' |
|
607 'commands are sent') |
|
608 |
|
609 if self._closed: |
|
610 raise error.ProgrammingError('callcommand() cannot be used after ' |
|
611 'close()') |
|
612 |
|
613 # The service advertises which commands are available. So if we attempt |
|
614 # to call an unknown command or pass an unknown argument, we can screen |
|
615 # for this. |
|
616 if command not in self._descriptor['commands']: |
|
617 raise error.ProgrammingError( |
|
618 'wire protocol command %s is not available' % command) |
|
619 |
|
620 cmdinfo = self._descriptor['commands'][command] |
|
621 unknownargs = set(args.keys()) - set(cmdinfo.get('args', {})) |
|
622 |
|
623 if unknownargs: |
|
624 raise error.ProgrammingError( |
|
625 'wire protocol command %s does not accept argument: %s' % ( |
|
626 command, ', '.join(sorted(unknownargs)))) |
|
627 |
|
628 self._neededpermissions |= set(cmdinfo['permissions']) |
|
629 |
|
630 # TODO we /could/ also validate types here, since the API descriptor |
|
631 # includes types... |
|
632 |
|
633 f = pycompat.futures.Future() |
|
634 |
|
635 # Monkeypatch it so result() triggers sendcommands(), otherwise result() |
|
636 # could deadlock. |
|
637 f.__class__ = queuedcommandfuture |
|
638 f._peerexecutor = self |
|
639 |
|
640 self._futures.add(f) |
|
641 self._calls.append((command, args, f)) |
|
642 |
|
643 return f |
|
644 |
|
645 def sendcommands(self): |
|
646 if self._sent: |
|
647 return |
|
648 |
|
649 if not self._calls: |
|
650 return |
|
651 |
|
652 self._sent = True |
|
653 |
|
654 # Unhack any future types so caller sees a clean type and so we |
|
655 # break reference cycle. |
|
656 for f in self._futures: |
|
657 if isinstance(f, queuedcommandfuture): |
|
658 f.__class__ = pycompat.futures.Future |
|
659 f._peerexecutor = None |
|
660 |
|
661 # Mark the future as running and filter out cancelled futures. |
|
662 calls = [(command, args, f) |
|
663 for command, args, f in self._calls |
|
664 if f.set_running_or_notify_cancel()] |
|
665 |
|
666 # Clear out references, prevent improper object usage. |
|
667 self._calls = None |
|
668 |
|
669 if not calls: |
|
670 return |
|
671 |
|
672 permissions = set(self._neededpermissions) |
|
673 |
|
674 if 'push' in permissions and 'pull' in permissions: |
|
675 permissions.remove('pull') |
|
676 |
|
677 if len(permissions) > 1: |
|
678 raise error.RepoError(_('cannot make request requiring multiple ' |
|
679 'permissions: %s') % |
|
680 _(', ').join(sorted(permissions))) |
|
681 |
|
682 permission = { |
|
683 'push': 'rw', |
|
684 'pull': 'ro', |
|
685 }[permissions.pop()] |
|
686 |
|
687 reactor, requests, resp = sendv2request( |
|
688 self._ui, self._opener, self._requestbuilder, self._apiurl, |
|
689 permission, calls) |
|
690 |
|
691 # TODO we probably want to validate the HTTP code, media type, etc. |
|
692 |
|
693 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) |
|
694 self._responsef = self._responseexecutor.submit(self._handleresponse, |
|
695 reactor, |
|
696 requests, |
|
697 resp) |
|
698 |
|
699 def close(self): |
|
700 if self._closed: |
|
701 return |
|
702 |
|
703 self.sendcommands() |
|
704 |
|
705 self._closed = True |
|
706 |
|
707 if not self._responsef: |
|
708 return |
|
709 |
|
710 try: |
|
711 self._responsef.result() |
|
712 finally: |
|
713 self._responseexecutor.shutdown(wait=True) |
|
714 self._responsef = None |
|
715 self._responseexecutor = None |
|
716 |
|
717 # If any of our futures are still in progress, mark them as |
|
718 # errored, otherwise a result() could wait indefinitely. |
|
719 for f in self._futures: |
|
720 if not f.done(): |
|
721 f.set_exception(error.ResponseError( |
|
722 _('unfulfilled command response'))) |
|
723 |
|
724 self._futures = None |
|
725 |
|
726 def _handleresponse(self, reactor, requests, resp): |
|
727 # Called in a thread to read the response. |
|
728 |
|
729 results = {k: [] for k in requests} |
|
730 |
|
731 while True: |
|
732 frame = wireprotoframing.readframe(resp) |
|
733 if frame is None: |
|
734 break |
|
735 |
|
736 self._ui.note(_('received %r\n') % frame) |
|
737 |
|
738 # Guard against receiving a frame with a request ID that we |
|
739 # didn't issue. This should never happen. |
|
740 request, f = requests.get(frame.requestid, [None, None]) |
|
741 |
|
742 action, meta = reactor.onframerecv(frame) |
|
743 |
|
744 if action == 'responsedata': |
|
745 assert request.requestid == meta['request'].requestid |
|
746 |
|
747 result = results[request.requestid] |
|
748 |
|
749 if meta['cbor']: |
|
750 payload = util.bytesio(meta['data']) |
|
751 |
|
752 decoder = cbor.CBORDecoder(payload) |
|
753 while payload.tell() + 1 < len(meta['data']): |
|
754 try: |
|
755 result.append(decoder.decode()) |
|
756 except Exception: |
|
757 f.set_exception_info(*sys.exc_info()[1:]) |
|
758 continue |
|
759 else: |
|
760 result.append(meta['data']) |
|
761 |
|
762 if meta['eos']: |
|
763 f.set_result(result) |
|
764 del results[request.requestid] |
|
765 |
|
766 else: |
|
767 e = error.ProgrammingError('unhandled action: %s' % action) |
|
768 |
|
769 if f: |
|
770 f.set_exception(e) |
|
771 else: |
|
772 raise e |
|
773 |
520 # TODO implement interface for version 2 peers |
774 # TODO implement interface for version 2 peers |
521 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities) |
775 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities, |
|
776 repository.ipeerrequests) |
522 class httpv2peer(object): |
777 class httpv2peer(object): |
523 def __init__(self, ui, repourl, apipath, opener, requestbuilder, |
778 def __init__(self, ui, repourl, apipath, opener, requestbuilder, |
524 apidescriptor): |
779 apidescriptor): |
525 self.ui = ui |
780 self.ui = ui |
526 |
781 |
527 if repourl.endswith('/'): |
782 if repourl.endswith('/'): |
528 repourl = repourl[:-1] |
783 repourl = repourl[:-1] |
529 |
784 |
530 self._url = repourl |
785 self._url = repourl |
531 self._apipath = apipath |
786 self._apipath = apipath |
|
787 self._apiurl = '%s/%s' % (repourl, apipath) |
532 self._opener = opener |
788 self._opener = opener |
533 self._requestbuilder = requestbuilder |
789 self._requestbuilder = requestbuilder |
534 self._descriptor = apidescriptor |
790 self._descriptor = apidescriptor |
535 |
791 |
536 # Start of ipeerconnection. |
792 # Start of ipeerconnection. |
578 _('cannot %s; client or remote repository does not support the %r ' |
834 _('cannot %s; client or remote repository does not support the %r ' |
579 'capability') % (purpose, name)) |
835 'capability') % (purpose, name)) |
580 |
836 |
581 # End of ipeercapabilities. |
837 # End of ipeercapabilities. |
582 |
838 |
583 # TODO require to be part of a batched primitive, use futures. |
|
584 def _call(self, name, **args): |
839 def _call(self, name, **args): |
585 """Call a wire protocol command with arguments.""" |
840 with self.commandexecutor() as e: |
586 |
841 return e.callcommand(name, args).result() |
587 # Having this early has a side-effect of importing wireprotov2server, |
842 |
588 # which has the side-effect of ensuring commands are registered. |
843 def commandexecutor(self): |
589 |
844 return httpv2executor(self.ui, self._opener, self._requestbuilder, |
590 # TODO modify user-agent to reflect v2. |
845 self._apiurl, self._descriptor) |
591 headers = { |
|
592 r'Accept': wireprotov2server.FRAMINGTYPE, |
|
593 r'Content-Type': wireprotov2server.FRAMINGTYPE, |
|
594 } |
|
595 |
|
596 # TODO permissions should come from capabilities results. |
|
597 permission = wireproto.commandsv2[name].permission |
|
598 if permission not in ('push', 'pull'): |
|
599 raise error.ProgrammingError('unknown permission type: %s' % |
|
600 permission) |
|
601 |
|
602 permission = { |
|
603 'push': 'rw', |
|
604 'pull': 'ro', |
|
605 }[permission] |
|
606 |
|
607 url = '%s/%s/%s/%s' % (self._url, self._apipath, permission, name) |
|
608 |
|
609 # TODO this should be part of a generic peer for the frame-based |
|
610 # protocol. |
|
611 reactor = wireprotoframing.clientreactor(hasmultiplesend=False, |
|
612 buffersends=True) |
|
613 |
|
614 request, action, meta = reactor.callcommand(name, args) |
|
615 assert action == 'noop' |
|
616 |
|
617 action, meta = reactor.flushcommands() |
|
618 assert action == 'sendframes' |
|
619 |
|
620 body = b''.join(map(bytes, meta['framegen'])) |
|
621 req = self._requestbuilder(pycompat.strurl(url), body, headers) |
|
622 req.add_unredirected_header(r'Content-Length', r'%d' % len(body)) |
|
623 |
|
624 # TODO unify this code with httppeer. |
|
625 try: |
|
626 res = self._opener.open(req) |
|
627 except urlerr.httperror as e: |
|
628 if e.code == 401: |
|
629 raise error.Abort(_('authorization failed')) |
|
630 |
|
631 raise |
|
632 except httplib.HTTPException as e: |
|
633 self.ui.traceback() |
|
634 raise IOError(None, e) |
|
635 |
|
636 # TODO validate response type, wrap response to handle I/O errors. |
|
637 # TODO more robust frame receiver. |
|
638 results = [] |
|
639 |
|
640 while True: |
|
641 frame = wireprotoframing.readframe(res) |
|
642 if frame is None: |
|
643 break |
|
644 |
|
645 self.ui.note(_('received %r\n') % frame) |
|
646 |
|
647 action, meta = reactor.onframerecv(frame) |
|
648 |
|
649 if action == 'responsedata': |
|
650 if meta['cbor']: |
|
651 payload = util.bytesio(meta['data']) |
|
652 |
|
653 decoder = cbor.CBORDecoder(payload) |
|
654 while payload.tell() + 1 < len(meta['data']): |
|
655 results.append(decoder.decode()) |
|
656 else: |
|
657 results.append(meta['data']) |
|
658 else: |
|
659 error.ProgrammingError('unhandled action: %s' % action) |
|
660 |
|
661 return results |
|
662 |
846 |
663 # Registry of API service names to metadata about peers that handle it. |
847 # Registry of API service names to metadata about peers that handle it. |
664 # |
848 # |
665 # The following keys are meaningful: |
849 # The following keys are meaningful: |
666 # |
850 # |