mercurial/httppeer.py
changeset 37651 950294e28136
parent 37644 77c9ee77687c
child 37654 8cea0d57bf37
equal deleted inserted replaced
37650:62ebfda864de 37651:950294e28136
    11 import errno
    11 import errno
    12 import io
    12 import io
    13 import os
    13 import os
    14 import socket
    14 import socket
    15 import struct
    15 import struct
       
    16 import sys
    16 import tempfile
    17 import tempfile
       
    18 import weakref
    17 
    19 
    18 from .i18n import _
    20 from .i18n import _
    19 from .thirdparty import (
    21 from .thirdparty import (
    20     cbor,
    22     cbor,
    21 )
    23 )
    29     pycompat,
    31     pycompat,
    30     repository,
    32     repository,
    31     statichttprepo,
    33     statichttprepo,
    32     url as urlmod,
    34     url as urlmod,
    33     util,
    35     util,
    34     wireproto,
       
    35     wireprotoframing,
    36     wireprotoframing,
    36     wireprototypes,
    37     wireprototypes,
    37     wireprotov1peer,
    38     wireprotov1peer,
    38     wireprotov2server,
    39     wireprotov2server,
    39 )
    40 )
   515         return self._callstream(cmd, _compressible=True, **args)
   516         return self._callstream(cmd, _compressible=True, **args)
   516 
   517 
   517     def _abort(self, exception):
   518     def _abort(self, exception):
   518         raise exception
   519         raise exception
   519 
   520 
       
   521 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests):
       
   522     reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
       
   523                                              buffersends=True)
       
   524 
       
   525     url = '%s/%s' % (apiurl, permission)
       
   526 
       
   527     if len(requests) > 1:
       
   528         url += '/multirequest'
       
   529     else:
       
   530         url += '/%s' % requests[0][0]
       
   531 
       
   532     # Request ID to (request, future)
       
   533     requestmap = {}
       
   534 
       
   535     for command, args, f in requests:
       
   536         request, action, meta = reactor.callcommand(command, args)
       
   537         assert action == 'noop'
       
   538 
       
   539         requestmap[request.requestid] = (request, f)
       
   540 
       
   541     action, meta = reactor.flushcommands()
       
   542     assert action == 'sendframes'
       
   543 
       
   544     # TODO stream this.
       
   545     body = b''.join(map(bytes, meta['framegen']))
       
   546 
       
   547     # TODO modify user-agent to reflect v2
       
   548     headers = {
       
   549         r'Accept': wireprotov2server.FRAMINGTYPE,
       
   550         r'Content-Type': wireprotov2server.FRAMINGTYPE,
       
   551     }
       
   552 
       
   553     req = requestbuilder(pycompat.strurl(url), body, headers)
       
   554     req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
       
   555 
       
   556     try:
       
   557         res = opener.open(req)
       
   558     except urlerr.httperror as e:
       
   559         if e.code == 401:
       
   560             raise error.Abort(_('authorization failed'))
       
   561 
       
   562         raise
       
   563     except httplib.HTTPException as e:
       
   564         ui.traceback()
       
   565         raise IOError(None, e)
       
   566 
       
   567     return reactor, requestmap, res
       
   568 
       
   569 class queuedcommandfuture(pycompat.futures.Future):
       
   570     """Wraps result() on command futures to trigger submission on call."""
       
   571 
       
   572     def result(self, timeout=None):
       
   573         if self.done():
       
   574             return pycompat.futures.Future.result(self, timeout)
       
   575 
       
   576         self._peerexecutor.sendcommands()
       
   577 
       
   578         # sendcommands() will restore the original __class__ and self.result
       
   579         # will resolve to Future.result.
       
   580         return self.result(timeout)
       
   581 
       
   582 @zi.implementer(repository.ipeercommandexecutor)
       
   583 class httpv2executor(object):
       
   584     def __init__(self, ui, opener, requestbuilder, apiurl, descriptor):
       
   585         self._ui = ui
       
   586         self._opener = opener
       
   587         self._requestbuilder = requestbuilder
       
   588         self._apiurl = apiurl
       
   589         self._descriptor = descriptor
       
   590         self._sent = False
       
   591         self._closed = False
       
   592         self._neededpermissions = set()
       
   593         self._calls = []
       
   594         self._futures = weakref.WeakSet()
       
   595         self._responseexecutor = None
       
   596         self._responsef = None
       
   597 
       
   598     def __enter__(self):
       
   599         return self
       
   600 
       
   601     def __exit__(self, exctype, excvalue, exctb):
       
   602         self.close()
       
   603 
       
   604     def callcommand(self, command, args):
       
   605         if self._sent:
       
   606             raise error.ProgrammingError('callcommand() cannot be used after '
       
   607                                          'commands are sent')
       
   608 
       
   609         if self._closed:
       
   610             raise error.ProgrammingError('callcommand() cannot be used after '
       
   611                                          'close()')
       
   612 
       
   613         # The service advertises which commands are available. So if we attempt
       
   614         # to call an unknown command or pass an unknown argument, we can screen
       
   615         # for this.
       
   616         if command not in self._descriptor['commands']:
       
   617             raise error.ProgrammingError(
       
   618                 'wire protocol command %s is not available' % command)
       
   619 
       
   620         cmdinfo = self._descriptor['commands'][command]
       
   621         unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
       
   622 
       
   623         if unknownargs:
       
   624             raise error.ProgrammingError(
       
   625                 'wire protocol command %s does not accept argument: %s' % (
       
   626                     command, ', '.join(sorted(unknownargs))))
       
   627 
       
   628         self._neededpermissions |= set(cmdinfo['permissions'])
       
   629 
       
   630         # TODO we /could/ also validate types here, since the API descriptor
       
   631         # includes types...
       
   632 
       
   633         f = pycompat.futures.Future()
       
   634 
       
   635         # Monkeypatch it so result() triggers sendcommands(), otherwise result()
       
   636         # could deadlock.
       
   637         f.__class__ = queuedcommandfuture
       
   638         f._peerexecutor = self
       
   639 
       
   640         self._futures.add(f)
       
   641         self._calls.append((command, args, f))
       
   642 
       
   643         return f
       
   644 
       
   645     def sendcommands(self):
       
   646         if self._sent:
       
   647             return
       
   648 
       
   649         if not self._calls:
       
   650             return
       
   651 
       
   652         self._sent = True
       
   653 
       
   654         # Unhack any future types so caller sees a clean type and so we
       
   655         # break reference cycle.
       
   656         for f in self._futures:
       
   657             if isinstance(f, queuedcommandfuture):
       
   658                 f.__class__ = pycompat.futures.Future
       
   659                 f._peerexecutor = None
       
   660 
       
   661         # Mark the future as running and filter out cancelled futures.
       
   662         calls = [(command, args, f)
       
   663                  for command, args, f in self._calls
       
   664                  if f.set_running_or_notify_cancel()]
       
   665 
       
   666         # Clear out references, prevent improper object usage.
       
   667         self._calls = None
       
   668 
       
   669         if not calls:
       
   670             return
       
   671 
       
   672         permissions = set(self._neededpermissions)
       
   673 
       
   674         if 'push' in permissions and 'pull' in permissions:
       
   675             permissions.remove('pull')
       
   676 
       
   677         if len(permissions) > 1:
       
   678             raise error.RepoError(_('cannot make request requiring multiple '
       
   679                                     'permissions: %s') %
       
   680                                   _(', ').join(sorted(permissions)))
       
   681 
       
   682         permission = {
       
   683             'push': 'rw',
       
   684             'pull': 'ro',
       
   685         }[permissions.pop()]
       
   686 
       
   687         reactor, requests, resp = sendv2request(
       
   688             self._ui, self._opener, self._requestbuilder, self._apiurl,
       
   689             permission, calls)
       
   690 
       
   691         # TODO we probably want to validate the HTTP code, media type, etc.
       
   692 
       
   693         self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
       
   694         self._responsef = self._responseexecutor.submit(self._handleresponse,
       
   695                                                         reactor,
       
   696                                                         requests,
       
   697                                                         resp)
       
   698 
       
   699     def close(self):
       
   700         if self._closed:
       
   701             return
       
   702 
       
   703         self.sendcommands()
       
   704 
       
   705         self._closed = True
       
   706 
       
   707         if not self._responsef:
       
   708             return
       
   709 
       
   710         try:
       
   711             self._responsef.result()
       
   712         finally:
       
   713             self._responseexecutor.shutdown(wait=True)
       
   714             self._responsef = None
       
   715             self._responseexecutor = None
       
   716 
       
   717             # If any of our futures are still in progress, mark them as
       
   718             # errored, otherwise a result() could wait indefinitely.
       
   719             for f in self._futures:
       
   720                 if not f.done():
       
   721                     f.set_exception(error.ResponseError(
       
   722                         _('unfulfilled command response')))
       
   723 
       
   724             self._futures = None
       
   725 
       
   726     def _handleresponse(self, reactor, requests, resp):
       
   727         # Called in a thread to read the response.
       
   728 
       
   729         results = {k: [] for k in requests}
       
   730 
       
   731         while True:
       
   732             frame = wireprotoframing.readframe(resp)
       
   733             if frame is None:
       
   734                 break
       
   735 
       
   736             self._ui.note(_('received %r\n') % frame)
       
   737 
       
   738             # Guard against receiving a frame with a request ID that we
       
   739             # didn't issue. This should never happen.
       
   740             request, f = requests.get(frame.requestid, [None, None])
       
   741 
       
   742             action, meta = reactor.onframerecv(frame)
       
   743 
       
   744             if action == 'responsedata':
       
   745                 assert request.requestid == meta['request'].requestid
       
   746 
       
   747                 result = results[request.requestid]
       
   748 
       
   749                 if meta['cbor']:
       
   750                     payload = util.bytesio(meta['data'])
       
   751 
       
   752                     decoder = cbor.CBORDecoder(payload)
       
   753                     while payload.tell() + 1 < len(meta['data']):
       
   754                         try:
       
   755                             result.append(decoder.decode())
       
   756                         except Exception:
       
   757                             f.set_exception_info(*sys.exc_info()[1:])
       
   758                             continue
       
   759                 else:
       
   760                     result.append(meta['data'])
       
   761 
       
   762                 if meta['eos']:
       
   763                     f.set_result(result)
       
   764                     del results[request.requestid]
       
   765 
       
   766             else:
       
   767                 e = error.ProgrammingError('unhandled action: %s' % action)
       
   768 
       
   769                 if f:
       
   770                     f.set_exception(e)
       
   771                 else:
       
   772                     raise e
       
   773 
   520 # TODO implement interface for version 2 peers
   774 # TODO implement interface for version 2 peers
   521 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities)
   775 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities,
       
   776                 repository.ipeerrequests)
   522 class httpv2peer(object):
   777 class httpv2peer(object):
   523     def __init__(self, ui, repourl, apipath, opener, requestbuilder,
   778     def __init__(self, ui, repourl, apipath, opener, requestbuilder,
   524                  apidescriptor):
   779                  apidescriptor):
   525         self.ui = ui
   780         self.ui = ui
   526 
   781 
   527         if repourl.endswith('/'):
   782         if repourl.endswith('/'):
   528             repourl = repourl[:-1]
   783             repourl = repourl[:-1]
   529 
   784 
   530         self._url = repourl
   785         self._url = repourl
   531         self._apipath = apipath
   786         self._apipath = apipath
       
   787         self._apiurl = '%s/%s' % (repourl, apipath)
   532         self._opener = opener
   788         self._opener = opener
   533         self._requestbuilder = requestbuilder
   789         self._requestbuilder = requestbuilder
   534         self._descriptor = apidescriptor
   790         self._descriptor = apidescriptor
   535 
   791 
   536     # Start of ipeerconnection.
   792     # Start of ipeerconnection.
   578             _('cannot %s; client or remote repository does not support the %r '
   834             _('cannot %s; client or remote repository does not support the %r '
   579               'capability') % (purpose, name))
   835               'capability') % (purpose, name))
   580 
   836 
   581     # End of ipeercapabilities.
   837     # End of ipeercapabilities.
   582 
   838 
   583     # TODO require to be part of a batched primitive, use futures.
       
   584     def _call(self, name, **args):
   839     def _call(self, name, **args):
   585         """Call a wire protocol command with arguments."""
   840         with self.commandexecutor() as e:
   586 
   841             return e.callcommand(name, args).result()
   587         # Having this early has a side-effect of importing wireprotov2server,
   842 
   588         # which has the side-effect of ensuring commands are registered.
   843     def commandexecutor(self):
   589 
   844         return httpv2executor(self.ui, self._opener, self._requestbuilder,
   590         # TODO modify user-agent to reflect v2.
   845                               self._apiurl, self._descriptor)
   591         headers = {
       
   592             r'Accept': wireprotov2server.FRAMINGTYPE,
       
   593             r'Content-Type': wireprotov2server.FRAMINGTYPE,
       
   594         }
       
   595 
       
   596         # TODO permissions should come from capabilities results.
       
   597         permission = wireproto.commandsv2[name].permission
       
   598         if permission not in ('push', 'pull'):
       
   599             raise error.ProgrammingError('unknown permission type: %s' %
       
   600                                          permission)
       
   601 
       
   602         permission = {
       
   603             'push': 'rw',
       
   604             'pull': 'ro',
       
   605         }[permission]
       
   606 
       
   607         url = '%s/%s/%s/%s' % (self._url, self._apipath, permission, name)
       
   608 
       
   609         # TODO this should be part of a generic peer for the frame-based
       
   610         # protocol.
       
   611         reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
       
   612                                                  buffersends=True)
       
   613 
       
   614         request, action, meta = reactor.callcommand(name, args)
       
   615         assert action == 'noop'
       
   616 
       
   617         action, meta = reactor.flushcommands()
       
   618         assert action == 'sendframes'
       
   619 
       
   620         body = b''.join(map(bytes, meta['framegen']))
       
   621         req = self._requestbuilder(pycompat.strurl(url), body, headers)
       
   622         req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
       
   623 
       
   624         # TODO unify this code with httppeer.
       
   625         try:
       
   626             res = self._opener.open(req)
       
   627         except urlerr.httperror as e:
       
   628             if e.code == 401:
       
   629                 raise error.Abort(_('authorization failed'))
       
   630 
       
   631             raise
       
   632         except httplib.HTTPException as e:
       
   633             self.ui.traceback()
       
   634             raise IOError(None, e)
       
   635 
       
   636         # TODO validate response type, wrap response to handle I/O errors.
       
   637         # TODO more robust frame receiver.
       
   638         results = []
       
   639 
       
   640         while True:
       
   641             frame = wireprotoframing.readframe(res)
       
   642             if frame is None:
       
   643                 break
       
   644 
       
   645             self.ui.note(_('received %r\n') % frame)
       
   646 
       
   647             action, meta = reactor.onframerecv(frame)
       
   648 
       
   649             if action == 'responsedata':
       
   650                 if meta['cbor']:
       
   651                     payload = util.bytesio(meta['data'])
       
   652 
       
   653                     decoder = cbor.CBORDecoder(payload)
       
   654                     while payload.tell() + 1 < len(meta['data']):
       
   655                         results.append(decoder.decode())
       
   656                 else:
       
   657                     results.append(meta['data'])
       
   658             else:
       
   659                 error.ProgrammingError('unhandled action: %s' % action)
       
   660 
       
   661         return results
       
   662 
   846 
   663 # Registry of API service names to metadata about peers that handle it.
   847 # Registry of API service names to metadata about peers that handle it.
   664 #
   848 #
   665 # The following keys are meaningful:
   849 # The following keys are meaningful:
   666 #
   850 #