Mercurial > public > mercurial-scm > hg
comparison mercurial/wireprotoframing.py @ 40135:966b5f7fd30b
wireprotov2: remove functions for creating response frames from bytes
All code in the actual server uses oncommandresponsereadyobjects().
Test code was ported to that method. This resulted in a handful of
subtle test changes.
Differential Revision: https://phab.mercurial-scm.org/D4924
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Fri, 05 Oct 2018 10:29:36 -0700 |
parents | 762ef19a07e3 |
children | 3a6d6c54bd81 |
comparison
equal
deleted
inserted
replaced
40134:cfeba1aafb9d | 40135:966b5f7fd30b |
---|---|
362 flags=flags, | 362 flags=flags, |
363 payload=data) | 363 payload=data) |
364 | 364 |
365 if done: | 365 if done: |
366 break | 366 break |
367 | |
368 def createcommandresponseframesfrombytes(stream, requestid, data, | |
369 maxframesize=DEFAULT_MAX_FRAME_SIZE): | |
370 """Create a raw frame to send a bytes response from static bytes input. | |
371 | |
372 Returns a generator of bytearrays. | |
373 """ | |
374 # Automatically send the overall CBOR response map. | |
375 overall = b''.join(cborutil.streamencode({b'status': b'ok'})) | |
376 if len(overall) > maxframesize: | |
377 raise error.ProgrammingError('not yet implemented') | |
378 | |
379 # Simple case where we can fit the full response in a single frame. | |
380 if len(overall) + len(data) <= maxframesize: | |
381 flags = FLAG_COMMAND_RESPONSE_EOS | |
382 yield stream.makeframe(requestid=requestid, | |
383 typeid=FRAME_TYPE_COMMAND_RESPONSE, | |
384 flags=flags, | |
385 payload=overall + data) | |
386 return | |
387 | |
388 # It's easier to send the overall CBOR map in its own frame than to track | |
389 # offsets. | |
390 yield stream.makeframe(requestid=requestid, | |
391 typeid=FRAME_TYPE_COMMAND_RESPONSE, | |
392 flags=FLAG_COMMAND_RESPONSE_CONTINUATION, | |
393 payload=overall) | |
394 | |
395 offset = 0 | |
396 while True: | |
397 chunk = data[offset:offset + maxframesize] | |
398 offset += len(chunk) | |
399 done = offset == len(data) | |
400 | |
401 if done: | |
402 flags = FLAG_COMMAND_RESPONSE_EOS | |
403 else: | |
404 flags = FLAG_COMMAND_RESPONSE_CONTINUATION | |
405 | |
406 yield stream.makeframe(requestid=requestid, | |
407 typeid=FRAME_TYPE_COMMAND_RESPONSE, | |
408 flags=flags, | |
409 payload=chunk) | |
410 | |
411 if done: | |
412 break | |
413 | |
414 def createbytesresponseframesfromgen(stream, requestid, gen, | |
415 maxframesize=DEFAULT_MAX_FRAME_SIZE): | |
416 """Generator of frames from a generator of byte chunks. | |
417 | |
418 This assumes that another frame will follow whatever this emits. i.e. | |
419 this always emits the continuation flag and never emits the end-of-stream | |
420 flag. | |
421 """ | |
422 cb = util.chunkbuffer(gen) | |
423 flags = FLAG_COMMAND_RESPONSE_CONTINUATION | |
424 | |
425 while True: | |
426 chunk = cb.read(maxframesize) | |
427 if not chunk: | |
428 break | |
429 | |
430 yield stream.makeframe(requestid=requestid, | |
431 typeid=FRAME_TYPE_COMMAND_RESPONSE, | |
432 flags=flags, | |
433 payload=chunk) | |
434 | |
435 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION | |
436 | 367 |
437 def createcommandresponseokframe(stream, requestid): | 368 def createcommandresponseokframe(stream, requestid): |
438 overall = b''.join(cborutil.streamencode({b'status': b'ok'})) | 369 overall = b''.join(cborutil.streamencode({b'status': b'ok'})) |
439 | 370 |
440 return stream.makeframe(requestid=requestid, | 371 return stream.makeframe(requestid=requestid, |
1018 if not meth: | 949 if not meth: |
1019 raise error.ProgrammingError('unhandled state: %s' % self._state) | 950 raise error.ProgrammingError('unhandled state: %s' % self._state) |
1020 | 951 |
1021 return meth(frame) | 952 return meth(frame) |
1022 | 953 |
1023 def oncommandresponseready(self, stream, requestid, data): | |
1024 """Signal that a bytes response is ready to be sent to the client. | |
1025 | |
1026 The raw bytes response is passed as an argument. | |
1027 """ | |
1028 ensureserverstream(stream) | |
1029 | |
1030 def sendframes(): | |
1031 for frame in createcommandresponseframesfrombytes(stream, requestid, | |
1032 data): | |
1033 yield frame | |
1034 | |
1035 self._activecommands.remove(requestid) | |
1036 | |
1037 result = sendframes() | |
1038 | |
1039 if self._deferoutput: | |
1040 self._bufferedframegens.append(result) | |
1041 return 'noop', {} | |
1042 else: | |
1043 return 'sendframes', { | |
1044 'framegen': result, | |
1045 } | |
1046 | |
1047 def oncommandresponsereadyobjects(self, stream, requestid, objs): | 954 def oncommandresponsereadyobjects(self, stream, requestid, objs): |
1048 """Signal that objects are ready to be sent to the client. | 955 """Signal that objects are ready to be sent to the client. |
1049 | 956 |
1050 ``objs`` is an iterable of objects (typically a generator) that will | 957 ``objs`` is an iterable of objects (typically a generator) that will |
1051 be encoded via CBOR and added to frames, which will be sent to the | 958 be encoded via CBOR and added to frames, which will be sent to the |
1052 client. | 959 client. |
1053 """ | 960 """ |
1054 ensureserverstream(stream) | 961 ensureserverstream(stream) |
962 | |
963 # A more robust solution would be to check for objs.{next,__next__}. | |
964 if isinstance(objs, list): | |
965 objs = iter(objs) | |
1055 | 966 |
1056 # We need to take care over exception handling. Uncaught exceptions | 967 # We need to take care over exception handling. Uncaught exceptions |
1057 # when generating frames could lead to premature end of the frame | 968 # when generating frames could lead to premature end of the frame |
1058 # stream and the possibility of the server or client process getting | 969 # stream and the possibility of the server or client process getting |
1059 # in a bad state. | 970 # in a bad state. |