Mercurial > public > mercurial-scm > hg
comparison mercurial/httppeer.py @ 37719:a656cba08a04
wireprotov2: move response handling out of httppeer
And fix some bugs while we're here.
The code for processing response data from the unified framing
protocol is mostly peer agnostic. The peer-specific bits are the
configuration of the client reactor and how I/O is performed. I
initially implemented things in httppeer for expediency.
This commit establishes a module for holding the peer API level
code for the framing based protocol. Inside this module we have
a class to help coordinate higher-level activities, such as managing
response object.
The client handler bits could be rolled into clientreactor. However,
I want clientreactor to be sans I/O and I want it to only be
concerned with protocol-level details, not higher-level concepts
like how protocol events are converted into peer API concepts. I
want clientreactor to receive a frame and then tell the caller what
should probably be done about it. If we start putting things like
future resolution into clientreactor, we'll constrain how the protocol
can be used (e.g. by requiring futures).
The new code is loosely based on what was in httppeer before. I
changed things a bit around response handling. We now buffer the
entire response "body" and then handle it as one atomic unit. This
fixed a bug around decoding CBOR data that spanned multiple frames.
I also fixed an off-by-one bug where we failed to read a single byte
CBOR value at the end of the stream. That's why tests have changed.
The new state of httppeer is much cleaner. It is largely agnostic
about framing protocol implementation details. That's how it should
be: the framing protocol is designed to be largely transport
agnostic. We want peers merely putting bytes on the wire and telling
the framing protocol where to read response data from.
There's still a bit of work to be done here, especially for
representing responses. But at least we're a step closer to having a
higher-level peer interface that can be plugged into the SSH peer
someday.
I initially added this class to wireprotoframing. However, we'll
eventually need version 2 specific functions to convert CBOR responses
into data structures expected by the code calling commands. This
needs to live somewhere. Since that code would be shared across peers,
we need a common module. We have wireprotov1peer for the equivalent
version 1 code. So I decided to establish wireprotov2peer.
Differential Revision: https://phab.mercurial-scm.org/D3379
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Sat, 14 Apr 2018 11:50:19 -0700 |
parents | 0664be4f0c1f |
children | e10b695b9c41 |
comparison
equal
deleted
inserted
replaced
37718:ad1c07008e0b | 37719:a656cba08a04 |
---|---|
11 import errno | 11 import errno |
12 import io | 12 import io |
13 import os | 13 import os |
14 import socket | 14 import socket |
15 import struct | 15 import struct |
16 import sys | |
17 import tempfile | 16 import tempfile |
18 import weakref | 17 import weakref |
19 | 18 |
20 from .i18n import _ | 19 from .i18n import _ |
21 from .thirdparty import ( | 20 from .thirdparty import ( |
34 url as urlmod, | 33 url as urlmod, |
35 util, | 34 util, |
36 wireprotoframing, | 35 wireprotoframing, |
37 wireprototypes, | 36 wireprototypes, |
38 wireprotov1peer, | 37 wireprotov1peer, |
38 wireprotov2peer, | |
39 wireprotov2server, | 39 wireprotov2server, |
40 ) | 40 ) |
41 | 41 |
42 httplib = util.httplib | 42 httplib = util.httplib |
43 urlerr = util.urlerr | 43 urlerr = util.urlerr |
520 | 520 |
521 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests): | 521 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests): |
522 reactor = wireprotoframing.clientreactor(hasmultiplesend=False, | 522 reactor = wireprotoframing.clientreactor(hasmultiplesend=False, |
523 buffersends=True) | 523 buffersends=True) |
524 | 524 |
525 handler = wireprotov2peer.clienthandler(ui, reactor) | |
526 | |
525 url = '%s/%s' % (apiurl, permission) | 527 url = '%s/%s' % (apiurl, permission) |
526 | 528 |
527 if len(requests) > 1: | 529 if len(requests) > 1: |
528 url += '/multirequest' | 530 url += '/multirequest' |
529 else: | 531 else: |
530 url += '/%s' % requests[0][0] | 532 url += '/%s' % requests[0][0] |
531 | 533 |
532 # Request ID to (request, future) | |
533 requestmap = {} | |
534 | |
535 for command, args, f in requests: | 534 for command, args, f in requests: |
536 request, action, meta = reactor.callcommand(command, args) | 535 assert not list(handler.callcommand(command, args, f)) |
537 assert action == 'noop' | |
538 | |
539 requestmap[request.requestid] = (request, f) | |
540 | |
541 action, meta = reactor.flushcommands() | |
542 assert action == 'sendframes' | |
543 | 536 |
544 # TODO stream this. | 537 # TODO stream this. |
545 body = b''.join(map(bytes, meta['framegen'])) | 538 body = b''.join(map(bytes, handler.flushcommands())) |
546 | 539 |
547 # TODO modify user-agent to reflect v2 | 540 # TODO modify user-agent to reflect v2 |
548 headers = { | 541 headers = { |
549 r'Accept': wireprotov2server.FRAMINGTYPE, | 542 r'Accept': wireprotov2server.FRAMINGTYPE, |
550 r'Content-Type': wireprotov2server.FRAMINGTYPE, | 543 r'Content-Type': wireprotov2server.FRAMINGTYPE, |
562 raise | 555 raise |
563 except httplib.HTTPException as e: | 556 except httplib.HTTPException as e: |
564 ui.traceback() | 557 ui.traceback() |
565 raise IOError(None, e) | 558 raise IOError(None, e) |
566 | 559 |
567 return reactor, requestmap, res | 560 return handler, res |
568 | 561 |
569 class queuedcommandfuture(pycompat.futures.Future): | 562 class queuedcommandfuture(pycompat.futures.Future): |
570 """Wraps result() on command futures to trigger submission on call.""" | 563 """Wraps result() on command futures to trigger submission on call.""" |
571 | 564 |
572 def result(self, timeout=None): | 565 def result(self, timeout=None): |
682 permission = { | 675 permission = { |
683 'push': 'rw', | 676 'push': 'rw', |
684 'pull': 'ro', | 677 'pull': 'ro', |
685 }[permissions.pop()] | 678 }[permissions.pop()] |
686 | 679 |
687 reactor, requests, resp = sendv2request( | 680 handler, resp = sendv2request( |
688 self._ui, self._opener, self._requestbuilder, self._apiurl, | 681 self._ui, self._opener, self._requestbuilder, self._apiurl, |
689 permission, calls) | 682 permission, calls) |
690 | 683 |
691 # TODO we probably want to validate the HTTP code, media type, etc. | 684 # TODO we probably want to validate the HTTP code, media type, etc. |
692 | 685 |
693 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) | 686 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) |
694 self._responsef = self._responseexecutor.submit(self._handleresponse, | 687 self._responsef = self._responseexecutor.submit(self._handleresponse, |
695 reactor, | 688 handler, resp) |
696 requests, | |
697 resp) | |
698 | 689 |
699 def close(self): | 690 def close(self): |
700 if self._closed: | 691 if self._closed: |
701 return | 692 return |
702 | 693 |
721 f.set_exception(error.ResponseError( | 712 f.set_exception(error.ResponseError( |
722 _('unfulfilled command response'))) | 713 _('unfulfilled command response'))) |
723 | 714 |
724 self._futures = None | 715 self._futures = None |
725 | 716 |
726 def _handleresponse(self, reactor, requests, resp): | 717 def _handleresponse(self, handler, resp): |
727 # Called in a thread to read the response. | 718 # Called in a thread to read the response. |
728 | 719 |
729 results = {k: [] for k in requests} | 720 while handler.readframe(resp): |
730 | 721 pass |
731 while True: | |
732 frame = wireprotoframing.readframe(resp) | |
733 if frame is None: | |
734 break | |
735 | |
736 self._ui.note(_('received %r\n') % frame) | |
737 | |
738 # Guard against receiving a frame with a request ID that we | |
739 # didn't issue. This should never happen. | |
740 request, f = requests.get(frame.requestid, [None, None]) | |
741 | |
742 action, meta = reactor.onframerecv(frame) | |
743 | |
744 if action == 'responsedata': | |
745 assert request.requestid == meta['request'].requestid | |
746 | |
747 result = results[request.requestid] | |
748 | |
749 if meta['cbor']: | |
750 payload = util.bytesio(meta['data']) | |
751 | |
752 decoder = cbor.CBORDecoder(payload) | |
753 while payload.tell() + 1 < len(meta['data']): | |
754 try: | |
755 result.append(decoder.decode()) | |
756 except Exception: | |
757 pycompat.future_set_exception_info( | |
758 f, sys.exc_info()[1:]) | |
759 continue | |
760 else: | |
761 result.append(meta['data']) | |
762 | |
763 if meta['eos']: | |
764 f.set_result(result) | |
765 del results[request.requestid] | |
766 | |
767 elif action == 'error': | |
768 e = error.RepoError(meta['message']) | |
769 | |
770 if f: | |
771 f.set_exception(e) | |
772 else: | |
773 raise e | |
774 | |
775 else: | |
776 e = error.ProgrammingError('unhandled action: %s' % action) | |
777 | |
778 if f: | |
779 f.set_exception(e) | |
780 else: | |
781 raise e | |
782 | 722 |
783 # TODO implement interface for version 2 peers | 723 # TODO implement interface for version 2 peers |
784 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities, | 724 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities, |
785 repository.ipeerrequests) | 725 repository.ipeerrequests) |
786 class httpv2peer(object): | 726 class httpv2peer(object): |