325 Indicates that nothing of interest happened and the server is waiting on |
325 Indicates that nothing of interest happened and the server is waiting on |
326 more frames from the client before anything interesting can be done. |
326 more frames from the client before anything interesting can be done. |
327 |
327 |
328 noop |
328 noop |
329 Indicates no additional action is required. |
329 Indicates no additional action is required. |
|
330 |
|
331 Known Issues |
|
332 ------------ |
|
333 |
|
334 There are no limits to the number of partially received commands or their |
|
335 size. A malicious client could stream command request data and exhaust the |
|
336 server's memory. |
|
337 |
|
338 Partially received commands are not acted upon when end of input is |
|
339 reached. Should the server error if it receives a partial request? |
|
340 Should the client send a message to abort a partially transmitted request |
|
341 to facilitate graceful shutdown? |
|
342 |
|
343 Active requests that haven't been responded to aren't tracked. This means |
|
344 that if we receive a command and instruct its dispatch, another command |
|
345 with its request ID can come in over the wire and there will be a race |
|
346 between who responds to what. |
330 """ |
347 """ |
331 |
348 |
332 def __init__(self, deferoutput=False): |
349 def __init__(self, deferoutput=False): |
333 """Construct a new server reactor. |
350 """Construct a new server reactor. |
334 |
351 |
340 sender cannot receive until all data has been transmitted. |
357 sender cannot receive until all data has been transmitted. |
341 """ |
358 """ |
342 self._deferoutput = deferoutput |
359 self._deferoutput = deferoutput |
343 self._state = 'idle' |
360 self._state = 'idle' |
344 self._bufferedframegens = [] |
361 self._bufferedframegens = [] |
345 self._activerequestid = None |
362 # request id -> dict of commands that are actively being received. |
346 self._activecommand = None |
363 self._receivingcommands = {} |
347 self._activeargs = None |
|
348 self._activedata = None |
|
349 self._expectingargs = None |
|
350 self._expectingdata = None |
|
351 self._activeargname = None |
|
352 self._activeargchunks = None |
|
353 |
364 |
354 def onframerecv(self, requestid, frametype, frameflags, payload): |
365 def onframerecv(self, requestid, frametype, frameflags, payload): |
355 """Process a frame that has been received off the wire. |
366 """Process a frame that has been received off the wire. |
356 |
367 |
357 Returns a dict with an ``action`` key that details what action, |
368 Returns a dict with an ``action`` key that details what action, |
358 if any, the consumer should take next. |
369 if any, the consumer should take next. |
359 """ |
370 """ |
360 handlers = { |
371 handlers = { |
361 'idle': self._onframeidle, |
372 'idle': self._onframeidle, |
362 'command-receiving-args': self._onframereceivingargs, |
373 'command-receiving': self._onframecommandreceiving, |
363 'command-receiving-data': self._onframereceivingdata, |
|
364 'errored': self._onframeerrored, |
374 'errored': self._onframeerrored, |
365 } |
375 } |
366 |
376 |
367 meth = handlers.get(self._state) |
377 meth = handlers.get(self._state) |
368 if not meth: |
378 if not meth: |
412 def _makeerrorresult(self, msg): |
424 def _makeerrorresult(self, msg): |
413 return 'error', { |
425 return 'error', { |
414 'message': msg, |
426 'message': msg, |
415 } |
427 } |
416 |
428 |
417 def _makeruncommandresult(self): |
429 def _makeruncommandresult(self, requestid): |
|
430 entry = self._receivingcommands[requestid] |
|
431 del self._receivingcommands[requestid] |
|
432 |
|
433 if self._receivingcommands: |
|
434 self._state = 'command-receiving' |
|
435 else: |
|
436 self._state = 'idle' |
|
437 |
418 return 'runcommand', { |
438 return 'runcommand', { |
419 'requestid': self._activerequestid, |
439 'requestid': requestid, |
420 'command': self._activecommand, |
440 'command': entry['command'], |
421 'args': self._activeargs, |
441 'args': entry['args'], |
422 'data': self._activedata.getvalue() if self._activedata else None, |
442 'data': entry['data'].getvalue() if entry['data'] else None, |
423 } |
443 } |
424 |
444 |
425 def _makewantframeresult(self): |
445 def _makewantframeresult(self): |
426 return 'wantframe', { |
446 return 'wantframe', { |
427 'state': self._state, |
447 'state': self._state, |
433 if frametype != FRAME_TYPE_COMMAND_NAME: |
453 if frametype != FRAME_TYPE_COMMAND_NAME: |
434 self._state = 'errored' |
454 self._state = 'errored' |
435 return self._makeerrorresult( |
455 return self._makeerrorresult( |
436 _('expected command frame; got %d') % frametype) |
456 _('expected command frame; got %d') % frametype) |
437 |
457 |
438 self._activerequestid = requestid |
458 if requestid in self._receivingcommands: |
439 self._activecommand = payload |
459 self._state = 'errored' |
440 self._activeargs = {} |
460 return self._makeerrorresult( |
441 self._activedata = None |
461 _('request with ID %d already received') % requestid) |
|
462 |
|
463 expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS) |
|
464 expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA) |
|
465 |
|
466 self._receivingcommands[requestid] = { |
|
467 'command': payload, |
|
468 'args': {}, |
|
469 'data': None, |
|
470 'expectingargs': expectingargs, |
|
471 'expectingdata': expectingdata, |
|
472 } |
442 |
473 |
443 if frameflags & FLAG_COMMAND_NAME_EOS: |
474 if frameflags & FLAG_COMMAND_NAME_EOS: |
444 return self._makeruncommandresult() |
475 return self._makeruncommandresult(requestid) |
445 |
476 |
446 self._expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS) |
477 if expectingargs or expectingdata: |
447 self._expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA) |
478 self._state = 'command-receiving' |
448 |
|
449 if self._expectingargs: |
|
450 self._state = 'command-receiving-args' |
|
451 return self._makewantframeresult() |
|
452 elif self._expectingdata: |
|
453 self._activedata = util.bytesio() |
|
454 self._state = 'command-receiving-data' |
|
455 return self._makewantframeresult() |
479 return self._makewantframeresult() |
456 else: |
480 else: |
457 self._state = 'errored' |
481 self._state = 'errored' |
458 return self._makeerrorresult(_('missing frame flags on ' |
482 return self._makeerrorresult(_('missing frame flags on ' |
459 'command frame')) |
483 'command frame')) |
460 |
484 |
461 def _onframereceivingargs(self, requestid, frametype, frameflags, payload): |
485 def _onframecommandreceiving(self, requestid, frametype, frameflags, |
462 if frametype != FRAME_TYPE_COMMAND_ARGUMENT: |
486 payload): |
463 self._state = 'errored' |
487 # It could be a new command request. Process it as such. |
464 return self._makeerrorresult(_('expected command argument ' |
488 if frametype == FRAME_TYPE_COMMAND_NAME: |
465 'frame; got %d') % frametype) |
489 return self._onframeidle(requestid, frametype, frameflags, payload) |
|
490 |
|
491 # All other frames should be related to a command that is currently |
|
492 # receiving. |
|
493 if requestid not in self._receivingcommands: |
|
494 self._state = 'errored' |
|
495 return self._makeerrorresult( |
|
496 _('received frame for request that is not receiving: %d') % |
|
497 requestid) |
|
498 |
|
499 entry = self._receivingcommands[requestid] |
|
500 |
|
501 if frametype == FRAME_TYPE_COMMAND_ARGUMENT: |
|
502 if not entry['expectingargs']: |
|
503 self._state = 'errored' |
|
504 return self._makeerrorresult(_( |
|
505 'received command argument frame for request that is not ' |
|
506 'expecting arguments: %d') % requestid) |
|
507 |
|
508 return self._handlecommandargsframe(requestid, entry, frametype, |
|
509 frameflags, payload) |
|
510 |
|
511 elif frametype == FRAME_TYPE_COMMAND_DATA: |
|
512 if not entry['expectingdata']: |
|
513 self._state = 'errored' |
|
514 return self._makeerrorresult(_( |
|
515 'received command data frame for request that is not ' |
|
516 'expecting data: %d') % requestid) |
|
517 |
|
518 if entry['data'] is None: |
|
519 entry['data'] = util.bytesio() |
|
520 |
|
521 return self._handlecommanddataframe(requestid, entry, frametype, |
|
522 frameflags, payload) |
|
523 |
|
524 def _handlecommandargsframe(self, requestid, entry, frametype, frameflags, |
|
525 payload): |
|
526 # The frame and state of command should have already been validated. |
|
527 assert frametype == FRAME_TYPE_COMMAND_ARGUMENT |
466 |
528 |
467 offset = 0 |
529 offset = 0 |
468 namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload) |
530 namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload) |
469 offset += ARGUMENT_FRAME_HEADER.size |
531 offset += ARGUMENT_FRAME_HEADER.size |
470 |
532 |
481 |
543 |
482 # Argument value spans multiple frames. Record our active state |
544 # Argument value spans multiple frames. Record our active state |
483 # and wait for the next frame. |
545 # and wait for the next frame. |
484 if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION: |
546 if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION: |
485 raise error.ProgrammingError('not yet implemented') |
547 raise error.ProgrammingError('not yet implemented') |
486 self._activeargname = argname |
|
487 self._activeargchunks = [argvalue] |
|
488 self._state = 'command-arg-continuation' |
|
489 return self._makewantframeresult() |
|
490 |
548 |
491 # Common case: the argument value is completely contained in this |
549 # Common case: the argument value is completely contained in this |
492 # frame. |
550 # frame. |
493 |
551 |
494 if len(argvalue) != valuesize: |
552 if len(argvalue) != valuesize: |
495 self._state = 'errored' |
553 self._state = 'errored' |
496 return self._makeerrorresult(_('malformed argument frame: ' |
554 return self._makeerrorresult(_('malformed argument frame: ' |
497 'partial argument value')) |
555 'partial argument value')) |
498 |
556 |
499 self._activeargs[argname] = argvalue |
557 entry['args'][argname] = argvalue |
500 |
558 |
501 if frameflags & FLAG_COMMAND_ARGUMENT_EOA: |
559 if frameflags & FLAG_COMMAND_ARGUMENT_EOA: |
502 if self._expectingdata: |
560 if entry['expectingdata']: |
503 self._state = 'command-receiving-data' |
|
504 self._activedata = util.bytesio() |
|
505 # TODO signal request to run a command once we don't |
561 # TODO signal request to run a command once we don't |
506 # buffer data frames. |
562 # buffer data frames. |
507 return self._makewantframeresult() |
563 return self._makewantframeresult() |
508 else: |
564 else: |
509 self._state = 'waiting' |
565 return self._makeruncommandresult(requestid) |
510 return self._makeruncommandresult() |
|
511 else: |
566 else: |
512 return self._makewantframeresult() |
567 return self._makewantframeresult() |
513 |
568 |
514 def _onframereceivingdata(self, requestid, frametype, frameflags, payload): |
569 def _handlecommanddataframe(self, requestid, entry, frametype, frameflags, |
515 if frametype != FRAME_TYPE_COMMAND_DATA: |
570 payload): |
516 self._state = 'errored' |
571 assert frametype == FRAME_TYPE_COMMAND_DATA |
517 return self._makeerrorresult(_('expected command data frame; ' |
|
518 'got %d') % frametype) |
|
519 |
572 |
520 # TODO support streaming data instead of buffering it. |
573 # TODO support streaming data instead of buffering it. |
521 self._activedata.write(payload) |
574 entry['data'].write(payload) |
522 |
575 |
523 if frameflags & FLAG_COMMAND_DATA_CONTINUATION: |
576 if frameflags & FLAG_COMMAND_DATA_CONTINUATION: |
524 return self._makewantframeresult() |
577 return self._makewantframeresult() |
525 elif frameflags & FLAG_COMMAND_DATA_EOS: |
578 elif frameflags & FLAG_COMMAND_DATA_EOS: |
526 self._activedata.seek(0) |
579 entry['data'].seek(0) |
527 self._state = 'idle' |
580 return self._makeruncommandresult(requestid) |
528 return self._makeruncommandresult() |
|
529 else: |
581 else: |
530 self._state = 'errored' |
582 self._state = 'errored' |
531 return self._makeerrorresult(_('command data frame without ' |
583 return self._makeerrorresult(_('command data frame without ' |
532 'flags')) |
584 'flags')) |
533 |
585 |