comparison contrib/python-zstandard/tests/test_compressor.py @ 44147:5e84a96d865b

python-zstandard: blacken at 80 characters I made this change upstream and it will make it into the next release of python-zstandard. I figured I'd send it Mercurial's way because it will allow us to drop this directory from the black exclusion list. # skip-blame blackening Differential Revision: https://phab.mercurial-scm.org/D7937
author Gregory Szorc <gregory.szorc@gmail.com>
date Wed, 22 Jan 2020 22:23:04 -0800
parents de7838053207
children 9db77d46de79
comparison
equal deleted inserted replaced
44146:45ec64d93b3a 44147:5e84a96d865b
22 else: 22 else:
23 next = lambda it: it.next() 23 next = lambda it: it.next()
24 24
25 25
26 def multithreaded_chunk_size(level, source_size=0): 26 def multithreaded_chunk_size(level, source_size=0):
27 params = zstd.ZstdCompressionParameters.from_level(level, source_size=source_size) 27 params = zstd.ZstdCompressionParameters.from_level(
28 level, source_size=source_size
29 )
28 30
29 return 1 << (params.window_log + 2) 31 return 1 << (params.window_log + 2)
30 32
31 33
32 @make_cffi 34 @make_cffi
84 self.assertEqual(len(result), 999) 86 self.assertEqual(len(result), 999)
85 self.assertEqual(result[0:4], b"\x28\xb5\x2f\xfd") 87 self.assertEqual(result[0:4], b"\x28\xb5\x2f\xfd")
86 88
87 # This matches the test for read_to_iter() below. 89 # This matches the test for read_to_iter() below.
88 cctx = zstd.ZstdCompressor(level=1, write_content_size=False) 90 cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
89 result = cctx.compress(b"f" * zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE + b"o") 91 result = cctx.compress(
92 b"f" * zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE + b"o"
93 )
90 self.assertEqual( 94 self.assertEqual(
91 result, 95 result,
92 b"\x28\xb5\x2f\xfd\x00\x40\x54\x00\x00" 96 b"\x28\xb5\x2f\xfd\x00\x40\x54\x00\x00"
93 b"\x10\x66\x66\x01\x00\xfb\xff\x39\xc0" 97 b"\x10\x66\x66\x01\x00\xfb\xff\x39\xc0"
94 b"\x02\x09\x00\x00\x6f", 98 b"\x02\x09\x00\x00\x6f",
97 def test_negative_level(self): 101 def test_negative_level(self):
98 cctx = zstd.ZstdCompressor(level=-4) 102 cctx = zstd.ZstdCompressor(level=-4)
99 result = cctx.compress(b"foo" * 256) 103 result = cctx.compress(b"foo" * 256)
100 104
101 def test_no_magic(self): 105 def test_no_magic(self):
102 params = zstd.ZstdCompressionParameters.from_level(1, format=zstd.FORMAT_ZSTD1) 106 params = zstd.ZstdCompressionParameters.from_level(
107 1, format=zstd.FORMAT_ZSTD1
108 )
103 cctx = zstd.ZstdCompressor(compression_params=params) 109 cctx = zstd.ZstdCompressor(compression_params=params)
104 magic = cctx.compress(b"foobar") 110 magic = cctx.compress(b"foobar")
105 111
106 params = zstd.ZstdCompressionParameters.from_level( 112 params = zstd.ZstdCompressionParameters.from_level(
107 1, format=zstd.FORMAT_ZSTD1_MAGICLESS 113 1, format=zstd.FORMAT_ZSTD1_MAGICLESS
221 self.assertEqual(params.content_size, 3) 227 self.assertEqual(params.content_size, 3)
222 self.assertEqual(params.dict_id, d.dict_id()) 228 self.assertEqual(params.dict_id, d.dict_id())
223 229
224 self.assertEqual( 230 self.assertEqual(
225 result, 231 result,
226 b"\x28\xb5\x2f\xfd\x23\x8f\x55\x0f\x70\x03\x19\x00\x00" b"\x66\x6f\x6f", 232 b"\x28\xb5\x2f\xfd\x23\x8f\x55\x0f\x70\x03\x19\x00\x00"
233 b"\x66\x6f\x6f",
227 ) 234 )
228 235
229 def test_multithreaded_compression_params(self): 236 def test_multithreaded_compression_params(self):
230 params = zstd.ZstdCompressionParameters.from_level(0, threads=2) 237 params = zstd.ZstdCompressionParameters.from_level(0, threads=2)
231 cctx = zstd.ZstdCompressor(compression_params=params) 238 cctx = zstd.ZstdCompressor(compression_params=params)
232 239
233 result = cctx.compress(b"foo") 240 result = cctx.compress(b"foo")
234 params = zstd.get_frame_parameters(result) 241 params = zstd.get_frame_parameters(result)
235 self.assertEqual(params.content_size, 3) 242 self.assertEqual(params.content_size, 3)
236 243
237 self.assertEqual(result, b"\x28\xb5\x2f\xfd\x20\x03\x19\x00\x00\x66\x6f\x6f") 244 self.assertEqual(
245 result, b"\x28\xb5\x2f\xfd\x20\x03\x19\x00\x00\x66\x6f\x6f"
246 )
238 247
239 248
240 @make_cffi 249 @make_cffi
241 class TestCompressor_compressobj(TestCase): 250 class TestCompressor_compressobj(TestCase):
242 def test_compressobj_empty(self): 251 def test_compressobj_empty(self):
345 cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK), 354 cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK),
346 b"\x28\xb5\x2f\xfd\x00\x48\x18\x00\x00foo", 355 b"\x28\xb5\x2f\xfd\x00\x48\x18\x00\x00foo",
347 ) 356 )
348 self.assertEqual(cobj.compress(b"bar"), b"") 357 self.assertEqual(cobj.compress(b"bar"), b"")
349 # 3 byte header plus content. 358 # 3 byte header plus content.
350 self.assertEqual(cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK), b"\x18\x00\x00bar") 359 self.assertEqual(
360 cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK), b"\x18\x00\x00bar"
361 )
351 self.assertEqual(cobj.flush(), b"\x01\x00\x00") 362 self.assertEqual(cobj.flush(), b"\x01\x00\x00")
352 363
353 def test_flush_empty_block(self): 364 def test_flush_empty_block(self):
354 cctx = zstd.ZstdCompressor(write_checksum=True) 365 cctx = zstd.ZstdCompressor(write_checksum=True)
355 cobj = cctx.compressobj() 366 cobj = cctx.compressobj()
443 cctx = zstd.ZstdCompressor(level=1, write_content_size=False) 454 cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
444 r, w = cctx.copy_stream(source, dest) 455 r, w = cctx.copy_stream(source, dest)
445 self.assertEqual(int(r), 0) 456 self.assertEqual(int(r), 0)
446 self.assertEqual(w, 9) 457 self.assertEqual(w, 9)
447 458
448 self.assertEqual(dest.getvalue(), b"\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00") 459 self.assertEqual(
460 dest.getvalue(), b"\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00"
461 )
449 462
450 def test_large_data(self): 463 def test_large_data(self):
451 source = io.BytesIO() 464 source = io.BytesIO()
452 for i in range(255): 465 for i in range(255):
453 source.write(struct.Struct(">B").pack(i) * 16384) 466 source.write(struct.Struct(">B").pack(i) * 16384)
476 source.seek(0) 489 source.seek(0)
477 with_checksum = io.BytesIO() 490 with_checksum = io.BytesIO()
478 cctx = zstd.ZstdCompressor(level=1, write_checksum=True) 491 cctx = zstd.ZstdCompressor(level=1, write_checksum=True)
479 cctx.copy_stream(source, with_checksum) 492 cctx.copy_stream(source, with_checksum)
480 493
481 self.assertEqual(len(with_checksum.getvalue()), len(no_checksum.getvalue()) + 4) 494 self.assertEqual(
495 len(with_checksum.getvalue()), len(no_checksum.getvalue()) + 4
496 )
482 497
483 no_params = zstd.get_frame_parameters(no_checksum.getvalue()) 498 no_params = zstd.get_frame_parameters(no_checksum.getvalue())
484 with_params = zstd.get_frame_parameters(with_checksum.getvalue()) 499 with_params = zstd.get_frame_parameters(with_checksum.getvalue())
485 self.assertEqual(no_params.content_size, zstd.CONTENTSIZE_UNKNOWN) 500 self.assertEqual(no_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
486 self.assertEqual(with_params.content_size, zstd.CONTENTSIZE_UNKNOWN) 501 self.assertEqual(with_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
583 class TestCompressor_stream_reader(TestCase): 598 class TestCompressor_stream_reader(TestCase):
584 def test_context_manager(self): 599 def test_context_manager(self):
585 cctx = zstd.ZstdCompressor() 600 cctx = zstd.ZstdCompressor()
586 601
587 with cctx.stream_reader(b"foo") as reader: 602 with cctx.stream_reader(b"foo") as reader:
588 with self.assertRaisesRegex(ValueError, "cannot __enter__ multiple times"): 603 with self.assertRaisesRegex(
604 ValueError, "cannot __enter__ multiple times"
605 ):
589 with reader as reader2: 606 with reader as reader2:
590 pass 607 pass
591 608
592 def test_no_context_manager(self): 609 def test_no_context_manager(self):
593 cctx = zstd.ZstdCompressor() 610 cctx = zstd.ZstdCompressor()
742 cctx = zstd.ZstdCompressor() 759 cctx = zstd.ZstdCompressor()
743 760
744 source = io.BytesIO(b"foobar") 761 source = io.BytesIO(b"foobar")
745 762
746 with cctx.stream_reader(source, size=2) as reader: 763 with cctx.stream_reader(source, size=2) as reader:
747 with self.assertRaisesRegex(zstd.ZstdError, "Src size is incorrect"): 764 with self.assertRaisesRegex(
765 zstd.ZstdError, "Src size is incorrect"
766 ):
748 reader.read(10) 767 reader.read(10)
749 768
750 # Try another compression operation. 769 # Try another compression operation.
751 with cctx.stream_reader(source, size=42): 770 with cctx.stream_reader(source, size=42):
752 pass 771 pass
1124 self.assertEqual(no_params.dict_id, 0) 1143 self.assertEqual(no_params.dict_id, 0)
1125 self.assertEqual(with_params.dict_id, 0) 1144 self.assertEqual(with_params.dict_id, 0)
1126 self.assertFalse(no_params.has_checksum) 1145 self.assertFalse(no_params.has_checksum)
1127 self.assertTrue(with_params.has_checksum) 1146 self.assertTrue(with_params.has_checksum)
1128 1147
1129 self.assertEqual(len(with_checksum.getvalue()), len(no_checksum.getvalue()) + 4) 1148 self.assertEqual(
1149 len(with_checksum.getvalue()), len(no_checksum.getvalue()) + 4
1150 )
1130 1151
1131 def test_write_content_size(self): 1152 def test_write_content_size(self):
1132 no_size = NonClosingBytesIO() 1153 no_size = NonClosingBytesIO()
1133 cctx = zstd.ZstdCompressor(level=1, write_content_size=False) 1154 cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
1134 with cctx.stream_writer(no_size) as compressor: 1155 with cctx.stream_writer(no_size) as compressor:
1143 # written. 1164 # written.
1144 self.assertEqual(len(with_size.getvalue()), len(no_size.getvalue())) 1165 self.assertEqual(len(with_size.getvalue()), len(no_size.getvalue()))
1145 1166
1146 # Declaring size will write the header. 1167 # Declaring size will write the header.
1147 with_size = NonClosingBytesIO() 1168 with_size = NonClosingBytesIO()
1148 with cctx.stream_writer(with_size, size=len(b"foobar" * 256)) as compressor: 1169 with cctx.stream_writer(
1170 with_size, size=len(b"foobar" * 256)
1171 ) as compressor:
1149 self.assertEqual(compressor.write(b"foobar" * 256), 0) 1172 self.assertEqual(compressor.write(b"foobar" * 256), 0)
1150 1173
1151 no_params = zstd.get_frame_parameters(no_size.getvalue()) 1174 no_params = zstd.get_frame_parameters(no_size.getvalue())
1152 with_params = zstd.get_frame_parameters(with_size.getvalue()) 1175 with_params = zstd.get_frame_parameters(with_size.getvalue())
1153 self.assertEqual(no_params.content_size, zstd.CONTENTSIZE_UNKNOWN) 1176 self.assertEqual(no_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
1189 self.assertEqual(no_params.dict_id, 0) 1212 self.assertEqual(no_params.dict_id, 0)
1190 self.assertEqual(with_params.dict_id, d.dict_id()) 1213 self.assertEqual(with_params.dict_id, d.dict_id())
1191 self.assertFalse(no_params.has_checksum) 1214 self.assertFalse(no_params.has_checksum)
1192 self.assertFalse(with_params.has_checksum) 1215 self.assertFalse(with_params.has_checksum)
1193 1216
1194 self.assertEqual(len(with_dict_id.getvalue()), len(no_dict_id.getvalue()) + 4) 1217 self.assertEqual(
1218 len(with_dict_id.getvalue()), len(no_dict_id.getvalue()) + 4
1219 )
1195 1220
1196 def test_memory_size(self): 1221 def test_memory_size(self):
1197 cctx = zstd.ZstdCompressor(level=3) 1222 cctx = zstd.ZstdCompressor(level=3)
1198 buffer = io.BytesIO() 1223 buffer = io.BytesIO()
1199 with cctx.stream_writer(buffer) as compressor: 1224 with cctx.stream_writer(buffer) as compressor:
1335 1360
1336 # Buffer protocol works. 1361 # Buffer protocol works.
1337 for chunk in cctx.read_to_iter(b"foobar"): 1362 for chunk in cctx.read_to_iter(b"foobar"):
1338 pass 1363 pass
1339 1364
1340 with self.assertRaisesRegex(ValueError, "must pass an object with a read"): 1365 with self.assertRaisesRegex(
1366 ValueError, "must pass an object with a read"
1367 ):
1341 for chunk in cctx.read_to_iter(True): 1368 for chunk in cctx.read_to_iter(True):
1342 pass 1369 pass
1343 1370
1344 def test_read_empty(self): 1371 def test_read_empty(self):
1345 cctx = zstd.ZstdCompressor(level=1, write_content_size=False) 1372 cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
1511 ], 1538 ],
1512 ) 1539 )
1513 1540
1514 dctx = zstd.ZstdDecompressor() 1541 dctx = zstd.ZstdDecompressor()
1515 1542
1516 self.assertEqual(dctx.decompress(b"".join(chunks)), (b"x" * 1000) + (b"y" * 24)) 1543 self.assertEqual(
1544 dctx.decompress(b"".join(chunks)), (b"x" * 1000) + (b"y" * 24)
1545 )
1517 1546
1518 def test_small_chunk_size(self): 1547 def test_small_chunk_size(self):
1519 cctx = zstd.ZstdCompressor() 1548 cctx = zstd.ZstdCompressor()
1520 chunker = cctx.chunker(chunk_size=1) 1549 chunker = cctx.chunker(chunk_size=1)
1521 1550
1531 b"\xfa\xd3\x77\x43", 1560 b"\xfa\xd3\x77\x43",
1532 ) 1561 )
1533 1562
1534 dctx = zstd.ZstdDecompressor() 1563 dctx = zstd.ZstdDecompressor()
1535 self.assertEqual( 1564 self.assertEqual(
1536 dctx.decompress(b"".join(chunks), max_output_size=10000), b"foo" * 1024 1565 dctx.decompress(b"".join(chunks), max_output_size=10000),
1566 b"foo" * 1024,
1537 ) 1567 )
1538 1568
1539 def test_input_types(self): 1569 def test_input_types(self):
1540 cctx = zstd.ZstdCompressor() 1570 cctx = zstd.ZstdCompressor()
1541 1571
1600 1630
1601 list(chunker.compress(b"foo")) 1631 list(chunker.compress(b"foo"))
1602 list(chunker.finish()) 1632 list(chunker.finish())
1603 1633
1604 with self.assertRaisesRegex( 1634 with self.assertRaisesRegex(
1605 zstd.ZstdError, r"cannot call compress\(\) after compression finished" 1635 zstd.ZstdError,
1636 r"cannot call compress\(\) after compression finished",
1606 ): 1637 ):
1607 list(chunker.compress(b"foo")) 1638 list(chunker.compress(b"foo"))
1608 1639
1609 def test_flush_after_finish(self): 1640 def test_flush_after_finish(self):
1610 cctx = zstd.ZstdCompressor() 1641 cctx = zstd.ZstdCompressor()
1642 cctx.multi_compress_to_buffer(True) 1673 cctx.multi_compress_to_buffer(True)
1643 1674
1644 with self.assertRaises(TypeError): 1675 with self.assertRaises(TypeError):
1645 cctx.multi_compress_to_buffer((1, 2)) 1676 cctx.multi_compress_to_buffer((1, 2))
1646 1677
1647 with self.assertRaisesRegex(TypeError, "item 0 not a bytes like object"): 1678 with self.assertRaisesRegex(
1679 TypeError, "item 0 not a bytes like object"
1680 ):
1648 cctx.multi_compress_to_buffer([u"foo"]) 1681 cctx.multi_compress_to_buffer([u"foo"])
1649 1682
1650 def test_empty_input(self): 1683 def test_empty_input(self):
1651 cctx = zstd.ZstdCompressor() 1684 cctx = zstd.ZstdCompressor()
1652 1685