mercurial/wireproto.py
changeset 33806 dedab036215d
parent 33767 b47fe9733d76
child 34062 6c6169f71b8d
equal deleted inserted replaced
33805:f913e90f15a0 33806:dedab036215d
    25     error,
    25     error,
    26     exchange,
    26     exchange,
    27     peer,
    27     peer,
    28     pushkey as pushkeymod,
    28     pushkey as pushkeymod,
    29     pycompat,
    29     pycompat,
       
    30     repository,
    30     streamclone,
    31     streamclone,
    31     util,
    32     util,
    32 )
    33 )
    33 
    34 
    34 urlerr = util.urlerr
    35 urlerr = util.urlerr
   210              'cg': 'boolean',
   211              'cg': 'boolean',
   211              'cbattempted': 'boolean'}
   212              'cbattempted': 'boolean'}
   212 
   213 
   213 # client side
   214 # client side
   214 
   215 
   215 class wirepeer(peer.peerrepository):
   216 class wirepeer(repository.legacypeer):
   216     """Client-side interface for communicating with a peer repository.
   217     """Client-side interface for communicating with a peer repository.
   217 
   218 
   218     Methods commonly call wire protocol commands of the same name.
   219     Methods commonly call wire protocol commands of the same name.
   219 
   220 
   220     See also httppeer.py and sshpeer.py for protocol-specific
   221     See also httppeer.py and sshpeer.py for protocol-specific
   221     implementations of this interface.
   222     implementations of this interface.
   222     """
   223     """
   223     def _submitbatch(self, req):
   224     # Begin of basewirepeer interface.
   224         """run batch request <req> on the server
       
   225 
       
   226         Returns an iterator of the raw responses from the server.
       
   227         """
       
   228         rsp = self._callstream("batch", cmds=encodebatchcmds(req))
       
   229         chunk = rsp.read(1024)
       
   230         work = [chunk]
       
   231         while chunk:
       
   232             while ';' not in chunk and chunk:
       
   233                 chunk = rsp.read(1024)
       
   234                 work.append(chunk)
       
   235             merged = ''.join(work)
       
   236             while ';' in merged:
       
   237                 one, merged = merged.split(';', 1)
       
   238                 yield unescapearg(one)
       
   239             chunk = rsp.read(1024)
       
   240             work = [merged, chunk]
       
   241         yield unescapearg(''.join(work))
       
   242 
       
   243     def _submitone(self, op, args):
       
   244         return self._call(op, **args)
       
   245 
   225 
   246     def iterbatch(self):
   226     def iterbatch(self):
   247         return remoteiterbatcher(self)
   227         return remoteiterbatcher(self)
   248 
   228 
   249     @batchable
   229     @batchable
   291                 branchmap[branchname] = branchheads
   271                 branchmap[branchname] = branchheads
   292             yield branchmap
   272             yield branchmap
   293         except TypeError:
   273         except TypeError:
   294             self._abort(error.ResponseError(_("unexpected response:"), d))
   274             self._abort(error.ResponseError(_("unexpected response:"), d))
   295 
   275 
   296     def branches(self, nodes):
   276     @batchable
   297         n = encodelist(nodes)
   277     def listkeys(self, namespace):
   298         d = self._call("branches", nodes=n)
   278         if not self.capable('pushkey'):
   299         try:
   279             yield {}, None
   300             br = [tuple(decodelist(b)) for b in d.splitlines()]
   280         f = future()
   301             return br
   281         self.ui.debug('preparing listkeys for "%s"\n' % namespace)
   302         except ValueError:
   282         yield {'namespace': encoding.fromlocal(namespace)}, f
   303             self._abort(error.ResponseError(_("unexpected response:"), d))
   283         d = f.value
   304 
   284         self.ui.debug('received listkey for "%s": %i bytes\n'
   305     def between(self, pairs):
   285                       % (namespace, len(d)))
   306         batch = 8 # avoid giant requests
   286         yield pushkeymod.decodekeys(d)
   307         r = []
       
   308         for i in xrange(0, len(pairs), batch):
       
   309             n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
       
   310             d = self._call("between", pairs=n)
       
   311             try:
       
   312                 r.extend(l and decodelist(l) or [] for l in d.splitlines())
       
   313             except ValueError:
       
   314                 self._abort(error.ResponseError(_("unexpected response:"), d))
       
   315         return r
       
   316 
   287 
   317     @batchable
   288     @batchable
   318     def pushkey(self, namespace, key, old, new):
   289     def pushkey(self, namespace, key, old, new):
   319         if not self.capable('pushkey'):
   290         if not self.capable('pushkey'):
   320             yield False, None
   291             yield False, None
   333                 _('push failed (unexpected response):'), d)
   304                 _('push failed (unexpected response):'), d)
   334         for l in output.splitlines(True):
   305         for l in output.splitlines(True):
   335             self.ui.status(_('remote: '), l)
   306             self.ui.status(_('remote: '), l)
   336         yield d
   307         yield d
   337 
   308 
   338     @batchable
       
   339     def listkeys(self, namespace):
       
   340         if not self.capable('pushkey'):
       
   341             yield {}, None
       
   342         f = future()
       
   343         self.ui.debug('preparing listkeys for "%s"\n' % namespace)
       
   344         yield {'namespace': encoding.fromlocal(namespace)}, f
       
   345         d = f.value
       
   346         self.ui.debug('received listkey for "%s": %i bytes\n'
       
   347                       % (namespace, len(d)))
       
   348         yield pushkeymod.decodekeys(d)
       
   349 
       
   350     def stream_out(self):
   309     def stream_out(self):
   351         return self._callstream('stream_out')
   310         return self._callstream('stream_out')
   352 
       
   353     def changegroup(self, nodes, kind):
       
   354         n = encodelist(nodes)
       
   355         f = self._callcompressable("changegroup", roots=n)
       
   356         return changegroupmod.cg1unpacker(f, 'UN')
       
   357 
       
   358     def changegroupsubset(self, bases, heads, kind):
       
   359         self.requirecap('changegroupsubset', _('look up remote changes'))
       
   360         bases = encodelist(bases)
       
   361         heads = encodelist(heads)
       
   362         f = self._callcompressable("changegroupsubset",
       
   363                                    bases=bases, heads=heads)
       
   364         return changegroupmod.cg1unpacker(f, 'UN')
       
   365 
   311 
   366     def getbundle(self, source, **kwargs):
   312     def getbundle(self, source, **kwargs):
   367         self.requirecap('getbundle', _('look up remote changes'))
   313         self.requirecap('getbundle', _('look up remote changes'))
   368         opts = {}
   314         opts = {}
   369         bundlecaps = kwargs.get('bundlecaps')
   315         bundlecaps = kwargs.get('bundlecaps')
   430         else:
   376         else:
   431             # bundle2 push. Send a stream, fetch a stream.
   377             # bundle2 push. Send a stream, fetch a stream.
   432             stream = self._calltwowaystream('unbundle', cg, heads=heads)
   378             stream = self._calltwowaystream('unbundle', cg, heads=heads)
   433             ret = bundle2.getunbundler(self.ui, stream)
   379             ret = bundle2.getunbundler(self.ui, stream)
   434         return ret
   380         return ret
       
   381 
       
   382     # End of basewirepeer interface.
       
   383 
       
   384     # Begin of baselegacywirepeer interface.
       
   385 
       
   386     def branches(self, nodes):
       
   387         n = encodelist(nodes)
       
   388         d = self._call("branches", nodes=n)
       
   389         try:
       
   390             br = [tuple(decodelist(b)) for b in d.splitlines()]
       
   391             return br
       
   392         except ValueError:
       
   393             self._abort(error.ResponseError(_("unexpected response:"), d))
       
   394 
       
   395     def between(self, pairs):
       
   396         batch = 8 # avoid giant requests
       
   397         r = []
       
   398         for i in xrange(0, len(pairs), batch):
       
   399             n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
       
   400             d = self._call("between", pairs=n)
       
   401             try:
       
   402                 r.extend(l and decodelist(l) or [] for l in d.splitlines())
       
   403             except ValueError:
       
   404                 self._abort(error.ResponseError(_("unexpected response:"), d))
       
   405         return r
       
   406 
       
   407     def changegroup(self, nodes, kind):
       
   408         n = encodelist(nodes)
       
   409         f = self._callcompressable("changegroup", roots=n)
       
   410         return changegroupmod.cg1unpacker(f, 'UN')
       
   411 
       
   412     def changegroupsubset(self, bases, heads, kind):
       
   413         self.requirecap('changegroupsubset', _('look up remote changes'))
       
   414         bases = encodelist(bases)
       
   415         heads = encodelist(heads)
       
   416         f = self._callcompressable("changegroupsubset",
       
   417                                    bases=bases, heads=heads)
       
   418         return changegroupmod.cg1unpacker(f, 'UN')
       
   419 
       
   420     # End of baselegacywirepeer interface.
       
   421 
       
   422     def _submitbatch(self, req):
       
   423         """run batch request <req> on the server
       
   424 
       
   425         Returns an iterator of the raw responses from the server.
       
   426         """
       
   427         rsp = self._callstream("batch", cmds=encodebatchcmds(req))
       
   428         chunk = rsp.read(1024)
       
   429         work = [chunk]
       
   430         while chunk:
       
   431             while ';' not in chunk and chunk:
       
   432                 chunk = rsp.read(1024)
       
   433                 work.append(chunk)
       
   434             merged = ''.join(work)
       
   435             while ';' in merged:
       
   436                 one, merged = merged.split(';', 1)
       
   437                 yield unescapearg(one)
       
   438             chunk = rsp.read(1024)
       
   439             work = [merged, chunk]
       
   440         yield unescapearg(''.join(work))
       
   441 
       
   442     def _submitone(self, op, args):
       
   443         return self._call(op, **args)
   435 
   444 
   436     def debugwireargs(self, one, two, three=None, four=None, five=None):
   445     def debugwireargs(self, one, two, three=None, four=None, five=None):
   437         # don't pass optional arguments left at their default value
   446         # don't pass optional arguments left at their default value
   438         opts = {}
   447         opts = {}
   439         if three is not None:
   448         if three is not None: