mercurial/wireprotov2peer.py
changeset 39561 d06834e0f48e
parent 39559 07b58266bce3
child 40019 f5a05bb48116
equal deleted inserted replaced
39560:84bf6ded9317 39561:d06834e0f48e
     5 # This software may be used and distributed according to the terms of the
     5 # This software may be used and distributed according to the terms of the
     6 # GNU General Public License version 2 or any later version.
     6 # GNU General Public License version 2 or any later version.
     7 
     7 
     8 from __future__ import absolute_import
     8 from __future__ import absolute_import
     9 
     9 
       
    10 import threading
       
    11 
    10 from .i18n import _
    12 from .i18n import _
    11 from . import (
    13 from . import (
    12     encoding,
    14     encoding,
    13     error,
    15     error,
    14     util,
       
    15     wireprotoframing,
    16     wireprotoframing,
    16 )
    17 )
    17 from .utils import (
    18 from .utils import (
    18     cborutil,
    19     cborutil,
    19 )
    20 )
    32         chunks.append(msg)
    33         chunks.append(msg)
    33 
    34 
    34     return b''.join(chunks)
    35     return b''.join(chunks)
    35 
    36 
    36 class commandresponse(object):
    37 class commandresponse(object):
    37     """Represents the response to a command request."""
    38     """Represents the response to a command request.
       
    39 
       
    40     Instances track the state of the command and hold its results.
       
    41 
       
    42     An external entity is required to update the state of the object when
       
    43     events occur.
       
    44     """
    38 
    45 
    39     def __init__(self, requestid, command):
    46     def __init__(self, requestid, command):
    40         self.requestid = requestid
    47         self.requestid = requestid
    41         self.command = command
    48         self.command = command
    42 
    49 
    43         self.b = util.bytesio()
    50         # Whether all remote input related to this command has been
    44 
    51         # received.
    45     def cborobjects(self):
    52         self._inputcomplete = False
    46         """Obtain decoded CBOR objects from this response."""
    53 
    47         self.b.seek(0)
    54         # We have a lock that is acquired when important object state is
    48 
    55         # mutated. This is to prevent race conditions between 1 thread
    49         for v in cborutil.decodeall(self.b.getvalue()):
    56         # sending us new data and another consuming it.
    50             yield v
    57         self._lock = threading.RLock()
       
    58 
       
    59         # An event is set when state of the object changes. This event
       
    60         # is waited on by the generator emitting objects.
       
    61         self._serviceable = threading.Event()
       
    62 
       
    63         self._pendingevents = []
       
    64         self._decoder = cborutil.bufferingdecoder()
       
    65         self._seeninitial = False
       
    66 
       
    67     def _oninputcomplete(self):
       
    68         with self._lock:
       
    69             self._inputcomplete = True
       
    70             self._serviceable.set()
       
    71 
       
    72     def _onresponsedata(self, data):
       
    73         available, readcount, wanted = self._decoder.decode(data)
       
    74 
       
    75         if not available:
       
    76             return
       
    77 
       
    78         with self._lock:
       
    79             for o in self._decoder.getavailable():
       
    80                 if not self._seeninitial:
       
    81                     self._handleinitial(o)
       
    82                     continue
       
    83 
       
    84                 self._pendingevents.append(o)
       
    85 
       
    86             self._serviceable.set()
       
    87 
       
    88     def _handleinitial(self, o):
       
    89         self._seeninitial = True
       
    90         if o[b'status'] == 'ok':
       
    91             return
       
    92 
       
    93         atoms = [{'msg': o[b'error'][b'message']}]
       
    94         if b'args' in o[b'error']:
       
    95             atoms[0]['args'] = o[b'error'][b'args']
       
    96 
       
    97         raise error.RepoError(formatrichmessage(atoms))
       
    98 
       
    99     def objects(self):
       
   100         """Obtained decoded objects from this response.
       
   101 
       
   102         This is a generator of data structures that were decoded from the
       
   103         command response.
       
   104 
       
   105         Obtaining the next member of the generator may block due to waiting
       
   106         on external data to become available.
       
   107 
       
   108         If the server encountered an error in the middle of serving the data
       
   109         or if another error occurred, an exception may be raised when
       
   110         advancing the generator.
       
   111         """
       
   112         while True:
       
   113             # TODO this can infinite loop if self._inputcomplete is never
       
   114             # set. We likely want to tie the lifetime of this object/state
       
   115             # to that of the background thread receiving frames and updating
       
   116             # our state.
       
   117             self._serviceable.wait(1.0)
       
   118 
       
   119             with self._lock:
       
   120                 self._serviceable.clear()
       
   121 
       
   122                 # Make copies because objects could be mutated during
       
   123                 # iteration.
       
   124                 stop = self._inputcomplete
       
   125                 pending = list(self._pendingevents)
       
   126                 self._pendingevents[:] = []
       
   127 
       
   128             for o in pending:
       
   129                 yield o
       
   130 
       
   131             if stop:
       
   132                 break
    51 
   133 
    52 class clienthandler(object):
   134 class clienthandler(object):
    53     """Object to handle higher-level client activities.
   135     """Object to handle higher-level client activities.
    54 
   136 
    55     The ``clientreactor`` is used to hold low-level state about the frame-based
   137     The ``clientreactor`` is used to hold low-level state about the frame-based
    78             raise error.ProgrammingError('%s not yet supported' % action)
   160             raise error.ProgrammingError('%s not yet supported' % action)
    79 
   161 
    80         rid = request.requestid
   162         rid = request.requestid
    81         self._requests[rid] = request
   163         self._requests[rid] = request
    82         self._futures[rid] = f
   164         self._futures[rid] = f
       
   165         # TODO we need some kind of lifetime on response instances otherwise
       
   166         # objects() may deadlock.
    83         self._responses[rid] = commandresponse(rid, command)
   167         self._responses[rid] = commandresponse(rid, command)
    84 
   168 
    85         return iter(())
   169         return iter(())
    86 
   170 
    87     def flushcommands(self):
   171     def flushcommands(self):
   117         action, meta = self._reactor.onframerecv(frame)
   201         action, meta = self._reactor.onframerecv(frame)
   118 
   202 
   119         if action == 'error':
   203         if action == 'error':
   120             e = error.RepoError(meta['message'])
   204             e = error.RepoError(meta['message'])
   121 
   205 
       
   206             if frame.requestid in self._responses:
       
   207                 self._responses[frame.requestid]._oninputcomplete()
       
   208 
   122             if frame.requestid in self._futures:
   209             if frame.requestid in self._futures:
   123                 self._futures[frame.requestid].set_exception(e)
   210                 self._futures[frame.requestid].set_exception(e)
       
   211                 del self._futures[frame.requestid]
   124             else:
   212             else:
   125                 raise e
   213                 raise e
   126 
   214 
   127             return
   215             return
   128 
   216 
   139             # future tracking the request.
   227             # future tracking the request.
   140             try:
   228             try:
   141                 self._processresponsedata(frame, meta, response)
   229                 self._processresponsedata(frame, meta, response)
   142             except BaseException as e:
   230             except BaseException as e:
   143                 self._futures[frame.requestid].set_exception(e)
   231                 self._futures[frame.requestid].set_exception(e)
       
   232                 del self._futures[frame.requestid]
       
   233                 response._oninputcomplete()
   144         else:
   234         else:
   145             raise error.ProgrammingError(
   235             raise error.ProgrammingError(
   146                 'unhandled action from clientreactor: %s' % action)
   236                 'unhandled action from clientreactor: %s' % action)
   147 
   237 
   148     def _processresponsedata(self, frame, meta, response):
   238     def _processresponsedata(self, frame, meta, response):
   149         # This buffers all data until end of stream is received. This
   239         # This can raise. The caller can handle it.
   150         # is bad for performance.
   240         response._onresponsedata(meta['data'])
   151         # TODO make response data streamable
       
   152         response.b.write(meta['data'])
       
   153 
   241 
   154         if meta['eos']:
   242         if meta['eos']:
   155             # If the command has a decoder, resolve the future to the
   243             response._oninputcomplete()
   156             # decoded value. Otherwise resolve to the rich response object.
       
   157             decoder = COMMAND_DECODERS.get(response.command)
       
   158 
       
   159             # TODO consider always resolving the overall status map.
       
   160             if decoder:
       
   161                 objs = response.cborobjects()
       
   162 
       
   163                 overall = next(objs)
       
   164 
       
   165                 if overall['status'] == 'ok':
       
   166                     self._futures[frame.requestid].set_result(decoder(objs))
       
   167                 else:
       
   168                     atoms = [{'msg': overall['error']['message']}]
       
   169                     if 'args' in overall['error']:
       
   170                         atoms[0]['args'] = overall['error']['args']
       
   171                     e = error.RepoError(formatrichmessage(atoms))
       
   172                     self._futures[frame.requestid].set_exception(e)
       
   173             else:
       
   174                 self._futures[frame.requestid].set_result(response)
       
   175 
       
   176             del self._requests[frame.requestid]
   244             del self._requests[frame.requestid]
       
   245 
       
   246         # If the command has a decoder, we wait until all input has been
       
   247         # received before resolving the future. Otherwise we resolve the
       
   248         # future immediately.
       
   249         if frame.requestid not in self._futures:
       
   250             return
       
   251 
       
   252         if response.command not in COMMAND_DECODERS:
       
   253             self._futures[frame.requestid].set_result(response.objects())
       
   254             del self._futures[frame.requestid]
       
   255         elif response._inputcomplete:
       
   256             decoded = COMMAND_DECODERS[response.command](response.objects())
       
   257             self._futures[frame.requestid].set_result(decoded)
   177             del self._futures[frame.requestid]
   258             del self._futures[frame.requestid]
   178 
   259 
   179 def decodebranchmap(objs):
   260 def decodebranchmap(objs):
   180     # Response should be a single CBOR map of branch name to array of nodes.
   261     # Response should be a single CBOR map of branch name to array of nodes.
   181     bm = next(objs)
   262     bm = next(objs)