comparison mercurial/wireprotoframing.py @ 40135:966b5f7fd30b

wireprotov2: remove functions for creating response frames from bytes All code in the actual server uses oncommandresponsereadyobjects(). Test code was ported to that method. This resulted in a handful of subtle test changes. Differential Revision: https://phab.mercurial-scm.org/D4924
author Gregory Szorc <gregory.szorc@gmail.com>
date Fri, 05 Oct 2018 10:29:36 -0700
parents 762ef19a07e3
children 3a6d6c54bd81
comparison
equal deleted inserted replaced
40134:cfeba1aafb9d 40135:966b5f7fd30b
362 flags=flags, 362 flags=flags,
363 payload=data) 363 payload=data)
364 364
365 if done: 365 if done:
366 break 366 break
367
368 def createcommandresponseframesfrombytes(stream, requestid, data,
369 maxframesize=DEFAULT_MAX_FRAME_SIZE):
370 """Create a raw frame to send a bytes response from static bytes input.
371
372 Returns a generator of bytearrays.
373 """
374 # Automatically send the overall CBOR response map.
375 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
376 if len(overall) > maxframesize:
377 raise error.ProgrammingError('not yet implemented')
378
379 # Simple case where we can fit the full response in a single frame.
380 if len(overall) + len(data) <= maxframesize:
381 flags = FLAG_COMMAND_RESPONSE_EOS
382 yield stream.makeframe(requestid=requestid,
383 typeid=FRAME_TYPE_COMMAND_RESPONSE,
384 flags=flags,
385 payload=overall + data)
386 return
387
388 # It's easier to send the overall CBOR map in its own frame than to track
389 # offsets.
390 yield stream.makeframe(requestid=requestid,
391 typeid=FRAME_TYPE_COMMAND_RESPONSE,
392 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
393 payload=overall)
394
395 offset = 0
396 while True:
397 chunk = data[offset:offset + maxframesize]
398 offset += len(chunk)
399 done = offset == len(data)
400
401 if done:
402 flags = FLAG_COMMAND_RESPONSE_EOS
403 else:
404 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
405
406 yield stream.makeframe(requestid=requestid,
407 typeid=FRAME_TYPE_COMMAND_RESPONSE,
408 flags=flags,
409 payload=chunk)
410
411 if done:
412 break
413
414 def createbytesresponseframesfromgen(stream, requestid, gen,
415 maxframesize=DEFAULT_MAX_FRAME_SIZE):
416 """Generator of frames from a generator of byte chunks.
417
418 This assumes that another frame will follow whatever this emits. i.e.
419 this always emits the continuation flag and never emits the end-of-stream
420 flag.
421 """
422 cb = util.chunkbuffer(gen)
423 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
424
425 while True:
426 chunk = cb.read(maxframesize)
427 if not chunk:
428 break
429
430 yield stream.makeframe(requestid=requestid,
431 typeid=FRAME_TYPE_COMMAND_RESPONSE,
432 flags=flags,
433 payload=chunk)
434
435 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
436 367
437 def createcommandresponseokframe(stream, requestid): 368 def createcommandresponseokframe(stream, requestid):
438 overall = b''.join(cborutil.streamencode({b'status': b'ok'})) 369 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
439 370
440 return stream.makeframe(requestid=requestid, 371 return stream.makeframe(requestid=requestid,
1018 if not meth: 949 if not meth:
1019 raise error.ProgrammingError('unhandled state: %s' % self._state) 950 raise error.ProgrammingError('unhandled state: %s' % self._state)
1020 951
1021 return meth(frame) 952 return meth(frame)
1022 953
1023 def oncommandresponseready(self, stream, requestid, data):
1024 """Signal that a bytes response is ready to be sent to the client.
1025
1026 The raw bytes response is passed as an argument.
1027 """
1028 ensureserverstream(stream)
1029
1030 def sendframes():
1031 for frame in createcommandresponseframesfrombytes(stream, requestid,
1032 data):
1033 yield frame
1034
1035 self._activecommands.remove(requestid)
1036
1037 result = sendframes()
1038
1039 if self._deferoutput:
1040 self._bufferedframegens.append(result)
1041 return 'noop', {}
1042 else:
1043 return 'sendframes', {
1044 'framegen': result,
1045 }
1046
1047 def oncommandresponsereadyobjects(self, stream, requestid, objs): 954 def oncommandresponsereadyobjects(self, stream, requestid, objs):
1048 """Signal that objects are ready to be sent to the client. 955 """Signal that objects are ready to be sent to the client.
1049 956
1050 ``objs`` is an iterable of objects (typically a generator) that will 957 ``objs`` is an iterable of objects (typically a generator) that will
1051 be encoded via CBOR and added to frames, which will be sent to the 958 be encoded via CBOR and added to frames, which will be sent to the
1052 client. 959 client.
1053 """ 960 """
1054 ensureserverstream(stream) 961 ensureserverstream(stream)
962
963 # A more robust solution would be to check for objs.{next,__next__}.
964 if isinstance(objs, list):
965 objs = iter(objs)
1055 966
1056 # We need to take care over exception handling. Uncaught exceptions 967 # We need to take care over exception handling. Uncaught exceptions
1057 # when generating frames could lead to premature end of the frame 968 # when generating frames could lead to premature end of the frame
1058 # stream and the possibility of the server or client process getting 969 # stream and the possibility of the server or client process getting
1059 # in a bad state. 970 # in a bad state.