Mercurial > public > mercurial-scm > hg
comparison mercurial/wireprotoframing.py @ 37058:c5e9c3b47366
wireproto: support for receiving multiple requests
Now that we have request IDs on each frame and a specification
that allows multiple requests to be issued simultaneously,
possibly interleaved, let's teach the server to deal with that.
Instead of tracking the state for *the* active command request,
we instead track the state of each receiving command by its
request ID. The multiple states in our state machine for processing
each command's state has been collapsed into a single state for
"receiving commands."
Tests have been added so our branch coverage covers all meaningful
branches.
However, we did lose some logical coverage. The implementation
of this new feature opens up the door to a server having partial
command requests when end of input is reached. We will probably
want a mechanism to deal with partial requests. For now, I've
tracked that as a known issue in the class docstring. I've
also noted an abuse vector that becomes a little bit easier to
exploit with this feature.
Differential Revision: https://phab.mercurial-scm.org/D2870
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Wed, 14 Mar 2018 16:53:30 -0700 |
parents | 2ec1fb9de638 |
children | 0a6c5cc09a88 |
comparison
equal
deleted
inserted
replaced
37057:2ec1fb9de638 | 37058:c5e9c3b47366 |
---|---|
325 Indicates that nothing of interest happened and the server is waiting on | 325 Indicates that nothing of interest happened and the server is waiting on |
326 more frames from the client before anything interesting can be done. | 326 more frames from the client before anything interesting can be done. |
327 | 327 |
328 noop | 328 noop |
329 Indicates no additional action is required. | 329 Indicates no additional action is required. |
330 | |
331 Known Issues | |
332 ------------ | |
333 | |
334 There are no limits to the number of partially received commands or their | |
335 size. A malicious client could stream command request data and exhaust the | |
336 server's memory. | |
337 | |
338 Partially received commands are not acted upon when end of input is | |
339 reached. Should the server error if it receives a partial request? | |
340 Should the client send a message to abort a partially transmitted request | |
341 to facilitate graceful shutdown? | |
342 | |
343 Active requests that haven't been responded to aren't tracked. This means | |
344 that if we receive a command and instruct its dispatch, another command | |
345 with its request ID can come in over the wire and there will be a race | |
346 between who responds to what. | |
330 """ | 347 """ |
331 | 348 |
332 def __init__(self, deferoutput=False): | 349 def __init__(self, deferoutput=False): |
333 """Construct a new server reactor. | 350 """Construct a new server reactor. |
334 | 351 |
340 sender cannot receive until all data has been transmitted. | 357 sender cannot receive until all data has been transmitted. |
341 """ | 358 """ |
342 self._deferoutput = deferoutput | 359 self._deferoutput = deferoutput |
343 self._state = 'idle' | 360 self._state = 'idle' |
344 self._bufferedframegens = [] | 361 self._bufferedframegens = [] |
345 self._activerequestid = None | 362 # request id -> dict of commands that are actively being received. |
346 self._activecommand = None | 363 self._receivingcommands = {} |
347 self._activeargs = None | |
348 self._activedata = None | |
349 self._expectingargs = None | |
350 self._expectingdata = None | |
351 self._activeargname = None | |
352 self._activeargchunks = None | |
353 | 364 |
354 def onframerecv(self, requestid, frametype, frameflags, payload): | 365 def onframerecv(self, requestid, frametype, frameflags, payload): |
355 """Process a frame that has been received off the wire. | 366 """Process a frame that has been received off the wire. |
356 | 367 |
357 Returns a dict with an ``action`` key that details what action, | 368 Returns a dict with an ``action`` key that details what action, |
358 if any, the consumer should take next. | 369 if any, the consumer should take next. |
359 """ | 370 """ |
360 handlers = { | 371 handlers = { |
361 'idle': self._onframeidle, | 372 'idle': self._onframeidle, |
362 'command-receiving-args': self._onframereceivingargs, | 373 'command-receiving': self._onframecommandreceiving, |
363 'command-receiving-data': self._onframereceivingdata, | |
364 'errored': self._onframeerrored, | 374 'errored': self._onframeerrored, |
365 } | 375 } |
366 | 376 |
367 meth = handlers.get(self._state) | 377 meth = handlers.get(self._state) |
368 if not meth: | 378 if not meth: |
389 """Signals that end of input has been received. | 399 """Signals that end of input has been received. |
390 | 400 |
391 No more frames will be received. All pending activity should be | 401 No more frames will be received. All pending activity should be |
392 completed. | 402 completed. |
393 """ | 403 """ |
404 # TODO should we do anything about in-flight commands? | |
405 | |
394 if not self._deferoutput or not self._bufferedframegens: | 406 if not self._deferoutput or not self._bufferedframegens: |
395 return 'noop', {} | 407 return 'noop', {} |
396 | 408 |
397 # If we buffered all our responses, emit those. | 409 # If we buffered all our responses, emit those. |
398 def makegen(): | 410 def makegen(): |
412 def _makeerrorresult(self, msg): | 424 def _makeerrorresult(self, msg): |
413 return 'error', { | 425 return 'error', { |
414 'message': msg, | 426 'message': msg, |
415 } | 427 } |
416 | 428 |
417 def _makeruncommandresult(self): | 429 def _makeruncommandresult(self, requestid): |
430 entry = self._receivingcommands[requestid] | |
431 del self._receivingcommands[requestid] | |
432 | |
433 if self._receivingcommands: | |
434 self._state = 'command-receiving' | |
435 else: | |
436 self._state = 'idle' | |
437 | |
418 return 'runcommand', { | 438 return 'runcommand', { |
419 'requestid': self._activerequestid, | 439 'requestid': requestid, |
420 'command': self._activecommand, | 440 'command': entry['command'], |
421 'args': self._activeargs, | 441 'args': entry['args'], |
422 'data': self._activedata.getvalue() if self._activedata else None, | 442 'data': entry['data'].getvalue() if entry['data'] else None, |
423 } | 443 } |
424 | 444 |
425 def _makewantframeresult(self): | 445 def _makewantframeresult(self): |
426 return 'wantframe', { | 446 return 'wantframe', { |
427 'state': self._state, | 447 'state': self._state, |
433 if frametype != FRAME_TYPE_COMMAND_NAME: | 453 if frametype != FRAME_TYPE_COMMAND_NAME: |
434 self._state = 'errored' | 454 self._state = 'errored' |
435 return self._makeerrorresult( | 455 return self._makeerrorresult( |
436 _('expected command frame; got %d') % frametype) | 456 _('expected command frame; got %d') % frametype) |
437 | 457 |
438 self._activerequestid = requestid | 458 if requestid in self._receivingcommands: |
439 self._activecommand = payload | 459 self._state = 'errored' |
440 self._activeargs = {} | 460 return self._makeerrorresult( |
441 self._activedata = None | 461 _('request with ID %d already received') % requestid) |
462 | |
463 expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS) | |
464 expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA) | |
465 | |
466 self._receivingcommands[requestid] = { | |
467 'command': payload, | |
468 'args': {}, | |
469 'data': None, | |
470 'expectingargs': expectingargs, | |
471 'expectingdata': expectingdata, | |
472 } | |
442 | 473 |
443 if frameflags & FLAG_COMMAND_NAME_EOS: | 474 if frameflags & FLAG_COMMAND_NAME_EOS: |
444 return self._makeruncommandresult() | 475 return self._makeruncommandresult(requestid) |
445 | 476 |
446 self._expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS) | 477 if expectingargs or expectingdata: |
447 self._expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA) | 478 self._state = 'command-receiving' |
448 | |
449 if self._expectingargs: | |
450 self._state = 'command-receiving-args' | |
451 return self._makewantframeresult() | |
452 elif self._expectingdata: | |
453 self._activedata = util.bytesio() | |
454 self._state = 'command-receiving-data' | |
455 return self._makewantframeresult() | 479 return self._makewantframeresult() |
456 else: | 480 else: |
457 self._state = 'errored' | 481 self._state = 'errored' |
458 return self._makeerrorresult(_('missing frame flags on ' | 482 return self._makeerrorresult(_('missing frame flags on ' |
459 'command frame')) | 483 'command frame')) |
460 | 484 |
461 def _onframereceivingargs(self, requestid, frametype, frameflags, payload): | 485 def _onframecommandreceiving(self, requestid, frametype, frameflags, |
462 if frametype != FRAME_TYPE_COMMAND_ARGUMENT: | 486 payload): |
463 self._state = 'errored' | 487 # It could be a new command request. Process it as such. |
464 return self._makeerrorresult(_('expected command argument ' | 488 if frametype == FRAME_TYPE_COMMAND_NAME: |
465 'frame; got %d') % frametype) | 489 return self._onframeidle(requestid, frametype, frameflags, payload) |
490 | |
491 # All other frames should be related to a command that is currently | |
492 # receiving. | |
493 if requestid not in self._receivingcommands: | |
494 self._state = 'errored' | |
495 return self._makeerrorresult( | |
496 _('received frame for request that is not receiving: %d') % | |
497 requestid) | |
498 | |
499 entry = self._receivingcommands[requestid] | |
500 | |
501 if frametype == FRAME_TYPE_COMMAND_ARGUMENT: | |
502 if not entry['expectingargs']: | |
503 self._state = 'errored' | |
504 return self._makeerrorresult(_( | |
505 'received command argument frame for request that is not ' | |
506 'expecting arguments: %d') % requestid) | |
507 | |
508 return self._handlecommandargsframe(requestid, entry, frametype, | |
509 frameflags, payload) | |
510 | |
511 elif frametype == FRAME_TYPE_COMMAND_DATA: | |
512 if not entry['expectingdata']: | |
513 self._state = 'errored' | |
514 return self._makeerrorresult(_( | |
515 'received command data frame for request that is not ' | |
516 'expecting data: %d') % requestid) | |
517 | |
518 if entry['data'] is None: | |
519 entry['data'] = util.bytesio() | |
520 | |
521 return self._handlecommanddataframe(requestid, entry, frametype, | |
522 frameflags, payload) | |
523 | |
524 def _handlecommandargsframe(self, requestid, entry, frametype, frameflags, | |
525 payload): | |
526 # The frame and state of command should have already been validated. | |
527 assert frametype == FRAME_TYPE_COMMAND_ARGUMENT | |
466 | 528 |
467 offset = 0 | 529 offset = 0 |
468 namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload) | 530 namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload) |
469 offset += ARGUMENT_FRAME_HEADER.size | 531 offset += ARGUMENT_FRAME_HEADER.size |
470 | 532 |
481 | 543 |
482 # Argument value spans multiple frames. Record our active state | 544 # Argument value spans multiple frames. Record our active state |
483 # and wait for the next frame. | 545 # and wait for the next frame. |
484 if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION: | 546 if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION: |
485 raise error.ProgrammingError('not yet implemented') | 547 raise error.ProgrammingError('not yet implemented') |
486 self._activeargname = argname | |
487 self._activeargchunks = [argvalue] | |
488 self._state = 'command-arg-continuation' | |
489 return self._makewantframeresult() | |
490 | 548 |
491 # Common case: the argument value is completely contained in this | 549 # Common case: the argument value is completely contained in this |
492 # frame. | 550 # frame. |
493 | 551 |
494 if len(argvalue) != valuesize: | 552 if len(argvalue) != valuesize: |
495 self._state = 'errored' | 553 self._state = 'errored' |
496 return self._makeerrorresult(_('malformed argument frame: ' | 554 return self._makeerrorresult(_('malformed argument frame: ' |
497 'partial argument value')) | 555 'partial argument value')) |
498 | 556 |
499 self._activeargs[argname] = argvalue | 557 entry['args'][argname] = argvalue |
500 | 558 |
501 if frameflags & FLAG_COMMAND_ARGUMENT_EOA: | 559 if frameflags & FLAG_COMMAND_ARGUMENT_EOA: |
502 if self._expectingdata: | 560 if entry['expectingdata']: |
503 self._state = 'command-receiving-data' | |
504 self._activedata = util.bytesio() | |
505 # TODO signal request to run a command once we don't | 561 # TODO signal request to run a command once we don't |
506 # buffer data frames. | 562 # buffer data frames. |
507 return self._makewantframeresult() | 563 return self._makewantframeresult() |
508 else: | 564 else: |
509 self._state = 'waiting' | 565 return self._makeruncommandresult(requestid) |
510 return self._makeruncommandresult() | |
511 else: | 566 else: |
512 return self._makewantframeresult() | 567 return self._makewantframeresult() |
513 | 568 |
514 def _onframereceivingdata(self, requestid, frametype, frameflags, payload): | 569 def _handlecommanddataframe(self, requestid, entry, frametype, frameflags, |
515 if frametype != FRAME_TYPE_COMMAND_DATA: | 570 payload): |
516 self._state = 'errored' | 571 assert frametype == FRAME_TYPE_COMMAND_DATA |
517 return self._makeerrorresult(_('expected command data frame; ' | |
518 'got %d') % frametype) | |
519 | 572 |
520 # TODO support streaming data instead of buffering it. | 573 # TODO support streaming data instead of buffering it. |
521 self._activedata.write(payload) | 574 entry['data'].write(payload) |
522 | 575 |
523 if frameflags & FLAG_COMMAND_DATA_CONTINUATION: | 576 if frameflags & FLAG_COMMAND_DATA_CONTINUATION: |
524 return self._makewantframeresult() | 577 return self._makewantframeresult() |
525 elif frameflags & FLAG_COMMAND_DATA_EOS: | 578 elif frameflags & FLAG_COMMAND_DATA_EOS: |
526 self._activedata.seek(0) | 579 entry['data'].seek(0) |
527 self._state = 'idle' | 580 return self._makeruncommandresult(requestid) |
528 return self._makeruncommandresult() | |
529 else: | 581 else: |
530 self._state = 'errored' | 582 self._state = 'errored' |
531 return self._makeerrorresult(_('command data frame without ' | 583 return self._makeerrorresult(_('command data frame without ' |
532 'flags')) | 584 'flags')) |
533 | 585 |