44 |
40 |
45 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required') |
41 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required') |
46 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/' |
42 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/' |
47 'IncompatibleClient') |
43 'IncompatibleClient') |
48 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint) |
44 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint) |
49 |
|
50 class remoteiterbatcher(peer.iterbatcher): |
|
51 def __init__(self, remote): |
|
52 super(remoteiterbatcher, self).__init__() |
|
53 self._remote = remote |
|
54 |
|
55 def __getattr__(self, name): |
|
56 # Validate this method is batchable, since submit() only supports |
|
57 # batchable methods. |
|
58 fn = getattr(self._remote, name) |
|
59 if not getattr(fn, 'batchable', None): |
|
60 raise error.ProgrammingError('Attempted to batch a non-batchable ' |
|
61 'call to %r' % name) |
|
62 |
|
63 return super(remoteiterbatcher, self).__getattr__(name) |
|
64 |
|
65 def submit(self): |
|
66 """Break the batch request into many patch calls and pipeline them. |
|
67 |
|
68 This is mostly valuable over http where request sizes can be |
|
69 limited, but can be used in other places as well. |
|
70 """ |
|
71 # 2-tuple of (command, arguments) that represents what will be |
|
72 # sent over the wire. |
|
73 requests = [] |
|
74 |
|
75 # 4-tuple of (command, final future, @batchable generator, remote |
|
76 # future). |
|
77 results = [] |
|
78 |
|
79 for command, args, opts, finalfuture in self.calls: |
|
80 mtd = getattr(self._remote, command) |
|
81 batchable = mtd.batchable(mtd.__self__, *args, **opts) |
|
82 |
|
83 commandargs, fremote = next(batchable) |
|
84 assert fremote |
|
85 requests.append((command, commandargs)) |
|
86 results.append((command, finalfuture, batchable, fremote)) |
|
87 |
|
88 if requests: |
|
89 self._resultiter = self._remote._submitbatch(requests) |
|
90 |
|
91 self._results = results |
|
92 |
|
93 def results(self): |
|
94 for command, finalfuture, batchable, remotefuture in self._results: |
|
95 # Get the raw result, set it in the remote future, feed it |
|
96 # back into the @batchable generator so it can be decoded, and |
|
97 # set the result on the final future to this value. |
|
98 remoteresult = next(self._resultiter) |
|
99 remotefuture.set(remoteresult) |
|
100 finalfuture.set(next(batchable)) |
|
101 |
|
102 # Verify our @batchable generators only emit 2 values. |
|
103 try: |
|
104 next(batchable) |
|
105 except StopIteration: |
|
106 pass |
|
107 else: |
|
108 raise error.ProgrammingError('%s @batchable generator emitted ' |
|
109 'unexpected value count' % command) |
|
110 |
|
111 yield finalfuture.value |
|
112 |
|
113 # Forward a couple of names from peer to make wireproto interactions |
|
114 # slightly more sensible. |
|
115 batchable = peer.batchable |
|
116 future = peer.future |
|
117 |
|
118 |
|
119 def encodebatchcmds(req): |
|
120 """Return a ``cmds`` argument value for the ``batch`` command.""" |
|
121 escapearg = wireprototypes.escapebatcharg |
|
122 |
|
123 cmds = [] |
|
124 for op, argsdict in req: |
|
125 # Old servers didn't properly unescape argument names. So prevent |
|
126 # the sending of argument names that may not be decoded properly by |
|
127 # servers. |
|
128 assert all(escapearg(k) == k for k in argsdict) |
|
129 |
|
130 args = ','.join('%s=%s' % (escapearg(k), escapearg(v)) |
|
131 for k, v in argsdict.iteritems()) |
|
132 cmds.append('%s %s' % (op, args)) |
|
133 |
|
134 return ';'.join(cmds) |
|
135 |
45 |
136 def clientcompressionsupport(proto): |
46 def clientcompressionsupport(proto): |
137 """Returns a list of compression methods supported by the client. |
47 """Returns a list of compression methods supported by the client. |
138 |
48 |
139 Returns a list of the compression methods supported by the client |
49 Returns a list of the compression methods supported by the client |
142 """ |
52 """ |
143 for cap in proto.getprotocaps(): |
53 for cap in proto.getprotocaps(): |
144 if cap.startswith('comp='): |
54 if cap.startswith('comp='): |
145 return cap[5:].split(',') |
55 return cap[5:].split(',') |
146 return ['zlib', 'none'] |
56 return ['zlib', 'none'] |
147 |
|
148 # client side |
|
149 |
|
150 class wirepeer(repository.legacypeer): |
|
151 """Client-side interface for communicating with a peer repository. |
|
152 |
|
153 Methods commonly call wire protocol commands of the same name. |
|
154 |
|
155 See also httppeer.py and sshpeer.py for protocol-specific |
|
156 implementations of this interface. |
|
157 """ |
|
158 # Begin of ipeercommands interface. |
|
159 |
|
160 def iterbatch(self): |
|
161 return remoteiterbatcher(self) |
|
162 |
|
163 @batchable |
|
164 def lookup(self, key): |
|
165 self.requirecap('lookup', _('look up remote revision')) |
|
166 f = future() |
|
167 yield {'key': encoding.fromlocal(key)}, f |
|
168 d = f.value |
|
169 success, data = d[:-1].split(" ", 1) |
|
170 if int(success): |
|
171 yield bin(data) |
|
172 else: |
|
173 self._abort(error.RepoError(data)) |
|
174 |
|
175 @batchable |
|
176 def heads(self): |
|
177 f = future() |
|
178 yield {}, f |
|
179 d = f.value |
|
180 try: |
|
181 yield wireprototypes.decodelist(d[:-1]) |
|
182 except ValueError: |
|
183 self._abort(error.ResponseError(_("unexpected response:"), d)) |
|
184 |
|
185 @batchable |
|
186 def known(self, nodes): |
|
187 f = future() |
|
188 yield {'nodes': wireprototypes.encodelist(nodes)}, f |
|
189 d = f.value |
|
190 try: |
|
191 yield [bool(int(b)) for b in d] |
|
192 except ValueError: |
|
193 self._abort(error.ResponseError(_("unexpected response:"), d)) |
|
194 |
|
195 @batchable |
|
196 def branchmap(self): |
|
197 f = future() |
|
198 yield {}, f |
|
199 d = f.value |
|
200 try: |
|
201 branchmap = {} |
|
202 for branchpart in d.splitlines(): |
|
203 branchname, branchheads = branchpart.split(' ', 1) |
|
204 branchname = encoding.tolocal(urlreq.unquote(branchname)) |
|
205 branchheads = wireprototypes.decodelist(branchheads) |
|
206 branchmap[branchname] = branchheads |
|
207 yield branchmap |
|
208 except TypeError: |
|
209 self._abort(error.ResponseError(_("unexpected response:"), d)) |
|
210 |
|
211 @batchable |
|
212 def listkeys(self, namespace): |
|
213 if not self.capable('pushkey'): |
|
214 yield {}, None |
|
215 f = future() |
|
216 self.ui.debug('preparing listkeys for "%s"\n' % namespace) |
|
217 yield {'namespace': encoding.fromlocal(namespace)}, f |
|
218 d = f.value |
|
219 self.ui.debug('received listkey for "%s": %i bytes\n' |
|
220 % (namespace, len(d))) |
|
221 yield pushkeymod.decodekeys(d) |
|
222 |
|
223 @batchable |
|
224 def pushkey(self, namespace, key, old, new): |
|
225 if not self.capable('pushkey'): |
|
226 yield False, None |
|
227 f = future() |
|
228 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key)) |
|
229 yield {'namespace': encoding.fromlocal(namespace), |
|
230 'key': encoding.fromlocal(key), |
|
231 'old': encoding.fromlocal(old), |
|
232 'new': encoding.fromlocal(new)}, f |
|
233 d = f.value |
|
234 d, output = d.split('\n', 1) |
|
235 try: |
|
236 d = bool(int(d)) |
|
237 except ValueError: |
|
238 raise error.ResponseError( |
|
239 _('push failed (unexpected response):'), d) |
|
240 for l in output.splitlines(True): |
|
241 self.ui.status(_('remote: '), l) |
|
242 yield d |
|
243 |
|
244 def stream_out(self): |
|
245 return self._callstream('stream_out') |
|
246 |
|
247 def getbundle(self, source, **kwargs): |
|
248 kwargs = pycompat.byteskwargs(kwargs) |
|
249 self.requirecap('getbundle', _('look up remote changes')) |
|
250 opts = {} |
|
251 bundlecaps = kwargs.get('bundlecaps') or set() |
|
252 for key, value in kwargs.iteritems(): |
|
253 if value is None: |
|
254 continue |
|
255 keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key) |
|
256 if keytype is None: |
|
257 raise error.ProgrammingError( |
|
258 'Unexpectedly None keytype for key %s' % key) |
|
259 elif keytype == 'nodes': |
|
260 value = wireprototypes.encodelist(value) |
|
261 elif keytype == 'csv': |
|
262 value = ','.join(value) |
|
263 elif keytype == 'scsv': |
|
264 value = ','.join(sorted(value)) |
|
265 elif keytype == 'boolean': |
|
266 value = '%i' % bool(value) |
|
267 elif keytype != 'plain': |
|
268 raise KeyError('unknown getbundle option type %s' |
|
269 % keytype) |
|
270 opts[key] = value |
|
271 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts)) |
|
272 if any((cap.startswith('HG2') for cap in bundlecaps)): |
|
273 return bundle2.getunbundler(self.ui, f) |
|
274 else: |
|
275 return changegroupmod.cg1unpacker(f, 'UN') |
|
276 |
|
277 def unbundle(self, cg, heads, url): |
|
278 '''Send cg (a readable file-like object representing the |
|
279 changegroup to push, typically a chunkbuffer object) to the |
|
280 remote server as a bundle. |
|
281 |
|
282 When pushing a bundle10 stream, return an integer indicating the |
|
283 result of the push (see changegroup.apply()). |
|
284 |
|
285 When pushing a bundle20 stream, return a bundle20 stream. |
|
286 |
|
287 `url` is the url the client thinks it's pushing to, which is |
|
288 visible to hooks. |
|
289 ''' |
|
290 |
|
291 if heads != ['force'] and self.capable('unbundlehash'): |
|
292 heads = wireprototypes.encodelist( |
|
293 ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()]) |
|
294 else: |
|
295 heads = wireprototypes.encodelist(heads) |
|
296 |
|
297 if util.safehasattr(cg, 'deltaheader'): |
|
298 # this a bundle10, do the old style call sequence |
|
299 ret, output = self._callpush("unbundle", cg, heads=heads) |
|
300 if ret == "": |
|
301 raise error.ResponseError( |
|
302 _('push failed:'), output) |
|
303 try: |
|
304 ret = int(ret) |
|
305 except ValueError: |
|
306 raise error.ResponseError( |
|
307 _('push failed (unexpected response):'), ret) |
|
308 |
|
309 for l in output.splitlines(True): |
|
310 self.ui.status(_('remote: '), l) |
|
311 else: |
|
312 # bundle2 push. Send a stream, fetch a stream. |
|
313 stream = self._calltwowaystream('unbundle', cg, heads=heads) |
|
314 ret = bundle2.getunbundler(self.ui, stream) |
|
315 return ret |
|
316 |
|
317 # End of ipeercommands interface. |
|
318 |
|
319 # Begin of ipeerlegacycommands interface. |
|
320 |
|
321 def branches(self, nodes): |
|
322 n = wireprototypes.encodelist(nodes) |
|
323 d = self._call("branches", nodes=n) |
|
324 try: |
|
325 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()] |
|
326 return br |
|
327 except ValueError: |
|
328 self._abort(error.ResponseError(_("unexpected response:"), d)) |
|
329 |
|
330 def between(self, pairs): |
|
331 batch = 8 # avoid giant requests |
|
332 r = [] |
|
333 for i in xrange(0, len(pairs), batch): |
|
334 n = " ".join([wireprototypes.encodelist(p, '-') |
|
335 for p in pairs[i:i + batch]]) |
|
336 d = self._call("between", pairs=n) |
|
337 try: |
|
338 r.extend(l and wireprototypes.decodelist(l) or [] |
|
339 for l in d.splitlines()) |
|
340 except ValueError: |
|
341 self._abort(error.ResponseError(_("unexpected response:"), d)) |
|
342 return r |
|
343 |
|
344 def changegroup(self, nodes, kind): |
|
345 n = wireprototypes.encodelist(nodes) |
|
346 f = self._callcompressable("changegroup", roots=n) |
|
347 return changegroupmod.cg1unpacker(f, 'UN') |
|
348 |
|
349 def changegroupsubset(self, bases, heads, kind): |
|
350 self.requirecap('changegroupsubset', _('look up remote changes')) |
|
351 bases = wireprototypes.encodelist(bases) |
|
352 heads = wireprototypes.encodelist(heads) |
|
353 f = self._callcompressable("changegroupsubset", |
|
354 bases=bases, heads=heads) |
|
355 return changegroupmod.cg1unpacker(f, 'UN') |
|
356 |
|
357 # End of ipeerlegacycommands interface. |
|
358 |
|
359 def _submitbatch(self, req): |
|
360 """run batch request <req> on the server |
|
361 |
|
362 Returns an iterator of the raw responses from the server. |
|
363 """ |
|
364 ui = self.ui |
|
365 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'): |
|
366 ui.debug('devel-peer-request: batched-content\n') |
|
367 for op, args in req: |
|
368 msg = 'devel-peer-request: - %s (%d arguments)\n' |
|
369 ui.debug(msg % (op, len(args))) |
|
370 |
|
371 unescapearg = wireprototypes.unescapebatcharg |
|
372 |
|
373 rsp = self._callstream("batch", cmds=encodebatchcmds(req)) |
|
374 chunk = rsp.read(1024) |
|
375 work = [chunk] |
|
376 while chunk: |
|
377 while ';' not in chunk and chunk: |
|
378 chunk = rsp.read(1024) |
|
379 work.append(chunk) |
|
380 merged = ''.join(work) |
|
381 while ';' in merged: |
|
382 one, merged = merged.split(';', 1) |
|
383 yield unescapearg(one) |
|
384 chunk = rsp.read(1024) |
|
385 work = [merged, chunk] |
|
386 yield unescapearg(''.join(work)) |
|
387 |
|
388 def _submitone(self, op, args): |
|
389 return self._call(op, **pycompat.strkwargs(args)) |
|
390 |
|
391 def debugwireargs(self, one, two, three=None, four=None, five=None): |
|
392 # don't pass optional arguments left at their default value |
|
393 opts = {} |
|
394 if three is not None: |
|
395 opts[r'three'] = three |
|
396 if four is not None: |
|
397 opts[r'four'] = four |
|
398 return self._call('debugwireargs', one=one, two=two, **opts) |
|
399 |
|
400 def _call(self, cmd, **args): |
|
401 """execute <cmd> on the server |
|
402 |
|
403 The command is expected to return a simple string. |
|
404 |
|
405 returns the server reply as a string.""" |
|
406 raise NotImplementedError() |
|
407 |
|
408 def _callstream(self, cmd, **args): |
|
409 """execute <cmd> on the server |
|
410 |
|
411 The command is expected to return a stream. Note that if the |
|
412 command doesn't return a stream, _callstream behaves |
|
413 differently for ssh and http peers. |
|
414 |
|
415 returns the server reply as a file like object. |
|
416 """ |
|
417 raise NotImplementedError() |
|
418 |
|
419 def _callcompressable(self, cmd, **args): |
|
420 """execute <cmd> on the server |
|
421 |
|
422 The command is expected to return a stream. |
|
423 |
|
424 The stream may have been compressed in some implementations. This |
|
425 function takes care of the decompression. This is the only difference |
|
426 with _callstream. |
|
427 |
|
428 returns the server reply as a file like object. |
|
429 """ |
|
430 raise NotImplementedError() |
|
431 |
|
432 def _callpush(self, cmd, fp, **args): |
|
433 """execute a <cmd> on server |
|
434 |
|
435 The command is expected to be related to a push. Push has a special |
|
436 return method. |
|
437 |
|
438 returns the server reply as a (ret, output) tuple. ret is either |
|
439 empty (error) or a stringified int. |
|
440 """ |
|
441 raise NotImplementedError() |
|
442 |
|
443 def _calltwowaystream(self, cmd, fp, **args): |
|
444 """execute <cmd> on server |
|
445 |
|
446 The command will send a stream to the server and get a stream in reply. |
|
447 """ |
|
448 raise NotImplementedError() |
|
449 |
|
450 def _abort(self, exception): |
|
451 """clearly abort the wire protocol connection and raise the exception |
|
452 """ |
|
453 raise NotImplementedError() |
|
454 |
|
455 # server side |
|
456 |
57 |
457 # wire protocol command can either return a string or one of these classes. |
58 # wire protocol command can either return a string or one of these classes. |
458 |
59 |
459 def getdispatchrepo(repo, proto, command): |
60 def getdispatchrepo(repo, proto, command): |
460 """Obtain the repo used for processing wire protocol commands. |
61 """Obtain the repo used for processing wire protocol commands. |