911 TODO consider adding limits as to the maximum amount of data that can |
911 TODO consider adding limits as to the maximum amount of data that can |
912 be buffered. |
912 be buffered. |
913 """ |
913 """ |
914 def __init__(self): |
914 def __init__(self): |
915 self._decoder = sansiodecoder() |
915 self._decoder = sansiodecoder() |
916 self._leftover = None |
916 self._chunks = [] |
|
917 self._wanted = 0 |
917 |
918 |
918 def decode(self, b): |
919 def decode(self, b): |
919 """Attempt to decode bytes to CBOR values. |
920 """Attempt to decode bytes to CBOR values. |
920 |
921 |
921 Returns a tuple with the following fields: |
922 Returns a tuple with the following fields: |
922 |
923 |
923 * Bool indicating whether new values are available for retrieval. |
924 * Bool indicating whether new values are available for retrieval. |
924 * Integer number of bytes decoded from the new input. |
925 * Integer number of bytes decoded from the new input. |
925 * Integer number of bytes wanted to decode the next value. |
926 * Integer number of bytes wanted to decode the next value. |
926 """ |
927 """ |
927 |
928 # Our strategy for buffering is to aggregate the incoming chunks in a |
928 if self._leftover: |
929 # list until we've received enough data to decode the next item. |
929 oldlen = len(self._leftover) |
930 # This is slightly more complicated than using an ``io.BytesIO`` |
930 b = self._leftover + b |
931 # or continuously concatenating incoming data. However, because it |
931 self._leftover = None |
932 # isn't constantly reallocating backing memory for a growing buffer, |
|
933 # it prevents excessive memory thrashing and is significantly faster, |
|
934 # especially in cases where the percentage of input chunks that don't |
|
935 # decode into a full item is high. |
|
936 |
|
937 if self._chunks: |
|
938 # A previous call said we needed N bytes to decode the next item. |
|
939 # But this call doesn't provide enough data. We buffer the incoming |
|
940 # chunk without attempting to decode. |
|
941 if len(b) < self._wanted: |
|
942 self._chunks.append(b) |
|
943 self._wanted -= len(b) |
|
944 return False, 0, self._wanted |
|
945 |
|
946 # Else we may have enough data to decode the next item. Aggregate |
|
947 # old data with new and reset the buffer. |
|
948 newlen = len(b) |
|
949 self._chunks.append(b) |
|
950 b = b''.join(self._chunks) |
|
951 self._chunks = [] |
|
952 oldlen = len(b) - newlen |
|
953 |
932 else: |
954 else: |
933 b = b |
|
934 oldlen = 0 |
955 oldlen = 0 |
935 |
956 |
936 available, readcount, wanted = self._decoder.decode(b) |
957 available, readcount, wanted = self._decoder.decode(b) |
|
958 self._wanted = wanted |
937 |
959 |
938 if readcount < len(b): |
960 if readcount < len(b): |
939 self._leftover = b[readcount:] |
961 self._chunks.append(b[readcount:]) |
940 |
962 |
941 return available, readcount - oldlen, wanted |
963 return available, readcount - oldlen, wanted |
942 |
964 |
943 def getavailable(self): |
965 def getavailable(self): |
944 return self._decoder.getavailable() |
966 return self._decoder.getavailable() |