Mercurial > public > mercurial-scm > hg-stable
comparison mercurial/sshpeer.py @ 28438:48fd02dac1d4
wireproto: make iterbatcher behave streamily over http(s)
Unfortunately, the ssh and http implementations are slightly different
due to differences in their _callstream implementations, which
prevents ssh from behaving streamily. We should probably introduce a
new batch command that can stream results over ssh at some point in
the near future.
The streamy behavior of batch over http(s) is an enormous win for
remotefilelog over http: in my testing, it's saving about 40% on file
fetches with a cold cache against a server on localhost.
author | Augie Fackler <augie@google.com> |
---|---|
date | Tue, 01 Mar 2016 18:41:43 -0500 |
parents | 8953e963ce8c |
children | 98e8313dcd9e |
comparison
equal
deleted
inserted
replaced
28437:c3eacee01c7e | 28438:48fd02dac1d4 |
---|---|
229 pass | 229 pass |
230 self.pipee.close() | 230 self.pipee.close() |
231 | 231 |
232 __del__ = cleanup | 232 __del__ = cleanup |
233 | 233 |
234 def _submitbatch(self, req): | |
235 cmds = [] | |
236 for op, argsdict in req: | |
237 args = ','.join('%s=%s' % (wireproto.escapearg(k), | |
238 wireproto.escapearg(v)) | |
239 for k, v in argsdict.iteritems()) | |
240 cmds.append('%s %s' % (op, args)) | |
241 rsp = self._callstream("batch", cmds=';'.join(cmds)) | |
242 available = self._getamount() | |
243 # TODO this response parsing is probably suboptimal for large | |
244 # batches with large responses. | |
245 toread = min(available, 1024) | |
246 work = rsp.read(toread) | |
247 available -= toread | |
248 chunk = work | |
249 while chunk: | |
250 while ';' in work: | |
251 one, work = work.split(';', 1) | |
252 yield wireproto.unescapearg(one) | |
253 toread = min(available, 1024) | |
254 chunk = rsp.read(toread) | |
255 available -= toread | |
256 work += chunk | |
257 yield wireproto.unescapearg(work) | |
258 | |
234 def _callstream(self, cmd, **args): | 259 def _callstream(self, cmd, **args): |
235 self.ui.debug("sending %s command\n" % cmd) | 260 self.ui.debug("sending %s command\n" % cmd) |
236 self.pipeo.write("%s\n" % cmd) | 261 self.pipeo.write("%s\n" % cmd) |
237 _func, names = wireproto.commands[cmd] | 262 _func, names = wireproto.commands[cmd] |
238 keys = names.split() | 263 keys = names.split() |
289 break | 314 break |
290 self._send(d) | 315 self._send(d) |
291 self._send("", flush=True) | 316 self._send("", flush=True) |
292 return self.pipei | 317 return self.pipei |
293 | 318 |
294 def _recv(self): | 319 def _getamount(self): |
295 l = self.pipei.readline() | 320 l = self.pipei.readline() |
296 if l == '\n': | 321 if l == '\n': |
297 self.readerr() | 322 self.readerr() |
298 msg = _('check previous remote output') | 323 msg = _('check previous remote output') |
299 self._abort(error.OutOfBandError(hint=msg)) | 324 self._abort(error.OutOfBandError(hint=msg)) |
300 self.readerr() | 325 self.readerr() |
301 try: | 326 try: |
302 l = int(l) | 327 return int(l) |
303 except ValueError: | 328 except ValueError: |
304 self._abort(error.ResponseError(_("unexpected response:"), l)) | 329 self._abort(error.ResponseError(_("unexpected response:"), l)) |
305 return self.pipei.read(l) | 330 |
331 def _recv(self): | |
332 return self.pipei.read(self._getamount()) | |
306 | 333 |
307 def _send(self, data, flush=False): | 334 def _send(self, data, flush=False): |
308 self.pipeo.write("%d\n" % len(data)) | 335 self.pipeo.write("%d\n" % len(data)) |
309 if data: | 336 if data: |
310 self.pipeo.write(data) | 337 self.pipeo.write(data) |