--- a/mercurial/wireproto.py Sun Nov 20 13:55:53 2016 -0800
+++ b/mercurial/wireproto.py Sun Nov 20 13:50:45 2016 -0800
@@ -78,21 +78,6 @@
# """
# raise NotImplementedError()
- def groupchunks(self, fh):
- """Generator of chunks to send to the client.
-
- Some protocols may have compressed the contents.
- """
- raise NotImplementedError()
-
- def compresschunks(self, chunks):
- """Generator of possible compressed chunks to send to the client.
-
- This is like ``groupchunks()`` except it accepts a generator as
- its argument.
- """
- raise NotImplementedError()
-
class remotebatch(peer.batcher):
'''batches the queued calls; uses as few roundtrips as possible'''
def __init__(self, remote):
@@ -529,10 +514,19 @@
"""wireproto reply: binary stream
The call was successful and the result is a stream.
- Iterate on the `self.gen` attribute to retrieve chunks.
+
+ Accepts either a generator or an object with a ``read(size)`` method.
+
+ ``v1compressible`` indicates whether this data can be compressed to
+ "version 1" clients (technically: HTTP peers using
+ application/mercurial-0.1 media type). This flag should NOT be used on
+ new commands because new clients should support a more modern compression
+ mechanism.
"""
- def __init__(self, gen):
+ def __init__(self, gen=None, reader=None, v1compressible=False):
self.gen = gen
+ self.reader = reader
+ self.v1compressible = v1compressible
class pushres(object):
"""wireproto reply: success with simple integer return
@@ -739,14 +733,14 @@
def changegroup(repo, proto, roots):
nodes = decodelist(roots)
cg = changegroupmod.changegroup(repo, nodes, 'serve')
- return streamres(proto.groupchunks(cg))
+ return streamres(reader=cg, v1compressible=True)
@wireprotocommand('changegroupsubset', 'bases heads')
def changegroupsubset(repo, proto, bases, heads):
bases = decodelist(bases)
heads = decodelist(heads)
cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
- return streamres(proto.groupchunks(cg))
+ return streamres(reader=cg, v1compressible=True)
@wireprotocommand('debugwireargs', 'one two *')
def debugwireargs(repo, proto, one, two, others):
@@ -781,7 +775,7 @@
return ooberror(bundle2required)
chunks = exchange.getbundlechunks(repo, 'serve', **opts)
- return streamres(proto.compresschunks(chunks))
+ return streamres(gen=chunks, v1compressible=True)
@wireprotocommand('heads')
def heads(repo, proto):
@@ -870,7 +864,7 @@
# LockError may be raised before the first result is yielded. Don't
# emit output until we're sure we got the lock successfully.
it = streamclone.generatev1wireproto(repo)
- return streamres(getstream(it))
+ return streamres(gen=getstream(it))
except error.LockError:
return '2\n'
@@ -900,7 +894,7 @@
if util.safehasattr(r, 'addpart'):
# The return looks streamable, we are in the bundle2 case and
# should return a stream.
- return streamres(r.getchunks())
+ return streamres(gen=r.getchunks())
return pushres(r)
finally:
@@ -962,4 +956,4 @@
manargs, advargs))
except error.PushRaced as exc:
bundler.newpart('error:pushraced', [('message', str(exc))])
- return streamres(bundler.getchunks())
+ return streamres(gen=bundler.getchunks())