comparison mercurial/httppeer.py @ 37719:a656cba08a04

wireprotov2: move response handling out of httppeer And fix some bugs while we're here. The code for processing response data from the unified framing protocol is mostly peer agnostic. The peer-specific bits are the configuration of the client reactor and how I/O is performed. I initially implemented things in httppeer for expediency. This commit establishes a module for holding the peer API level code for the framing based protocol. Inside this module we have a class to help coordinate higher-level activities, such as managing response object. The client handler bits could be rolled into clientreactor. However, I want clientreactor to be sans I/O and I want it to only be concerned with protocol-level details, not higher-level concepts like how protocol events are converted into peer API concepts. I want clientreactor to receive a frame and then tell the caller what should probably be done about it. If we start putting things like future resolution into clientreactor, we'll constrain how the protocol can be used (e.g. by requiring futures). The new code is loosely based on what was in httppeer before. I changed things a bit around response handling. We now buffer the entire response "body" and then handle it as one atomic unit. This fixed a bug around decoding CBOR data that spanned multiple frames. I also fixed an off-by-one bug where we failed to read a single byte CBOR value at the end of the stream. That's why tests have changed. The new state of httppeer is much cleaner. It is largely agnostic about framing protocol implementation details. That's how it should be: the framing protocol is designed to be largely transport agnostic. We want peers merely putting bytes on the wire and telling the framing protocol where to read response data from. There's still a bit of work to be done here, especially for representing responses. But at least we're a step closer to having a higher-level peer interface that can be plugged into the SSH peer someday. I initially added this class to wireprotoframing. However, we'll eventually need version 2 specific functions to convert CBOR responses into data structures expected by the code calling commands. This needs to live somewhere. Since that code would be shared across peers, we need a common module. We have wireprotov1peer for the equivalent version 1 code. So I decided to establish wireprotov2peer. Differential Revision: https://phab.mercurial-scm.org/D3379
author Gregory Szorc <gregory.szorc@gmail.com>
date Sat, 14 Apr 2018 11:50:19 -0700
parents 0664be4f0c1f
children e10b695b9c41
comparison
equal deleted inserted replaced
37718:ad1c07008e0b 37719:a656cba08a04
11 import errno 11 import errno
12 import io 12 import io
13 import os 13 import os
14 import socket 14 import socket
15 import struct 15 import struct
16 import sys
17 import tempfile 16 import tempfile
18 import weakref 17 import weakref
19 18
20 from .i18n import _ 19 from .i18n import _
21 from .thirdparty import ( 20 from .thirdparty import (
34 url as urlmod, 33 url as urlmod,
35 util, 34 util,
36 wireprotoframing, 35 wireprotoframing,
37 wireprototypes, 36 wireprototypes,
38 wireprotov1peer, 37 wireprotov1peer,
38 wireprotov2peer,
39 wireprotov2server, 39 wireprotov2server,
40 ) 40 )
41 41
42 httplib = util.httplib 42 httplib = util.httplib
43 urlerr = util.urlerr 43 urlerr = util.urlerr
520 520
521 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests): 521 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests):
522 reactor = wireprotoframing.clientreactor(hasmultiplesend=False, 522 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
523 buffersends=True) 523 buffersends=True)
524 524
525 handler = wireprotov2peer.clienthandler(ui, reactor)
526
525 url = '%s/%s' % (apiurl, permission) 527 url = '%s/%s' % (apiurl, permission)
526 528
527 if len(requests) > 1: 529 if len(requests) > 1:
528 url += '/multirequest' 530 url += '/multirequest'
529 else: 531 else:
530 url += '/%s' % requests[0][0] 532 url += '/%s' % requests[0][0]
531 533
532 # Request ID to (request, future)
533 requestmap = {}
534
535 for command, args, f in requests: 534 for command, args, f in requests:
536 request, action, meta = reactor.callcommand(command, args) 535 assert not list(handler.callcommand(command, args, f))
537 assert action == 'noop'
538
539 requestmap[request.requestid] = (request, f)
540
541 action, meta = reactor.flushcommands()
542 assert action == 'sendframes'
543 536
544 # TODO stream this. 537 # TODO stream this.
545 body = b''.join(map(bytes, meta['framegen'])) 538 body = b''.join(map(bytes, handler.flushcommands()))
546 539
547 # TODO modify user-agent to reflect v2 540 # TODO modify user-agent to reflect v2
548 headers = { 541 headers = {
549 r'Accept': wireprotov2server.FRAMINGTYPE, 542 r'Accept': wireprotov2server.FRAMINGTYPE,
550 r'Content-Type': wireprotov2server.FRAMINGTYPE, 543 r'Content-Type': wireprotov2server.FRAMINGTYPE,
562 raise 555 raise
563 except httplib.HTTPException as e: 556 except httplib.HTTPException as e:
564 ui.traceback() 557 ui.traceback()
565 raise IOError(None, e) 558 raise IOError(None, e)
566 559
567 return reactor, requestmap, res 560 return handler, res
568 561
569 class queuedcommandfuture(pycompat.futures.Future): 562 class queuedcommandfuture(pycompat.futures.Future):
570 """Wraps result() on command futures to trigger submission on call.""" 563 """Wraps result() on command futures to trigger submission on call."""
571 564
572 def result(self, timeout=None): 565 def result(self, timeout=None):
682 permission = { 675 permission = {
683 'push': 'rw', 676 'push': 'rw',
684 'pull': 'ro', 677 'pull': 'ro',
685 }[permissions.pop()] 678 }[permissions.pop()]
686 679
687 reactor, requests, resp = sendv2request( 680 handler, resp = sendv2request(
688 self._ui, self._opener, self._requestbuilder, self._apiurl, 681 self._ui, self._opener, self._requestbuilder, self._apiurl,
689 permission, calls) 682 permission, calls)
690 683
691 # TODO we probably want to validate the HTTP code, media type, etc. 684 # TODO we probably want to validate the HTTP code, media type, etc.
692 685
693 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) 686 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
694 self._responsef = self._responseexecutor.submit(self._handleresponse, 687 self._responsef = self._responseexecutor.submit(self._handleresponse,
695 reactor, 688 handler, resp)
696 requests,
697 resp)
698 689
699 def close(self): 690 def close(self):
700 if self._closed: 691 if self._closed:
701 return 692 return
702 693
721 f.set_exception(error.ResponseError( 712 f.set_exception(error.ResponseError(
722 _('unfulfilled command response'))) 713 _('unfulfilled command response')))
723 714
724 self._futures = None 715 self._futures = None
725 716
726 def _handleresponse(self, reactor, requests, resp): 717 def _handleresponse(self, handler, resp):
727 # Called in a thread to read the response. 718 # Called in a thread to read the response.
728 719
729 results = {k: [] for k in requests} 720 while handler.readframe(resp):
730 721 pass
731 while True:
732 frame = wireprotoframing.readframe(resp)
733 if frame is None:
734 break
735
736 self._ui.note(_('received %r\n') % frame)
737
738 # Guard against receiving a frame with a request ID that we
739 # didn't issue. This should never happen.
740 request, f = requests.get(frame.requestid, [None, None])
741
742 action, meta = reactor.onframerecv(frame)
743
744 if action == 'responsedata':
745 assert request.requestid == meta['request'].requestid
746
747 result = results[request.requestid]
748
749 if meta['cbor']:
750 payload = util.bytesio(meta['data'])
751
752 decoder = cbor.CBORDecoder(payload)
753 while payload.tell() + 1 < len(meta['data']):
754 try:
755 result.append(decoder.decode())
756 except Exception:
757 pycompat.future_set_exception_info(
758 f, sys.exc_info()[1:])
759 continue
760 else:
761 result.append(meta['data'])
762
763 if meta['eos']:
764 f.set_result(result)
765 del results[request.requestid]
766
767 elif action == 'error':
768 e = error.RepoError(meta['message'])
769
770 if f:
771 f.set_exception(e)
772 else:
773 raise e
774
775 else:
776 e = error.ProgrammingError('unhandled action: %s' % action)
777
778 if f:
779 f.set_exception(e)
780 else:
781 raise e
782 722
783 # TODO implement interface for version 2 peers 723 # TODO implement interface for version 2 peers
784 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities, 724 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities,
785 repository.ipeerrequests) 725 repository.ipeerrequests)
786 class httpv2peer(object): 726 class httpv2peer(object):