diff -r ad1c07008e0b -r a656cba08a04 mercurial/httppeer.py --- a/mercurial/httppeer.py Sat Apr 14 11:49:57 2018 -0700 +++ b/mercurial/httppeer.py Sat Apr 14 11:50:19 2018 -0700 @@ -13,7 +13,6 @@ import os import socket import struct -import sys import tempfile import weakref @@ -36,6 +35,7 @@ wireprotoframing, wireprototypes, wireprotov1peer, + wireprotov2peer, wireprotov2server, ) @@ -522,6 +522,8 @@ reactor = wireprotoframing.clientreactor(hasmultiplesend=False, buffersends=True) + handler = wireprotov2peer.clienthandler(ui, reactor) + url = '%s/%s' % (apiurl, permission) if len(requests) > 1: @@ -529,20 +531,11 @@ else: url += '/%s' % requests[0][0] - # Request ID to (request, future) - requestmap = {} - for command, args, f in requests: - request, action, meta = reactor.callcommand(command, args) - assert action == 'noop' - - requestmap[request.requestid] = (request, f) - - action, meta = reactor.flushcommands() - assert action == 'sendframes' + assert not list(handler.callcommand(command, args, f)) # TODO stream this. - body = b''.join(map(bytes, meta['framegen'])) + body = b''.join(map(bytes, handler.flushcommands())) # TODO modify user-agent to reflect v2 headers = { @@ -564,7 +557,7 @@ ui.traceback() raise IOError(None, e) - return reactor, requestmap, res + return handler, res class queuedcommandfuture(pycompat.futures.Future): """Wraps result() on command futures to trigger submission on call.""" @@ -684,7 +677,7 @@ 'pull': 'ro', }[permissions.pop()] - reactor, requests, resp = sendv2request( + handler, resp = sendv2request( self._ui, self._opener, self._requestbuilder, self._apiurl, permission, calls) @@ -692,9 +685,7 @@ self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) self._responsef = self._responseexecutor.submit(self._handleresponse, - reactor, - requests, - resp) + handler, resp) def close(self): if self._closed: @@ -723,62 +714,11 @@ self._futures = None - def _handleresponse(self, reactor, requests, resp): + def _handleresponse(self, handler, resp): # Called in a thread to read the response. - results = {k: [] for k in requests} - - while True: - frame = wireprotoframing.readframe(resp) - if frame is None: - break - - self._ui.note(_('received %r\n') % frame) - - # Guard against receiving a frame with a request ID that we - # didn't issue. This should never happen. - request, f = requests.get(frame.requestid, [None, None]) - - action, meta = reactor.onframerecv(frame) - - if action == 'responsedata': - assert request.requestid == meta['request'].requestid - - result = results[request.requestid] - - if meta['cbor']: - payload = util.bytesio(meta['data']) - - decoder = cbor.CBORDecoder(payload) - while payload.tell() + 1 < len(meta['data']): - try: - result.append(decoder.decode()) - except Exception: - pycompat.future_set_exception_info( - f, sys.exc_info()[1:]) - continue - else: - result.append(meta['data']) - - if meta['eos']: - f.set_result(result) - del results[request.requestid] - - elif action == 'error': - e = error.RepoError(meta['message']) - - if f: - f.set_exception(e) - else: - raise e - - else: - e = error.ProgrammingError('unhandled action: %s' % action) - - if f: - f.set_exception(e) - else: - raise e + while handler.readframe(resp): + pass # TODO implement interface for version 2 peers @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities,