87 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE, |
87 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE, |
88 } |
88 } |
89 |
89 |
90 ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH') |
90 ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH') |
91 |
91 |
92 def makeframe(frametype, frameflags, payload): |
92 def makeframe(requestid, frametype, frameflags, payload): |
93 """Assemble a frame into a byte array.""" |
93 """Assemble a frame into a byte array.""" |
94 # TODO assert size of payload. |
94 # TODO assert size of payload. |
95 frame = bytearray(FRAME_HEADER_SIZE + len(payload)) |
95 frame = bytearray(FRAME_HEADER_SIZE + len(payload)) |
96 |
96 |
|
97 # 24 bits length |
|
98 # 16 bits request id |
|
99 # 4 bits type |
|
100 # 4 bits flags |
|
101 |
97 l = struct.pack(r'<I', len(payload)) |
102 l = struct.pack(r'<I', len(payload)) |
98 frame[0:3] = l[0:3] |
103 frame[0:3] = l[0:3] |
99 frame[3] = (frametype << 4) | frameflags |
104 struct.pack_into(r'<H', frame, 3, requestid) |
100 frame[4:] = payload |
105 frame[5] = (frametype << 4) | frameflags |
|
106 frame[6:] = payload |
101 |
107 |
102 return frame |
108 return frame |
103 |
109 |
104 def makeframefromhumanstring(s): |
110 def makeframefromhumanstring(s): |
105 """Given a string of the form: <type> <flags> <payload>, creates a frame. |
111 """Create a frame from a human readable string |
|
112 |
|
113 Strings have the form: |
|
114 |
|
115 <request-id> <type> <flags> <payload> |
106 |
116 |
107 This can be used by user-facing applications and tests for creating |
117 This can be used by user-facing applications and tests for creating |
108 frames easily without having to type out a bunch of constants. |
118 frames easily without having to type out a bunch of constants. |
109 |
119 |
|
120 Request ID is an integer. |
|
121 |
110 Frame type and flags can be specified by integer or named constant. |
122 Frame type and flags can be specified by integer or named constant. |
|
123 |
111 Flags can be delimited by `|` to bitwise OR them together. |
124 Flags can be delimited by `|` to bitwise OR them together. |
112 """ |
125 """ |
113 frametype, frameflags, payload = s.split(b' ', 2) |
126 requestid, frametype, frameflags, payload = s.split(b' ', 3) |
|
127 |
|
128 requestid = int(requestid) |
114 |
129 |
115 if frametype in FRAME_TYPES: |
130 if frametype in FRAME_TYPES: |
116 frametype = FRAME_TYPES[frametype] |
131 frametype = FRAME_TYPES[frametype] |
117 else: |
132 else: |
118 frametype = int(frametype) |
133 frametype = int(frametype) |
125 else: |
140 else: |
126 finalflags |= int(flag) |
141 finalflags |= int(flag) |
127 |
142 |
128 payload = util.unescapestr(payload) |
143 payload = util.unescapestr(payload) |
129 |
144 |
130 return makeframe(frametype, finalflags, payload) |
145 return makeframe(requestid, frametype, finalflags, payload) |
131 |
146 |
132 def parseheader(data): |
147 def parseheader(data): |
133 """Parse a unified framing protocol frame header from a buffer. |
148 """Parse a unified framing protocol frame header from a buffer. |
134 |
149 |
135 The header is expected to be in the buffer at offset 0 and the |
150 The header is expected to be in the buffer at offset 0 and the |
138 # 24 bits payload length (little endian) |
153 # 24 bits payload length (little endian) |
139 # 4 bits frame type |
154 # 4 bits frame type |
140 # 4 bits frame flags |
155 # 4 bits frame flags |
141 # ... payload |
156 # ... payload |
142 framelength = data[0] + 256 * data[1] + 16384 * data[2] |
157 framelength = data[0] + 256 * data[1] + 16384 * data[2] |
143 typeflags = data[3] |
158 requestid = struct.unpack_from(r'<H', data, 3)[0] |
|
159 typeflags = data[5] |
144 |
160 |
145 frametype = (typeflags & 0xf0) >> 4 |
161 frametype = (typeflags & 0xf0) >> 4 |
146 frameflags = typeflags & 0x0f |
162 frameflags = typeflags & 0x0f |
147 |
163 |
148 return frametype, frameflags, framelength |
164 return requestid, frametype, frameflags, framelength |
149 |
165 |
150 def readframe(fh): |
166 def readframe(fh): |
151 """Read a unified framing protocol frame from a file object. |
167 """Read a unified framing protocol frame from a file object. |
152 |
168 |
153 Returns a 3-tuple of (type, flags, payload) for the decoded frame or |
169 Returns a 3-tuple of (type, flags, payload) for the decoded frame or |
163 |
179 |
164 if readcount != FRAME_HEADER_SIZE: |
180 if readcount != FRAME_HEADER_SIZE: |
165 raise error.Abort(_('received incomplete frame: got %d bytes: %s') % |
181 raise error.Abort(_('received incomplete frame: got %d bytes: %s') % |
166 (readcount, header)) |
182 (readcount, header)) |
167 |
183 |
168 frametype, frameflags, framelength = parseheader(header) |
184 requestid, frametype, frameflags, framelength = parseheader(header) |
169 |
185 |
170 payload = fh.read(framelength) |
186 payload = fh.read(framelength) |
171 if len(payload) != framelength: |
187 if len(payload) != framelength: |
172 raise error.Abort(_('frame length error: expected %d; got %d') % |
188 raise error.Abort(_('frame length error: expected %d; got %d') % |
173 (framelength, len(payload))) |
189 (framelength, len(payload))) |
174 |
190 |
175 return frametype, frameflags, payload |
191 return requestid, frametype, frameflags, payload |
176 |
192 |
177 def createcommandframes(cmd, args, datafh=None): |
193 def createcommandframes(requestid, cmd, args, datafh=None): |
178 """Create frames necessary to transmit a request to run a command. |
194 """Create frames necessary to transmit a request to run a command. |
179 |
195 |
180 This is a generator of bytearrays. Each item represents a frame |
196 This is a generator of bytearrays. Each item represents a frame |
181 ready to be sent over the wire to a peer. |
197 ready to be sent over the wire to a peer. |
182 """ |
198 """ |
217 else: |
233 else: |
218 flags = FLAG_COMMAND_DATA_EOS |
234 flags = FLAG_COMMAND_DATA_EOS |
219 assert datafh.read(1) == b'' |
235 assert datafh.read(1) == b'' |
220 done = True |
236 done = True |
221 |
237 |
222 yield makeframe(FRAME_TYPE_COMMAND_DATA, flags, data) |
238 yield makeframe(requestid, FRAME_TYPE_COMMAND_DATA, flags, data) |
223 |
239 |
224 if done: |
240 if done: |
225 break |
241 break |
226 |
242 |
227 def createbytesresponseframesfrombytes(data, |
243 def createbytesresponseframesfrombytes(requestid, data, |
228 maxframesize=DEFAULT_MAX_FRAME_SIZE): |
244 maxframesize=DEFAULT_MAX_FRAME_SIZE): |
229 """Create a raw frame to send a bytes response from static bytes input. |
245 """Create a raw frame to send a bytes response from static bytes input. |
230 |
246 |
231 Returns a generator of bytearrays. |
247 Returns a generator of bytearrays. |
232 """ |
248 """ |
233 |
249 |
234 # Simple case of a single frame. |
250 # Simple case of a single frame. |
235 if len(data) <= maxframesize: |
251 if len(data) <= maxframesize: |
236 yield makeframe(FRAME_TYPE_BYTES_RESPONSE, |
252 yield makeframe(requestid, FRAME_TYPE_BYTES_RESPONSE, |
237 FLAG_BYTES_RESPONSE_EOS, data) |
253 FLAG_BYTES_RESPONSE_EOS, data) |
238 return |
254 return |
239 |
255 |
240 offset = 0 |
256 offset = 0 |
241 while True: |
257 while True: |
246 if done: |
262 if done: |
247 flags = FLAG_BYTES_RESPONSE_EOS |
263 flags = FLAG_BYTES_RESPONSE_EOS |
248 else: |
264 else: |
249 flags = FLAG_BYTES_RESPONSE_CONTINUATION |
265 flags = FLAG_BYTES_RESPONSE_CONTINUATION |
250 |
266 |
251 yield makeframe(FRAME_TYPE_BYTES_RESPONSE, flags, chunk) |
267 yield makeframe(requestid, FRAME_TYPE_BYTES_RESPONSE, flags, chunk) |
252 |
268 |
253 if done: |
269 if done: |
254 break |
270 break |
255 |
271 |
256 def createerrorframe(msg, protocol=False, application=False): |
272 def createerrorframe(requestid, msg, protocol=False, application=False): |
257 # TODO properly handle frame size limits. |
273 # TODO properly handle frame size limits. |
258 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE |
274 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE |
259 |
275 |
260 flags = 0 |
276 flags = 0 |
261 if protocol: |
277 if protocol: |
262 flags |= FLAG_ERROR_RESPONSE_PROTOCOL |
278 flags |= FLAG_ERROR_RESPONSE_PROTOCOL |
263 if application: |
279 if application: |
264 flags |= FLAG_ERROR_RESPONSE_APPLICATION |
280 flags |= FLAG_ERROR_RESPONSE_APPLICATION |
265 |
281 |
266 yield makeframe(FRAME_TYPE_ERROR_RESPONSE, flags, msg) |
282 yield makeframe(requestid, FRAME_TYPE_ERROR_RESPONSE, flags, msg) |
267 |
283 |
268 class serverreactor(object): |
284 class serverreactor(object): |
269 """Holds state of a server handling frame-based protocol requests. |
285 """Holds state of a server handling frame-based protocol requests. |
270 |
286 |
271 This class is the "brain" of the unified frame-based protocol server |
287 This class is the "brain" of the unified frame-based protocol server |
324 sender cannot receive until all data has been transmitted. |
340 sender cannot receive until all data has been transmitted. |
325 """ |
341 """ |
326 self._deferoutput = deferoutput |
342 self._deferoutput = deferoutput |
327 self._state = 'idle' |
343 self._state = 'idle' |
328 self._bufferedframegens = [] |
344 self._bufferedframegens = [] |
|
345 self._activerequestid = None |
329 self._activecommand = None |
346 self._activecommand = None |
330 self._activeargs = None |
347 self._activeargs = None |
331 self._activedata = None |
348 self._activedata = None |
332 self._expectingargs = None |
349 self._expectingargs = None |
333 self._expectingdata = None |
350 self._expectingdata = None |
334 self._activeargname = None |
351 self._activeargname = None |
335 self._activeargchunks = None |
352 self._activeargchunks = None |
336 |
353 |
337 def onframerecv(self, frametype, frameflags, payload): |
354 def onframerecv(self, requestid, frametype, frameflags, payload): |
338 """Process a frame that has been received off the wire. |
355 """Process a frame that has been received off the wire. |
339 |
356 |
340 Returns a dict with an ``action`` key that details what action, |
357 Returns a dict with an ``action`` key that details what action, |
341 if any, the consumer should take next. |
358 if any, the consumer should take next. |
342 """ |
359 """ |
349 |
366 |
350 meth = handlers.get(self._state) |
367 meth = handlers.get(self._state) |
351 if not meth: |
368 if not meth: |
352 raise error.ProgrammingError('unhandled state: %s' % self._state) |
369 raise error.ProgrammingError('unhandled state: %s' % self._state) |
353 |
370 |
354 return meth(frametype, frameflags, payload) |
371 return meth(requestid, frametype, frameflags, payload) |
355 |
372 |
356 def onbytesresponseready(self, data): |
373 def onbytesresponseready(self, requestid, data): |
357 """Signal that a bytes response is ready to be sent to the client. |
374 """Signal that a bytes response is ready to be sent to the client. |
358 |
375 |
359 The raw bytes response is passed as an argument. |
376 The raw bytes response is passed as an argument. |
360 """ |
377 """ |
361 framegen = createbytesresponseframesfrombytes(data) |
378 framegen = createbytesresponseframesfrombytes(requestid, data) |
362 |
379 |
363 if self._deferoutput: |
380 if self._deferoutput: |
364 self._bufferedframegens.append(framegen) |
381 self._bufferedframegens.append(framegen) |
365 return 'noop', {} |
382 return 'noop', {} |
366 else: |
383 else: |
385 |
402 |
386 return 'sendframes', { |
403 return 'sendframes', { |
387 'framegen': makegen(), |
404 'framegen': makegen(), |
388 } |
405 } |
389 |
406 |
390 def onapplicationerror(self, msg): |
407 def onapplicationerror(self, requestid, msg): |
391 return 'sendframes', { |
408 return 'sendframes', { |
392 'framegen': createerrorframe(msg, application=True), |
409 'framegen': createerrorframe(requestid, msg, application=True), |
393 } |
410 } |
394 |
411 |
395 def _makeerrorresult(self, msg): |
412 def _makeerrorresult(self, msg): |
396 return 'error', { |
413 return 'error', { |
397 'message': msg, |
414 'message': msg, |
398 } |
415 } |
399 |
416 |
400 def _makeruncommandresult(self): |
417 def _makeruncommandresult(self): |
401 return 'runcommand', { |
418 return 'runcommand', { |
|
419 'requestid': self._activerequestid, |
402 'command': self._activecommand, |
420 'command': self._activecommand, |
403 'args': self._activeargs, |
421 'args': self._activeargs, |
404 'data': self._activedata.getvalue() if self._activedata else None, |
422 'data': self._activedata.getvalue() if self._activedata else None, |
405 } |
423 } |
406 |
424 |
407 def _makewantframeresult(self): |
425 def _makewantframeresult(self): |
408 return 'wantframe', { |
426 return 'wantframe', { |
409 'state': self._state, |
427 'state': self._state, |
410 } |
428 } |
411 |
429 |
412 def _onframeidle(self, frametype, frameflags, payload): |
430 def _onframeidle(self, requestid, frametype, frameflags, payload): |
413 # The only frame type that should be received in this state is a |
431 # The only frame type that should be received in this state is a |
414 # command request. |
432 # command request. |
415 if frametype != FRAME_TYPE_COMMAND_NAME: |
433 if frametype != FRAME_TYPE_COMMAND_NAME: |
416 self._state = 'errored' |
434 self._state = 'errored' |
417 return self._makeerrorresult( |
435 return self._makeerrorresult( |
418 _('expected command frame; got %d') % frametype) |
436 _('expected command frame; got %d') % frametype) |
419 |
437 |
|
438 self._activerequestid = requestid |
420 self._activecommand = payload |
439 self._activecommand = payload |
421 self._activeargs = {} |
440 self._activeargs = {} |
422 self._activedata = None |
441 self._activedata = None |
423 |
442 |
424 if frameflags & FLAG_COMMAND_NAME_EOS: |
443 if frameflags & FLAG_COMMAND_NAME_EOS: |
437 else: |
456 else: |
438 self._state = 'errored' |
457 self._state = 'errored' |
439 return self._makeerrorresult(_('missing frame flags on ' |
458 return self._makeerrorresult(_('missing frame flags on ' |
440 'command frame')) |
459 'command frame')) |
441 |
460 |
442 def _onframereceivingargs(self, frametype, frameflags, payload): |
461 def _onframereceivingargs(self, requestid, frametype, frameflags, payload): |
443 if frametype != FRAME_TYPE_COMMAND_ARGUMENT: |
462 if frametype != FRAME_TYPE_COMMAND_ARGUMENT: |
444 self._state = 'errored' |
463 self._state = 'errored' |
445 return self._makeerrorresult(_('expected command argument ' |
464 return self._makeerrorresult(_('expected command argument ' |
446 'frame; got %d') % frametype) |
465 'frame; got %d') % frametype) |
447 |
466 |
490 self._state = 'waiting' |
509 self._state = 'waiting' |
491 return self._makeruncommandresult() |
510 return self._makeruncommandresult() |
492 else: |
511 else: |
493 return self._makewantframeresult() |
512 return self._makewantframeresult() |
494 |
513 |
495 def _onframereceivingdata(self, frametype, frameflags, payload): |
514 def _onframereceivingdata(self, requestid, frametype, frameflags, payload): |
496 if frametype != FRAME_TYPE_COMMAND_DATA: |
515 if frametype != FRAME_TYPE_COMMAND_DATA: |
497 self._state = 'errored' |
516 self._state = 'errored' |
498 return self._makeerrorresult(_('expected command data frame; ' |
517 return self._makeerrorresult(_('expected command data frame; ' |
499 'got %d') % frametype) |
518 'got %d') % frametype) |
500 |
519 |