Mercurial > public > mercurial-scm > hg-stable
diff mercurial/wireprotov1peer.py @ 37614:a81d02ea65db
wireproto: move version 1 peer functionality to standalone module (API)
wireproto.py contains code for both the client and the server. There
*should* be a somewhat strong separation between the two.
This commit extracts the client-side code from wireproto.py into a new
module - wireprotov1peer.
Differential Revision: https://phab.mercurial-scm.org/D3259
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Wed, 11 Apr 2018 12:49:08 -0700 |
parents | mercurial/wireproto.py@96d735601ca1 |
children | f3dc8239e3a9 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mercurial/wireprotov1peer.py Wed Apr 11 12:49:08 2018 -0700 @@ -0,0 +1,420 @@ +# wireprotov1peer.py - Client-side functionality for wire protocol version 1. +# +# Copyright 2005-2010 Matt Mackall <mpm@selenic.com> +# +# This software may be used and distributed according to the terms of the +# GNU General Public License version 2 or any later version. + +from __future__ import absolute_import + +import hashlib + +from .i18n import _ +from .node import ( + bin, +) + +from . import ( + bundle2, + changegroup as changegroupmod, + encoding, + error, + peer, + pushkey as pushkeymod, + pycompat, + repository, + util, + wireprototypes, +) + +urlreq = util.urlreq + +class remoteiterbatcher(peer.iterbatcher): + def __init__(self, remote): + super(remoteiterbatcher, self).__init__() + self._remote = remote + + def __getattr__(self, name): + # Validate this method is batchable, since submit() only supports + # batchable methods. + fn = getattr(self._remote, name) + if not getattr(fn, 'batchable', None): + raise error.ProgrammingError('Attempted to batch a non-batchable ' + 'call to %r' % name) + + return super(remoteiterbatcher, self).__getattr__(name) + + def submit(self): + """Break the batch request into many patch calls and pipeline them. + + This is mostly valuable over http where request sizes can be + limited, but can be used in other places as well. + """ + # 2-tuple of (command, arguments) that represents what will be + # sent over the wire. + requests = [] + + # 4-tuple of (command, final future, @batchable generator, remote + # future). + results = [] + + for command, args, opts, finalfuture in self.calls: + mtd = getattr(self._remote, command) + batchable = mtd.batchable(mtd.__self__, *args, **opts) + + commandargs, fremote = next(batchable) + assert fremote + requests.append((command, commandargs)) + results.append((command, finalfuture, batchable, fremote)) + + if requests: + self._resultiter = self._remote._submitbatch(requests) + + self._results = results + + def results(self): + for command, finalfuture, batchable, remotefuture in self._results: + # Get the raw result, set it in the remote future, feed it + # back into the @batchable generator so it can be decoded, and + # set the result on the final future to this value. + remoteresult = next(self._resultiter) + remotefuture.set(remoteresult) + finalfuture.set(next(batchable)) + + # Verify our @batchable generators only emit 2 values. + try: + next(batchable) + except StopIteration: + pass + else: + raise error.ProgrammingError('%s @batchable generator emitted ' + 'unexpected value count' % command) + + yield finalfuture.value + +# Forward a couple of names from peer to make wireproto interactions +# slightly more sensible. +batchable = peer.batchable +future = peer.future + +def encodebatchcmds(req): + """Return a ``cmds`` argument value for the ``batch`` command.""" + escapearg = wireprototypes.escapebatcharg + + cmds = [] + for op, argsdict in req: + # Old servers didn't properly unescape argument names. So prevent + # the sending of argument names that may not be decoded properly by + # servers. + assert all(escapearg(k) == k for k in argsdict) + + args = ','.join('%s=%s' % (escapearg(k), escapearg(v)) + for k, v in argsdict.iteritems()) + cmds.append('%s %s' % (op, args)) + + return ';'.join(cmds) + +class wirepeer(repository.legacypeer): + """Client-side interface for communicating with a peer repository. + + Methods commonly call wire protocol commands of the same name. + + See also httppeer.py and sshpeer.py for protocol-specific + implementations of this interface. + """ + # Begin of ipeercommands interface. + + def iterbatch(self): + return remoteiterbatcher(self) + + @batchable + def lookup(self, key): + self.requirecap('lookup', _('look up remote revision')) + f = future() + yield {'key': encoding.fromlocal(key)}, f + d = f.value + success, data = d[:-1].split(" ", 1) + if int(success): + yield bin(data) + else: + self._abort(error.RepoError(data)) + + @batchable + def heads(self): + f = future() + yield {}, f + d = f.value + try: + yield wireprototypes.decodelist(d[:-1]) + except ValueError: + self._abort(error.ResponseError(_("unexpected response:"), d)) + + @batchable + def known(self, nodes): + f = future() + yield {'nodes': wireprototypes.encodelist(nodes)}, f + d = f.value + try: + yield [bool(int(b)) for b in d] + except ValueError: + self._abort(error.ResponseError(_("unexpected response:"), d)) + + @batchable + def branchmap(self): + f = future() + yield {}, f + d = f.value + try: + branchmap = {} + for branchpart in d.splitlines(): + branchname, branchheads = branchpart.split(' ', 1) + branchname = encoding.tolocal(urlreq.unquote(branchname)) + branchheads = wireprototypes.decodelist(branchheads) + branchmap[branchname] = branchheads + yield branchmap + except TypeError: + self._abort(error.ResponseError(_("unexpected response:"), d)) + + @batchable + def listkeys(self, namespace): + if not self.capable('pushkey'): + yield {}, None + f = future() + self.ui.debug('preparing listkeys for "%s"\n' % namespace) + yield {'namespace': encoding.fromlocal(namespace)}, f + d = f.value + self.ui.debug('received listkey for "%s": %i bytes\n' + % (namespace, len(d))) + yield pushkeymod.decodekeys(d) + + @batchable + def pushkey(self, namespace, key, old, new): + if not self.capable('pushkey'): + yield False, None + f = future() + self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key)) + yield {'namespace': encoding.fromlocal(namespace), + 'key': encoding.fromlocal(key), + 'old': encoding.fromlocal(old), + 'new': encoding.fromlocal(new)}, f + d = f.value + d, output = d.split('\n', 1) + try: + d = bool(int(d)) + except ValueError: + raise error.ResponseError( + _('push failed (unexpected response):'), d) + for l in output.splitlines(True): + self.ui.status(_('remote: '), l) + yield d + + def stream_out(self): + return self._callstream('stream_out') + + def getbundle(self, source, **kwargs): + kwargs = pycompat.byteskwargs(kwargs) + self.requirecap('getbundle', _('look up remote changes')) + opts = {} + bundlecaps = kwargs.get('bundlecaps') or set() + for key, value in kwargs.iteritems(): + if value is None: + continue + keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key) + if keytype is None: + raise error.ProgrammingError( + 'Unexpectedly None keytype for key %s' % key) + elif keytype == 'nodes': + value = wireprototypes.encodelist(value) + elif keytype == 'csv': + value = ','.join(value) + elif keytype == 'scsv': + value = ','.join(sorted(value)) + elif keytype == 'boolean': + value = '%i' % bool(value) + elif keytype != 'plain': + raise KeyError('unknown getbundle option type %s' + % keytype) + opts[key] = value + f = self._callcompressable("getbundle", **pycompat.strkwargs(opts)) + if any((cap.startswith('HG2') for cap in bundlecaps)): + return bundle2.getunbundler(self.ui, f) + else: + return changegroupmod.cg1unpacker(f, 'UN') + + def unbundle(self, cg, heads, url): + '''Send cg (a readable file-like object representing the + changegroup to push, typically a chunkbuffer object) to the + remote server as a bundle. + + When pushing a bundle10 stream, return an integer indicating the + result of the push (see changegroup.apply()). + + When pushing a bundle20 stream, return a bundle20 stream. + + `url` is the url the client thinks it's pushing to, which is + visible to hooks. + ''' + + if heads != ['force'] and self.capable('unbundlehash'): + heads = wireprototypes.encodelist( + ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()]) + else: + heads = wireprototypes.encodelist(heads) + + if util.safehasattr(cg, 'deltaheader'): + # this a bundle10, do the old style call sequence + ret, output = self._callpush("unbundle", cg, heads=heads) + if ret == "": + raise error.ResponseError( + _('push failed:'), output) + try: + ret = int(ret) + except ValueError: + raise error.ResponseError( + _('push failed (unexpected response):'), ret) + + for l in output.splitlines(True): + self.ui.status(_('remote: '), l) + else: + # bundle2 push. Send a stream, fetch a stream. + stream = self._calltwowaystream('unbundle', cg, heads=heads) + ret = bundle2.getunbundler(self.ui, stream) + return ret + + # End of ipeercommands interface. + + # Begin of ipeerlegacycommands interface. + + def branches(self, nodes): + n = wireprototypes.encodelist(nodes) + d = self._call("branches", nodes=n) + try: + br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()] + return br + except ValueError: + self._abort(error.ResponseError(_("unexpected response:"), d)) + + def between(self, pairs): + batch = 8 # avoid giant requests + r = [] + for i in xrange(0, len(pairs), batch): + n = " ".join([wireprototypes.encodelist(p, '-') + for p in pairs[i:i + batch]]) + d = self._call("between", pairs=n) + try: + r.extend(l and wireprototypes.decodelist(l) or [] + for l in d.splitlines()) + except ValueError: + self._abort(error.ResponseError(_("unexpected response:"), d)) + return r + + def changegroup(self, nodes, kind): + n = wireprototypes.encodelist(nodes) + f = self._callcompressable("changegroup", roots=n) + return changegroupmod.cg1unpacker(f, 'UN') + + def changegroupsubset(self, bases, heads, kind): + self.requirecap('changegroupsubset', _('look up remote changes')) + bases = wireprototypes.encodelist(bases) + heads = wireprototypes.encodelist(heads) + f = self._callcompressable("changegroupsubset", + bases=bases, heads=heads) + return changegroupmod.cg1unpacker(f, 'UN') + + # End of ipeerlegacycommands interface. + + def _submitbatch(self, req): + """run batch request <req> on the server + + Returns an iterator of the raw responses from the server. + """ + ui = self.ui + if ui.debugflag and ui.configbool('devel', 'debug.peer-request'): + ui.debug('devel-peer-request: batched-content\n') + for op, args in req: + msg = 'devel-peer-request: - %s (%d arguments)\n' + ui.debug(msg % (op, len(args))) + + unescapearg = wireprototypes.unescapebatcharg + + rsp = self._callstream("batch", cmds=encodebatchcmds(req)) + chunk = rsp.read(1024) + work = [chunk] + while chunk: + while ';' not in chunk and chunk: + chunk = rsp.read(1024) + work.append(chunk) + merged = ''.join(work) + while ';' in merged: + one, merged = merged.split(';', 1) + yield unescapearg(one) + chunk = rsp.read(1024) + work = [merged, chunk] + yield unescapearg(''.join(work)) + + def _submitone(self, op, args): + return self._call(op, **pycompat.strkwargs(args)) + + def debugwireargs(self, one, two, three=None, four=None, five=None): + # don't pass optional arguments left at their default value + opts = {} + if three is not None: + opts[r'three'] = three + if four is not None: + opts[r'four'] = four + return self._call('debugwireargs', one=one, two=two, **opts) + + def _call(self, cmd, **args): + """execute <cmd> on the server + + The command is expected to return a simple string. + + returns the server reply as a string.""" + raise NotImplementedError() + + def _callstream(self, cmd, **args): + """execute <cmd> on the server + + The command is expected to return a stream. Note that if the + command doesn't return a stream, _callstream behaves + differently for ssh and http peers. + + returns the server reply as a file like object. + """ + raise NotImplementedError() + + def _callcompressable(self, cmd, **args): + """execute <cmd> on the server + + The command is expected to return a stream. + + The stream may have been compressed in some implementations. This + function takes care of the decompression. This is the only difference + with _callstream. + + returns the server reply as a file like object. + """ + raise NotImplementedError() + + def _callpush(self, cmd, fp, **args): + """execute a <cmd> on server + + The command is expected to be related to a push. Push has a special + return method. + + returns the server reply as a (ret, output) tuple. ret is either + empty (error) or a stringified int. + """ + raise NotImplementedError() + + def _calltwowaystream(self, cmd, fp, **args): + """execute <cmd> on server + + The command will send a stream to the server and get a stream in reply. + """ + raise NotImplementedError() + + def _abort(self, exception): + """clearly abort the wire protocol connection and raise the exception + """ + raise NotImplementedError()