mercurial/wireprotov2peer.py
changeset 40137 ed4ebbb98ca0
parent 40134 cfeba1aafb9d
child 40139 17223d8e7d75
equal deleted inserted replaced
40136:3a6d6c54bd81 40137:ed4ebbb98ca0
   133         # An event is set when state of the object changes. This event
   133         # An event is set when state of the object changes. This event
   134         # is waited on by the generator emitting objects.
   134         # is waited on by the generator emitting objects.
   135         self._serviceable = threading.Event()
   135         self._serviceable = threading.Event()
   136 
   136 
   137         self._pendingevents = []
   137         self._pendingevents = []
       
   138         self._pendingerror = None
   138         self._decoder = cborutil.bufferingdecoder()
   139         self._decoder = cborutil.bufferingdecoder()
   139         self._seeninitial = False
   140         self._seeninitial = False
   140         self._redirect = None
   141         self._redirect = None
   141 
   142 
   142     def _oninputcomplete(self):
   143     def _oninputcomplete(self):
   165                                         'after content redirect; the remote is '
   166                                         'after content redirect; the remote is '
   166                                         'buggy'))
   167                                         'buggy'))
   167 
   168 
   168                 self._pendingevents.append(o)
   169                 self._pendingevents.append(o)
   169 
   170 
       
   171             self._serviceable.set()
       
   172 
       
   173     def _onerror(self, e):
       
   174         self._pendingerror = e
       
   175 
       
   176         with self._lock:
   170             self._serviceable.set()
   177             self._serviceable.set()
   171 
   178 
   172     def _handleinitial(self, o):
   179     def _handleinitial(self, o):
   173         self._seeninitial = True
   180         self._seeninitial = True
   174         if o[b'status'] == b'ok':
   181         if o[b'status'] == b'ok':
   209             # TODO this can infinite loop if self._inputcomplete is never
   216             # TODO this can infinite loop if self._inputcomplete is never
   210             # set. We likely want to tie the lifetime of this object/state
   217             # set. We likely want to tie the lifetime of this object/state
   211             # to that of the background thread receiving frames and updating
   218             # to that of the background thread receiving frames and updating
   212             # our state.
   219             # our state.
   213             self._serviceable.wait(1.0)
   220             self._serviceable.wait(1.0)
       
   221 
       
   222             if self._pendingerror:
       
   223                 raise self._pendingerror
   214 
   224 
   215             with self._lock:
   225             with self._lock:
   216                 self._serviceable.clear()
   226                 self._serviceable.clear()
   217 
   227 
   218                 # Make copies because objects could be mutated during
   228                 # Make copies because objects could be mutated during
   340             # Any failures processing this frame should bubble up to the
   350             # Any failures processing this frame should bubble up to the
   341             # future tracking the request.
   351             # future tracking the request.
   342             try:
   352             try:
   343                 self._processresponsedata(frame, meta, response)
   353                 self._processresponsedata(frame, meta, response)
   344             except BaseException as e:
   354             except BaseException as e:
   345                 self._futures[frame.requestid].set_exception(e)
   355                 # If an exception occurs before the future is resolved,
   346                 del self._futures[frame.requestid]
   356                 # fail the future. Otherwise, we stuff the exception on
   347                 response._oninputcomplete()
   357                 # the response object so it can be raised during objects()
       
   358                 # iteration. If nothing is consuming objects(), we could
       
   359                 # silently swallow this exception. That's a risk we'll have to
       
   360                 # take.
       
   361                 if frame.requestid in self._futures:
       
   362                     self._futures[frame.requestid].set_exception(e)
       
   363                     del self._futures[frame.requestid]
       
   364                     response._oninputcomplete()
       
   365                 else:
       
   366                     response._onerror(e)
   348         else:
   367         else:
   349             raise error.ProgrammingError(
   368             raise error.ProgrammingError(
   350                 'unhandled action from clientreactor: %s' % action)
   369                 'unhandled action from clientreactor: %s' % action)
   351 
   370 
   352     def _processresponsedata(self, frame, meta, response):
   371     def _processresponsedata(self, frame, meta, response):