32 chunks.append(msg) |
33 chunks.append(msg) |
33 |
34 |
34 return b''.join(chunks) |
35 return b''.join(chunks) |
35 |
36 |
36 class commandresponse(object): |
37 class commandresponse(object): |
37 """Represents the response to a command request.""" |
38 """Represents the response to a command request. |
|
39 |
|
40 Instances track the state of the command and hold its results. |
|
41 |
|
42 An external entity is required to update the state of the object when |
|
43 events occur. |
|
44 """ |
38 |
45 |
39 def __init__(self, requestid, command): |
46 def __init__(self, requestid, command): |
40 self.requestid = requestid |
47 self.requestid = requestid |
41 self.command = command |
48 self.command = command |
42 |
49 |
43 self.b = util.bytesio() |
50 # Whether all remote input related to this command has been |
44 |
51 # received. |
45 def cborobjects(self): |
52 self._inputcomplete = False |
46 """Obtain decoded CBOR objects from this response.""" |
53 |
47 self.b.seek(0) |
54 # We have a lock that is acquired when important object state is |
48 |
55 # mutated. This is to prevent race conditions between 1 thread |
49 for v in cborutil.decodeall(self.b.getvalue()): |
56 # sending us new data and another consuming it. |
50 yield v |
57 self._lock = threading.RLock() |
|
58 |
|
59 # An event is set when state of the object changes. This event |
|
60 # is waited on by the generator emitting objects. |
|
61 self._serviceable = threading.Event() |
|
62 |
|
63 self._pendingevents = [] |
|
64 self._decoder = cborutil.bufferingdecoder() |
|
65 self._seeninitial = False |
|
66 |
|
67 def _oninputcomplete(self): |
|
68 with self._lock: |
|
69 self._inputcomplete = True |
|
70 self._serviceable.set() |
|
71 |
|
72 def _onresponsedata(self, data): |
|
73 available, readcount, wanted = self._decoder.decode(data) |
|
74 |
|
75 if not available: |
|
76 return |
|
77 |
|
78 with self._lock: |
|
79 for o in self._decoder.getavailable(): |
|
80 if not self._seeninitial: |
|
81 self._handleinitial(o) |
|
82 continue |
|
83 |
|
84 self._pendingevents.append(o) |
|
85 |
|
86 self._serviceable.set() |
|
87 |
|
88 def _handleinitial(self, o): |
|
89 self._seeninitial = True |
|
90 if o[b'status'] == 'ok': |
|
91 return |
|
92 |
|
93 atoms = [{'msg': o[b'error'][b'message']}] |
|
94 if b'args' in o[b'error']: |
|
95 atoms[0]['args'] = o[b'error'][b'args'] |
|
96 |
|
97 raise error.RepoError(formatrichmessage(atoms)) |
|
98 |
|
99 def objects(self): |
|
100 """Obtained decoded objects from this response. |
|
101 |
|
102 This is a generator of data structures that were decoded from the |
|
103 command response. |
|
104 |
|
105 Obtaining the next member of the generator may block due to waiting |
|
106 on external data to become available. |
|
107 |
|
108 If the server encountered an error in the middle of serving the data |
|
109 or if another error occurred, an exception may be raised when |
|
110 advancing the generator. |
|
111 """ |
|
112 while True: |
|
113 # TODO this can infinite loop if self._inputcomplete is never |
|
114 # set. We likely want to tie the lifetime of this object/state |
|
115 # to that of the background thread receiving frames and updating |
|
116 # our state. |
|
117 self._serviceable.wait(1.0) |
|
118 |
|
119 with self._lock: |
|
120 self._serviceable.clear() |
|
121 |
|
122 # Make copies because objects could be mutated during |
|
123 # iteration. |
|
124 stop = self._inputcomplete |
|
125 pending = list(self._pendingevents) |
|
126 self._pendingevents[:] = [] |
|
127 |
|
128 for o in pending: |
|
129 yield o |
|
130 |
|
131 if stop: |
|
132 break |
51 |
133 |
52 class clienthandler(object): |
134 class clienthandler(object): |
53 """Object to handle higher-level client activities. |
135 """Object to handle higher-level client activities. |
54 |
136 |
55 The ``clientreactor`` is used to hold low-level state about the frame-based |
137 The ``clientreactor`` is used to hold low-level state about the frame-based |
78 raise error.ProgrammingError('%s not yet supported' % action) |
160 raise error.ProgrammingError('%s not yet supported' % action) |
79 |
161 |
80 rid = request.requestid |
162 rid = request.requestid |
81 self._requests[rid] = request |
163 self._requests[rid] = request |
82 self._futures[rid] = f |
164 self._futures[rid] = f |
|
165 # TODO we need some kind of lifetime on response instances otherwise |
|
166 # objects() may deadlock. |
83 self._responses[rid] = commandresponse(rid, command) |
167 self._responses[rid] = commandresponse(rid, command) |
84 |
168 |
85 return iter(()) |
169 return iter(()) |
86 |
170 |
87 def flushcommands(self): |
171 def flushcommands(self): |
139 # future tracking the request. |
227 # future tracking the request. |
140 try: |
228 try: |
141 self._processresponsedata(frame, meta, response) |
229 self._processresponsedata(frame, meta, response) |
142 except BaseException as e: |
230 except BaseException as e: |
143 self._futures[frame.requestid].set_exception(e) |
231 self._futures[frame.requestid].set_exception(e) |
|
232 del self._futures[frame.requestid] |
|
233 response._oninputcomplete() |
144 else: |
234 else: |
145 raise error.ProgrammingError( |
235 raise error.ProgrammingError( |
146 'unhandled action from clientreactor: %s' % action) |
236 'unhandled action from clientreactor: %s' % action) |
147 |
237 |
148 def _processresponsedata(self, frame, meta, response): |
238 def _processresponsedata(self, frame, meta, response): |
149 # This buffers all data until end of stream is received. This |
239 # This can raise. The caller can handle it. |
150 # is bad for performance. |
240 response._onresponsedata(meta['data']) |
151 # TODO make response data streamable |
|
152 response.b.write(meta['data']) |
|
153 |
241 |
154 if meta['eos']: |
242 if meta['eos']: |
155 # If the command has a decoder, resolve the future to the |
243 response._oninputcomplete() |
156 # decoded value. Otherwise resolve to the rich response object. |
|
157 decoder = COMMAND_DECODERS.get(response.command) |
|
158 |
|
159 # TODO consider always resolving the overall status map. |
|
160 if decoder: |
|
161 objs = response.cborobjects() |
|
162 |
|
163 overall = next(objs) |
|
164 |
|
165 if overall['status'] == 'ok': |
|
166 self._futures[frame.requestid].set_result(decoder(objs)) |
|
167 else: |
|
168 atoms = [{'msg': overall['error']['message']}] |
|
169 if 'args' in overall['error']: |
|
170 atoms[0]['args'] = overall['error']['args'] |
|
171 e = error.RepoError(formatrichmessage(atoms)) |
|
172 self._futures[frame.requestid].set_exception(e) |
|
173 else: |
|
174 self._futures[frame.requestid].set_result(response) |
|
175 |
|
176 del self._requests[frame.requestid] |
244 del self._requests[frame.requestid] |
|
245 |
|
246 # If the command has a decoder, we wait until all input has been |
|
247 # received before resolving the future. Otherwise we resolve the |
|
248 # future immediately. |
|
249 if frame.requestid not in self._futures: |
|
250 return |
|
251 |
|
252 if response.command not in COMMAND_DECODERS: |
|
253 self._futures[frame.requestid].set_result(response.objects()) |
|
254 del self._futures[frame.requestid] |
|
255 elif response._inputcomplete: |
|
256 decoded = COMMAND_DECODERS[response.command](response.objects()) |
|
257 self._futures[frame.requestid].set_result(decoded) |
177 del self._futures[frame.requestid] |
258 del self._futures[frame.requestid] |
178 |
259 |
179 def decodebranchmap(objs): |
260 def decodebranchmap(objs): |
180 # Response should be a single CBOR map of branch name to array of nodes. |
261 # Response should be a single CBOR map of branch name to array of nodes. |
181 bm = next(objs) |
262 bm = next(objs) |