Mercurial > public > mercurial-scm > hg
comparison mercurial/wireproto.py @ 28438:48fd02dac1d4
wireproto: make iterbatcher behave streamily over http(s)
Unfortunately, the ssh and http implementations are slightly different
due to differences in their _callstream implementations, which
prevents ssh from behaving streamily. We should probably introduce a
new batch command that can stream results over ssh at some point in
the near future.
The streamy behavior of batch over http(s) is an enormous win for
remotefilelog over http: in my testing, it's saving about 40% on file
fetches with a cold cache against a server on localhost.
author | Augie Fackler <augie@google.com> |
---|---|
date | Tue, 01 Mar 2016 18:41:43 -0500 |
parents | 8d38eab2777a |
children | fd2acc5046f6 |
comparison
equal
deleted
inserted
replaced
28437:c3eacee01c7e | 28438:48fd02dac1d4 |
---|---|
5 # This software may be used and distributed according to the terms of the | 5 # This software may be used and distributed according to the terms of the |
6 # GNU General Public License version 2 or any later version. | 6 # GNU General Public License version 2 or any later version. |
7 | 7 |
8 from __future__ import absolute_import | 8 from __future__ import absolute_import |
9 | 9 |
10 import itertools | |
10 import os | 11 import os |
11 import sys | 12 import sys |
12 import tempfile | 13 import tempfile |
13 import urllib | 14 import urllib |
14 | 15 |
117 class remoteiterbatcher(peer.iterbatcher): | 118 class remoteiterbatcher(peer.iterbatcher): |
118 def __init__(self, remote): | 119 def __init__(self, remote): |
119 super(remoteiterbatcher, self).__init__() | 120 super(remoteiterbatcher, self).__init__() |
120 self._remote = remote | 121 self._remote = remote |
121 | 122 |
123 def __getattr__(self, name): | |
124 if not getattr(self._remote, name, False): | |
125 raise AttributeError( | |
126 'Attempted to iterbatch non-batchable call to %r' % name) | |
127 return super(remoteiterbatcher, self).__getattr__(name) | |
128 | |
122 def submit(self): | 129 def submit(self): |
123 """Break the batch request into many patch calls and pipeline them. | 130 """Break the batch request into many patch calls and pipeline them. |
124 | 131 |
125 This is mostly valuable over http where request sizes can be | 132 This is mostly valuable over http where request sizes can be |
126 limited, but can be used in other places as well. | 133 limited, but can be used in other places as well. |
127 """ | 134 """ |
128 rb = self._remote.batch() | 135 req, rsp = [], [] |
129 rb.calls = self.calls | 136 for name, args, opts, resref in self.calls: |
130 rb.submit() | 137 mtd = getattr(self._remote, name) |
138 batchable = mtd.batchable(mtd.im_self, *args, **opts) | |
139 encargsorres, encresref = batchable.next() | |
140 assert encresref | |
141 req.append((name, encargsorres)) | |
142 rsp.append((batchable, encresref)) | |
143 if req: | |
144 self._resultiter = self._remote._submitbatch(req) | |
145 self._rsp = rsp | |
131 | 146 |
132 def results(self): | 147 def results(self): |
133 for name, args, opts, resref in self.calls: | 148 for (batchable, encresref), encres in itertools.izip( |
134 yield resref.value | 149 self._rsp, self._resultiter): |
150 encresref.set(encres) | |
151 yield batchable.next() | |
135 | 152 |
136 # Forward a couple of names from peer to make wireproto interactions | 153 # Forward a couple of names from peer to make wireproto interactions |
137 # slightly more sensible. | 154 # slightly more sensible. |
138 batchable = peer.batchable | 155 batchable = peer.batchable |
139 future = peer.future | 156 future = peer.future |
200 if self.capable('batch'): | 217 if self.capable('batch'): |
201 return remotebatch(self) | 218 return remotebatch(self) |
202 else: | 219 else: |
203 return peer.localbatch(self) | 220 return peer.localbatch(self) |
204 def _submitbatch(self, req): | 221 def _submitbatch(self, req): |
222 """run batch request <req> on the server | |
223 | |
224 Returns an iterator of the raw responses from the server. | |
225 """ | |
205 cmds = [] | 226 cmds = [] |
206 for op, argsdict in req: | 227 for op, argsdict in req: |
207 args = ','.join('%s=%s' % (escapearg(k), escapearg(v)) | 228 args = ','.join('%s=%s' % (escapearg(k), escapearg(v)) |
208 for k, v in argsdict.iteritems()) | 229 for k, v in argsdict.iteritems()) |
209 cmds.append('%s %s' % (op, args)) | 230 cmds.append('%s %s' % (op, args)) |
210 rsp = self._call("batch", cmds=';'.join(cmds)) | 231 rsp = self._callstream("batch", cmds=';'.join(cmds)) |
211 return [unescapearg(r) for r in rsp.split(';')] | 232 # TODO this response parsing is probably suboptimal for large |
233 # batches with large responses. | |
234 work = rsp.read(1024) | |
235 chunk = work | |
236 while chunk: | |
237 while ';' in work: | |
238 one, work = work.split(';', 1) | |
239 yield unescapearg(one) | |
240 chunk = rsp.read(1024) | |
241 work += chunk | |
242 yield unescapearg(work) | |
243 | |
212 def _submitone(self, op, args): | 244 def _submitone(self, op, args): |
213 return self._call(op, **args) | 245 return self._call(op, **args) |
214 | 246 |
215 def iterbatch(self): | 247 def iterbatch(self): |
216 return remoteiterbatcher(self) | 248 return remoteiterbatcher(self) |