comparison mercurial/httppeer.py @ 48561:04688c51f81f

exchangev2: remove it As discussed on the mailing list, this is incomplete and unused with little hope of revival. Differential Revision: https://phab.mercurial-scm.org/D11954
author Rapha?l Gom?s <rgomes@octobus.net>
date Tue, 07 Dec 2021 16:44:22 +0100
parents ffd3e823a7e5
children a0da5075bca3
comparison
equal deleted inserted replaced
48560:d6c53b40b078 48561:04688c51f81f
11 import errno 11 import errno
12 import io 12 import io
13 import os 13 import os
14 import socket 14 import socket
15 import struct 15 import struct
16 import weakref
17 16
18 from .i18n import _ 17 from .i18n import _
19 from .pycompat import getattr 18 from .pycompat import getattr
20 from . import ( 19 from . import (
21 bundle2, 20 bundle2,
23 httpconnection, 22 httpconnection,
24 pycompat, 23 pycompat,
25 statichttprepo, 24 statichttprepo,
26 url as urlmod, 25 url as urlmod,
27 util, 26 util,
28 wireprotoframing,
29 wireprototypes,
30 wireprotov1peer, 27 wireprotov1peer,
31 wireprotov2peer,
32 wireprotov2server,
33 ) 28 )
34 from .interfaces import ( 29 from .utils import urlutil
35 repository,
36 util as interfaceutil,
37 )
38 from .utils import (
39 cborutil,
40 stringutil,
41 urlutil,
42 )
43 30
44 httplib = util.httplib 31 httplib = util.httplib
45 urlerr = util.urlerr 32 urlerr = util.urlerr
46 urlreq = util.urlreq 33 urlreq = util.urlreq
47 34
329 def __init__(self, msg, respurl): 316 def __init__(self, msg, respurl):
330 super(RedirectedRepoError, self).__init__(msg) 317 super(RedirectedRepoError, self).__init__(msg)
331 self.respurl = respurl 318 self.respurl = respurl
332 319
333 320
334 def parsev1commandresponse( 321 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible):
335 ui, baseurl, requrl, qs, resp, compressible, allowcbor=False
336 ):
337 # record the url we got redirected to 322 # record the url we got redirected to
338 redirected = False 323 redirected = False
339 respurl = pycompat.bytesurl(resp.geturl()) 324 respurl = pycompat.bytesurl(resp.geturl())
340 if respurl.endswith(qs): 325 if respurl.endswith(qs):
341 respurl = respurl[: -len(qs)] 326 respurl = respurl[: -len(qs)]
373 else: 358 else:
374 raise error.RepoError(msg) 359 raise error.RepoError(msg)
375 360
376 try: 361 try:
377 subtype = proto.split(b'-', 1)[1] 362 subtype = proto.split(b'-', 1)[1]
378
379 # Unless we end up supporting CBOR in the legacy wire protocol,
380 # this should ONLY be encountered for the initial capabilities
381 # request during handshake.
382 if subtype == b'cbor':
383 if allowcbor:
384 return respurl, proto, resp
385 else:
386 raise error.RepoError(
387 _(b'unexpected CBOR response from server')
388 )
389 363
390 version_info = tuple([int(n) for n in subtype.split(b'.')]) 364 version_info = tuple([int(n) for n in subtype.split(b'.')])
391 except ValueError: 365 except ValueError:
392 raise error.RepoError( 366 raise error.RepoError(
393 _(b"'%s' sent a broken Content-Type header (%s)") % (safeurl, proto) 367 _(b"'%s' sent a broken Content-Type header (%s)") % (safeurl, proto)
562 536
563 def _abort(self, exception): 537 def _abort(self, exception):
564 raise exception 538 raise exception
565 539
566 540
567 def sendv2request(
568 ui, opener, requestbuilder, apiurl, permission, requests, redirect
569 ):
570 wireprotoframing.populatestreamencoders()
571
572 uiencoders = ui.configlist(b'experimental', b'httppeer.v2-encoder-order')
573
574 if uiencoders:
575 encoders = []
576
577 for encoder in uiencoders:
578 if encoder not in wireprotoframing.STREAM_ENCODERS:
579 ui.warn(
580 _(
581 b'wire protocol version 2 encoder referenced in '
582 b'config (%s) is not known; ignoring\n'
583 )
584 % encoder
585 )
586 else:
587 encoders.append(encoder)
588
589 else:
590 encoders = wireprotoframing.STREAM_ENCODERS_ORDER
591
592 reactor = wireprotoframing.clientreactor(
593 ui,
594 hasmultiplesend=False,
595 buffersends=True,
596 clientcontentencoders=encoders,
597 )
598
599 handler = wireprotov2peer.clienthandler(
600 ui, reactor, opener=opener, requestbuilder=requestbuilder
601 )
602
603 url = b'%s/%s' % (apiurl, permission)
604
605 if len(requests) > 1:
606 url += b'/multirequest'
607 else:
608 url += b'/%s' % requests[0][0]
609
610 ui.debug(b'sending %d commands\n' % len(requests))
611 for command, args, f in requests:
612 ui.debug(
613 b'sending command %s: %s\n'
614 % (command, stringutil.pprint(args, indent=2))
615 )
616 assert not list(
617 handler.callcommand(command, args, f, redirect=redirect)
618 )
619
620 # TODO stream this.
621 body = b''.join(map(bytes, handler.flushcommands()))
622
623 # TODO modify user-agent to reflect v2
624 headers = {
625 'Accept': wireprotov2server.FRAMINGTYPE,
626 'Content-Type': wireprotov2server.FRAMINGTYPE,
627 }
628
629 req = requestbuilder(pycompat.strurl(url), body, headers)
630 req.add_unredirected_header('Content-Length', '%d' % len(body))
631
632 try:
633 res = opener.open(req)
634 except urlerr.httperror as e:
635 if e.code == 401:
636 raise error.Abort(_(b'authorization failed'))
637
638 raise
639 except httplib.HTTPException as e:
640 ui.traceback()
641 raise IOError(None, e)
642
643 return handler, res
644
645
646 class queuedcommandfuture(pycompat.futures.Future): 541 class queuedcommandfuture(pycompat.futures.Future):
647 """Wraps result() on command futures to trigger submission on call.""" 542 """Wraps result() on command futures to trigger submission on call."""
648 543
649 def result(self, timeout=None): 544 def result(self, timeout=None):
650 if self.done(): 545 if self.done():
655 # sendcommands() will restore the original __class__ and self.result 550 # sendcommands() will restore the original __class__ and self.result
656 # will resolve to Future.result. 551 # will resolve to Future.result.
657 return self.result(timeout) 552 return self.result(timeout)
658 553
659 554
660 @interfaceutil.implementer(repository.ipeercommandexecutor)
661 class httpv2executor(object):
662 def __init__(
663 self, ui, opener, requestbuilder, apiurl, descriptor, redirect
664 ):
665 self._ui = ui
666 self._opener = opener
667 self._requestbuilder = requestbuilder
668 self._apiurl = apiurl
669 self._descriptor = descriptor
670 self._redirect = redirect
671 self._sent = False
672 self._closed = False
673 self._neededpermissions = set()
674 self._calls = []
675 self._futures = weakref.WeakSet()
676 self._responseexecutor = None
677 self._responsef = None
678
679 def __enter__(self):
680 return self
681
682 def __exit__(self, exctype, excvalue, exctb):
683 self.close()
684
685 def callcommand(self, command, args):
686 if self._sent:
687 raise error.ProgrammingError(
688 b'callcommand() cannot be used after commands are sent'
689 )
690
691 if self._closed:
692 raise error.ProgrammingError(
693 b'callcommand() cannot be used after close()'
694 )
695
696 # The service advertises which commands are available. So if we attempt
697 # to call an unknown command or pass an unknown argument, we can screen
698 # for this.
699 if command not in self._descriptor[b'commands']:
700 raise error.ProgrammingError(
701 b'wire protocol command %s is not available' % command
702 )
703
704 cmdinfo = self._descriptor[b'commands'][command]
705 unknownargs = set(args.keys()) - set(cmdinfo.get(b'args', {}))
706
707 if unknownargs:
708 raise error.ProgrammingError(
709 b'wire protocol command %s does not accept argument: %s'
710 % (command, b', '.join(sorted(unknownargs)))
711 )
712
713 self._neededpermissions |= set(cmdinfo[b'permissions'])
714
715 # TODO we /could/ also validate types here, since the API descriptor
716 # includes types...
717
718 f = pycompat.futures.Future()
719
720 # Monkeypatch it so result() triggers sendcommands(), otherwise result()
721 # could deadlock.
722 f.__class__ = queuedcommandfuture
723 f._peerexecutor = self
724
725 self._futures.add(f)
726 self._calls.append((command, args, f))
727
728 return f
729
730 def sendcommands(self):
731 if self._sent:
732 return
733
734 if not self._calls:
735 return
736
737 self._sent = True
738
739 # Unhack any future types so caller sees a clean type and so we
740 # break reference cycle.
741 for f in self._futures:
742 if isinstance(f, queuedcommandfuture):
743 f.__class__ = pycompat.futures.Future
744 f._peerexecutor = None
745
746 # Mark the future as running and filter out cancelled futures.
747 calls = [
748 (command, args, f)
749 for command, args, f in self._calls
750 if f.set_running_or_notify_cancel()
751 ]
752
753 # Clear out references, prevent improper object usage.
754 self._calls = None
755
756 if not calls:
757 return
758
759 permissions = set(self._neededpermissions)
760
761 if b'push' in permissions and b'pull' in permissions:
762 permissions.remove(b'pull')
763
764 if len(permissions) > 1:
765 raise error.RepoError(
766 _(b'cannot make request requiring multiple permissions: %s')
767 % _(b', ').join(sorted(permissions))
768 )
769
770 permission = {
771 b'push': b'rw',
772 b'pull': b'ro',
773 }[permissions.pop()]
774
775 handler, resp = sendv2request(
776 self._ui,
777 self._opener,
778 self._requestbuilder,
779 self._apiurl,
780 permission,
781 calls,
782 self._redirect,
783 )
784
785 # TODO we probably want to validate the HTTP code, media type, etc.
786
787 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
788 self._responsef = self._responseexecutor.submit(
789 self._handleresponse, handler, resp
790 )
791
792 def close(self):
793 if self._closed:
794 return
795
796 self.sendcommands()
797
798 self._closed = True
799
800 if not self._responsef:
801 return
802
803 # TODO ^C here may not result in immediate program termination.
804
805 try:
806 self._responsef.result()
807 finally:
808 self._responseexecutor.shutdown(wait=True)
809 self._responsef = None
810 self._responseexecutor = None
811
812 # If any of our futures are still in progress, mark them as
813 # errored, otherwise a result() could wait indefinitely.
814 for f in self._futures:
815 if not f.done():
816 f.set_exception(
817 error.ResponseError(_(b'unfulfilled command response'))
818 )
819
820 self._futures = None
821
822 def _handleresponse(self, handler, resp):
823 # Called in a thread to read the response.
824
825 while handler.readdata(resp):
826 pass
827
828
829 @interfaceutil.implementer(repository.ipeerv2)
830 class httpv2peer(object):
831
832 limitedarguments = False
833
834 def __init__(
835 self, ui, repourl, apipath, opener, requestbuilder, apidescriptor
836 ):
837 self.ui = ui
838 self.apidescriptor = apidescriptor
839
840 if repourl.endswith(b'/'):
841 repourl = repourl[:-1]
842
843 self._url = repourl
844 self._apipath = apipath
845 self._apiurl = b'%s/%s' % (repourl, apipath)
846 self._opener = opener
847 self._requestbuilder = requestbuilder
848
849 self._redirect = wireprotov2peer.supportedredirects(ui, apidescriptor)
850
851 # Start of ipeerconnection.
852
853 def url(self):
854 return self._url
855
856 def local(self):
857 return None
858
859 def peer(self):
860 return self
861
862 def canpush(self):
863 # TODO change once implemented.
864 return False
865
866 def close(self):
867 self.ui.note(
868 _(
869 b'(sent %d HTTP requests and %d bytes; '
870 b'received %d bytes in responses)\n'
871 )
872 % (
873 self._opener.requestscount,
874 self._opener.sentbytescount,
875 self._opener.receivedbytescount,
876 )
877 )
878
879 # End of ipeerconnection.
880
881 # Start of ipeercapabilities.
882
883 def capable(self, name):
884 # The capabilities used internally historically map to capabilities
885 # advertised from the "capabilities" wire protocol command. However,
886 # version 2 of that command works differently.
887
888 # Maps to commands that are available.
889 if name in (
890 b'branchmap',
891 b'getbundle',
892 b'known',
893 b'lookup',
894 b'pushkey',
895 ):
896 return True
897
898 # Other concepts.
899 if name in (b'bundle2',):
900 return True
901
902 # Alias command-* to presence of command of that name.
903 if name.startswith(b'command-'):
904 return name[len(b'command-') :] in self.apidescriptor[b'commands']
905
906 return False
907
908 def requirecap(self, name, purpose):
909 if self.capable(name):
910 return
911
912 raise error.CapabilityError(
913 _(
914 b'cannot %s; client or remote repository does not support the '
915 b'\'%s\' capability'
916 )
917 % (purpose, name)
918 )
919
920 # End of ipeercapabilities.
921
922 def _call(self, name, **args):
923 with self.commandexecutor() as e:
924 return e.callcommand(name, args).result()
925
926 def commandexecutor(self):
927 return httpv2executor(
928 self.ui,
929 self._opener,
930 self._requestbuilder,
931 self._apiurl,
932 self.apidescriptor,
933 self._redirect,
934 )
935
936
937 # Registry of API service names to metadata about peers that handle it.
938 #
939 # The following keys are meaningful:
940 #
941 # init
942 # Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
943 # apidescriptor) to create a peer.
944 #
945 # priority
946 # Integer priority for the service. If we could choose from multiple
947 # services, we choose the one with the highest priority.
948 API_PEERS = {
949 wireprototypes.HTTP_WIREPROTO_V2: {
950 b'init': httpv2peer,
951 b'priority': 50,
952 },
953 }
954
955
956 def performhandshake(ui, url, opener, requestbuilder): 555 def performhandshake(ui, url, opener, requestbuilder):
957 # The handshake is a request to the capabilities command. 556 # The handshake is a request to the capabilities command.
958 557
959 caps = None 558 caps = None
960 559
961 def capable(x): 560 def capable(x):
962 raise error.ProgrammingError(b'should not be called') 561 raise error.ProgrammingError(b'should not be called')
963 562
964 args = {} 563 args = {}
965
966 # The client advertises support for newer protocols by adding an
967 # X-HgUpgrade-* header with a list of supported APIs and an
968 # X-HgProto-* header advertising which serializing formats it supports.
969 # We only support the HTTP version 2 transport and CBOR responses for
970 # now.
971 advertisev2 = ui.configbool(b'experimental', b'httppeer.advertise-v2')
972
973 if advertisev2:
974 args[b'headers'] = {
975 'X-HgProto-1': 'cbor',
976 }
977
978 args[b'headers'].update(
979 encodevalueinheaders(
980 b' '.join(sorted(API_PEERS)),
981 b'X-HgUpgrade',
982 # We don't know the header limit this early.
983 # So make it small.
984 1024,
985 )
986 )
987 564
988 req, requrl, qs = makev1commandrequest( 565 req, requrl, qs = makev1commandrequest(
989 ui, requestbuilder, caps, capable, url, b'capabilities', args 566 ui, requestbuilder, caps, capable, url, b'capabilities', args
990 ) 567 )
991 resp = sendrequest(ui, opener, req) 568 resp = sendrequest(ui, opener, req)
1002 # issue without behavior degradation. And according to issue 5860, it may 579 # issue without behavior degradation. And according to issue 5860, it may
1003 # be a longstanding bug in some server implementations. So we allow a 580 # be a longstanding bug in some server implementations. So we allow a
1004 # redirect that drops the query string to "just work." 581 # redirect that drops the query string to "just work."
1005 try: 582 try:
1006 respurl, ct, resp = parsev1commandresponse( 583 respurl, ct, resp = parsev1commandresponse(
1007 ui, url, requrl, qs, resp, compressible=False, allowcbor=advertisev2 584 ui, url, requrl, qs, resp, compressible=False
1008 ) 585 )
1009 except RedirectedRepoError as e: 586 except RedirectedRepoError as e:
1010 req, requrl, qs = makev1commandrequest( 587 req, requrl, qs = makev1commandrequest(
1011 ui, requestbuilder, caps, capable, e.respurl, b'capabilities', args 588 ui, requestbuilder, caps, capable, e.respurl, b'capabilities', args
1012 ) 589 )
1013 resp = sendrequest(ui, opener, req) 590 resp = sendrequest(ui, opener, req)
1014 respurl, ct, resp = parsev1commandresponse( 591 respurl, ct, resp = parsev1commandresponse(
1015 ui, url, requrl, qs, resp, compressible=False, allowcbor=advertisev2 592 ui, url, requrl, qs, resp, compressible=False
1016 ) 593 )
1017 594
1018 try: 595 try:
1019 rawdata = resp.read() 596 rawdata = resp.read()
1020 finally: 597 finally:
1021 resp.close() 598 resp.close()
1022 599
1023 if not ct.startswith(b'application/mercurial-'): 600 if not ct.startswith(b'application/mercurial-'):
1024 raise error.ProgrammingError(b'unexpected content-type: %s' % ct) 601 raise error.ProgrammingError(b'unexpected content-type: %s' % ct)
1025 602
1026 if advertisev2: 603 info = {b'v1capabilities': set(rawdata.split())}
1027 if ct == b'application/mercurial-cbor':
1028 try:
1029 info = cborutil.decodeall(rawdata)[0]
1030 except cborutil.CBORDecodeError:
1031 raise error.Abort(
1032 _(b'error decoding CBOR from remote server'),
1033 hint=_(
1034 b'try again and consider contacting '
1035 b'the server operator'
1036 ),
1037 )
1038
1039 # We got a legacy response. That's fine.
1040 elif ct in (b'application/mercurial-0.1', b'application/mercurial-0.2'):
1041 info = {b'v1capabilities': set(rawdata.split())}
1042
1043 else:
1044 raise error.RepoError(
1045 _(b'unexpected response type from server: %s') % ct
1046 )
1047 else:
1048 info = {b'v1capabilities': set(rawdata.split())}
1049 604
1050 return respurl, info 605 return respurl, info
1051 606
1052 607
1053 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request): 608 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
1071 626
1072 opener = opener or urlmod.opener(ui, authinfo) 627 opener = opener or urlmod.opener(ui, authinfo)
1073 628
1074 respurl, info = performhandshake(ui, url, opener, requestbuilder) 629 respurl, info = performhandshake(ui, url, opener, requestbuilder)
1075 630
1076 # Given the intersection of APIs that both we and the server support,
1077 # sort by their advertised priority and pick the first one.
1078 #
1079 # TODO consider making this request-based and interface driven. For
1080 # example, the caller could say "I want a peer that does X." It's quite
1081 # possible that not all peers would do that. Since we know the service
1082 # capabilities, we could filter out services not meeting the
1083 # requirements. Possibly by consulting the interfaces defined by the
1084 # peer type.
1085 apipeerchoices = set(info.get(b'apis', {}).keys()) & set(API_PEERS.keys())
1086
1087 preferredchoices = sorted(
1088 apipeerchoices, key=lambda x: API_PEERS[x][b'priority'], reverse=True
1089 )
1090
1091 for service in preferredchoices:
1092 apipath = b'%s/%s' % (info[b'apibase'].rstrip(b'/'), service)
1093
1094 return API_PEERS[service][b'init'](
1095 ui, respurl, apipath, opener, requestbuilder, info[b'apis'][service]
1096 )
1097
1098 # Failed to construct an API peer. Fall back to legacy.
1099 return httppeer( 631 return httppeer(
1100 ui, path, respurl, opener, requestbuilder, info[b'v1capabilities'] 632 ui, path, respurl, opener, requestbuilder, info[b'v1capabilities']
1101 ) 633 )
1102 634
1103 635