Mercurial > public > mercurial-scm > hg-stable
comparison mercurial/httppeer.py @ 48561:04688c51f81f
exchangev2: remove it
As discussed on the mailing list, this is incomplete and unused with little
hope of revival.
Differential Revision: https://phab.mercurial-scm.org/D11954
author | Rapha?l Gom?s <rgomes@octobus.net> |
---|---|
date | Tue, 07 Dec 2021 16:44:22 +0100 |
parents | ffd3e823a7e5 |
children | a0da5075bca3 |
comparison
equal
deleted
inserted
replaced
48560:d6c53b40b078 | 48561:04688c51f81f |
---|---|
11 import errno | 11 import errno |
12 import io | 12 import io |
13 import os | 13 import os |
14 import socket | 14 import socket |
15 import struct | 15 import struct |
16 import weakref | |
17 | 16 |
18 from .i18n import _ | 17 from .i18n import _ |
19 from .pycompat import getattr | 18 from .pycompat import getattr |
20 from . import ( | 19 from . import ( |
21 bundle2, | 20 bundle2, |
23 httpconnection, | 22 httpconnection, |
24 pycompat, | 23 pycompat, |
25 statichttprepo, | 24 statichttprepo, |
26 url as urlmod, | 25 url as urlmod, |
27 util, | 26 util, |
28 wireprotoframing, | |
29 wireprototypes, | |
30 wireprotov1peer, | 27 wireprotov1peer, |
31 wireprotov2peer, | |
32 wireprotov2server, | |
33 ) | 28 ) |
34 from .interfaces import ( | 29 from .utils import urlutil |
35 repository, | |
36 util as interfaceutil, | |
37 ) | |
38 from .utils import ( | |
39 cborutil, | |
40 stringutil, | |
41 urlutil, | |
42 ) | |
43 | 30 |
44 httplib = util.httplib | 31 httplib = util.httplib |
45 urlerr = util.urlerr | 32 urlerr = util.urlerr |
46 urlreq = util.urlreq | 33 urlreq = util.urlreq |
47 | 34 |
329 def __init__(self, msg, respurl): | 316 def __init__(self, msg, respurl): |
330 super(RedirectedRepoError, self).__init__(msg) | 317 super(RedirectedRepoError, self).__init__(msg) |
331 self.respurl = respurl | 318 self.respurl = respurl |
332 | 319 |
333 | 320 |
334 def parsev1commandresponse( | 321 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible): |
335 ui, baseurl, requrl, qs, resp, compressible, allowcbor=False | |
336 ): | |
337 # record the url we got redirected to | 322 # record the url we got redirected to |
338 redirected = False | 323 redirected = False |
339 respurl = pycompat.bytesurl(resp.geturl()) | 324 respurl = pycompat.bytesurl(resp.geturl()) |
340 if respurl.endswith(qs): | 325 if respurl.endswith(qs): |
341 respurl = respurl[: -len(qs)] | 326 respurl = respurl[: -len(qs)] |
373 else: | 358 else: |
374 raise error.RepoError(msg) | 359 raise error.RepoError(msg) |
375 | 360 |
376 try: | 361 try: |
377 subtype = proto.split(b'-', 1)[1] | 362 subtype = proto.split(b'-', 1)[1] |
378 | |
379 # Unless we end up supporting CBOR in the legacy wire protocol, | |
380 # this should ONLY be encountered for the initial capabilities | |
381 # request during handshake. | |
382 if subtype == b'cbor': | |
383 if allowcbor: | |
384 return respurl, proto, resp | |
385 else: | |
386 raise error.RepoError( | |
387 _(b'unexpected CBOR response from server') | |
388 ) | |
389 | 363 |
390 version_info = tuple([int(n) for n in subtype.split(b'.')]) | 364 version_info = tuple([int(n) for n in subtype.split(b'.')]) |
391 except ValueError: | 365 except ValueError: |
392 raise error.RepoError( | 366 raise error.RepoError( |
393 _(b"'%s' sent a broken Content-Type header (%s)") % (safeurl, proto) | 367 _(b"'%s' sent a broken Content-Type header (%s)") % (safeurl, proto) |
562 | 536 |
563 def _abort(self, exception): | 537 def _abort(self, exception): |
564 raise exception | 538 raise exception |
565 | 539 |
566 | 540 |
567 def sendv2request( | |
568 ui, opener, requestbuilder, apiurl, permission, requests, redirect | |
569 ): | |
570 wireprotoframing.populatestreamencoders() | |
571 | |
572 uiencoders = ui.configlist(b'experimental', b'httppeer.v2-encoder-order') | |
573 | |
574 if uiencoders: | |
575 encoders = [] | |
576 | |
577 for encoder in uiencoders: | |
578 if encoder not in wireprotoframing.STREAM_ENCODERS: | |
579 ui.warn( | |
580 _( | |
581 b'wire protocol version 2 encoder referenced in ' | |
582 b'config (%s) is not known; ignoring\n' | |
583 ) | |
584 % encoder | |
585 ) | |
586 else: | |
587 encoders.append(encoder) | |
588 | |
589 else: | |
590 encoders = wireprotoframing.STREAM_ENCODERS_ORDER | |
591 | |
592 reactor = wireprotoframing.clientreactor( | |
593 ui, | |
594 hasmultiplesend=False, | |
595 buffersends=True, | |
596 clientcontentencoders=encoders, | |
597 ) | |
598 | |
599 handler = wireprotov2peer.clienthandler( | |
600 ui, reactor, opener=opener, requestbuilder=requestbuilder | |
601 ) | |
602 | |
603 url = b'%s/%s' % (apiurl, permission) | |
604 | |
605 if len(requests) > 1: | |
606 url += b'/multirequest' | |
607 else: | |
608 url += b'/%s' % requests[0][0] | |
609 | |
610 ui.debug(b'sending %d commands\n' % len(requests)) | |
611 for command, args, f in requests: | |
612 ui.debug( | |
613 b'sending command %s: %s\n' | |
614 % (command, stringutil.pprint(args, indent=2)) | |
615 ) | |
616 assert not list( | |
617 handler.callcommand(command, args, f, redirect=redirect) | |
618 ) | |
619 | |
620 # TODO stream this. | |
621 body = b''.join(map(bytes, handler.flushcommands())) | |
622 | |
623 # TODO modify user-agent to reflect v2 | |
624 headers = { | |
625 'Accept': wireprotov2server.FRAMINGTYPE, | |
626 'Content-Type': wireprotov2server.FRAMINGTYPE, | |
627 } | |
628 | |
629 req = requestbuilder(pycompat.strurl(url), body, headers) | |
630 req.add_unredirected_header('Content-Length', '%d' % len(body)) | |
631 | |
632 try: | |
633 res = opener.open(req) | |
634 except urlerr.httperror as e: | |
635 if e.code == 401: | |
636 raise error.Abort(_(b'authorization failed')) | |
637 | |
638 raise | |
639 except httplib.HTTPException as e: | |
640 ui.traceback() | |
641 raise IOError(None, e) | |
642 | |
643 return handler, res | |
644 | |
645 | |
646 class queuedcommandfuture(pycompat.futures.Future): | 541 class queuedcommandfuture(pycompat.futures.Future): |
647 """Wraps result() on command futures to trigger submission on call.""" | 542 """Wraps result() on command futures to trigger submission on call.""" |
648 | 543 |
649 def result(self, timeout=None): | 544 def result(self, timeout=None): |
650 if self.done(): | 545 if self.done(): |
655 # sendcommands() will restore the original __class__ and self.result | 550 # sendcommands() will restore the original __class__ and self.result |
656 # will resolve to Future.result. | 551 # will resolve to Future.result. |
657 return self.result(timeout) | 552 return self.result(timeout) |
658 | 553 |
659 | 554 |
660 @interfaceutil.implementer(repository.ipeercommandexecutor) | |
661 class httpv2executor(object): | |
662 def __init__( | |
663 self, ui, opener, requestbuilder, apiurl, descriptor, redirect | |
664 ): | |
665 self._ui = ui | |
666 self._opener = opener | |
667 self._requestbuilder = requestbuilder | |
668 self._apiurl = apiurl | |
669 self._descriptor = descriptor | |
670 self._redirect = redirect | |
671 self._sent = False | |
672 self._closed = False | |
673 self._neededpermissions = set() | |
674 self._calls = [] | |
675 self._futures = weakref.WeakSet() | |
676 self._responseexecutor = None | |
677 self._responsef = None | |
678 | |
679 def __enter__(self): | |
680 return self | |
681 | |
682 def __exit__(self, exctype, excvalue, exctb): | |
683 self.close() | |
684 | |
685 def callcommand(self, command, args): | |
686 if self._sent: | |
687 raise error.ProgrammingError( | |
688 b'callcommand() cannot be used after commands are sent' | |
689 ) | |
690 | |
691 if self._closed: | |
692 raise error.ProgrammingError( | |
693 b'callcommand() cannot be used after close()' | |
694 ) | |
695 | |
696 # The service advertises which commands are available. So if we attempt | |
697 # to call an unknown command or pass an unknown argument, we can screen | |
698 # for this. | |
699 if command not in self._descriptor[b'commands']: | |
700 raise error.ProgrammingError( | |
701 b'wire protocol command %s is not available' % command | |
702 ) | |
703 | |
704 cmdinfo = self._descriptor[b'commands'][command] | |
705 unknownargs = set(args.keys()) - set(cmdinfo.get(b'args', {})) | |
706 | |
707 if unknownargs: | |
708 raise error.ProgrammingError( | |
709 b'wire protocol command %s does not accept argument: %s' | |
710 % (command, b', '.join(sorted(unknownargs))) | |
711 ) | |
712 | |
713 self._neededpermissions |= set(cmdinfo[b'permissions']) | |
714 | |
715 # TODO we /could/ also validate types here, since the API descriptor | |
716 # includes types... | |
717 | |
718 f = pycompat.futures.Future() | |
719 | |
720 # Monkeypatch it so result() triggers sendcommands(), otherwise result() | |
721 # could deadlock. | |
722 f.__class__ = queuedcommandfuture | |
723 f._peerexecutor = self | |
724 | |
725 self._futures.add(f) | |
726 self._calls.append((command, args, f)) | |
727 | |
728 return f | |
729 | |
730 def sendcommands(self): | |
731 if self._sent: | |
732 return | |
733 | |
734 if not self._calls: | |
735 return | |
736 | |
737 self._sent = True | |
738 | |
739 # Unhack any future types so caller sees a clean type and so we | |
740 # break reference cycle. | |
741 for f in self._futures: | |
742 if isinstance(f, queuedcommandfuture): | |
743 f.__class__ = pycompat.futures.Future | |
744 f._peerexecutor = None | |
745 | |
746 # Mark the future as running and filter out cancelled futures. | |
747 calls = [ | |
748 (command, args, f) | |
749 for command, args, f in self._calls | |
750 if f.set_running_or_notify_cancel() | |
751 ] | |
752 | |
753 # Clear out references, prevent improper object usage. | |
754 self._calls = None | |
755 | |
756 if not calls: | |
757 return | |
758 | |
759 permissions = set(self._neededpermissions) | |
760 | |
761 if b'push' in permissions and b'pull' in permissions: | |
762 permissions.remove(b'pull') | |
763 | |
764 if len(permissions) > 1: | |
765 raise error.RepoError( | |
766 _(b'cannot make request requiring multiple permissions: %s') | |
767 % _(b', ').join(sorted(permissions)) | |
768 ) | |
769 | |
770 permission = { | |
771 b'push': b'rw', | |
772 b'pull': b'ro', | |
773 }[permissions.pop()] | |
774 | |
775 handler, resp = sendv2request( | |
776 self._ui, | |
777 self._opener, | |
778 self._requestbuilder, | |
779 self._apiurl, | |
780 permission, | |
781 calls, | |
782 self._redirect, | |
783 ) | |
784 | |
785 # TODO we probably want to validate the HTTP code, media type, etc. | |
786 | |
787 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) | |
788 self._responsef = self._responseexecutor.submit( | |
789 self._handleresponse, handler, resp | |
790 ) | |
791 | |
792 def close(self): | |
793 if self._closed: | |
794 return | |
795 | |
796 self.sendcommands() | |
797 | |
798 self._closed = True | |
799 | |
800 if not self._responsef: | |
801 return | |
802 | |
803 # TODO ^C here may not result in immediate program termination. | |
804 | |
805 try: | |
806 self._responsef.result() | |
807 finally: | |
808 self._responseexecutor.shutdown(wait=True) | |
809 self._responsef = None | |
810 self._responseexecutor = None | |
811 | |
812 # If any of our futures are still in progress, mark them as | |
813 # errored, otherwise a result() could wait indefinitely. | |
814 for f in self._futures: | |
815 if not f.done(): | |
816 f.set_exception( | |
817 error.ResponseError(_(b'unfulfilled command response')) | |
818 ) | |
819 | |
820 self._futures = None | |
821 | |
822 def _handleresponse(self, handler, resp): | |
823 # Called in a thread to read the response. | |
824 | |
825 while handler.readdata(resp): | |
826 pass | |
827 | |
828 | |
829 @interfaceutil.implementer(repository.ipeerv2) | |
830 class httpv2peer(object): | |
831 | |
832 limitedarguments = False | |
833 | |
834 def __init__( | |
835 self, ui, repourl, apipath, opener, requestbuilder, apidescriptor | |
836 ): | |
837 self.ui = ui | |
838 self.apidescriptor = apidescriptor | |
839 | |
840 if repourl.endswith(b'/'): | |
841 repourl = repourl[:-1] | |
842 | |
843 self._url = repourl | |
844 self._apipath = apipath | |
845 self._apiurl = b'%s/%s' % (repourl, apipath) | |
846 self._opener = opener | |
847 self._requestbuilder = requestbuilder | |
848 | |
849 self._redirect = wireprotov2peer.supportedredirects(ui, apidescriptor) | |
850 | |
851 # Start of ipeerconnection. | |
852 | |
853 def url(self): | |
854 return self._url | |
855 | |
856 def local(self): | |
857 return None | |
858 | |
859 def peer(self): | |
860 return self | |
861 | |
862 def canpush(self): | |
863 # TODO change once implemented. | |
864 return False | |
865 | |
866 def close(self): | |
867 self.ui.note( | |
868 _( | |
869 b'(sent %d HTTP requests and %d bytes; ' | |
870 b'received %d bytes in responses)\n' | |
871 ) | |
872 % ( | |
873 self._opener.requestscount, | |
874 self._opener.sentbytescount, | |
875 self._opener.receivedbytescount, | |
876 ) | |
877 ) | |
878 | |
879 # End of ipeerconnection. | |
880 | |
881 # Start of ipeercapabilities. | |
882 | |
883 def capable(self, name): | |
884 # The capabilities used internally historically map to capabilities | |
885 # advertised from the "capabilities" wire protocol command. However, | |
886 # version 2 of that command works differently. | |
887 | |
888 # Maps to commands that are available. | |
889 if name in ( | |
890 b'branchmap', | |
891 b'getbundle', | |
892 b'known', | |
893 b'lookup', | |
894 b'pushkey', | |
895 ): | |
896 return True | |
897 | |
898 # Other concepts. | |
899 if name in (b'bundle2',): | |
900 return True | |
901 | |
902 # Alias command-* to presence of command of that name. | |
903 if name.startswith(b'command-'): | |
904 return name[len(b'command-') :] in self.apidescriptor[b'commands'] | |
905 | |
906 return False | |
907 | |
908 def requirecap(self, name, purpose): | |
909 if self.capable(name): | |
910 return | |
911 | |
912 raise error.CapabilityError( | |
913 _( | |
914 b'cannot %s; client or remote repository does not support the ' | |
915 b'\'%s\' capability' | |
916 ) | |
917 % (purpose, name) | |
918 ) | |
919 | |
920 # End of ipeercapabilities. | |
921 | |
922 def _call(self, name, **args): | |
923 with self.commandexecutor() as e: | |
924 return e.callcommand(name, args).result() | |
925 | |
926 def commandexecutor(self): | |
927 return httpv2executor( | |
928 self.ui, | |
929 self._opener, | |
930 self._requestbuilder, | |
931 self._apiurl, | |
932 self.apidescriptor, | |
933 self._redirect, | |
934 ) | |
935 | |
936 | |
937 # Registry of API service names to metadata about peers that handle it. | |
938 # | |
939 # The following keys are meaningful: | |
940 # | |
941 # init | |
942 # Callable receiving (ui, repourl, servicepath, opener, requestbuilder, | |
943 # apidescriptor) to create a peer. | |
944 # | |
945 # priority | |
946 # Integer priority for the service. If we could choose from multiple | |
947 # services, we choose the one with the highest priority. | |
948 API_PEERS = { | |
949 wireprototypes.HTTP_WIREPROTO_V2: { | |
950 b'init': httpv2peer, | |
951 b'priority': 50, | |
952 }, | |
953 } | |
954 | |
955 | |
956 def performhandshake(ui, url, opener, requestbuilder): | 555 def performhandshake(ui, url, opener, requestbuilder): |
957 # The handshake is a request to the capabilities command. | 556 # The handshake is a request to the capabilities command. |
958 | 557 |
959 caps = None | 558 caps = None |
960 | 559 |
961 def capable(x): | 560 def capable(x): |
962 raise error.ProgrammingError(b'should not be called') | 561 raise error.ProgrammingError(b'should not be called') |
963 | 562 |
964 args = {} | 563 args = {} |
965 | |
966 # The client advertises support for newer protocols by adding an | |
967 # X-HgUpgrade-* header with a list of supported APIs and an | |
968 # X-HgProto-* header advertising which serializing formats it supports. | |
969 # We only support the HTTP version 2 transport and CBOR responses for | |
970 # now. | |
971 advertisev2 = ui.configbool(b'experimental', b'httppeer.advertise-v2') | |
972 | |
973 if advertisev2: | |
974 args[b'headers'] = { | |
975 'X-HgProto-1': 'cbor', | |
976 } | |
977 | |
978 args[b'headers'].update( | |
979 encodevalueinheaders( | |
980 b' '.join(sorted(API_PEERS)), | |
981 b'X-HgUpgrade', | |
982 # We don't know the header limit this early. | |
983 # So make it small. | |
984 1024, | |
985 ) | |
986 ) | |
987 | 564 |
988 req, requrl, qs = makev1commandrequest( | 565 req, requrl, qs = makev1commandrequest( |
989 ui, requestbuilder, caps, capable, url, b'capabilities', args | 566 ui, requestbuilder, caps, capable, url, b'capabilities', args |
990 ) | 567 ) |
991 resp = sendrequest(ui, opener, req) | 568 resp = sendrequest(ui, opener, req) |
1002 # issue without behavior degradation. And according to issue 5860, it may | 579 # issue without behavior degradation. And according to issue 5860, it may |
1003 # be a longstanding bug in some server implementations. So we allow a | 580 # be a longstanding bug in some server implementations. So we allow a |
1004 # redirect that drops the query string to "just work." | 581 # redirect that drops the query string to "just work." |
1005 try: | 582 try: |
1006 respurl, ct, resp = parsev1commandresponse( | 583 respurl, ct, resp = parsev1commandresponse( |
1007 ui, url, requrl, qs, resp, compressible=False, allowcbor=advertisev2 | 584 ui, url, requrl, qs, resp, compressible=False |
1008 ) | 585 ) |
1009 except RedirectedRepoError as e: | 586 except RedirectedRepoError as e: |
1010 req, requrl, qs = makev1commandrequest( | 587 req, requrl, qs = makev1commandrequest( |
1011 ui, requestbuilder, caps, capable, e.respurl, b'capabilities', args | 588 ui, requestbuilder, caps, capable, e.respurl, b'capabilities', args |
1012 ) | 589 ) |
1013 resp = sendrequest(ui, opener, req) | 590 resp = sendrequest(ui, opener, req) |
1014 respurl, ct, resp = parsev1commandresponse( | 591 respurl, ct, resp = parsev1commandresponse( |
1015 ui, url, requrl, qs, resp, compressible=False, allowcbor=advertisev2 | 592 ui, url, requrl, qs, resp, compressible=False |
1016 ) | 593 ) |
1017 | 594 |
1018 try: | 595 try: |
1019 rawdata = resp.read() | 596 rawdata = resp.read() |
1020 finally: | 597 finally: |
1021 resp.close() | 598 resp.close() |
1022 | 599 |
1023 if not ct.startswith(b'application/mercurial-'): | 600 if not ct.startswith(b'application/mercurial-'): |
1024 raise error.ProgrammingError(b'unexpected content-type: %s' % ct) | 601 raise error.ProgrammingError(b'unexpected content-type: %s' % ct) |
1025 | 602 |
1026 if advertisev2: | 603 info = {b'v1capabilities': set(rawdata.split())} |
1027 if ct == b'application/mercurial-cbor': | |
1028 try: | |
1029 info = cborutil.decodeall(rawdata)[0] | |
1030 except cborutil.CBORDecodeError: | |
1031 raise error.Abort( | |
1032 _(b'error decoding CBOR from remote server'), | |
1033 hint=_( | |
1034 b'try again and consider contacting ' | |
1035 b'the server operator' | |
1036 ), | |
1037 ) | |
1038 | |
1039 # We got a legacy response. That's fine. | |
1040 elif ct in (b'application/mercurial-0.1', b'application/mercurial-0.2'): | |
1041 info = {b'v1capabilities': set(rawdata.split())} | |
1042 | |
1043 else: | |
1044 raise error.RepoError( | |
1045 _(b'unexpected response type from server: %s') % ct | |
1046 ) | |
1047 else: | |
1048 info = {b'v1capabilities': set(rawdata.split())} | |
1049 | 604 |
1050 return respurl, info | 605 return respurl, info |
1051 | 606 |
1052 | 607 |
1053 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request): | 608 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request): |
1071 | 626 |
1072 opener = opener or urlmod.opener(ui, authinfo) | 627 opener = opener or urlmod.opener(ui, authinfo) |
1073 | 628 |
1074 respurl, info = performhandshake(ui, url, opener, requestbuilder) | 629 respurl, info = performhandshake(ui, url, opener, requestbuilder) |
1075 | 630 |
1076 # Given the intersection of APIs that both we and the server support, | |
1077 # sort by their advertised priority and pick the first one. | |
1078 # | |
1079 # TODO consider making this request-based and interface driven. For | |
1080 # example, the caller could say "I want a peer that does X." It's quite | |
1081 # possible that not all peers would do that. Since we know the service | |
1082 # capabilities, we could filter out services not meeting the | |
1083 # requirements. Possibly by consulting the interfaces defined by the | |
1084 # peer type. | |
1085 apipeerchoices = set(info.get(b'apis', {}).keys()) & set(API_PEERS.keys()) | |
1086 | |
1087 preferredchoices = sorted( | |
1088 apipeerchoices, key=lambda x: API_PEERS[x][b'priority'], reverse=True | |
1089 ) | |
1090 | |
1091 for service in preferredchoices: | |
1092 apipath = b'%s/%s' % (info[b'apibase'].rstrip(b'/'), service) | |
1093 | |
1094 return API_PEERS[service][b'init']( | |
1095 ui, respurl, apipath, opener, requestbuilder, info[b'apis'][service] | |
1096 ) | |
1097 | |
1098 # Failed to construct an API peer. Fall back to legacy. | |
1099 return httppeer( | 631 return httppeer( |
1100 ui, path, respurl, opener, requestbuilder, info[b'v1capabilities'] | 632 ui, path, respurl, opener, requestbuilder, info[b'v1capabilities'] |
1101 ) | 633 ) |
1102 | 634 |
1103 | 635 |