diff mercurial/sshpeer.py @ 28438:48fd02dac1d4

wireproto: make iterbatcher behave streamily over http(s) Unfortunately, the ssh and http implementations are slightly different due to differences in their _callstream implementations, which prevents ssh from behaving streamily. We should probably introduce a new batch command that can stream results over ssh at some point in the near future. The streamy behavior of batch over http(s) is an enormous win for remotefilelog over http: in my testing, it's saving about 40% on file fetches with a cold cache against a server on localhost.
author Augie Fackler <augie@google.com>
date Tue, 01 Mar 2016 18:41:43 -0500
parents 8953e963ce8c
children 98e8313dcd9e
line wrap: on
line diff
--- a/mercurial/sshpeer.py	Tue Mar 01 17:44:41 2016 -0500
+++ b/mercurial/sshpeer.py	Tue Mar 01 18:41:43 2016 -0500
@@ -231,6 +231,31 @@
 
     __del__ = cleanup
 
+    def _submitbatch(self, req):
+        cmds = []
+        for op, argsdict in req:
+            args = ','.join('%s=%s' % (wireproto.escapearg(k),
+                                       wireproto.escapearg(v))
+                            for k, v in argsdict.iteritems())
+            cmds.append('%s %s' % (op, args))
+        rsp = self._callstream("batch", cmds=';'.join(cmds))
+        available = self._getamount()
+        # TODO this response parsing is probably suboptimal for large
+        # batches with large responses.
+        toread = min(available, 1024)
+        work = rsp.read(toread)
+        available -= toread
+        chunk = work
+        while chunk:
+            while ';' in work:
+                one, work = work.split(';', 1)
+                yield wireproto.unescapearg(one)
+            toread = min(available, 1024)
+            chunk = rsp.read(toread)
+            available -= toread
+            work += chunk
+        yield wireproto.unescapearg(work)
+
     def _callstream(self, cmd, **args):
         self.ui.debug("sending %s command\n" % cmd)
         self.pipeo.write("%s\n" % cmd)
@@ -291,7 +316,7 @@
         self._send("", flush=True)
         return self.pipei
 
-    def _recv(self):
+    def _getamount(self):
         l = self.pipei.readline()
         if l == '\n':
             self.readerr()
@@ -299,10 +324,12 @@
             self._abort(error.OutOfBandError(hint=msg))
         self.readerr()
         try:
-            l = int(l)
+            return int(l)
         except ValueError:
             self._abort(error.ResponseError(_("unexpected response:"), l))
-        return self.pipei.read(l)
+
+    def _recv(self):
+        return self.pipei.read(self._getamount())
 
     def _send(self, data, flush=False):
         self.pipeo.write("%d\n" % len(data))