Mercurial > public > mercurial-scm > hg
comparison mercurial/httppeer.py @ 37558:8a73132214a3
httppeer: support protocol upgrade
With the new handshake defined and in place on the server, we can
now implement it on the client.
The HTTP handshake mechanism has been taught to add headers advertising
its support for the new capabilities response. Response handling
has been adjusted to allow CBOR responses through. And makepeer()
has been taught to instantiate a mutually supported peer.
The HTTPv2 peer class doesn't implement the full peer interface. So
HTTPv2 is not yet usable as a peer.
Like the server side, we support registering handlers for
different API services. This allows extensions to easily implement
API services and peers. A practical use case for this is to
provide a previous implementation of the experimental version 2
wire protocol to a future version of Mercurial. We know there will
be BC breaks after 4.6 ships. But someone could take the peer and
server code from 4.6, drop it in an extension, and allow its use
indefinitely.
Differential Revision: https://phab.mercurial-scm.org/D3243
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Tue, 10 Apr 2018 18:16:47 -0700 |
parents | b77aa48ba690 |
children | 01bfe5ad0c53 |
comparison
equal
deleted
inserted
replaced
37557:734515aca84d | 37558:8a73132214a3 |
---|---|
27 statichttprepo, | 27 statichttprepo, |
28 url as urlmod, | 28 url as urlmod, |
29 util, | 29 util, |
30 wireproto, | 30 wireproto, |
31 wireprotoframing, | 31 wireprotoframing, |
32 wireprototypes, | |
32 wireprotov2server, | 33 wireprotov2server, |
33 ) | 34 ) |
34 | 35 |
35 httplib = util.httplib | 36 httplib = util.httplib |
36 urlerr = util.urlerr | 37 urlerr = util.urlerr |
309 # Insert error handlers for common I/O failures. | 310 # Insert error handlers for common I/O failures. |
310 _wraphttpresponse(res) | 311 _wraphttpresponse(res) |
311 | 312 |
312 return res | 313 return res |
313 | 314 |
314 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible): | 315 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible, |
316 allowcbor=False): | |
315 # record the url we got redirected to | 317 # record the url we got redirected to |
316 respurl = pycompat.bytesurl(resp.geturl()) | 318 respurl = pycompat.bytesurl(resp.geturl()) |
317 if respurl.endswith(qs): | 319 if respurl.endswith(qs): |
318 respurl = respurl[:-len(qs)] | 320 respurl = respurl[:-len(qs)] |
319 if baseurl.rstrip('/') != respurl.rstrip('/'): | 321 if baseurl.rstrip('/') != respurl.rstrip('/'): |
337 _("'%s' does not appear to be an hg repository:\n" | 339 _("'%s' does not appear to be an hg repository:\n" |
338 "---%%<--- (%s)\n%s\n---%%<---\n") | 340 "---%%<--- (%s)\n%s\n---%%<---\n") |
339 % (safeurl, proto or 'no content-type', resp.read(1024))) | 341 % (safeurl, proto or 'no content-type', resp.read(1024))) |
340 | 342 |
341 try: | 343 try: |
342 version = proto.split('-', 1)[1] | 344 subtype = proto.split('-', 1)[1] |
343 version_info = tuple([int(n) for n in version.split('.')]) | 345 |
346 # Unless we end up supporting CBOR in the legacy wire protocol, | |
347 # this should ONLY be encountered for the initial capabilities | |
348 # request during handshake. | |
349 if subtype == 'cbor': | |
350 if allowcbor: | |
351 return respurl, proto, resp | |
352 else: | |
353 raise error.RepoError(_('unexpected CBOR response from ' | |
354 'server')) | |
355 | |
356 version_info = tuple([int(n) for n in subtype.split('.')]) | |
344 except ValueError: | 357 except ValueError: |
345 raise error.RepoError(_("'%s' sent a broken Content-Type " | 358 raise error.RepoError(_("'%s' sent a broken Content-Type " |
346 "header (%s)") % (safeurl, proto)) | 359 "header (%s)") % (safeurl, proto)) |
347 | 360 |
348 # TODO consider switching to a decompression reader that uses | 361 # TODO consider switching to a decompression reader that uses |
359 engine = util.compengines.forwiretype(ename) | 372 engine = util.compengines.forwiretype(ename) |
360 | 373 |
361 resp = engine.decompressorreader(resp) | 374 resp = engine.decompressorreader(resp) |
362 else: | 375 else: |
363 raise error.RepoError(_("'%s' uses newer protocol %s") % | 376 raise error.RepoError(_("'%s' uses newer protocol %s") % |
364 (safeurl, version)) | 377 (safeurl, subtype)) |
365 | 378 |
366 return respurl, resp | 379 return respurl, proto, resp |
367 | 380 |
368 class httppeer(wireproto.wirepeer): | 381 class httppeer(wireproto.wirepeer): |
369 def __init__(self, ui, path, url, opener, requestbuilder, caps): | 382 def __init__(self, ui, path, url, opener, requestbuilder, caps): |
370 self.ui = ui | 383 self.ui = ui |
371 self._path = path | 384 self._path = path |
414 self._caps, self.capable, | 427 self._caps, self.capable, |
415 self._url, cmd, args) | 428 self._url, cmd, args) |
416 | 429 |
417 resp = sendrequest(self.ui, self._urlopener, req) | 430 resp = sendrequest(self.ui, self._urlopener, req) |
418 | 431 |
419 self._url, resp = parsev1commandresponse(self.ui, self._url, cu, qs, | 432 self._url, ct, resp = parsev1commandresponse(self.ui, self._url, cu, qs, |
420 resp, _compressible) | 433 resp, _compressible) |
421 | 434 |
422 return resp | 435 return resp |
423 | 436 |
424 def _call(self, cmd, **args): | 437 def _call(self, cmd, **args): |
425 fp = self._callstream(cmd, **args) | 438 fp = self._callstream(cmd, **args) |
499 def _abort(self, exception): | 512 def _abort(self, exception): |
500 raise exception | 513 raise exception |
501 | 514 |
502 # TODO implement interface for version 2 peers | 515 # TODO implement interface for version 2 peers |
503 class httpv2peer(object): | 516 class httpv2peer(object): |
504 def __init__(self, ui, repourl, opener): | 517 def __init__(self, ui, repourl, apipath, opener, requestbuilder, |
518 apidescriptor): | |
505 self.ui = ui | 519 self.ui = ui |
506 | 520 |
507 if repourl.endswith('/'): | 521 if repourl.endswith('/'): |
508 repourl = repourl[:-1] | 522 repourl = repourl[:-1] |
509 | 523 |
510 self.url = repourl | 524 self.url = repourl |
525 self._apipath = apipath | |
511 self._opener = opener | 526 self._opener = opener |
512 # This is an its own attribute to facilitate extensions overriding | 527 self._requestbuilder = requestbuilder |
513 # the default type. | 528 self._descriptor = apidescriptor |
514 self._requestbuilder = urlreq.request | |
515 | 529 |
516 def close(self): | 530 def close(self): |
517 pass | 531 pass |
518 | 532 |
519 # TODO require to be part of a batched primitive, use futures. | 533 # TODO require to be part of a batched primitive, use futures. |
538 permission = { | 552 permission = { |
539 'push': 'rw', | 553 'push': 'rw', |
540 'pull': 'ro', | 554 'pull': 'ro', |
541 }[permission] | 555 }[permission] |
542 | 556 |
543 url = '%s/api/%s/%s/%s' % (self.url, wireprotov2server.HTTPV2, | 557 url = '%s/%s/%s/%s' % (self.url, self._apipath, permission, name) |
544 permission, name) | |
545 | 558 |
546 # TODO this should be part of a generic peer for the frame-based | 559 # TODO this should be part of a generic peer for the frame-based |
547 # protocol. | 560 # protocol. |
548 reactor = wireprotoframing.clientreactor(hasmultiplesend=False, | 561 reactor = wireprotoframing.clientreactor(hasmultiplesend=False, |
549 buffersends=True) | 562 buffersends=True) |
595 else: | 608 else: |
596 error.ProgrammingError('unhandled action: %s' % action) | 609 error.ProgrammingError('unhandled action: %s' % action) |
597 | 610 |
598 return results | 611 return results |
599 | 612 |
613 # Registry of API service names to metadata about peers that handle it. | |
614 # | |
615 # The following keys are meaningful: | |
616 # | |
617 # init | |
618 # Callable receiving (ui, repourl, servicepath, opener, requestbuilder, | |
619 # apidescriptor) to create a peer. | |
620 # | |
621 # priority | |
622 # Integer priority for the service. If we could choose from multiple | |
623 # services, we choose the one with the highest priority. | |
624 API_PEERS = { | |
625 wireprototypes.HTTPV2: { | |
626 'init': httpv2peer, | |
627 'priority': 50, | |
628 }, | |
629 } | |
630 | |
600 def performhandshake(ui, url, opener, requestbuilder): | 631 def performhandshake(ui, url, opener, requestbuilder): |
601 # The handshake is a request to the capabilities command. | 632 # The handshake is a request to the capabilities command. |
602 | 633 |
603 caps = None | 634 caps = None |
604 def capable(x): | 635 def capable(x): |
605 raise error.ProgrammingError('should not be called') | 636 raise error.ProgrammingError('should not be called') |
606 | 637 |
638 args = {} | |
639 | |
640 # The client advertises support for newer protocols by adding an | |
641 # X-HgUpgrade-* header with a list of supported APIs and an | |
642 # X-HgProto-* header advertising which serializing formats it supports. | |
643 # We only support the HTTP version 2 transport and CBOR responses for | |
644 # now. | |
645 advertisev2 = ui.configbool('experimental', 'httppeer.advertise-v2') | |
646 | |
647 if advertisev2: | |
648 args['headers'] = { | |
649 r'X-HgProto-1': r'cbor', | |
650 } | |
651 | |
652 args['headers'].update( | |
653 encodevalueinheaders(' '.join(sorted(API_PEERS)), | |
654 'X-HgUpgrade', | |
655 # We don't know the header limit this early. | |
656 # So make it small. | |
657 1024)) | |
658 | |
607 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps, | 659 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps, |
608 capable, url, 'capabilities', | 660 capable, url, 'capabilities', |
609 {}) | 661 args) |
610 | 662 |
611 resp = sendrequest(ui, opener, req) | 663 resp = sendrequest(ui, opener, req) |
612 | 664 |
613 respurl, resp = parsev1commandresponse(ui, url, requrl, qs, resp, | 665 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp, |
614 compressible=False) | 666 compressible=False, |
667 allowcbor=advertisev2) | |
615 | 668 |
616 try: | 669 try: |
617 rawcaps = resp.read() | 670 rawdata = resp.read() |
618 finally: | 671 finally: |
619 resp.close() | 672 resp.close() |
620 | 673 |
621 return respurl, set(rawcaps.split()) | 674 if not ct.startswith('application/mercurial-'): |
675 raise error.ProgrammingError('unexpected content-type: %s' % ct) | |
676 | |
677 if advertisev2: | |
678 if ct == 'application/mercurial-cbor': | |
679 try: | |
680 info = cbor.loads(rawdata) | |
681 except cbor.CBORDecodeError: | |
682 raise error.Abort(_('error decoding CBOR from remote server'), | |
683 hint=_('try again and consider contacting ' | |
684 'the server operator')) | |
685 | |
686 # We got a legacy response. That's fine. | |
687 elif ct in ('application/mercurial-0.1', 'application/mercurial-0.2'): | |
688 info = { | |
689 'v1capabilities': set(rawdata.split()) | |
690 } | |
691 | |
692 else: | |
693 raise error.RepoError( | |
694 _('unexpected response type from server: %s') % ct) | |
695 else: | |
696 info = { | |
697 'v1capabilities': set(rawdata.split()) | |
698 } | |
699 | |
700 return respurl, info | |
622 | 701 |
623 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request): | 702 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request): |
624 """Construct an appropriate HTTP peer instance. | 703 """Construct an appropriate HTTP peer instance. |
625 | 704 |
626 ``opener`` is an ``url.opener`` that should be used to establish | 705 ``opener`` is an ``url.opener`` that should be used to establish |
638 url, authinfo = u.authinfo() | 717 url, authinfo = u.authinfo() |
639 ui.debug('using %s\n' % url) | 718 ui.debug('using %s\n' % url) |
640 | 719 |
641 opener = opener or urlmod.opener(ui, authinfo) | 720 opener = opener or urlmod.opener(ui, authinfo) |
642 | 721 |
643 respurl, caps = performhandshake(ui, url, opener, requestbuilder) | 722 respurl, info = performhandshake(ui, url, opener, requestbuilder) |
644 | 723 |
645 return httppeer(ui, path, respurl, opener, requestbuilder, caps) | 724 # Given the intersection of APIs that both we and the server support, |
725 # sort by their advertised priority and pick the first one. | |
726 # | |
727 # TODO consider making this request-based and interface driven. For | |
728 # example, the caller could say "I want a peer that does X." It's quite | |
729 # possible that not all peers would do that. Since we know the service | |
730 # capabilities, we could filter out services not meeting the | |
731 # requirements. Possibly by consulting the interfaces defined by the | |
732 # peer type. | |
733 apipeerchoices = set(info.get('apis', {}).keys()) & set(API_PEERS.keys()) | |
734 | |
735 preferredchoices = sorted(apipeerchoices, | |
736 key=lambda x: API_PEERS[x]['priority'], | |
737 reverse=True) | |
738 | |
739 for service in preferredchoices: | |
740 apipath = '%s/%s' % (info['apibase'].rstrip('/'), service) | |
741 | |
742 return API_PEERS[service]['init'](ui, respurl, apipath, opener, | |
743 requestbuilder, | |
744 info['apis'][service]) | |
745 | |
746 # Failed to construct an API peer. Fall back to legacy. | |
747 return httppeer(ui, path, respurl, opener, requestbuilder, | |
748 info['v1capabilities']) | |
646 | 749 |
647 def instance(ui, path, create): | 750 def instance(ui, path, create): |
648 if create: | 751 if create: |
649 raise error.Abort(_('cannot create new http repository')) | 752 raise error.Abort(_('cannot create new http repository')) |
650 try: | 753 try: |