contrib/python-zstandard/tests/test_decompressor.py
changeset 31796 e0dc40530c5a
parent 30895 c32454d69b85
child 37495 b1fb341d8a61
equal deleted inserted replaced
31795:2b130e26c3a4 31796:e0dc40530c5a
    47     def test_content_size_present(self):
    47     def test_content_size_present(self):
    48         cctx = zstd.ZstdCompressor(write_content_size=True)
    48         cctx = zstd.ZstdCompressor(write_content_size=True)
    49         compressed = cctx.compress(b'foobar')
    49         compressed = cctx.compress(b'foobar')
    50 
    50 
    51         dctx = zstd.ZstdDecompressor()
    51         dctx = zstd.ZstdDecompressor()
    52         decompressed  = dctx.decompress(compressed)
    52         decompressed = dctx.decompress(compressed)
    53         self.assertEqual(decompressed, b'foobar')
    53         self.assertEqual(decompressed, b'foobar')
    54 
    54 
    55     def test_max_output_size(self):
    55     def test_max_output_size(self):
    56         cctx = zstd.ZstdCompressor(write_content_size=False)
    56         cctx = zstd.ZstdCompressor(write_content_size=False)
    57         source = b'foobar' * 256
    57         source = b'foobar' * 256
   290             s = struct.Struct('>B')
   290             s = struct.Struct('>B')
   291             for c in source:
   291             for c in source:
   292                 if not isinstance(c, str):
   292                 if not isinstance(c, str):
   293                     c = s.pack(c)
   293                     c = s.pack(c)
   294                 decompressor.write(c)
   294                 decompressor.write(c)
   295 
       
   296 
   295 
   297         self.assertEqual(dest.getvalue(), b'foobarfoobar')
   296         self.assertEqual(dest.getvalue(), b'foobarfoobar')
   298         self.assertEqual(dest._write_count, len(dest.getvalue()))
   297         self.assertEqual(dest._write_count, len(dest.getvalue()))
   299 
   298 
   300 
   299 
   573             chain = chunks[0:i]
   572             chain = chunks[0:i]
   574             expected = original[i - 1]
   573             expected = original[i - 1]
   575             dctx = zstd.ZstdDecompressor()
   574             dctx = zstd.ZstdDecompressor()
   576             decompressed = dctx.decompress_content_dict_chain(chain)
   575             decompressed = dctx.decompress_content_dict_chain(chain)
   577             self.assertEqual(decompressed, expected)
   576             self.assertEqual(decompressed, expected)
       
   577 
       
   578 
       
   579 # TODO enable for CFFI
       
   580 class TestDecompressor_multi_decompress_to_buffer(unittest.TestCase):
       
   581     def test_invalid_inputs(self):
       
   582         dctx = zstd.ZstdDecompressor()
       
   583 
       
   584         with self.assertRaises(TypeError):
       
   585             dctx.multi_decompress_to_buffer(True)
       
   586 
       
   587         with self.assertRaises(TypeError):
       
   588             dctx.multi_decompress_to_buffer((1, 2))
       
   589 
       
   590         with self.assertRaisesRegexp(TypeError, 'item 0 not a bytes like object'):
       
   591             dctx.multi_decompress_to_buffer([u'foo'])
       
   592 
       
   593         with self.assertRaisesRegexp(ValueError, 'could not determine decompressed size of item 0'):
       
   594             dctx.multi_decompress_to_buffer([b'foobarbaz'])
       
   595 
       
   596     def test_list_input(self):
       
   597         cctx = zstd.ZstdCompressor(write_content_size=True)
       
   598 
       
   599         original = [b'foo' * 4, b'bar' * 6]
       
   600         frames = [cctx.compress(d) for d in original]
       
   601 
       
   602         dctx = zstd.ZstdDecompressor()
       
   603         result = dctx.multi_decompress_to_buffer(frames)
       
   604 
       
   605         self.assertEqual(len(result), len(frames))
       
   606         self.assertEqual(result.size(), sum(map(len, original)))
       
   607 
       
   608         for i, data in enumerate(original):
       
   609             self.assertEqual(result[i].tobytes(), data)
       
   610 
       
   611         self.assertEqual(result[0].offset, 0)
       
   612         self.assertEqual(len(result[0]), 12)
       
   613         self.assertEqual(result[1].offset, 12)
       
   614         self.assertEqual(len(result[1]), 18)
       
   615 
       
   616     def test_list_input_frame_sizes(self):
       
   617         cctx = zstd.ZstdCompressor(write_content_size=False)
       
   618 
       
   619         original = [b'foo' * 4, b'bar' * 6, b'baz' * 8]
       
   620         frames = [cctx.compress(d) for d in original]
       
   621         sizes = struct.pack('=' + 'Q' * len(original), *map(len, original))
       
   622 
       
   623         dctx = zstd.ZstdDecompressor()
       
   624         result = dctx.multi_decompress_to_buffer(frames, decompressed_sizes=sizes)
       
   625 
       
   626         self.assertEqual(len(result), len(frames))
       
   627         self.assertEqual(result.size(), sum(map(len, original)))
       
   628 
       
   629         for i, data in enumerate(original):
       
   630             self.assertEqual(result[i].tobytes(), data)
       
   631 
       
   632     def test_buffer_with_segments_input(self):
       
   633         cctx = zstd.ZstdCompressor(write_content_size=True)
       
   634 
       
   635         original = [b'foo' * 4, b'bar' * 6]
       
   636         frames = [cctx.compress(d) for d in original]
       
   637 
       
   638         dctx = zstd.ZstdDecompressor()
       
   639 
       
   640         segments = struct.pack('=QQQQ', 0, len(frames[0]), len(frames[0]), len(frames[1]))
       
   641         b = zstd.BufferWithSegments(b''.join(frames), segments)
       
   642 
       
   643         result = dctx.multi_decompress_to_buffer(b)
       
   644 
       
   645         self.assertEqual(len(result), len(frames))
       
   646         self.assertEqual(result[0].offset, 0)
       
   647         self.assertEqual(len(result[0]), 12)
       
   648         self.assertEqual(result[1].offset, 12)
       
   649         self.assertEqual(len(result[1]), 18)
       
   650 
       
   651     def test_buffer_with_segments_sizes(self):
       
   652         cctx = zstd.ZstdCompressor(write_content_size=False)
       
   653         original = [b'foo' * 4, b'bar' * 6, b'baz' * 8]
       
   654         frames = [cctx.compress(d) for d in original]
       
   655         sizes = struct.pack('=' + 'Q' * len(original), *map(len, original))
       
   656 
       
   657         segments = struct.pack('=QQQQQQ', 0, len(frames[0]),
       
   658                                len(frames[0]), len(frames[1]),
       
   659                                len(frames[0]) + len(frames[1]), len(frames[2]))
       
   660         b = zstd.BufferWithSegments(b''.join(frames), segments)
       
   661 
       
   662         dctx = zstd.ZstdDecompressor()
       
   663         result = dctx.multi_decompress_to_buffer(b, decompressed_sizes=sizes)
       
   664 
       
   665         self.assertEqual(len(result), len(frames))
       
   666         self.assertEqual(result.size(), sum(map(len, original)))
       
   667 
       
   668         for i, data in enumerate(original):
       
   669             self.assertEqual(result[i].tobytes(), data)
       
   670 
       
   671     def test_buffer_with_segments_collection_input(self):
       
   672         cctx = zstd.ZstdCompressor(write_content_size=True)
       
   673 
       
   674         original = [
       
   675             b'foo0' * 2,
       
   676             b'foo1' * 3,
       
   677             b'foo2' * 4,
       
   678             b'foo3' * 5,
       
   679             b'foo4' * 6,
       
   680         ]
       
   681 
       
   682         frames = cctx.multi_compress_to_buffer(original)
       
   683 
       
   684         # Check round trip.
       
   685         dctx = zstd.ZstdDecompressor()
       
   686         decompressed = dctx.multi_decompress_to_buffer(frames, threads=3)
       
   687 
       
   688         self.assertEqual(len(decompressed), len(original))
       
   689 
       
   690         for i, data in enumerate(original):
       
   691             self.assertEqual(data, decompressed[i].tobytes())
       
   692 
       
   693         # And a manual mode.
       
   694         b = b''.join([frames[0].tobytes(), frames[1].tobytes()])
       
   695         b1 = zstd.BufferWithSegments(b, struct.pack('=QQQQ',
       
   696                                                     0, len(frames[0]),
       
   697                                                     len(frames[0]), len(frames[1])))
       
   698 
       
   699         b = b''.join([frames[2].tobytes(), frames[3].tobytes(), frames[4].tobytes()])
       
   700         b2 = zstd.BufferWithSegments(b, struct.pack('=QQQQQQ',
       
   701                                                     0, len(frames[2]),
       
   702                                                     len(frames[2]), len(frames[3]),
       
   703                                                     len(frames[2]) + len(frames[3]), len(frames[4])))
       
   704 
       
   705         c = zstd.BufferWithSegmentsCollection(b1, b2)
       
   706 
       
   707         dctx = zstd.ZstdDecompressor()
       
   708         decompressed = dctx.multi_decompress_to_buffer(c)
       
   709 
       
   710         self.assertEqual(len(decompressed), 5)
       
   711         for i in range(5):
       
   712             self.assertEqual(decompressed[i].tobytes(), original[i])
       
   713 
       
   714     def test_multiple_threads(self):
       
   715         cctx = zstd.ZstdCompressor(write_content_size=True)
       
   716 
       
   717         frames = []
       
   718         frames.extend(cctx.compress(b'x' * 64) for i in range(256))
       
   719         frames.extend(cctx.compress(b'y' * 64) for i in range(256))
       
   720 
       
   721         dctx = zstd.ZstdDecompressor()
       
   722         result = dctx.multi_decompress_to_buffer(frames, threads=-1)
       
   723 
       
   724         self.assertEqual(len(result), len(frames))
       
   725         self.assertEqual(result.size(), 2 * 64 * 256)
       
   726         self.assertEqual(result[0].tobytes(), b'x' * 64)
       
   727         self.assertEqual(result[256].tobytes(), b'y' * 64)
       
   728 
       
   729     def test_item_failure(self):
       
   730         cctx = zstd.ZstdCompressor(write_content_size=True)
       
   731         frames = [cctx.compress(b'x' * 128), cctx.compress(b'y' * 128)]
       
   732 
       
   733         frames[1] = frames[1] + b'extra'
       
   734 
       
   735         dctx = zstd.ZstdDecompressor()
       
   736 
       
   737         with self.assertRaisesRegexp(zstd.ZstdError, 'error decompressing item 1: Src size incorrect'):
       
   738             dctx.multi_decompress_to_buffer(frames)
       
   739 
       
   740         with self.assertRaisesRegexp(zstd.ZstdError, 'error decompressing item 1: Src size incorrect'):
       
   741             dctx.multi_decompress_to_buffer(frames, threads=2)