Mercurial > public > mercurial-scm > hg
comparison mercurial/httppeer.py @ 37651:950294e28136
httppeer: implement command executor for version 2 peer
Now that we have a new API for issuing commands which is compatible
with wire protocol version 2, we can start using it with wire protocol
version 2.
This commit replaces our hacky implementation of _call() with something
a bit more robust based on the new command executor interface.
We now have proper support for issuing multiple commands per HTTP
request. Each HTTP request maintains its own client reactor.
The implementation is similar to the one in the legacy wire protocol.
We use a ThreadPoolExecutor for spinning up a thread to read the HTTP
response in the background. This allows responses to resolve in any
order. While not implemented on the server yet, a client could use
concurrent.futures.as_completed() with a collection of futures and
handle responses as they arrive from the server.
The return value from issued commands is still a simple list of raw
or decoded CBOR data. This is still super hacky. We will want a rich
data type for representing command responses. But at least this
commit gets us one step closer to a proper peer implementation.
Differential Revision: https://phab.mercurial-scm.org/D3297
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Fri, 13 Apr 2018 12:30:04 -0700 |
parents | 77c9ee77687c |
children | 8cea0d57bf37 |
comparison
equal
deleted
inserted
replaced
37650:62ebfda864de | 37651:950294e28136 |
---|---|
11 import errno | 11 import errno |
12 import io | 12 import io |
13 import os | 13 import os |
14 import socket | 14 import socket |
15 import struct | 15 import struct |
16 import sys | |
16 import tempfile | 17 import tempfile |
18 import weakref | |
17 | 19 |
18 from .i18n import _ | 20 from .i18n import _ |
19 from .thirdparty import ( | 21 from .thirdparty import ( |
20 cbor, | 22 cbor, |
21 ) | 23 ) |
29 pycompat, | 31 pycompat, |
30 repository, | 32 repository, |
31 statichttprepo, | 33 statichttprepo, |
32 url as urlmod, | 34 url as urlmod, |
33 util, | 35 util, |
34 wireproto, | |
35 wireprotoframing, | 36 wireprotoframing, |
36 wireprototypes, | 37 wireprototypes, |
37 wireprotov1peer, | 38 wireprotov1peer, |
38 wireprotov2server, | 39 wireprotov2server, |
39 ) | 40 ) |
515 return self._callstream(cmd, _compressible=True, **args) | 516 return self._callstream(cmd, _compressible=True, **args) |
516 | 517 |
517 def _abort(self, exception): | 518 def _abort(self, exception): |
518 raise exception | 519 raise exception |
519 | 520 |
521 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests): | |
522 reactor = wireprotoframing.clientreactor(hasmultiplesend=False, | |
523 buffersends=True) | |
524 | |
525 url = '%s/%s' % (apiurl, permission) | |
526 | |
527 if len(requests) > 1: | |
528 url += '/multirequest' | |
529 else: | |
530 url += '/%s' % requests[0][0] | |
531 | |
532 # Request ID to (request, future) | |
533 requestmap = {} | |
534 | |
535 for command, args, f in requests: | |
536 request, action, meta = reactor.callcommand(command, args) | |
537 assert action == 'noop' | |
538 | |
539 requestmap[request.requestid] = (request, f) | |
540 | |
541 action, meta = reactor.flushcommands() | |
542 assert action == 'sendframes' | |
543 | |
544 # TODO stream this. | |
545 body = b''.join(map(bytes, meta['framegen'])) | |
546 | |
547 # TODO modify user-agent to reflect v2 | |
548 headers = { | |
549 r'Accept': wireprotov2server.FRAMINGTYPE, | |
550 r'Content-Type': wireprotov2server.FRAMINGTYPE, | |
551 } | |
552 | |
553 req = requestbuilder(pycompat.strurl(url), body, headers) | |
554 req.add_unredirected_header(r'Content-Length', r'%d' % len(body)) | |
555 | |
556 try: | |
557 res = opener.open(req) | |
558 except urlerr.httperror as e: | |
559 if e.code == 401: | |
560 raise error.Abort(_('authorization failed')) | |
561 | |
562 raise | |
563 except httplib.HTTPException as e: | |
564 ui.traceback() | |
565 raise IOError(None, e) | |
566 | |
567 return reactor, requestmap, res | |
568 | |
569 class queuedcommandfuture(pycompat.futures.Future): | |
570 """Wraps result() on command futures to trigger submission on call.""" | |
571 | |
572 def result(self, timeout=None): | |
573 if self.done(): | |
574 return pycompat.futures.Future.result(self, timeout) | |
575 | |
576 self._peerexecutor.sendcommands() | |
577 | |
578 # sendcommands() will restore the original __class__ and self.result | |
579 # will resolve to Future.result. | |
580 return self.result(timeout) | |
581 | |
582 @zi.implementer(repository.ipeercommandexecutor) | |
583 class httpv2executor(object): | |
584 def __init__(self, ui, opener, requestbuilder, apiurl, descriptor): | |
585 self._ui = ui | |
586 self._opener = opener | |
587 self._requestbuilder = requestbuilder | |
588 self._apiurl = apiurl | |
589 self._descriptor = descriptor | |
590 self._sent = False | |
591 self._closed = False | |
592 self._neededpermissions = set() | |
593 self._calls = [] | |
594 self._futures = weakref.WeakSet() | |
595 self._responseexecutor = None | |
596 self._responsef = None | |
597 | |
598 def __enter__(self): | |
599 return self | |
600 | |
601 def __exit__(self, exctype, excvalue, exctb): | |
602 self.close() | |
603 | |
604 def callcommand(self, command, args): | |
605 if self._sent: | |
606 raise error.ProgrammingError('callcommand() cannot be used after ' | |
607 'commands are sent') | |
608 | |
609 if self._closed: | |
610 raise error.ProgrammingError('callcommand() cannot be used after ' | |
611 'close()') | |
612 | |
613 # The service advertises which commands are available. So if we attempt | |
614 # to call an unknown command or pass an unknown argument, we can screen | |
615 # for this. | |
616 if command not in self._descriptor['commands']: | |
617 raise error.ProgrammingError( | |
618 'wire protocol command %s is not available' % command) | |
619 | |
620 cmdinfo = self._descriptor['commands'][command] | |
621 unknownargs = set(args.keys()) - set(cmdinfo.get('args', {})) | |
622 | |
623 if unknownargs: | |
624 raise error.ProgrammingError( | |
625 'wire protocol command %s does not accept argument: %s' % ( | |
626 command, ', '.join(sorted(unknownargs)))) | |
627 | |
628 self._neededpermissions |= set(cmdinfo['permissions']) | |
629 | |
630 # TODO we /could/ also validate types here, since the API descriptor | |
631 # includes types... | |
632 | |
633 f = pycompat.futures.Future() | |
634 | |
635 # Monkeypatch it so result() triggers sendcommands(), otherwise result() | |
636 # could deadlock. | |
637 f.__class__ = queuedcommandfuture | |
638 f._peerexecutor = self | |
639 | |
640 self._futures.add(f) | |
641 self._calls.append((command, args, f)) | |
642 | |
643 return f | |
644 | |
645 def sendcommands(self): | |
646 if self._sent: | |
647 return | |
648 | |
649 if not self._calls: | |
650 return | |
651 | |
652 self._sent = True | |
653 | |
654 # Unhack any future types so caller sees a clean type and so we | |
655 # break reference cycle. | |
656 for f in self._futures: | |
657 if isinstance(f, queuedcommandfuture): | |
658 f.__class__ = pycompat.futures.Future | |
659 f._peerexecutor = None | |
660 | |
661 # Mark the future as running and filter out cancelled futures. | |
662 calls = [(command, args, f) | |
663 for command, args, f in self._calls | |
664 if f.set_running_or_notify_cancel()] | |
665 | |
666 # Clear out references, prevent improper object usage. | |
667 self._calls = None | |
668 | |
669 if not calls: | |
670 return | |
671 | |
672 permissions = set(self._neededpermissions) | |
673 | |
674 if 'push' in permissions and 'pull' in permissions: | |
675 permissions.remove('pull') | |
676 | |
677 if len(permissions) > 1: | |
678 raise error.RepoError(_('cannot make request requiring multiple ' | |
679 'permissions: %s') % | |
680 _(', ').join(sorted(permissions))) | |
681 | |
682 permission = { | |
683 'push': 'rw', | |
684 'pull': 'ro', | |
685 }[permissions.pop()] | |
686 | |
687 reactor, requests, resp = sendv2request( | |
688 self._ui, self._opener, self._requestbuilder, self._apiurl, | |
689 permission, calls) | |
690 | |
691 # TODO we probably want to validate the HTTP code, media type, etc. | |
692 | |
693 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) | |
694 self._responsef = self._responseexecutor.submit(self._handleresponse, | |
695 reactor, | |
696 requests, | |
697 resp) | |
698 | |
699 def close(self): | |
700 if self._closed: | |
701 return | |
702 | |
703 self.sendcommands() | |
704 | |
705 self._closed = True | |
706 | |
707 if not self._responsef: | |
708 return | |
709 | |
710 try: | |
711 self._responsef.result() | |
712 finally: | |
713 self._responseexecutor.shutdown(wait=True) | |
714 self._responsef = None | |
715 self._responseexecutor = None | |
716 | |
717 # If any of our futures are still in progress, mark them as | |
718 # errored, otherwise a result() could wait indefinitely. | |
719 for f in self._futures: | |
720 if not f.done(): | |
721 f.set_exception(error.ResponseError( | |
722 _('unfulfilled command response'))) | |
723 | |
724 self._futures = None | |
725 | |
726 def _handleresponse(self, reactor, requests, resp): | |
727 # Called in a thread to read the response. | |
728 | |
729 results = {k: [] for k in requests} | |
730 | |
731 while True: | |
732 frame = wireprotoframing.readframe(resp) | |
733 if frame is None: | |
734 break | |
735 | |
736 self._ui.note(_('received %r\n') % frame) | |
737 | |
738 # Guard against receiving a frame with a request ID that we | |
739 # didn't issue. This should never happen. | |
740 request, f = requests.get(frame.requestid, [None, None]) | |
741 | |
742 action, meta = reactor.onframerecv(frame) | |
743 | |
744 if action == 'responsedata': | |
745 assert request.requestid == meta['request'].requestid | |
746 | |
747 result = results[request.requestid] | |
748 | |
749 if meta['cbor']: | |
750 payload = util.bytesio(meta['data']) | |
751 | |
752 decoder = cbor.CBORDecoder(payload) | |
753 while payload.tell() + 1 < len(meta['data']): | |
754 try: | |
755 result.append(decoder.decode()) | |
756 except Exception: | |
757 f.set_exception_info(*sys.exc_info()[1:]) | |
758 continue | |
759 else: | |
760 result.append(meta['data']) | |
761 | |
762 if meta['eos']: | |
763 f.set_result(result) | |
764 del results[request.requestid] | |
765 | |
766 else: | |
767 e = error.ProgrammingError('unhandled action: %s' % action) | |
768 | |
769 if f: | |
770 f.set_exception(e) | |
771 else: | |
772 raise e | |
773 | |
520 # TODO implement interface for version 2 peers | 774 # TODO implement interface for version 2 peers |
521 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities) | 775 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities, |
776 repository.ipeerrequests) | |
522 class httpv2peer(object): | 777 class httpv2peer(object): |
523 def __init__(self, ui, repourl, apipath, opener, requestbuilder, | 778 def __init__(self, ui, repourl, apipath, opener, requestbuilder, |
524 apidescriptor): | 779 apidescriptor): |
525 self.ui = ui | 780 self.ui = ui |
526 | 781 |
527 if repourl.endswith('/'): | 782 if repourl.endswith('/'): |
528 repourl = repourl[:-1] | 783 repourl = repourl[:-1] |
529 | 784 |
530 self._url = repourl | 785 self._url = repourl |
531 self._apipath = apipath | 786 self._apipath = apipath |
787 self._apiurl = '%s/%s' % (repourl, apipath) | |
532 self._opener = opener | 788 self._opener = opener |
533 self._requestbuilder = requestbuilder | 789 self._requestbuilder = requestbuilder |
534 self._descriptor = apidescriptor | 790 self._descriptor = apidescriptor |
535 | 791 |
536 # Start of ipeerconnection. | 792 # Start of ipeerconnection. |
578 _('cannot %s; client or remote repository does not support the %r ' | 834 _('cannot %s; client or remote repository does not support the %r ' |
579 'capability') % (purpose, name)) | 835 'capability') % (purpose, name)) |
580 | 836 |
581 # End of ipeercapabilities. | 837 # End of ipeercapabilities. |
582 | 838 |
583 # TODO require to be part of a batched primitive, use futures. | |
584 def _call(self, name, **args): | 839 def _call(self, name, **args): |
585 """Call a wire protocol command with arguments.""" | 840 with self.commandexecutor() as e: |
586 | 841 return e.callcommand(name, args).result() |
587 # Having this early has a side-effect of importing wireprotov2server, | 842 |
588 # which has the side-effect of ensuring commands are registered. | 843 def commandexecutor(self): |
589 | 844 return httpv2executor(self.ui, self._opener, self._requestbuilder, |
590 # TODO modify user-agent to reflect v2. | 845 self._apiurl, self._descriptor) |
591 headers = { | |
592 r'Accept': wireprotov2server.FRAMINGTYPE, | |
593 r'Content-Type': wireprotov2server.FRAMINGTYPE, | |
594 } | |
595 | |
596 # TODO permissions should come from capabilities results. | |
597 permission = wireproto.commandsv2[name].permission | |
598 if permission not in ('push', 'pull'): | |
599 raise error.ProgrammingError('unknown permission type: %s' % | |
600 permission) | |
601 | |
602 permission = { | |
603 'push': 'rw', | |
604 'pull': 'ro', | |
605 }[permission] | |
606 | |
607 url = '%s/%s/%s/%s' % (self._url, self._apipath, permission, name) | |
608 | |
609 # TODO this should be part of a generic peer for the frame-based | |
610 # protocol. | |
611 reactor = wireprotoframing.clientreactor(hasmultiplesend=False, | |
612 buffersends=True) | |
613 | |
614 request, action, meta = reactor.callcommand(name, args) | |
615 assert action == 'noop' | |
616 | |
617 action, meta = reactor.flushcommands() | |
618 assert action == 'sendframes' | |
619 | |
620 body = b''.join(map(bytes, meta['framegen'])) | |
621 req = self._requestbuilder(pycompat.strurl(url), body, headers) | |
622 req.add_unredirected_header(r'Content-Length', r'%d' % len(body)) | |
623 | |
624 # TODO unify this code with httppeer. | |
625 try: | |
626 res = self._opener.open(req) | |
627 except urlerr.httperror as e: | |
628 if e.code == 401: | |
629 raise error.Abort(_('authorization failed')) | |
630 | |
631 raise | |
632 except httplib.HTTPException as e: | |
633 self.ui.traceback() | |
634 raise IOError(None, e) | |
635 | |
636 # TODO validate response type, wrap response to handle I/O errors. | |
637 # TODO more robust frame receiver. | |
638 results = [] | |
639 | |
640 while True: | |
641 frame = wireprotoframing.readframe(res) | |
642 if frame is None: | |
643 break | |
644 | |
645 self.ui.note(_('received %r\n') % frame) | |
646 | |
647 action, meta = reactor.onframerecv(frame) | |
648 | |
649 if action == 'responsedata': | |
650 if meta['cbor']: | |
651 payload = util.bytesio(meta['data']) | |
652 | |
653 decoder = cbor.CBORDecoder(payload) | |
654 while payload.tell() + 1 < len(meta['data']): | |
655 results.append(decoder.decode()) | |
656 else: | |
657 results.append(meta['data']) | |
658 else: | |
659 error.ProgrammingError('unhandled action: %s' % action) | |
660 | |
661 return results | |
662 | 846 |
663 # Registry of API service names to metadata about peers that handle it. | 847 # Registry of API service names to metadata about peers that handle it. |
664 # | 848 # |
665 # The following keys are meaningful: | 849 # The following keys are meaningful: |
666 # | 850 # |