Mercurial > public > mercurial-scm > hg
comparison mercurial/wireprotoserver.py @ 37289:5fadc63ac99f
wireproto: explicit API to create outgoing streams
It is better to create outgoing streams through the reactor so the
reactor knows about what streams are active and can track them
accordingly.
Test output changes slightly because frames from subsequent responses
no longer have the "stream begin" stream flag set because the stream
is now used across all responses.
Differential Revision: https://phab.mercurial-scm.org/D2947
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Mon, 26 Mar 2018 13:59:56 -0700 |
parents | 9bfcbe4f4745 |
children | 3d0e2cd86e05 |
comparison
equal
deleted
inserted
replaced
37288:9bfcbe4f4745 | 37289:5fadc63ac99f |
---|---|
430 # the entire request is transmitted. Figure out a way to indicate support | 430 # the entire request is transmitted. Figure out a way to indicate support |
431 # for that so we can opt into full duplex mode. | 431 # for that so we can opt into full duplex mode. |
432 reactor = wireprotoframing.serverreactor(deferoutput=True) | 432 reactor = wireprotoframing.serverreactor(deferoutput=True) |
433 seencommand = False | 433 seencommand = False |
434 | 434 |
435 outstream = reactor.makeoutputstream() | |
436 | |
435 while True: | 437 while True: |
436 frame = wireprotoframing.readframe(req.bodyfh) | 438 frame = wireprotoframing.readframe(req.bodyfh) |
437 if not frame: | 439 if not frame: |
438 break | 440 break |
439 | 441 |
442 if action == 'wantframe': | 444 if action == 'wantframe': |
443 # Need more data before we can do anything. | 445 # Need more data before we can do anything. |
444 continue | 446 continue |
445 elif action == 'runcommand': | 447 elif action == 'runcommand': |
446 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm, | 448 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm, |
447 reqcommand, reactor, meta, | 449 reqcommand, reactor, outstream, |
448 issubsequent=seencommand) | 450 meta, issubsequent=seencommand) |
449 | 451 |
450 if sentoutput: | 452 if sentoutput: |
451 return | 453 return |
452 | 454 |
453 seencommand = True | 455 seencommand = True |
474 else: | 476 else: |
475 raise error.ProgrammingError('unhandled action from frame processor: %s' | 477 raise error.ProgrammingError('unhandled action from frame processor: %s' |
476 % action) | 478 % action) |
477 | 479 |
478 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor, | 480 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor, |
479 command, issubsequent): | 481 outstream, command, issubsequent): |
480 """Dispatch a wire protocol command made from HTTPv2 requests. | 482 """Dispatch a wire protocol command made from HTTPv2 requests. |
481 | 483 |
482 The authenticated permission (``authedperm``) along with the original | 484 The authenticated permission (``authedperm``) along with the original |
483 command from the URL (``reqcommand``) are passed in. | 485 command from the URL (``reqcommand``) are passed in. |
484 """ | 486 """ |
544 | 546 |
545 rsp = wireproto.dispatch(repo, proto, command['command']) | 547 rsp = wireproto.dispatch(repo, proto, command['command']) |
546 | 548 |
547 res.status = b'200 OK' | 549 res.status = b'200 OK' |
548 res.headers[b'Content-Type'] = FRAMINGTYPE | 550 res.headers[b'Content-Type'] = FRAMINGTYPE |
549 stream = wireprotoframing.stream(2) | |
550 | 551 |
551 if isinstance(rsp, wireprototypes.bytesresponse): | 552 if isinstance(rsp, wireprototypes.bytesresponse): |
552 action, meta = reactor.onbytesresponseready(stream, | 553 action, meta = reactor.onbytesresponseready(outstream, |
553 command['requestid'], | 554 command['requestid'], |
554 rsp.data) | 555 rsp.data) |
555 else: | 556 else: |
556 action, meta = reactor.onapplicationerror( | 557 action, meta = reactor.onapplicationerror( |
557 _('unhandled response type from wire proto command')) | 558 _('unhandled response type from wire proto command')) |