Mercurial > public > mercurial-scm > hg
diff mercurial/wireproto.py @ 37614:a81d02ea65db
wireproto: move version 1 peer functionality to standalone module (API)
wireproto.py contains code for both the client and the server. There
*should* be a somewhat strong separation between the two.
This commit extracts the client-side code from wireproto.py into a new
module - wireprotov1peer.
Differential Revision: https://phab.mercurial-scm.org/D3259
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Wed, 11 Apr 2018 12:49:08 -0700 |
parents | 96d735601ca1 |
children | 379d54eae6eb |
line wrap: on
line diff
--- a/mercurial/wireproto.py Wed Apr 11 10:51:38 2018 -0700 +++ b/mercurial/wireproto.py Wed Apr 11 12:49:08 2018 -0700 @@ -7,13 +7,11 @@ from __future__ import absolute_import -import hashlib import os import tempfile from .i18n import _ from .node import ( - bin, hex, nullid, ) @@ -25,10 +23,8 @@ encoding, error, exchange, - peer, pushkey as pushkeymod, pycompat, - repository, streamclone, util, wireprototypes, @@ -47,92 +43,6 @@ 'IncompatibleClient') bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint) -class remoteiterbatcher(peer.iterbatcher): - def __init__(self, remote): - super(remoteiterbatcher, self).__init__() - self._remote = remote - - def __getattr__(self, name): - # Validate this method is batchable, since submit() only supports - # batchable methods. - fn = getattr(self._remote, name) - if not getattr(fn, 'batchable', None): - raise error.ProgrammingError('Attempted to batch a non-batchable ' - 'call to %r' % name) - - return super(remoteiterbatcher, self).__getattr__(name) - - def submit(self): - """Break the batch request into many patch calls and pipeline them. - - This is mostly valuable over http where request sizes can be - limited, but can be used in other places as well. - """ - # 2-tuple of (command, arguments) that represents what will be - # sent over the wire. - requests = [] - - # 4-tuple of (command, final future, @batchable generator, remote - # future). - results = [] - - for command, args, opts, finalfuture in self.calls: - mtd = getattr(self._remote, command) - batchable = mtd.batchable(mtd.__self__, *args, **opts) - - commandargs, fremote = next(batchable) - assert fremote - requests.append((command, commandargs)) - results.append((command, finalfuture, batchable, fremote)) - - if requests: - self._resultiter = self._remote._submitbatch(requests) - - self._results = results - - def results(self): - for command, finalfuture, batchable, remotefuture in self._results: - # Get the raw result, set it in the remote future, feed it - # back into the @batchable generator so it can be decoded, and - # set the result on the final future to this value. - remoteresult = next(self._resultiter) - remotefuture.set(remoteresult) - finalfuture.set(next(batchable)) - - # Verify our @batchable generators only emit 2 values. - try: - next(batchable) - except StopIteration: - pass - else: - raise error.ProgrammingError('%s @batchable generator emitted ' - 'unexpected value count' % command) - - yield finalfuture.value - -# Forward a couple of names from peer to make wireproto interactions -# slightly more sensible. -batchable = peer.batchable -future = peer.future - - -def encodebatchcmds(req): - """Return a ``cmds`` argument value for the ``batch`` command.""" - escapearg = wireprototypes.escapebatcharg - - cmds = [] - for op, argsdict in req: - # Old servers didn't properly unescape argument names. So prevent - # the sending of argument names that may not be decoded properly by - # servers. - assert all(escapearg(k) == k for k in argsdict) - - args = ','.join('%s=%s' % (escapearg(k), escapearg(v)) - for k, v in argsdict.iteritems()) - cmds.append('%s %s' % (op, args)) - - return ';'.join(cmds) - def clientcompressionsupport(proto): """Returns a list of compression methods supported by the client. @@ -145,315 +55,6 @@ return cap[5:].split(',') return ['zlib', 'none'] -# client side - -class wirepeer(repository.legacypeer): - """Client-side interface for communicating with a peer repository. - - Methods commonly call wire protocol commands of the same name. - - See also httppeer.py and sshpeer.py for protocol-specific - implementations of this interface. - """ - # Begin of ipeercommands interface. - - def iterbatch(self): - return remoteiterbatcher(self) - - @batchable - def lookup(self, key): - self.requirecap('lookup', _('look up remote revision')) - f = future() - yield {'key': encoding.fromlocal(key)}, f - d = f.value - success, data = d[:-1].split(" ", 1) - if int(success): - yield bin(data) - else: - self._abort(error.RepoError(data)) - - @batchable - def heads(self): - f = future() - yield {}, f - d = f.value - try: - yield wireprototypes.decodelist(d[:-1]) - except ValueError: - self._abort(error.ResponseError(_("unexpected response:"), d)) - - @batchable - def known(self, nodes): - f = future() - yield {'nodes': wireprototypes.encodelist(nodes)}, f - d = f.value - try: - yield [bool(int(b)) for b in d] - except ValueError: - self._abort(error.ResponseError(_("unexpected response:"), d)) - - @batchable - def branchmap(self): - f = future() - yield {}, f - d = f.value - try: - branchmap = {} - for branchpart in d.splitlines(): - branchname, branchheads = branchpart.split(' ', 1) - branchname = encoding.tolocal(urlreq.unquote(branchname)) - branchheads = wireprototypes.decodelist(branchheads) - branchmap[branchname] = branchheads - yield branchmap - except TypeError: - self._abort(error.ResponseError(_("unexpected response:"), d)) - - @batchable - def listkeys(self, namespace): - if not self.capable('pushkey'): - yield {}, None - f = future() - self.ui.debug('preparing listkeys for "%s"\n' % namespace) - yield {'namespace': encoding.fromlocal(namespace)}, f - d = f.value - self.ui.debug('received listkey for "%s": %i bytes\n' - % (namespace, len(d))) - yield pushkeymod.decodekeys(d) - - @batchable - def pushkey(self, namespace, key, old, new): - if not self.capable('pushkey'): - yield False, None - f = future() - self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key)) - yield {'namespace': encoding.fromlocal(namespace), - 'key': encoding.fromlocal(key), - 'old': encoding.fromlocal(old), - 'new': encoding.fromlocal(new)}, f - d = f.value - d, output = d.split('\n', 1) - try: - d = bool(int(d)) - except ValueError: - raise error.ResponseError( - _('push failed (unexpected response):'), d) - for l in output.splitlines(True): - self.ui.status(_('remote: '), l) - yield d - - def stream_out(self): - return self._callstream('stream_out') - - def getbundle(self, source, **kwargs): - kwargs = pycompat.byteskwargs(kwargs) - self.requirecap('getbundle', _('look up remote changes')) - opts = {} - bundlecaps = kwargs.get('bundlecaps') or set() - for key, value in kwargs.iteritems(): - if value is None: - continue - keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key) - if keytype is None: - raise error.ProgrammingError( - 'Unexpectedly None keytype for key %s' % key) - elif keytype == 'nodes': - value = wireprototypes.encodelist(value) - elif keytype == 'csv': - value = ','.join(value) - elif keytype == 'scsv': - value = ','.join(sorted(value)) - elif keytype == 'boolean': - value = '%i' % bool(value) - elif keytype != 'plain': - raise KeyError('unknown getbundle option type %s' - % keytype) - opts[key] = value - f = self._callcompressable("getbundle", **pycompat.strkwargs(opts)) - if any((cap.startswith('HG2') for cap in bundlecaps)): - return bundle2.getunbundler(self.ui, f) - else: - return changegroupmod.cg1unpacker(f, 'UN') - - def unbundle(self, cg, heads, url): - '''Send cg (a readable file-like object representing the - changegroup to push, typically a chunkbuffer object) to the - remote server as a bundle. - - When pushing a bundle10 stream, return an integer indicating the - result of the push (see changegroup.apply()). - - When pushing a bundle20 stream, return a bundle20 stream. - - `url` is the url the client thinks it's pushing to, which is - visible to hooks. - ''' - - if heads != ['force'] and self.capable('unbundlehash'): - heads = wireprototypes.encodelist( - ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()]) - else: - heads = wireprototypes.encodelist(heads) - - if util.safehasattr(cg, 'deltaheader'): - # this a bundle10, do the old style call sequence - ret, output = self._callpush("unbundle", cg, heads=heads) - if ret == "": - raise error.ResponseError( - _('push failed:'), output) - try: - ret = int(ret) - except ValueError: - raise error.ResponseError( - _('push failed (unexpected response):'), ret) - - for l in output.splitlines(True): - self.ui.status(_('remote: '), l) - else: - # bundle2 push. Send a stream, fetch a stream. - stream = self._calltwowaystream('unbundle', cg, heads=heads) - ret = bundle2.getunbundler(self.ui, stream) - return ret - - # End of ipeercommands interface. - - # Begin of ipeerlegacycommands interface. - - def branches(self, nodes): - n = wireprototypes.encodelist(nodes) - d = self._call("branches", nodes=n) - try: - br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()] - return br - except ValueError: - self._abort(error.ResponseError(_("unexpected response:"), d)) - - def between(self, pairs): - batch = 8 # avoid giant requests - r = [] - for i in xrange(0, len(pairs), batch): - n = " ".join([wireprototypes.encodelist(p, '-') - for p in pairs[i:i + batch]]) - d = self._call("between", pairs=n) - try: - r.extend(l and wireprototypes.decodelist(l) or [] - for l in d.splitlines()) - except ValueError: - self._abort(error.ResponseError(_("unexpected response:"), d)) - return r - - def changegroup(self, nodes, kind): - n = wireprototypes.encodelist(nodes) - f = self._callcompressable("changegroup", roots=n) - return changegroupmod.cg1unpacker(f, 'UN') - - def changegroupsubset(self, bases, heads, kind): - self.requirecap('changegroupsubset', _('look up remote changes')) - bases = wireprototypes.encodelist(bases) - heads = wireprototypes.encodelist(heads) - f = self._callcompressable("changegroupsubset", - bases=bases, heads=heads) - return changegroupmod.cg1unpacker(f, 'UN') - - # End of ipeerlegacycommands interface. - - def _submitbatch(self, req): - """run batch request <req> on the server - - Returns an iterator of the raw responses from the server. - """ - ui = self.ui - if ui.debugflag and ui.configbool('devel', 'debug.peer-request'): - ui.debug('devel-peer-request: batched-content\n') - for op, args in req: - msg = 'devel-peer-request: - %s (%d arguments)\n' - ui.debug(msg % (op, len(args))) - - unescapearg = wireprototypes.unescapebatcharg - - rsp = self._callstream("batch", cmds=encodebatchcmds(req)) - chunk = rsp.read(1024) - work = [chunk] - while chunk: - while ';' not in chunk and chunk: - chunk = rsp.read(1024) - work.append(chunk) - merged = ''.join(work) - while ';' in merged: - one, merged = merged.split(';', 1) - yield unescapearg(one) - chunk = rsp.read(1024) - work = [merged, chunk] - yield unescapearg(''.join(work)) - - def _submitone(self, op, args): - return self._call(op, **pycompat.strkwargs(args)) - - def debugwireargs(self, one, two, three=None, four=None, five=None): - # don't pass optional arguments left at their default value - opts = {} - if three is not None: - opts[r'three'] = three - if four is not None: - opts[r'four'] = four - return self._call('debugwireargs', one=one, two=two, **opts) - - def _call(self, cmd, **args): - """execute <cmd> on the server - - The command is expected to return a simple string. - - returns the server reply as a string.""" - raise NotImplementedError() - - def _callstream(self, cmd, **args): - """execute <cmd> on the server - - The command is expected to return a stream. Note that if the - command doesn't return a stream, _callstream behaves - differently for ssh and http peers. - - returns the server reply as a file like object. - """ - raise NotImplementedError() - - def _callcompressable(self, cmd, **args): - """execute <cmd> on the server - - The command is expected to return a stream. - - The stream may have been compressed in some implementations. This - function takes care of the decompression. This is the only difference - with _callstream. - - returns the server reply as a file like object. - """ - raise NotImplementedError() - - def _callpush(self, cmd, fp, **args): - """execute a <cmd> on server - - The command is expected to be related to a push. Push has a special - return method. - - returns the server reply as a (ret, output) tuple. ret is either - empty (error) or a stringified int. - """ - raise NotImplementedError() - - def _calltwowaystream(self, cmd, fp, **args): - """execute <cmd> on server - - The command will send a stream to the server and get a stream in reply. - """ - raise NotImplementedError() - - def _abort(self, exception): - """clearly abort the wire protocol connection and raise the exception - """ - raise NotImplementedError() - -# server side - # wire protocol command can either return a string or one of these classes. def getdispatchrepo(repo, proto, command):