contrib/python-zstandard/tests/test_compressor.py
changeset 44147 5e84a96d865b
parent 43994 de7838053207
child 52639 9db77d46de79
equal deleted inserted replaced
44146:45ec64d93b3a 44147:5e84a96d865b
    22 else:
    22 else:
    23     next = lambda it: it.next()
    23     next = lambda it: it.next()
    24 
    24 
    25 
    25 
    26 def multithreaded_chunk_size(level, source_size=0):
    26 def multithreaded_chunk_size(level, source_size=0):
    27     params = zstd.ZstdCompressionParameters.from_level(level, source_size=source_size)
    27     params = zstd.ZstdCompressionParameters.from_level(
       
    28         level, source_size=source_size
       
    29     )
    28 
    30 
    29     return 1 << (params.window_log + 2)
    31     return 1 << (params.window_log + 2)
    30 
    32 
    31 
    33 
    32 @make_cffi
    34 @make_cffi
    84         self.assertEqual(len(result), 999)
    86         self.assertEqual(len(result), 999)
    85         self.assertEqual(result[0:4], b"\x28\xb5\x2f\xfd")
    87         self.assertEqual(result[0:4], b"\x28\xb5\x2f\xfd")
    86 
    88 
    87         # This matches the test for read_to_iter() below.
    89         # This matches the test for read_to_iter() below.
    88         cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
    90         cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
    89         result = cctx.compress(b"f" * zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE + b"o")
    91         result = cctx.compress(
       
    92             b"f" * zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE + b"o"
       
    93         )
    90         self.assertEqual(
    94         self.assertEqual(
    91             result,
    95             result,
    92             b"\x28\xb5\x2f\xfd\x00\x40\x54\x00\x00"
    96             b"\x28\xb5\x2f\xfd\x00\x40\x54\x00\x00"
    93             b"\x10\x66\x66\x01\x00\xfb\xff\x39\xc0"
    97             b"\x10\x66\x66\x01\x00\xfb\xff\x39\xc0"
    94             b"\x02\x09\x00\x00\x6f",
    98             b"\x02\x09\x00\x00\x6f",
    97     def test_negative_level(self):
   101     def test_negative_level(self):
    98         cctx = zstd.ZstdCompressor(level=-4)
   102         cctx = zstd.ZstdCompressor(level=-4)
    99         result = cctx.compress(b"foo" * 256)
   103         result = cctx.compress(b"foo" * 256)
   100 
   104 
   101     def test_no_magic(self):
   105     def test_no_magic(self):
   102         params = zstd.ZstdCompressionParameters.from_level(1, format=zstd.FORMAT_ZSTD1)
   106         params = zstd.ZstdCompressionParameters.from_level(
       
   107             1, format=zstd.FORMAT_ZSTD1
       
   108         )
   103         cctx = zstd.ZstdCompressor(compression_params=params)
   109         cctx = zstd.ZstdCompressor(compression_params=params)
   104         magic = cctx.compress(b"foobar")
   110         magic = cctx.compress(b"foobar")
   105 
   111 
   106         params = zstd.ZstdCompressionParameters.from_level(
   112         params = zstd.ZstdCompressionParameters.from_level(
   107             1, format=zstd.FORMAT_ZSTD1_MAGICLESS
   113             1, format=zstd.FORMAT_ZSTD1_MAGICLESS
   221         self.assertEqual(params.content_size, 3)
   227         self.assertEqual(params.content_size, 3)
   222         self.assertEqual(params.dict_id, d.dict_id())
   228         self.assertEqual(params.dict_id, d.dict_id())
   223 
   229 
   224         self.assertEqual(
   230         self.assertEqual(
   225             result,
   231             result,
   226             b"\x28\xb5\x2f\xfd\x23\x8f\x55\x0f\x70\x03\x19\x00\x00" b"\x66\x6f\x6f",
   232             b"\x28\xb5\x2f\xfd\x23\x8f\x55\x0f\x70\x03\x19\x00\x00"
       
   233             b"\x66\x6f\x6f",
   227         )
   234         )
   228 
   235 
   229     def test_multithreaded_compression_params(self):
   236     def test_multithreaded_compression_params(self):
   230         params = zstd.ZstdCompressionParameters.from_level(0, threads=2)
   237         params = zstd.ZstdCompressionParameters.from_level(0, threads=2)
   231         cctx = zstd.ZstdCompressor(compression_params=params)
   238         cctx = zstd.ZstdCompressor(compression_params=params)
   232 
   239 
   233         result = cctx.compress(b"foo")
   240         result = cctx.compress(b"foo")
   234         params = zstd.get_frame_parameters(result)
   241         params = zstd.get_frame_parameters(result)
   235         self.assertEqual(params.content_size, 3)
   242         self.assertEqual(params.content_size, 3)
   236 
   243 
   237         self.assertEqual(result, b"\x28\xb5\x2f\xfd\x20\x03\x19\x00\x00\x66\x6f\x6f")
   244         self.assertEqual(
       
   245             result, b"\x28\xb5\x2f\xfd\x20\x03\x19\x00\x00\x66\x6f\x6f"
       
   246         )
   238 
   247 
   239 
   248 
   240 @make_cffi
   249 @make_cffi
   241 class TestCompressor_compressobj(TestCase):
   250 class TestCompressor_compressobj(TestCase):
   242     def test_compressobj_empty(self):
   251     def test_compressobj_empty(self):
   345             cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK),
   354             cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK),
   346             b"\x28\xb5\x2f\xfd\x00\x48\x18\x00\x00foo",
   355             b"\x28\xb5\x2f\xfd\x00\x48\x18\x00\x00foo",
   347         )
   356         )
   348         self.assertEqual(cobj.compress(b"bar"), b"")
   357         self.assertEqual(cobj.compress(b"bar"), b"")
   349         # 3 byte header plus content.
   358         # 3 byte header plus content.
   350         self.assertEqual(cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK), b"\x18\x00\x00bar")
   359         self.assertEqual(
       
   360             cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK), b"\x18\x00\x00bar"
       
   361         )
   351         self.assertEqual(cobj.flush(), b"\x01\x00\x00")
   362         self.assertEqual(cobj.flush(), b"\x01\x00\x00")
   352 
   363 
   353     def test_flush_empty_block(self):
   364     def test_flush_empty_block(self):
   354         cctx = zstd.ZstdCompressor(write_checksum=True)
   365         cctx = zstd.ZstdCompressor(write_checksum=True)
   355         cobj = cctx.compressobj()
   366         cobj = cctx.compressobj()
   443         cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
   454         cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
   444         r, w = cctx.copy_stream(source, dest)
   455         r, w = cctx.copy_stream(source, dest)
   445         self.assertEqual(int(r), 0)
   456         self.assertEqual(int(r), 0)
   446         self.assertEqual(w, 9)
   457         self.assertEqual(w, 9)
   447 
   458 
   448         self.assertEqual(dest.getvalue(), b"\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00")
   459         self.assertEqual(
       
   460             dest.getvalue(), b"\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00"
       
   461         )
   449 
   462 
   450     def test_large_data(self):
   463     def test_large_data(self):
   451         source = io.BytesIO()
   464         source = io.BytesIO()
   452         for i in range(255):
   465         for i in range(255):
   453             source.write(struct.Struct(">B").pack(i) * 16384)
   466             source.write(struct.Struct(">B").pack(i) * 16384)
   476         source.seek(0)
   489         source.seek(0)
   477         with_checksum = io.BytesIO()
   490         with_checksum = io.BytesIO()
   478         cctx = zstd.ZstdCompressor(level=1, write_checksum=True)
   491         cctx = zstd.ZstdCompressor(level=1, write_checksum=True)
   479         cctx.copy_stream(source, with_checksum)
   492         cctx.copy_stream(source, with_checksum)
   480 
   493 
   481         self.assertEqual(len(with_checksum.getvalue()), len(no_checksum.getvalue()) + 4)
   494         self.assertEqual(
       
   495             len(with_checksum.getvalue()), len(no_checksum.getvalue()) + 4
       
   496         )
   482 
   497 
   483         no_params = zstd.get_frame_parameters(no_checksum.getvalue())
   498         no_params = zstd.get_frame_parameters(no_checksum.getvalue())
   484         with_params = zstd.get_frame_parameters(with_checksum.getvalue())
   499         with_params = zstd.get_frame_parameters(with_checksum.getvalue())
   485         self.assertEqual(no_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
   500         self.assertEqual(no_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
   486         self.assertEqual(with_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
   501         self.assertEqual(with_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
   583 class TestCompressor_stream_reader(TestCase):
   598 class TestCompressor_stream_reader(TestCase):
   584     def test_context_manager(self):
   599     def test_context_manager(self):
   585         cctx = zstd.ZstdCompressor()
   600         cctx = zstd.ZstdCompressor()
   586 
   601 
   587         with cctx.stream_reader(b"foo") as reader:
   602         with cctx.stream_reader(b"foo") as reader:
   588             with self.assertRaisesRegex(ValueError, "cannot __enter__ multiple times"):
   603             with self.assertRaisesRegex(
       
   604                 ValueError, "cannot __enter__ multiple times"
       
   605             ):
   589                 with reader as reader2:
   606                 with reader as reader2:
   590                     pass
   607                     pass
   591 
   608 
   592     def test_no_context_manager(self):
   609     def test_no_context_manager(self):
   593         cctx = zstd.ZstdCompressor()
   610         cctx = zstd.ZstdCompressor()
   742         cctx = zstd.ZstdCompressor()
   759         cctx = zstd.ZstdCompressor()
   743 
   760 
   744         source = io.BytesIO(b"foobar")
   761         source = io.BytesIO(b"foobar")
   745 
   762 
   746         with cctx.stream_reader(source, size=2) as reader:
   763         with cctx.stream_reader(source, size=2) as reader:
   747             with self.assertRaisesRegex(zstd.ZstdError, "Src size is incorrect"):
   764             with self.assertRaisesRegex(
       
   765                 zstd.ZstdError, "Src size is incorrect"
       
   766             ):
   748                 reader.read(10)
   767                 reader.read(10)
   749 
   768 
   750         # Try another compression operation.
   769         # Try another compression operation.
   751         with cctx.stream_reader(source, size=42):
   770         with cctx.stream_reader(source, size=42):
   752             pass
   771             pass
  1124         self.assertEqual(no_params.dict_id, 0)
  1143         self.assertEqual(no_params.dict_id, 0)
  1125         self.assertEqual(with_params.dict_id, 0)
  1144         self.assertEqual(with_params.dict_id, 0)
  1126         self.assertFalse(no_params.has_checksum)
  1145         self.assertFalse(no_params.has_checksum)
  1127         self.assertTrue(with_params.has_checksum)
  1146         self.assertTrue(with_params.has_checksum)
  1128 
  1147 
  1129         self.assertEqual(len(with_checksum.getvalue()), len(no_checksum.getvalue()) + 4)
  1148         self.assertEqual(
       
  1149             len(with_checksum.getvalue()), len(no_checksum.getvalue()) + 4
       
  1150         )
  1130 
  1151 
  1131     def test_write_content_size(self):
  1152     def test_write_content_size(self):
  1132         no_size = NonClosingBytesIO()
  1153         no_size = NonClosingBytesIO()
  1133         cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
  1154         cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
  1134         with cctx.stream_writer(no_size) as compressor:
  1155         with cctx.stream_writer(no_size) as compressor:
  1143         # written.
  1164         # written.
  1144         self.assertEqual(len(with_size.getvalue()), len(no_size.getvalue()))
  1165         self.assertEqual(len(with_size.getvalue()), len(no_size.getvalue()))
  1145 
  1166 
  1146         # Declaring size will write the header.
  1167         # Declaring size will write the header.
  1147         with_size = NonClosingBytesIO()
  1168         with_size = NonClosingBytesIO()
  1148         with cctx.stream_writer(with_size, size=len(b"foobar" * 256)) as compressor:
  1169         with cctx.stream_writer(
       
  1170             with_size, size=len(b"foobar" * 256)
       
  1171         ) as compressor:
  1149             self.assertEqual(compressor.write(b"foobar" * 256), 0)
  1172             self.assertEqual(compressor.write(b"foobar" * 256), 0)
  1150 
  1173 
  1151         no_params = zstd.get_frame_parameters(no_size.getvalue())
  1174         no_params = zstd.get_frame_parameters(no_size.getvalue())
  1152         with_params = zstd.get_frame_parameters(with_size.getvalue())
  1175         with_params = zstd.get_frame_parameters(with_size.getvalue())
  1153         self.assertEqual(no_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
  1176         self.assertEqual(no_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
  1189         self.assertEqual(no_params.dict_id, 0)
  1212         self.assertEqual(no_params.dict_id, 0)
  1190         self.assertEqual(with_params.dict_id, d.dict_id())
  1213         self.assertEqual(with_params.dict_id, d.dict_id())
  1191         self.assertFalse(no_params.has_checksum)
  1214         self.assertFalse(no_params.has_checksum)
  1192         self.assertFalse(with_params.has_checksum)
  1215         self.assertFalse(with_params.has_checksum)
  1193 
  1216 
  1194         self.assertEqual(len(with_dict_id.getvalue()), len(no_dict_id.getvalue()) + 4)
  1217         self.assertEqual(
       
  1218             len(with_dict_id.getvalue()), len(no_dict_id.getvalue()) + 4
       
  1219         )
  1195 
  1220 
  1196     def test_memory_size(self):
  1221     def test_memory_size(self):
  1197         cctx = zstd.ZstdCompressor(level=3)
  1222         cctx = zstd.ZstdCompressor(level=3)
  1198         buffer = io.BytesIO()
  1223         buffer = io.BytesIO()
  1199         with cctx.stream_writer(buffer) as compressor:
  1224         with cctx.stream_writer(buffer) as compressor:
  1335 
  1360 
  1336         # Buffer protocol works.
  1361         # Buffer protocol works.
  1337         for chunk in cctx.read_to_iter(b"foobar"):
  1362         for chunk in cctx.read_to_iter(b"foobar"):
  1338             pass
  1363             pass
  1339 
  1364 
  1340         with self.assertRaisesRegex(ValueError, "must pass an object with a read"):
  1365         with self.assertRaisesRegex(
       
  1366             ValueError, "must pass an object with a read"
       
  1367         ):
  1341             for chunk in cctx.read_to_iter(True):
  1368             for chunk in cctx.read_to_iter(True):
  1342                 pass
  1369                 pass
  1343 
  1370 
  1344     def test_read_empty(self):
  1371     def test_read_empty(self):
  1345         cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
  1372         cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
  1511             ],
  1538             ],
  1512         )
  1539         )
  1513 
  1540 
  1514         dctx = zstd.ZstdDecompressor()
  1541         dctx = zstd.ZstdDecompressor()
  1515 
  1542 
  1516         self.assertEqual(dctx.decompress(b"".join(chunks)), (b"x" * 1000) + (b"y" * 24))
  1543         self.assertEqual(
       
  1544             dctx.decompress(b"".join(chunks)), (b"x" * 1000) + (b"y" * 24)
       
  1545         )
  1517 
  1546 
  1518     def test_small_chunk_size(self):
  1547     def test_small_chunk_size(self):
  1519         cctx = zstd.ZstdCompressor()
  1548         cctx = zstd.ZstdCompressor()
  1520         chunker = cctx.chunker(chunk_size=1)
  1549         chunker = cctx.chunker(chunk_size=1)
  1521 
  1550 
  1531             b"\xfa\xd3\x77\x43",
  1560             b"\xfa\xd3\x77\x43",
  1532         )
  1561         )
  1533 
  1562 
  1534         dctx = zstd.ZstdDecompressor()
  1563         dctx = zstd.ZstdDecompressor()
  1535         self.assertEqual(
  1564         self.assertEqual(
  1536             dctx.decompress(b"".join(chunks), max_output_size=10000), b"foo" * 1024
  1565             dctx.decompress(b"".join(chunks), max_output_size=10000),
       
  1566             b"foo" * 1024,
  1537         )
  1567         )
  1538 
  1568 
  1539     def test_input_types(self):
  1569     def test_input_types(self):
  1540         cctx = zstd.ZstdCompressor()
  1570         cctx = zstd.ZstdCompressor()
  1541 
  1571 
  1600 
  1630 
  1601         list(chunker.compress(b"foo"))
  1631         list(chunker.compress(b"foo"))
  1602         list(chunker.finish())
  1632         list(chunker.finish())
  1603 
  1633 
  1604         with self.assertRaisesRegex(
  1634         with self.assertRaisesRegex(
  1605             zstd.ZstdError, r"cannot call compress\(\) after compression finished"
  1635             zstd.ZstdError,
       
  1636             r"cannot call compress\(\) after compression finished",
  1606         ):
  1637         ):
  1607             list(chunker.compress(b"foo"))
  1638             list(chunker.compress(b"foo"))
  1608 
  1639 
  1609     def test_flush_after_finish(self):
  1640     def test_flush_after_finish(self):
  1610         cctx = zstd.ZstdCompressor()
  1641         cctx = zstd.ZstdCompressor()
  1642             cctx.multi_compress_to_buffer(True)
  1673             cctx.multi_compress_to_buffer(True)
  1643 
  1674 
  1644         with self.assertRaises(TypeError):
  1675         with self.assertRaises(TypeError):
  1645             cctx.multi_compress_to_buffer((1, 2))
  1676             cctx.multi_compress_to_buffer((1, 2))
  1646 
  1677 
  1647         with self.assertRaisesRegex(TypeError, "item 0 not a bytes like object"):
  1678         with self.assertRaisesRegex(
       
  1679             TypeError, "item 0 not a bytes like object"
       
  1680         ):
  1648             cctx.multi_compress_to_buffer([u"foo"])
  1681             cctx.multi_compress_to_buffer([u"foo"])
  1649 
  1682 
  1650     def test_empty_input(self):
  1683     def test_empty_input(self):
  1651         cctx = zstd.ZstdCompressor()
  1684         cctx = zstd.ZstdCompressor()
  1652 
  1685