mercurial/wireprotoframing.py
changeset 37728 564a3eec6e63
parent 37726 0c184ca594bb
child 39446 36f487a332ad
equal deleted inserted replaced
37727:5cdde6158426 37728:564a3eec6e63
   383                                flags=flags,
   383                                flags=flags,
   384                                payload=chunk)
   384                                payload=chunk)
   385 
   385 
   386         if done:
   386         if done:
   387             break
   387             break
       
   388 
       
   389 def createbytesresponseframesfromgen(stream, requestid, gen,
       
   390                                      maxframesize=DEFAULT_MAX_FRAME_SIZE):
       
   391     overall = cbor.dumps({b'status': b'ok'}, canonical=True)
       
   392 
       
   393     yield stream.makeframe(requestid=requestid,
       
   394                            typeid=FRAME_TYPE_COMMAND_RESPONSE,
       
   395                            flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
       
   396                            payload=overall)
       
   397 
       
   398     cb = util.chunkbuffer(gen)
       
   399 
       
   400     flags = 0
       
   401 
       
   402     while True:
       
   403         chunk = cb.read(maxframesize)
       
   404         if not chunk:
       
   405             break
       
   406 
       
   407         yield stream.makeframe(requestid=requestid,
       
   408                                typeid=FRAME_TYPE_COMMAND_RESPONSE,
       
   409                                flags=flags,
       
   410                                payload=chunk)
       
   411 
       
   412         flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
       
   413 
       
   414     flags ^= FLAG_COMMAND_RESPONSE_CONTINUATION
       
   415     flags |= FLAG_COMMAND_RESPONSE_EOS
       
   416     yield stream.makeframe(requestid=requestid,
       
   417                            typeid=FRAME_TYPE_COMMAND_RESPONSE,
       
   418                            flags=flags,
       
   419                            payload=b'')
       
   420 
       
   421 def createcommanderrorresponse(stream, requestid, message, args=None):
       
   422     m = {
       
   423         b'status': b'error',
       
   424         b'error': {
       
   425             b'message': message,
       
   426         }
       
   427     }
       
   428 
       
   429     if args:
       
   430         m[b'error'][b'args'] = args
       
   431 
       
   432     overall = cbor.dumps(m, canonical=True)
       
   433 
       
   434     yield stream.makeframe(requestid=requestid,
       
   435                            typeid=FRAME_TYPE_COMMAND_RESPONSE,
       
   436                            flags=FLAG_COMMAND_RESPONSE_EOS,
       
   437                            payload=overall)
   388 
   438 
   389 def createerrorframe(stream, requestid, msg, errtype):
   439 def createerrorframe(stream, requestid, msg, errtype):
   390     # TODO properly handle frame size limits.
   440     # TODO properly handle frame size limits.
   391     assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
   441     assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
   392 
   442 
   632         else:
   682         else:
   633             return 'sendframes', {
   683             return 'sendframes', {
   634                 'framegen': result,
   684                 'framegen': result,
   635             }
   685             }
   636 
   686 
       
   687     def oncommandresponsereadygen(self, stream, requestid, gen):
       
   688         """Signal that a bytes response is ready, with data as a generator."""
       
   689         ensureserverstream(stream)
       
   690 
       
   691         def sendframes():
       
   692             for frame in createbytesresponseframesfromgen(stream, requestid,
       
   693                                                           gen):
       
   694                 yield frame
       
   695 
       
   696             self._activecommands.remove(requestid)
       
   697 
       
   698         return self._handlesendframes(sendframes())
       
   699 
   637     def oninputeof(self):
   700     def oninputeof(self):
   638         """Signals that end of input has been received.
   701         """Signals that end of input has been received.
   639 
   702 
   640         No more frames will be received. All pending activity should be
   703         No more frames will be received. All pending activity should be
   641         completed.
   704         completed.
   653 
   716 
   654         return 'sendframes', {
   717         return 'sendframes', {
   655             'framegen': makegen(),
   718             'framegen': makegen(),
   656         }
   719         }
   657 
   720 
       
   721     def _handlesendframes(self, framegen):
       
   722         if self._deferoutput:
       
   723             self._bufferedframegens.append(framegen)
       
   724             return 'noop', {}
       
   725         else:
       
   726             return 'sendframes', {
       
   727                 'framegen': framegen,
       
   728             }
       
   729 
   658     def onservererror(self, stream, requestid, msg):
   730     def onservererror(self, stream, requestid, msg):
   659         ensureserverstream(stream)
   731         ensureserverstream(stream)
   660 
   732 
   661         return 'sendframes', {
   733         def sendframes():
   662             'framegen': createerrorframe(stream, requestid, msg,
   734             for frame in createerrorframe(stream, requestid, msg,
   663                                          errtype='server'),
   735                                           errtype='server'):
   664         }
   736                 yield frame
       
   737 
       
   738             self._activecommands.remove(requestid)
       
   739 
       
   740         return self._handlesendframes(sendframes())
       
   741 
       
   742     def oncommanderror(self, stream, requestid, message, args=None):
       
   743         """Called when a command encountered an error before sending output."""
       
   744         ensureserverstream(stream)
       
   745 
       
   746         def sendframes():
       
   747             for frame in createcommanderrorresponse(stream, requestid, message,
       
   748                                                     args):
       
   749                 yield frame
       
   750 
       
   751             self._activecommands.remove(requestid)
       
   752 
       
   753         return self._handlesendframes(sendframes())
   665 
   754 
   666     def makeoutputstream(self):
   755     def makeoutputstream(self):
   667         """Create a stream to be used for sending data to the client."""
   756         """Create a stream to be used for sending data to the client."""
   668         streamid = self._nextoutgoingstreamid
   757         streamid = self._nextoutgoingstreamid
   669         self._nextoutgoingstreamid += 2
   758         self._nextoutgoingstreamid += 2