mercurial/wireproto.py
changeset 37614 a81d02ea65db
parent 37613 96d735601ca1
child 37779 379d54eae6eb
equal deleted inserted replaced
37613:96d735601ca1 37614:a81d02ea65db
     5 # This software may be used and distributed according to the terms of the
     5 # This software may be used and distributed according to the terms of the
     6 # GNU General Public License version 2 or any later version.
     6 # GNU General Public License version 2 or any later version.
     7 
     7 
     8 from __future__ import absolute_import
     8 from __future__ import absolute_import
     9 
     9 
    10 import hashlib
       
    11 import os
    10 import os
    12 import tempfile
    11 import tempfile
    13 
    12 
    14 from .i18n import _
    13 from .i18n import _
    15 from .node import (
    14 from .node import (
    16     bin,
       
    17     hex,
    15     hex,
    18     nullid,
    16     nullid,
    19 )
    17 )
    20 
    18 
    21 from . import (
    19 from . import (
    23     changegroup as changegroupmod,
    21     changegroup as changegroupmod,
    24     discovery,
    22     discovery,
    25     encoding,
    23     encoding,
    26     error,
    24     error,
    27     exchange,
    25     exchange,
    28     peer,
       
    29     pushkey as pushkeymod,
    26     pushkey as pushkeymod,
    30     pycompat,
    27     pycompat,
    31     repository,
       
    32     streamclone,
    28     streamclone,
    33     util,
    29     util,
    34     wireprototypes,
    30     wireprototypes,
    35 )
    31 )
    36 
    32 
    44 
    40 
    45 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
    41 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
    46 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
    42 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
    47                         'IncompatibleClient')
    43                         'IncompatibleClient')
    48 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
    44 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
    49 
       
    50 class remoteiterbatcher(peer.iterbatcher):
       
    51     def __init__(self, remote):
       
    52         super(remoteiterbatcher, self).__init__()
       
    53         self._remote = remote
       
    54 
       
    55     def __getattr__(self, name):
       
    56         # Validate this method is batchable, since submit() only supports
       
    57         # batchable methods.
       
    58         fn = getattr(self._remote, name)
       
    59         if not getattr(fn, 'batchable', None):
       
    60             raise error.ProgrammingError('Attempted to batch a non-batchable '
       
    61                                          'call to %r' % name)
       
    62 
       
    63         return super(remoteiterbatcher, self).__getattr__(name)
       
    64 
       
    65     def submit(self):
       
    66         """Break the batch request into many patch calls and pipeline them.
       
    67 
       
    68         This is mostly valuable over http where request sizes can be
       
    69         limited, but can be used in other places as well.
       
    70         """
       
    71         # 2-tuple of (command, arguments) that represents what will be
       
    72         # sent over the wire.
       
    73         requests = []
       
    74 
       
    75         # 4-tuple of (command, final future, @batchable generator, remote
       
    76         # future).
       
    77         results = []
       
    78 
       
    79         for command, args, opts, finalfuture in self.calls:
       
    80             mtd = getattr(self._remote, command)
       
    81             batchable = mtd.batchable(mtd.__self__, *args, **opts)
       
    82 
       
    83             commandargs, fremote = next(batchable)
       
    84             assert fremote
       
    85             requests.append((command, commandargs))
       
    86             results.append((command, finalfuture, batchable, fremote))
       
    87 
       
    88         if requests:
       
    89             self._resultiter = self._remote._submitbatch(requests)
       
    90 
       
    91         self._results = results
       
    92 
       
    93     def results(self):
       
    94         for command, finalfuture, batchable, remotefuture in self._results:
       
    95             # Get the raw result, set it in the remote future, feed it
       
    96             # back into the @batchable generator so it can be decoded, and
       
    97             # set the result on the final future to this value.
       
    98             remoteresult = next(self._resultiter)
       
    99             remotefuture.set(remoteresult)
       
   100             finalfuture.set(next(batchable))
       
   101 
       
   102             # Verify our @batchable generators only emit 2 values.
       
   103             try:
       
   104                 next(batchable)
       
   105             except StopIteration:
       
   106                 pass
       
   107             else:
       
   108                 raise error.ProgrammingError('%s @batchable generator emitted '
       
   109                                              'unexpected value count' % command)
       
   110 
       
   111             yield finalfuture.value
       
   112 
       
   113 # Forward a couple of names from peer to make wireproto interactions
       
   114 # slightly more sensible.
       
   115 batchable = peer.batchable
       
   116 future = peer.future
       
   117 
       
   118 
       
   119 def encodebatchcmds(req):
       
   120     """Return a ``cmds`` argument value for the ``batch`` command."""
       
   121     escapearg = wireprototypes.escapebatcharg
       
   122 
       
   123     cmds = []
       
   124     for op, argsdict in req:
       
   125         # Old servers didn't properly unescape argument names. So prevent
       
   126         # the sending of argument names that may not be decoded properly by
       
   127         # servers.
       
   128         assert all(escapearg(k) == k for k in argsdict)
       
   129 
       
   130         args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
       
   131                         for k, v in argsdict.iteritems())
       
   132         cmds.append('%s %s' % (op, args))
       
   133 
       
   134     return ';'.join(cmds)
       
   135 
    45 
   136 def clientcompressionsupport(proto):
    46 def clientcompressionsupport(proto):
   137     """Returns a list of compression methods supported by the client.
    47     """Returns a list of compression methods supported by the client.
   138 
    48 
   139     Returns a list of the compression methods supported by the client
    49     Returns a list of the compression methods supported by the client
   142     """
    52     """
   143     for cap in proto.getprotocaps():
    53     for cap in proto.getprotocaps():
   144         if cap.startswith('comp='):
    54         if cap.startswith('comp='):
   145             return cap[5:].split(',')
    55             return cap[5:].split(',')
   146     return ['zlib', 'none']
    56     return ['zlib', 'none']
   147 
       
   148 # client side
       
   149 
       
   150 class wirepeer(repository.legacypeer):
       
   151     """Client-side interface for communicating with a peer repository.
       
   152 
       
   153     Methods commonly call wire protocol commands of the same name.
       
   154 
       
   155     See also httppeer.py and sshpeer.py for protocol-specific
       
   156     implementations of this interface.
       
   157     """
       
   158     # Begin of ipeercommands interface.
       
   159 
       
   160     def iterbatch(self):
       
   161         return remoteiterbatcher(self)
       
   162 
       
   163     @batchable
       
   164     def lookup(self, key):
       
   165         self.requirecap('lookup', _('look up remote revision'))
       
   166         f = future()
       
   167         yield {'key': encoding.fromlocal(key)}, f
       
   168         d = f.value
       
   169         success, data = d[:-1].split(" ", 1)
       
   170         if int(success):
       
   171             yield bin(data)
       
   172         else:
       
   173             self._abort(error.RepoError(data))
       
   174 
       
   175     @batchable
       
   176     def heads(self):
       
   177         f = future()
       
   178         yield {}, f
       
   179         d = f.value
       
   180         try:
       
   181             yield wireprototypes.decodelist(d[:-1])
       
   182         except ValueError:
       
   183             self._abort(error.ResponseError(_("unexpected response:"), d))
       
   184 
       
   185     @batchable
       
   186     def known(self, nodes):
       
   187         f = future()
       
   188         yield {'nodes': wireprototypes.encodelist(nodes)}, f
       
   189         d = f.value
       
   190         try:
       
   191             yield [bool(int(b)) for b in d]
       
   192         except ValueError:
       
   193             self._abort(error.ResponseError(_("unexpected response:"), d))
       
   194 
       
   195     @batchable
       
   196     def branchmap(self):
       
   197         f = future()
       
   198         yield {}, f
       
   199         d = f.value
       
   200         try:
       
   201             branchmap = {}
       
   202             for branchpart in d.splitlines():
       
   203                 branchname, branchheads = branchpart.split(' ', 1)
       
   204                 branchname = encoding.tolocal(urlreq.unquote(branchname))
       
   205                 branchheads = wireprototypes.decodelist(branchheads)
       
   206                 branchmap[branchname] = branchheads
       
   207             yield branchmap
       
   208         except TypeError:
       
   209             self._abort(error.ResponseError(_("unexpected response:"), d))
       
   210 
       
   211     @batchable
       
   212     def listkeys(self, namespace):
       
   213         if not self.capable('pushkey'):
       
   214             yield {}, None
       
   215         f = future()
       
   216         self.ui.debug('preparing listkeys for "%s"\n' % namespace)
       
   217         yield {'namespace': encoding.fromlocal(namespace)}, f
       
   218         d = f.value
       
   219         self.ui.debug('received listkey for "%s": %i bytes\n'
       
   220                       % (namespace, len(d)))
       
   221         yield pushkeymod.decodekeys(d)
       
   222 
       
   223     @batchable
       
   224     def pushkey(self, namespace, key, old, new):
       
   225         if not self.capable('pushkey'):
       
   226             yield False, None
       
   227         f = future()
       
   228         self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
       
   229         yield {'namespace': encoding.fromlocal(namespace),
       
   230                'key': encoding.fromlocal(key),
       
   231                'old': encoding.fromlocal(old),
       
   232                'new': encoding.fromlocal(new)}, f
       
   233         d = f.value
       
   234         d, output = d.split('\n', 1)
       
   235         try:
       
   236             d = bool(int(d))
       
   237         except ValueError:
       
   238             raise error.ResponseError(
       
   239                 _('push failed (unexpected response):'), d)
       
   240         for l in output.splitlines(True):
       
   241             self.ui.status(_('remote: '), l)
       
   242         yield d
       
   243 
       
   244     def stream_out(self):
       
   245         return self._callstream('stream_out')
       
   246 
       
   247     def getbundle(self, source, **kwargs):
       
   248         kwargs = pycompat.byteskwargs(kwargs)
       
   249         self.requirecap('getbundle', _('look up remote changes'))
       
   250         opts = {}
       
   251         bundlecaps = kwargs.get('bundlecaps') or set()
       
   252         for key, value in kwargs.iteritems():
       
   253             if value is None:
       
   254                 continue
       
   255             keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
       
   256             if keytype is None:
       
   257                 raise error.ProgrammingError(
       
   258                     'Unexpectedly None keytype for key %s' % key)
       
   259             elif keytype == 'nodes':
       
   260                 value = wireprototypes.encodelist(value)
       
   261             elif keytype == 'csv':
       
   262                 value = ','.join(value)
       
   263             elif keytype == 'scsv':
       
   264                 value = ','.join(sorted(value))
       
   265             elif keytype == 'boolean':
       
   266                 value = '%i' % bool(value)
       
   267             elif keytype != 'plain':
       
   268                 raise KeyError('unknown getbundle option type %s'
       
   269                                % keytype)
       
   270             opts[key] = value
       
   271         f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
       
   272         if any((cap.startswith('HG2') for cap in bundlecaps)):
       
   273             return bundle2.getunbundler(self.ui, f)
       
   274         else:
       
   275             return changegroupmod.cg1unpacker(f, 'UN')
       
   276 
       
   277     def unbundle(self, cg, heads, url):
       
   278         '''Send cg (a readable file-like object representing the
       
   279         changegroup to push, typically a chunkbuffer object) to the
       
   280         remote server as a bundle.
       
   281 
       
   282         When pushing a bundle10 stream, return an integer indicating the
       
   283         result of the push (see changegroup.apply()).
       
   284 
       
   285         When pushing a bundle20 stream, return a bundle20 stream.
       
   286 
       
   287         `url` is the url the client thinks it's pushing to, which is
       
   288         visible to hooks.
       
   289         '''
       
   290 
       
   291         if heads != ['force'] and self.capable('unbundlehash'):
       
   292             heads = wireprototypes.encodelist(
       
   293                 ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()])
       
   294         else:
       
   295             heads = wireprototypes.encodelist(heads)
       
   296 
       
   297         if util.safehasattr(cg, 'deltaheader'):
       
   298             # this a bundle10, do the old style call sequence
       
   299             ret, output = self._callpush("unbundle", cg, heads=heads)
       
   300             if ret == "":
       
   301                 raise error.ResponseError(
       
   302                     _('push failed:'), output)
       
   303             try:
       
   304                 ret = int(ret)
       
   305             except ValueError:
       
   306                 raise error.ResponseError(
       
   307                     _('push failed (unexpected response):'), ret)
       
   308 
       
   309             for l in output.splitlines(True):
       
   310                 self.ui.status(_('remote: '), l)
       
   311         else:
       
   312             # bundle2 push. Send a stream, fetch a stream.
       
   313             stream = self._calltwowaystream('unbundle', cg, heads=heads)
       
   314             ret = bundle2.getunbundler(self.ui, stream)
       
   315         return ret
       
   316 
       
   317     # End of ipeercommands interface.
       
   318 
       
   319     # Begin of ipeerlegacycommands interface.
       
   320 
       
   321     def branches(self, nodes):
       
   322         n = wireprototypes.encodelist(nodes)
       
   323         d = self._call("branches", nodes=n)
       
   324         try:
       
   325             br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
       
   326             return br
       
   327         except ValueError:
       
   328             self._abort(error.ResponseError(_("unexpected response:"), d))
       
   329 
       
   330     def between(self, pairs):
       
   331         batch = 8 # avoid giant requests
       
   332         r = []
       
   333         for i in xrange(0, len(pairs), batch):
       
   334             n = " ".join([wireprototypes.encodelist(p, '-')
       
   335                           for p in pairs[i:i + batch]])
       
   336             d = self._call("between", pairs=n)
       
   337             try:
       
   338                 r.extend(l and wireprototypes.decodelist(l) or []
       
   339                          for l in d.splitlines())
       
   340             except ValueError:
       
   341                 self._abort(error.ResponseError(_("unexpected response:"), d))
       
   342         return r
       
   343 
       
   344     def changegroup(self, nodes, kind):
       
   345         n = wireprototypes.encodelist(nodes)
       
   346         f = self._callcompressable("changegroup", roots=n)
       
   347         return changegroupmod.cg1unpacker(f, 'UN')
       
   348 
       
   349     def changegroupsubset(self, bases, heads, kind):
       
   350         self.requirecap('changegroupsubset', _('look up remote changes'))
       
   351         bases = wireprototypes.encodelist(bases)
       
   352         heads = wireprototypes.encodelist(heads)
       
   353         f = self._callcompressable("changegroupsubset",
       
   354                                    bases=bases, heads=heads)
       
   355         return changegroupmod.cg1unpacker(f, 'UN')
       
   356 
       
   357     # End of ipeerlegacycommands interface.
       
   358 
       
   359     def _submitbatch(self, req):
       
   360         """run batch request <req> on the server
       
   361 
       
   362         Returns an iterator of the raw responses from the server.
       
   363         """
       
   364         ui = self.ui
       
   365         if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
       
   366             ui.debug('devel-peer-request: batched-content\n')
       
   367             for op, args in req:
       
   368                 msg = 'devel-peer-request:    - %s (%d arguments)\n'
       
   369                 ui.debug(msg % (op, len(args)))
       
   370 
       
   371         unescapearg = wireprototypes.unescapebatcharg
       
   372 
       
   373         rsp = self._callstream("batch", cmds=encodebatchcmds(req))
       
   374         chunk = rsp.read(1024)
       
   375         work = [chunk]
       
   376         while chunk:
       
   377             while ';' not in chunk and chunk:
       
   378                 chunk = rsp.read(1024)
       
   379                 work.append(chunk)
       
   380             merged = ''.join(work)
       
   381             while ';' in merged:
       
   382                 one, merged = merged.split(';', 1)
       
   383                 yield unescapearg(one)
       
   384             chunk = rsp.read(1024)
       
   385             work = [merged, chunk]
       
   386         yield unescapearg(''.join(work))
       
   387 
       
   388     def _submitone(self, op, args):
       
   389         return self._call(op, **pycompat.strkwargs(args))
       
   390 
       
   391     def debugwireargs(self, one, two, three=None, four=None, five=None):
       
   392         # don't pass optional arguments left at their default value
       
   393         opts = {}
       
   394         if three is not None:
       
   395             opts[r'three'] = three
       
   396         if four is not None:
       
   397             opts[r'four'] = four
       
   398         return self._call('debugwireargs', one=one, two=two, **opts)
       
   399 
       
   400     def _call(self, cmd, **args):
       
   401         """execute <cmd> on the server
       
   402 
       
   403         The command is expected to return a simple string.
       
   404 
       
   405         returns the server reply as a string."""
       
   406         raise NotImplementedError()
       
   407 
       
   408     def _callstream(self, cmd, **args):
       
   409         """execute <cmd> on the server
       
   410 
       
   411         The command is expected to return a stream. Note that if the
       
   412         command doesn't return a stream, _callstream behaves
       
   413         differently for ssh and http peers.
       
   414 
       
   415         returns the server reply as a file like object.
       
   416         """
       
   417         raise NotImplementedError()
       
   418 
       
   419     def _callcompressable(self, cmd, **args):
       
   420         """execute <cmd> on the server
       
   421 
       
   422         The command is expected to return a stream.
       
   423 
       
   424         The stream may have been compressed in some implementations. This
       
   425         function takes care of the decompression. This is the only difference
       
   426         with _callstream.
       
   427 
       
   428         returns the server reply as a file like object.
       
   429         """
       
   430         raise NotImplementedError()
       
   431 
       
   432     def _callpush(self, cmd, fp, **args):
       
   433         """execute a <cmd> on server
       
   434 
       
   435         The command is expected to be related to a push. Push has a special
       
   436         return method.
       
   437 
       
   438         returns the server reply as a (ret, output) tuple. ret is either
       
   439         empty (error) or a stringified int.
       
   440         """
       
   441         raise NotImplementedError()
       
   442 
       
   443     def _calltwowaystream(self, cmd, fp, **args):
       
   444         """execute <cmd> on server
       
   445 
       
   446         The command will send a stream to the server and get a stream in reply.
       
   447         """
       
   448         raise NotImplementedError()
       
   449 
       
   450     def _abort(self, exception):
       
   451         """clearly abort the wire protocol connection and raise the exception
       
   452         """
       
   453         raise NotImplementedError()
       
   454 
       
   455 # server side
       
   456 
    57 
   457 # wire protocol command can either return a string or one of these classes.
    58 # wire protocol command can either return a string or one of these classes.
   458 
    59 
   459 def getdispatchrepo(repo, proto, command):
    60 def getdispatchrepo(repo, proto, command):
   460     """Obtain the repo used for processing wire protocol commands.
    61     """Obtain the repo used for processing wire protocol commands.