133 # An event is set when state of the object changes. This event |
133 # An event is set when state of the object changes. This event |
134 # is waited on by the generator emitting objects. |
134 # is waited on by the generator emitting objects. |
135 self._serviceable = threading.Event() |
135 self._serviceable = threading.Event() |
136 |
136 |
137 self._pendingevents = [] |
137 self._pendingevents = [] |
|
138 self._pendingerror = None |
138 self._decoder = cborutil.bufferingdecoder() |
139 self._decoder = cborutil.bufferingdecoder() |
139 self._seeninitial = False |
140 self._seeninitial = False |
140 self._redirect = None |
141 self._redirect = None |
141 |
142 |
142 def _oninputcomplete(self): |
143 def _oninputcomplete(self): |
165 'after content redirect; the remote is ' |
166 'after content redirect; the remote is ' |
166 'buggy')) |
167 'buggy')) |
167 |
168 |
168 self._pendingevents.append(o) |
169 self._pendingevents.append(o) |
169 |
170 |
|
171 self._serviceable.set() |
|
172 |
|
173 def _onerror(self, e): |
|
174 self._pendingerror = e |
|
175 |
|
176 with self._lock: |
170 self._serviceable.set() |
177 self._serviceable.set() |
171 |
178 |
172 def _handleinitial(self, o): |
179 def _handleinitial(self, o): |
173 self._seeninitial = True |
180 self._seeninitial = True |
174 if o[b'status'] == b'ok': |
181 if o[b'status'] == b'ok': |
209 # TODO this can infinite loop if self._inputcomplete is never |
216 # TODO this can infinite loop if self._inputcomplete is never |
210 # set. We likely want to tie the lifetime of this object/state |
217 # set. We likely want to tie the lifetime of this object/state |
211 # to that of the background thread receiving frames and updating |
218 # to that of the background thread receiving frames and updating |
212 # our state. |
219 # our state. |
213 self._serviceable.wait(1.0) |
220 self._serviceable.wait(1.0) |
|
221 |
|
222 if self._pendingerror: |
|
223 raise self._pendingerror |
214 |
224 |
215 with self._lock: |
225 with self._lock: |
216 self._serviceable.clear() |
226 self._serviceable.clear() |
217 |
227 |
218 # Make copies because objects could be mutated during |
228 # Make copies because objects could be mutated during |
340 # Any failures processing this frame should bubble up to the |
350 # Any failures processing this frame should bubble up to the |
341 # future tracking the request. |
351 # future tracking the request. |
342 try: |
352 try: |
343 self._processresponsedata(frame, meta, response) |
353 self._processresponsedata(frame, meta, response) |
344 except BaseException as e: |
354 except BaseException as e: |
345 self._futures[frame.requestid].set_exception(e) |
355 # If an exception occurs before the future is resolved, |
346 del self._futures[frame.requestid] |
356 # fail the future. Otherwise, we stuff the exception on |
347 response._oninputcomplete() |
357 # the response object so it can be raised during objects() |
|
358 # iteration. If nothing is consuming objects(), we could |
|
359 # silently swallow this exception. That's a risk we'll have to |
|
360 # take. |
|
361 if frame.requestid in self._futures: |
|
362 self._futures[frame.requestid].set_exception(e) |
|
363 del self._futures[frame.requestid] |
|
364 response._oninputcomplete() |
|
365 else: |
|
366 response._onerror(e) |
348 else: |
367 else: |
349 raise error.ProgrammingError( |
368 raise error.ProgrammingError( |
350 'unhandled action from clientreactor: %s' % action) |
369 'unhandled action from clientreactor: %s' % action) |
351 |
370 |
352 def _processresponsedata(self, frame, meta, response): |
371 def _processresponsedata(self, frame, meta, response): |