comparison mercurial/sshpeer.py @ 28438:48fd02dac1d4

wireproto: make iterbatcher behave streamily over http(s) Unfortunately, the ssh and http implementations are slightly different due to differences in their _callstream implementations, which prevents ssh from behaving streamily. We should probably introduce a new batch command that can stream results over ssh at some point in the near future. The streamy behavior of batch over http(s) is an enormous win for remotefilelog over http: in my testing, it's saving about 40% on file fetches with a cold cache against a server on localhost.
author Augie Fackler <augie@google.com>
date Tue, 01 Mar 2016 18:41:43 -0500
parents 8953e963ce8c
children 98e8313dcd9e
comparison
equal deleted inserted replaced
28437:c3eacee01c7e 28438:48fd02dac1d4
229 pass 229 pass
230 self.pipee.close() 230 self.pipee.close()
231 231
232 __del__ = cleanup 232 __del__ = cleanup
233 233
234 def _submitbatch(self, req):
235 cmds = []
236 for op, argsdict in req:
237 args = ','.join('%s=%s' % (wireproto.escapearg(k),
238 wireproto.escapearg(v))
239 for k, v in argsdict.iteritems())
240 cmds.append('%s %s' % (op, args))
241 rsp = self._callstream("batch", cmds=';'.join(cmds))
242 available = self._getamount()
243 # TODO this response parsing is probably suboptimal for large
244 # batches with large responses.
245 toread = min(available, 1024)
246 work = rsp.read(toread)
247 available -= toread
248 chunk = work
249 while chunk:
250 while ';' in work:
251 one, work = work.split(';', 1)
252 yield wireproto.unescapearg(one)
253 toread = min(available, 1024)
254 chunk = rsp.read(toread)
255 available -= toread
256 work += chunk
257 yield wireproto.unescapearg(work)
258
234 def _callstream(self, cmd, **args): 259 def _callstream(self, cmd, **args):
235 self.ui.debug("sending %s command\n" % cmd) 260 self.ui.debug("sending %s command\n" % cmd)
236 self.pipeo.write("%s\n" % cmd) 261 self.pipeo.write("%s\n" % cmd)
237 _func, names = wireproto.commands[cmd] 262 _func, names = wireproto.commands[cmd]
238 keys = names.split() 263 keys = names.split()
289 break 314 break
290 self._send(d) 315 self._send(d)
291 self._send("", flush=True) 316 self._send("", flush=True)
292 return self.pipei 317 return self.pipei
293 318
294 def _recv(self): 319 def _getamount(self):
295 l = self.pipei.readline() 320 l = self.pipei.readline()
296 if l == '\n': 321 if l == '\n':
297 self.readerr() 322 self.readerr()
298 msg = _('check previous remote output') 323 msg = _('check previous remote output')
299 self._abort(error.OutOfBandError(hint=msg)) 324 self._abort(error.OutOfBandError(hint=msg))
300 self.readerr() 325 self.readerr()
301 try: 326 try:
302 l = int(l) 327 return int(l)
303 except ValueError: 328 except ValueError:
304 self._abort(error.ResponseError(_("unexpected response:"), l)) 329 self._abort(error.ResponseError(_("unexpected response:"), l))
305 return self.pipei.read(l) 330
331 def _recv(self):
332 return self.pipei.read(self._getamount())
306 333
307 def _send(self, data, flush=False): 334 def _send(self, data, flush=False):
308 self.pipeo.write("%d\n" % len(data)) 335 self.pipeo.write("%d\n" % len(data))
309 if data: 336 if data:
310 self.pipeo.write(data) 337 self.pipeo.write(data)