comparison mercurial/httppeer.py @ 37558:8a73132214a3

httppeer: support protocol upgrade With the new handshake defined and in place on the server, we can now implement it on the client. The HTTP handshake mechanism has been taught to add headers advertising its support for the new capabilities response. Response handling has been adjusted to allow CBOR responses through. And makepeer() has been taught to instantiate a mutually supported peer. The HTTPv2 peer class doesn't implement the full peer interface. So HTTPv2 is not yet usable as a peer. Like the server side, we support registering handlers for different API services. This allows extensions to easily implement API services and peers. A practical use case for this is to provide a previous implementation of the experimental version 2 wire protocol to a future version of Mercurial. We know there will be BC breaks after 4.6 ships. But someone could take the peer and server code from 4.6, drop it in an extension, and allow its use indefinitely. Differential Revision: https://phab.mercurial-scm.org/D3243
author Gregory Szorc <gregory.szorc@gmail.com>
date Tue, 10 Apr 2018 18:16:47 -0700
parents b77aa48ba690
children 01bfe5ad0c53
comparison
equal deleted inserted replaced
37557:734515aca84d 37558:8a73132214a3
27 statichttprepo, 27 statichttprepo,
28 url as urlmod, 28 url as urlmod,
29 util, 29 util,
30 wireproto, 30 wireproto,
31 wireprotoframing, 31 wireprotoframing,
32 wireprototypes,
32 wireprotov2server, 33 wireprotov2server,
33 ) 34 )
34 35
35 httplib = util.httplib 36 httplib = util.httplib
36 urlerr = util.urlerr 37 urlerr = util.urlerr
309 # Insert error handlers for common I/O failures. 310 # Insert error handlers for common I/O failures.
310 _wraphttpresponse(res) 311 _wraphttpresponse(res)
311 312
312 return res 313 return res
313 314
314 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible): 315 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible,
316 allowcbor=False):
315 # record the url we got redirected to 317 # record the url we got redirected to
316 respurl = pycompat.bytesurl(resp.geturl()) 318 respurl = pycompat.bytesurl(resp.geturl())
317 if respurl.endswith(qs): 319 if respurl.endswith(qs):
318 respurl = respurl[:-len(qs)] 320 respurl = respurl[:-len(qs)]
319 if baseurl.rstrip('/') != respurl.rstrip('/'): 321 if baseurl.rstrip('/') != respurl.rstrip('/'):
337 _("'%s' does not appear to be an hg repository:\n" 339 _("'%s' does not appear to be an hg repository:\n"
338 "---%%<--- (%s)\n%s\n---%%<---\n") 340 "---%%<--- (%s)\n%s\n---%%<---\n")
339 % (safeurl, proto or 'no content-type', resp.read(1024))) 341 % (safeurl, proto or 'no content-type', resp.read(1024)))
340 342
341 try: 343 try:
342 version = proto.split('-', 1)[1] 344 subtype = proto.split('-', 1)[1]
343 version_info = tuple([int(n) for n in version.split('.')]) 345
346 # Unless we end up supporting CBOR in the legacy wire protocol,
347 # this should ONLY be encountered for the initial capabilities
348 # request during handshake.
349 if subtype == 'cbor':
350 if allowcbor:
351 return respurl, proto, resp
352 else:
353 raise error.RepoError(_('unexpected CBOR response from '
354 'server'))
355
356 version_info = tuple([int(n) for n in subtype.split('.')])
344 except ValueError: 357 except ValueError:
345 raise error.RepoError(_("'%s' sent a broken Content-Type " 358 raise error.RepoError(_("'%s' sent a broken Content-Type "
346 "header (%s)") % (safeurl, proto)) 359 "header (%s)") % (safeurl, proto))
347 360
348 # TODO consider switching to a decompression reader that uses 361 # TODO consider switching to a decompression reader that uses
359 engine = util.compengines.forwiretype(ename) 372 engine = util.compengines.forwiretype(ename)
360 373
361 resp = engine.decompressorreader(resp) 374 resp = engine.decompressorreader(resp)
362 else: 375 else:
363 raise error.RepoError(_("'%s' uses newer protocol %s") % 376 raise error.RepoError(_("'%s' uses newer protocol %s") %
364 (safeurl, version)) 377 (safeurl, subtype))
365 378
366 return respurl, resp 379 return respurl, proto, resp
367 380
368 class httppeer(wireproto.wirepeer): 381 class httppeer(wireproto.wirepeer):
369 def __init__(self, ui, path, url, opener, requestbuilder, caps): 382 def __init__(self, ui, path, url, opener, requestbuilder, caps):
370 self.ui = ui 383 self.ui = ui
371 self._path = path 384 self._path = path
414 self._caps, self.capable, 427 self._caps, self.capable,
415 self._url, cmd, args) 428 self._url, cmd, args)
416 429
417 resp = sendrequest(self.ui, self._urlopener, req) 430 resp = sendrequest(self.ui, self._urlopener, req)
418 431
419 self._url, resp = parsev1commandresponse(self.ui, self._url, cu, qs, 432 self._url, ct, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
420 resp, _compressible) 433 resp, _compressible)
421 434
422 return resp 435 return resp
423 436
424 def _call(self, cmd, **args): 437 def _call(self, cmd, **args):
425 fp = self._callstream(cmd, **args) 438 fp = self._callstream(cmd, **args)
499 def _abort(self, exception): 512 def _abort(self, exception):
500 raise exception 513 raise exception
501 514
502 # TODO implement interface for version 2 peers 515 # TODO implement interface for version 2 peers
503 class httpv2peer(object): 516 class httpv2peer(object):
504 def __init__(self, ui, repourl, opener): 517 def __init__(self, ui, repourl, apipath, opener, requestbuilder,
518 apidescriptor):
505 self.ui = ui 519 self.ui = ui
506 520
507 if repourl.endswith('/'): 521 if repourl.endswith('/'):
508 repourl = repourl[:-1] 522 repourl = repourl[:-1]
509 523
510 self.url = repourl 524 self.url = repourl
525 self._apipath = apipath
511 self._opener = opener 526 self._opener = opener
512 # This is an its own attribute to facilitate extensions overriding 527 self._requestbuilder = requestbuilder
513 # the default type. 528 self._descriptor = apidescriptor
514 self._requestbuilder = urlreq.request
515 529
516 def close(self): 530 def close(self):
517 pass 531 pass
518 532
519 # TODO require to be part of a batched primitive, use futures. 533 # TODO require to be part of a batched primitive, use futures.
538 permission = { 552 permission = {
539 'push': 'rw', 553 'push': 'rw',
540 'pull': 'ro', 554 'pull': 'ro',
541 }[permission] 555 }[permission]
542 556
543 url = '%s/api/%s/%s/%s' % (self.url, wireprotov2server.HTTPV2, 557 url = '%s/%s/%s/%s' % (self.url, self._apipath, permission, name)
544 permission, name)
545 558
546 # TODO this should be part of a generic peer for the frame-based 559 # TODO this should be part of a generic peer for the frame-based
547 # protocol. 560 # protocol.
548 reactor = wireprotoframing.clientreactor(hasmultiplesend=False, 561 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
549 buffersends=True) 562 buffersends=True)
595 else: 608 else:
596 error.ProgrammingError('unhandled action: %s' % action) 609 error.ProgrammingError('unhandled action: %s' % action)
597 610
598 return results 611 return results
599 612
613 # Registry of API service names to metadata about peers that handle it.
614 #
615 # The following keys are meaningful:
616 #
617 # init
618 # Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
619 # apidescriptor) to create a peer.
620 #
621 # priority
622 # Integer priority for the service. If we could choose from multiple
623 # services, we choose the one with the highest priority.
624 API_PEERS = {
625 wireprototypes.HTTPV2: {
626 'init': httpv2peer,
627 'priority': 50,
628 },
629 }
630
600 def performhandshake(ui, url, opener, requestbuilder): 631 def performhandshake(ui, url, opener, requestbuilder):
601 # The handshake is a request to the capabilities command. 632 # The handshake is a request to the capabilities command.
602 633
603 caps = None 634 caps = None
604 def capable(x): 635 def capable(x):
605 raise error.ProgrammingError('should not be called') 636 raise error.ProgrammingError('should not be called')
606 637
638 args = {}
639
640 # The client advertises support for newer protocols by adding an
641 # X-HgUpgrade-* header with a list of supported APIs and an
642 # X-HgProto-* header advertising which serializing formats it supports.
643 # We only support the HTTP version 2 transport and CBOR responses for
644 # now.
645 advertisev2 = ui.configbool('experimental', 'httppeer.advertise-v2')
646
647 if advertisev2:
648 args['headers'] = {
649 r'X-HgProto-1': r'cbor',
650 }
651
652 args['headers'].update(
653 encodevalueinheaders(' '.join(sorted(API_PEERS)),
654 'X-HgUpgrade',
655 # We don't know the header limit this early.
656 # So make it small.
657 1024))
658
607 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps, 659 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
608 capable, url, 'capabilities', 660 capable, url, 'capabilities',
609 {}) 661 args)
610 662
611 resp = sendrequest(ui, opener, req) 663 resp = sendrequest(ui, opener, req)
612 664
613 respurl, resp = parsev1commandresponse(ui, url, requrl, qs, resp, 665 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
614 compressible=False) 666 compressible=False,
667 allowcbor=advertisev2)
615 668
616 try: 669 try:
617 rawcaps = resp.read() 670 rawdata = resp.read()
618 finally: 671 finally:
619 resp.close() 672 resp.close()
620 673
621 return respurl, set(rawcaps.split()) 674 if not ct.startswith('application/mercurial-'):
675 raise error.ProgrammingError('unexpected content-type: %s' % ct)
676
677 if advertisev2:
678 if ct == 'application/mercurial-cbor':
679 try:
680 info = cbor.loads(rawdata)
681 except cbor.CBORDecodeError:
682 raise error.Abort(_('error decoding CBOR from remote server'),
683 hint=_('try again and consider contacting '
684 'the server operator'))
685
686 # We got a legacy response. That's fine.
687 elif ct in ('application/mercurial-0.1', 'application/mercurial-0.2'):
688 info = {
689 'v1capabilities': set(rawdata.split())
690 }
691
692 else:
693 raise error.RepoError(
694 _('unexpected response type from server: %s') % ct)
695 else:
696 info = {
697 'v1capabilities': set(rawdata.split())
698 }
699
700 return respurl, info
622 701
623 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request): 702 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
624 """Construct an appropriate HTTP peer instance. 703 """Construct an appropriate HTTP peer instance.
625 704
626 ``opener`` is an ``url.opener`` that should be used to establish 705 ``opener`` is an ``url.opener`` that should be used to establish
638 url, authinfo = u.authinfo() 717 url, authinfo = u.authinfo()
639 ui.debug('using %s\n' % url) 718 ui.debug('using %s\n' % url)
640 719
641 opener = opener or urlmod.opener(ui, authinfo) 720 opener = opener or urlmod.opener(ui, authinfo)
642 721
643 respurl, caps = performhandshake(ui, url, opener, requestbuilder) 722 respurl, info = performhandshake(ui, url, opener, requestbuilder)
644 723
645 return httppeer(ui, path, respurl, opener, requestbuilder, caps) 724 # Given the intersection of APIs that both we and the server support,
725 # sort by their advertised priority and pick the first one.
726 #
727 # TODO consider making this request-based and interface driven. For
728 # example, the caller could say "I want a peer that does X." It's quite
729 # possible that not all peers would do that. Since we know the service
730 # capabilities, we could filter out services not meeting the
731 # requirements. Possibly by consulting the interfaces defined by the
732 # peer type.
733 apipeerchoices = set(info.get('apis', {}).keys()) & set(API_PEERS.keys())
734
735 preferredchoices = sorted(apipeerchoices,
736 key=lambda x: API_PEERS[x]['priority'],
737 reverse=True)
738
739 for service in preferredchoices:
740 apipath = '%s/%s' % (info['apibase'].rstrip('/'), service)
741
742 return API_PEERS[service]['init'](ui, respurl, apipath, opener,
743 requestbuilder,
744 info['apis'][service])
745
746 # Failed to construct an API peer. Fall back to legacy.
747 return httppeer(ui, path, respurl, opener, requestbuilder,
748 info['v1capabilities'])
646 749
647 def instance(ui, path, create): 750 def instance(ui, path, create):
648 if create: 751 if create:
649 raise error.Abort(_('cannot create new http repository')) 752 raise error.Abort(_('cannot create new http repository'))
650 try: 753 try: