Mercurial > public > mercurial-scm > hg-stable
diff mercurial/httppeer.py @ 48561:04688c51f81f
exchangev2: remove it
As discussed on the mailing list, this is incomplete and unused with little
hope of revival.
Differential Revision: https://phab.mercurial-scm.org/D11954
author | Rapha?l Gom?s <rgomes@octobus.net> |
---|---|
date | Tue, 07 Dec 2021 16:44:22 +0100 |
parents | ffd3e823a7e5 |
children | a0da5075bca3 |
line wrap: on
line diff
--- a/mercurial/httppeer.py Thu Dec 30 13:25:44 2021 +0100 +++ b/mercurial/httppeer.py Tue Dec 07 16:44:22 2021 +0100 @@ -13,7 +13,6 @@ import os import socket import struct -import weakref from .i18n import _ from .pycompat import getattr @@ -25,21 +24,9 @@ statichttprepo, url as urlmod, util, - wireprotoframing, - wireprototypes, wireprotov1peer, - wireprotov2peer, - wireprotov2server, ) -from .interfaces import ( - repository, - util as interfaceutil, -) -from .utils import ( - cborutil, - stringutil, - urlutil, -) +from .utils import urlutil httplib = util.httplib urlerr = util.urlerr @@ -331,9 +318,7 @@ self.respurl = respurl -def parsev1commandresponse( - ui, baseurl, requrl, qs, resp, compressible, allowcbor=False -): +def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible): # record the url we got redirected to redirected = False respurl = pycompat.bytesurl(resp.geturl()) @@ -376,17 +361,6 @@ try: subtype = proto.split(b'-', 1)[1] - # Unless we end up supporting CBOR in the legacy wire protocol, - # this should ONLY be encountered for the initial capabilities - # request during handshake. - if subtype == b'cbor': - if allowcbor: - return respurl, proto, resp - else: - raise error.RepoError( - _(b'unexpected CBOR response from server') - ) - version_info = tuple([int(n) for n in subtype.split(b'.')]) except ValueError: raise error.RepoError( @@ -564,85 +538,6 @@ raise exception -def sendv2request( - ui, opener, requestbuilder, apiurl, permission, requests, redirect -): - wireprotoframing.populatestreamencoders() - - uiencoders = ui.configlist(b'experimental', b'httppeer.v2-encoder-order') - - if uiencoders: - encoders = [] - - for encoder in uiencoders: - if encoder not in wireprotoframing.STREAM_ENCODERS: - ui.warn( - _( - b'wire protocol version 2 encoder referenced in ' - b'config (%s) is not known; ignoring\n' - ) - % encoder - ) - else: - encoders.append(encoder) - - else: - encoders = wireprotoframing.STREAM_ENCODERS_ORDER - - reactor = wireprotoframing.clientreactor( - ui, - hasmultiplesend=False, - buffersends=True, - clientcontentencoders=encoders, - ) - - handler = wireprotov2peer.clienthandler( - ui, reactor, opener=opener, requestbuilder=requestbuilder - ) - - url = b'%s/%s' % (apiurl, permission) - - if len(requests) > 1: - url += b'/multirequest' - else: - url += b'/%s' % requests[0][0] - - ui.debug(b'sending %d commands\n' % len(requests)) - for command, args, f in requests: - ui.debug( - b'sending command %s: %s\n' - % (command, stringutil.pprint(args, indent=2)) - ) - assert not list( - handler.callcommand(command, args, f, redirect=redirect) - ) - - # TODO stream this. - body = b''.join(map(bytes, handler.flushcommands())) - - # TODO modify user-agent to reflect v2 - headers = { - 'Accept': wireprotov2server.FRAMINGTYPE, - 'Content-Type': wireprotov2server.FRAMINGTYPE, - } - - req = requestbuilder(pycompat.strurl(url), body, headers) - req.add_unredirected_header('Content-Length', '%d' % len(body)) - - try: - res = opener.open(req) - except urlerr.httperror as e: - if e.code == 401: - raise error.Abort(_(b'authorization failed')) - - raise - except httplib.HTTPException as e: - ui.traceback() - raise IOError(None, e) - - return handler, res - - class queuedcommandfuture(pycompat.futures.Future): """Wraps result() on command futures to trigger submission on call.""" @@ -657,302 +552,6 @@ return self.result(timeout) -@interfaceutil.implementer(repository.ipeercommandexecutor) -class httpv2executor(object): - def __init__( - self, ui, opener, requestbuilder, apiurl, descriptor, redirect - ): - self._ui = ui - self._opener = opener - self._requestbuilder = requestbuilder - self._apiurl = apiurl - self._descriptor = descriptor - self._redirect = redirect - self._sent = False - self._closed = False - self._neededpermissions = set() - self._calls = [] - self._futures = weakref.WeakSet() - self._responseexecutor = None - self._responsef = None - - def __enter__(self): - return self - - def __exit__(self, exctype, excvalue, exctb): - self.close() - - def callcommand(self, command, args): - if self._sent: - raise error.ProgrammingError( - b'callcommand() cannot be used after commands are sent' - ) - - if self._closed: - raise error.ProgrammingError( - b'callcommand() cannot be used after close()' - ) - - # The service advertises which commands are available. So if we attempt - # to call an unknown command or pass an unknown argument, we can screen - # for this. - if command not in self._descriptor[b'commands']: - raise error.ProgrammingError( - b'wire protocol command %s is not available' % command - ) - - cmdinfo = self._descriptor[b'commands'][command] - unknownargs = set(args.keys()) - set(cmdinfo.get(b'args', {})) - - if unknownargs: - raise error.ProgrammingError( - b'wire protocol command %s does not accept argument: %s' - % (command, b', '.join(sorted(unknownargs))) - ) - - self._neededpermissions |= set(cmdinfo[b'permissions']) - - # TODO we /could/ also validate types here, since the API descriptor - # includes types... - - f = pycompat.futures.Future() - - # Monkeypatch it so result() triggers sendcommands(), otherwise result() - # could deadlock. - f.__class__ = queuedcommandfuture - f._peerexecutor = self - - self._futures.add(f) - self._calls.append((command, args, f)) - - return f - - def sendcommands(self): - if self._sent: - return - - if not self._calls: - return - - self._sent = True - - # Unhack any future types so caller sees a clean type and so we - # break reference cycle. - for f in self._futures: - if isinstance(f, queuedcommandfuture): - f.__class__ = pycompat.futures.Future - f._peerexecutor = None - - # Mark the future as running and filter out cancelled futures. - calls = [ - (command, args, f) - for command, args, f in self._calls - if f.set_running_or_notify_cancel() - ] - - # Clear out references, prevent improper object usage. - self._calls = None - - if not calls: - return - - permissions = set(self._neededpermissions) - - if b'push' in permissions and b'pull' in permissions: - permissions.remove(b'pull') - - if len(permissions) > 1: - raise error.RepoError( - _(b'cannot make request requiring multiple permissions: %s') - % _(b', ').join(sorted(permissions)) - ) - - permission = { - b'push': b'rw', - b'pull': b'ro', - }[permissions.pop()] - - handler, resp = sendv2request( - self._ui, - self._opener, - self._requestbuilder, - self._apiurl, - permission, - calls, - self._redirect, - ) - - # TODO we probably want to validate the HTTP code, media type, etc. - - self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) - self._responsef = self._responseexecutor.submit( - self._handleresponse, handler, resp - ) - - def close(self): - if self._closed: - return - - self.sendcommands() - - self._closed = True - - if not self._responsef: - return - - # TODO ^C here may not result in immediate program termination. - - try: - self._responsef.result() - finally: - self._responseexecutor.shutdown(wait=True) - self._responsef = None - self._responseexecutor = None - - # If any of our futures are still in progress, mark them as - # errored, otherwise a result() could wait indefinitely. - for f in self._futures: - if not f.done(): - f.set_exception( - error.ResponseError(_(b'unfulfilled command response')) - ) - - self._futures = None - - def _handleresponse(self, handler, resp): - # Called in a thread to read the response. - - while handler.readdata(resp): - pass - - -@interfaceutil.implementer(repository.ipeerv2) -class httpv2peer(object): - - limitedarguments = False - - def __init__( - self, ui, repourl, apipath, opener, requestbuilder, apidescriptor - ): - self.ui = ui - self.apidescriptor = apidescriptor - - if repourl.endswith(b'/'): - repourl = repourl[:-1] - - self._url = repourl - self._apipath = apipath - self._apiurl = b'%s/%s' % (repourl, apipath) - self._opener = opener - self._requestbuilder = requestbuilder - - self._redirect = wireprotov2peer.supportedredirects(ui, apidescriptor) - - # Start of ipeerconnection. - - def url(self): - return self._url - - def local(self): - return None - - def peer(self): - return self - - def canpush(self): - # TODO change once implemented. - return False - - def close(self): - self.ui.note( - _( - b'(sent %d HTTP requests and %d bytes; ' - b'received %d bytes in responses)\n' - ) - % ( - self._opener.requestscount, - self._opener.sentbytescount, - self._opener.receivedbytescount, - ) - ) - - # End of ipeerconnection. - - # Start of ipeercapabilities. - - def capable(self, name): - # The capabilities used internally historically map to capabilities - # advertised from the "capabilities" wire protocol command. However, - # version 2 of that command works differently. - - # Maps to commands that are available. - if name in ( - b'branchmap', - b'getbundle', - b'known', - b'lookup', - b'pushkey', - ): - return True - - # Other concepts. - if name in (b'bundle2',): - return True - - # Alias command-* to presence of command of that name. - if name.startswith(b'command-'): - return name[len(b'command-') :] in self.apidescriptor[b'commands'] - - return False - - def requirecap(self, name, purpose): - if self.capable(name): - return - - raise error.CapabilityError( - _( - b'cannot %s; client or remote repository does not support the ' - b'\'%s\' capability' - ) - % (purpose, name) - ) - - # End of ipeercapabilities. - - def _call(self, name, **args): - with self.commandexecutor() as e: - return e.callcommand(name, args).result() - - def commandexecutor(self): - return httpv2executor( - self.ui, - self._opener, - self._requestbuilder, - self._apiurl, - self.apidescriptor, - self._redirect, - ) - - -# Registry of API service names to metadata about peers that handle it. -# -# The following keys are meaningful: -# -# init -# Callable receiving (ui, repourl, servicepath, opener, requestbuilder, -# apidescriptor) to create a peer. -# -# priority -# Integer priority for the service. If we could choose from multiple -# services, we choose the one with the highest priority. -API_PEERS = { - wireprototypes.HTTP_WIREPROTO_V2: { - b'init': httpv2peer, - b'priority': 50, - }, -} - - def performhandshake(ui, url, opener, requestbuilder): # The handshake is a request to the capabilities command. @@ -963,28 +562,6 @@ args = {} - # The client advertises support for newer protocols by adding an - # X-HgUpgrade-* header with a list of supported APIs and an - # X-HgProto-* header advertising which serializing formats it supports. - # We only support the HTTP version 2 transport and CBOR responses for - # now. - advertisev2 = ui.configbool(b'experimental', b'httppeer.advertise-v2') - - if advertisev2: - args[b'headers'] = { - 'X-HgProto-1': 'cbor', - } - - args[b'headers'].update( - encodevalueinheaders( - b' '.join(sorted(API_PEERS)), - b'X-HgUpgrade', - # We don't know the header limit this early. - # So make it small. - 1024, - ) - ) - req, requrl, qs = makev1commandrequest( ui, requestbuilder, caps, capable, url, b'capabilities', args ) @@ -1004,7 +581,7 @@ # redirect that drops the query string to "just work." try: respurl, ct, resp = parsev1commandresponse( - ui, url, requrl, qs, resp, compressible=False, allowcbor=advertisev2 + ui, url, requrl, qs, resp, compressible=False ) except RedirectedRepoError as e: req, requrl, qs = makev1commandrequest( @@ -1012,7 +589,7 @@ ) resp = sendrequest(ui, opener, req) respurl, ct, resp = parsev1commandresponse( - ui, url, requrl, qs, resp, compressible=False, allowcbor=advertisev2 + ui, url, requrl, qs, resp, compressible=False ) try: @@ -1023,29 +600,7 @@ if not ct.startswith(b'application/mercurial-'): raise error.ProgrammingError(b'unexpected content-type: %s' % ct) - if advertisev2: - if ct == b'application/mercurial-cbor': - try: - info = cborutil.decodeall(rawdata)[0] - except cborutil.CBORDecodeError: - raise error.Abort( - _(b'error decoding CBOR from remote server'), - hint=_( - b'try again and consider contacting ' - b'the server operator' - ), - ) - - # We got a legacy response. That's fine. - elif ct in (b'application/mercurial-0.1', b'application/mercurial-0.2'): - info = {b'v1capabilities': set(rawdata.split())} - - else: - raise error.RepoError( - _(b'unexpected response type from server: %s') % ct - ) - else: - info = {b'v1capabilities': set(rawdata.split())} + info = {b'v1capabilities': set(rawdata.split())} return respurl, info @@ -1073,29 +628,6 @@ respurl, info = performhandshake(ui, url, opener, requestbuilder) - # Given the intersection of APIs that both we and the server support, - # sort by their advertised priority and pick the first one. - # - # TODO consider making this request-based and interface driven. For - # example, the caller could say "I want a peer that does X." It's quite - # possible that not all peers would do that. Since we know the service - # capabilities, we could filter out services not meeting the - # requirements. Possibly by consulting the interfaces defined by the - # peer type. - apipeerchoices = set(info.get(b'apis', {}).keys()) & set(API_PEERS.keys()) - - preferredchoices = sorted( - apipeerchoices, key=lambda x: API_PEERS[x][b'priority'], reverse=True - ) - - for service in preferredchoices: - apipath = b'%s/%s' % (info[b'apibase'].rstrip(b'/'), service) - - return API_PEERS[service][b'init']( - ui, respurl, apipath, opener, requestbuilder, info[b'apis'][service] - ) - - # Failed to construct an API peer. Fall back to legacy. return httppeer( ui, path, respurl, opener, requestbuilder, info[b'v1capabilities'] )