comparison mercurial/wireprotoserver.py @ 37289:5fadc63ac99f

wireproto: explicit API to create outgoing streams It is better to create outgoing streams through the reactor so the reactor knows about what streams are active and can track them accordingly. Test output changes slightly because frames from subsequent responses no longer have the "stream begin" stream flag set because the stream is now used across all responses. Differential Revision: https://phab.mercurial-scm.org/D2947
author Gregory Szorc <gregory.szorc@gmail.com>
date Mon, 26 Mar 2018 13:59:56 -0700
parents 9bfcbe4f4745
children 3d0e2cd86e05
comparison
equal deleted inserted replaced
37288:9bfcbe4f4745 37289:5fadc63ac99f
430 # the entire request is transmitted. Figure out a way to indicate support 430 # the entire request is transmitted. Figure out a way to indicate support
431 # for that so we can opt into full duplex mode. 431 # for that so we can opt into full duplex mode.
432 reactor = wireprotoframing.serverreactor(deferoutput=True) 432 reactor = wireprotoframing.serverreactor(deferoutput=True)
433 seencommand = False 433 seencommand = False
434 434
435 outstream = reactor.makeoutputstream()
436
435 while True: 437 while True:
436 frame = wireprotoframing.readframe(req.bodyfh) 438 frame = wireprotoframing.readframe(req.bodyfh)
437 if not frame: 439 if not frame:
438 break 440 break
439 441
442 if action == 'wantframe': 444 if action == 'wantframe':
443 # Need more data before we can do anything. 445 # Need more data before we can do anything.
444 continue 446 continue
445 elif action == 'runcommand': 447 elif action == 'runcommand':
446 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm, 448 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
447 reqcommand, reactor, meta, 449 reqcommand, reactor, outstream,
448 issubsequent=seencommand) 450 meta, issubsequent=seencommand)
449 451
450 if sentoutput: 452 if sentoutput:
451 return 453 return
452 454
453 seencommand = True 455 seencommand = True
474 else: 476 else:
475 raise error.ProgrammingError('unhandled action from frame processor: %s' 477 raise error.ProgrammingError('unhandled action from frame processor: %s'
476 % action) 478 % action)
477 479
478 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor, 480 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
479 command, issubsequent): 481 outstream, command, issubsequent):
480 """Dispatch a wire protocol command made from HTTPv2 requests. 482 """Dispatch a wire protocol command made from HTTPv2 requests.
481 483
482 The authenticated permission (``authedperm``) along with the original 484 The authenticated permission (``authedperm``) along with the original
483 command from the URL (``reqcommand``) are passed in. 485 command from the URL (``reqcommand``) are passed in.
484 """ 486 """
544 546
545 rsp = wireproto.dispatch(repo, proto, command['command']) 547 rsp = wireproto.dispatch(repo, proto, command['command'])
546 548
547 res.status = b'200 OK' 549 res.status = b'200 OK'
548 res.headers[b'Content-Type'] = FRAMINGTYPE 550 res.headers[b'Content-Type'] = FRAMINGTYPE
549 stream = wireprotoframing.stream(2)
550 551
551 if isinstance(rsp, wireprototypes.bytesresponse): 552 if isinstance(rsp, wireprototypes.bytesresponse):
552 action, meta = reactor.onbytesresponseready(stream, 553 action, meta = reactor.onbytesresponseready(outstream,
553 command['requestid'], 554 command['requestid'],
554 rsp.data) 555 rsp.data)
555 else: 556 else:
556 action, meta = reactor.onapplicationerror( 557 action, meta = reactor.onapplicationerror(
557 _('unhandled response type from wire proto command')) 558 _('unhandled response type from wire proto command'))