Mercurial > public > mercurial-scm > hg
comparison contrib/python-zstandard/tests/test_compressor.py @ 44147:5e84a96d865b
python-zstandard: blacken at 80 characters
I made this change upstream and it will make it into the next
release of python-zstandard. I figured I'd send it Mercurial's
way because it will allow us to drop this directory from the black
exclusion list.
# skip-blame blackening
Differential Revision: https://phab.mercurial-scm.org/D7937
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Wed, 22 Jan 2020 22:23:04 -0800 |
parents | de7838053207 |
children | 9db77d46de79 |
comparison
equal
deleted
inserted
replaced
44146:45ec64d93b3a | 44147:5e84a96d865b |
---|---|
22 else: | 22 else: |
23 next = lambda it: it.next() | 23 next = lambda it: it.next() |
24 | 24 |
25 | 25 |
26 def multithreaded_chunk_size(level, source_size=0): | 26 def multithreaded_chunk_size(level, source_size=0): |
27 params = zstd.ZstdCompressionParameters.from_level(level, source_size=source_size) | 27 params = zstd.ZstdCompressionParameters.from_level( |
28 level, source_size=source_size | |
29 ) | |
28 | 30 |
29 return 1 << (params.window_log + 2) | 31 return 1 << (params.window_log + 2) |
30 | 32 |
31 | 33 |
32 @make_cffi | 34 @make_cffi |
84 self.assertEqual(len(result), 999) | 86 self.assertEqual(len(result), 999) |
85 self.assertEqual(result[0:4], b"\x28\xb5\x2f\xfd") | 87 self.assertEqual(result[0:4], b"\x28\xb5\x2f\xfd") |
86 | 88 |
87 # This matches the test for read_to_iter() below. | 89 # This matches the test for read_to_iter() below. |
88 cctx = zstd.ZstdCompressor(level=1, write_content_size=False) | 90 cctx = zstd.ZstdCompressor(level=1, write_content_size=False) |
89 result = cctx.compress(b"f" * zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE + b"o") | 91 result = cctx.compress( |
92 b"f" * zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE + b"o" | |
93 ) | |
90 self.assertEqual( | 94 self.assertEqual( |
91 result, | 95 result, |
92 b"\x28\xb5\x2f\xfd\x00\x40\x54\x00\x00" | 96 b"\x28\xb5\x2f\xfd\x00\x40\x54\x00\x00" |
93 b"\x10\x66\x66\x01\x00\xfb\xff\x39\xc0" | 97 b"\x10\x66\x66\x01\x00\xfb\xff\x39\xc0" |
94 b"\x02\x09\x00\x00\x6f", | 98 b"\x02\x09\x00\x00\x6f", |
97 def test_negative_level(self): | 101 def test_negative_level(self): |
98 cctx = zstd.ZstdCompressor(level=-4) | 102 cctx = zstd.ZstdCompressor(level=-4) |
99 result = cctx.compress(b"foo" * 256) | 103 result = cctx.compress(b"foo" * 256) |
100 | 104 |
101 def test_no_magic(self): | 105 def test_no_magic(self): |
102 params = zstd.ZstdCompressionParameters.from_level(1, format=zstd.FORMAT_ZSTD1) | 106 params = zstd.ZstdCompressionParameters.from_level( |
107 1, format=zstd.FORMAT_ZSTD1 | |
108 ) | |
103 cctx = zstd.ZstdCompressor(compression_params=params) | 109 cctx = zstd.ZstdCompressor(compression_params=params) |
104 magic = cctx.compress(b"foobar") | 110 magic = cctx.compress(b"foobar") |
105 | 111 |
106 params = zstd.ZstdCompressionParameters.from_level( | 112 params = zstd.ZstdCompressionParameters.from_level( |
107 1, format=zstd.FORMAT_ZSTD1_MAGICLESS | 113 1, format=zstd.FORMAT_ZSTD1_MAGICLESS |
221 self.assertEqual(params.content_size, 3) | 227 self.assertEqual(params.content_size, 3) |
222 self.assertEqual(params.dict_id, d.dict_id()) | 228 self.assertEqual(params.dict_id, d.dict_id()) |
223 | 229 |
224 self.assertEqual( | 230 self.assertEqual( |
225 result, | 231 result, |
226 b"\x28\xb5\x2f\xfd\x23\x8f\x55\x0f\x70\x03\x19\x00\x00" b"\x66\x6f\x6f", | 232 b"\x28\xb5\x2f\xfd\x23\x8f\x55\x0f\x70\x03\x19\x00\x00" |
233 b"\x66\x6f\x6f", | |
227 ) | 234 ) |
228 | 235 |
229 def test_multithreaded_compression_params(self): | 236 def test_multithreaded_compression_params(self): |
230 params = zstd.ZstdCompressionParameters.from_level(0, threads=2) | 237 params = zstd.ZstdCompressionParameters.from_level(0, threads=2) |
231 cctx = zstd.ZstdCompressor(compression_params=params) | 238 cctx = zstd.ZstdCompressor(compression_params=params) |
232 | 239 |
233 result = cctx.compress(b"foo") | 240 result = cctx.compress(b"foo") |
234 params = zstd.get_frame_parameters(result) | 241 params = zstd.get_frame_parameters(result) |
235 self.assertEqual(params.content_size, 3) | 242 self.assertEqual(params.content_size, 3) |
236 | 243 |
237 self.assertEqual(result, b"\x28\xb5\x2f\xfd\x20\x03\x19\x00\x00\x66\x6f\x6f") | 244 self.assertEqual( |
245 result, b"\x28\xb5\x2f\xfd\x20\x03\x19\x00\x00\x66\x6f\x6f" | |
246 ) | |
238 | 247 |
239 | 248 |
240 @make_cffi | 249 @make_cffi |
241 class TestCompressor_compressobj(TestCase): | 250 class TestCompressor_compressobj(TestCase): |
242 def test_compressobj_empty(self): | 251 def test_compressobj_empty(self): |
345 cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK), | 354 cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK), |
346 b"\x28\xb5\x2f\xfd\x00\x48\x18\x00\x00foo", | 355 b"\x28\xb5\x2f\xfd\x00\x48\x18\x00\x00foo", |
347 ) | 356 ) |
348 self.assertEqual(cobj.compress(b"bar"), b"") | 357 self.assertEqual(cobj.compress(b"bar"), b"") |
349 # 3 byte header plus content. | 358 # 3 byte header plus content. |
350 self.assertEqual(cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK), b"\x18\x00\x00bar") | 359 self.assertEqual( |
360 cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK), b"\x18\x00\x00bar" | |
361 ) | |
351 self.assertEqual(cobj.flush(), b"\x01\x00\x00") | 362 self.assertEqual(cobj.flush(), b"\x01\x00\x00") |
352 | 363 |
353 def test_flush_empty_block(self): | 364 def test_flush_empty_block(self): |
354 cctx = zstd.ZstdCompressor(write_checksum=True) | 365 cctx = zstd.ZstdCompressor(write_checksum=True) |
355 cobj = cctx.compressobj() | 366 cobj = cctx.compressobj() |
443 cctx = zstd.ZstdCompressor(level=1, write_content_size=False) | 454 cctx = zstd.ZstdCompressor(level=1, write_content_size=False) |
444 r, w = cctx.copy_stream(source, dest) | 455 r, w = cctx.copy_stream(source, dest) |
445 self.assertEqual(int(r), 0) | 456 self.assertEqual(int(r), 0) |
446 self.assertEqual(w, 9) | 457 self.assertEqual(w, 9) |
447 | 458 |
448 self.assertEqual(dest.getvalue(), b"\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00") | 459 self.assertEqual( |
460 dest.getvalue(), b"\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00" | |
461 ) | |
449 | 462 |
450 def test_large_data(self): | 463 def test_large_data(self): |
451 source = io.BytesIO() | 464 source = io.BytesIO() |
452 for i in range(255): | 465 for i in range(255): |
453 source.write(struct.Struct(">B").pack(i) * 16384) | 466 source.write(struct.Struct(">B").pack(i) * 16384) |
476 source.seek(0) | 489 source.seek(0) |
477 with_checksum = io.BytesIO() | 490 with_checksum = io.BytesIO() |
478 cctx = zstd.ZstdCompressor(level=1, write_checksum=True) | 491 cctx = zstd.ZstdCompressor(level=1, write_checksum=True) |
479 cctx.copy_stream(source, with_checksum) | 492 cctx.copy_stream(source, with_checksum) |
480 | 493 |
481 self.assertEqual(len(with_checksum.getvalue()), len(no_checksum.getvalue()) + 4) | 494 self.assertEqual( |
495 len(with_checksum.getvalue()), len(no_checksum.getvalue()) + 4 | |
496 ) | |
482 | 497 |
483 no_params = zstd.get_frame_parameters(no_checksum.getvalue()) | 498 no_params = zstd.get_frame_parameters(no_checksum.getvalue()) |
484 with_params = zstd.get_frame_parameters(with_checksum.getvalue()) | 499 with_params = zstd.get_frame_parameters(with_checksum.getvalue()) |
485 self.assertEqual(no_params.content_size, zstd.CONTENTSIZE_UNKNOWN) | 500 self.assertEqual(no_params.content_size, zstd.CONTENTSIZE_UNKNOWN) |
486 self.assertEqual(with_params.content_size, zstd.CONTENTSIZE_UNKNOWN) | 501 self.assertEqual(with_params.content_size, zstd.CONTENTSIZE_UNKNOWN) |
583 class TestCompressor_stream_reader(TestCase): | 598 class TestCompressor_stream_reader(TestCase): |
584 def test_context_manager(self): | 599 def test_context_manager(self): |
585 cctx = zstd.ZstdCompressor() | 600 cctx = zstd.ZstdCompressor() |
586 | 601 |
587 with cctx.stream_reader(b"foo") as reader: | 602 with cctx.stream_reader(b"foo") as reader: |
588 with self.assertRaisesRegex(ValueError, "cannot __enter__ multiple times"): | 603 with self.assertRaisesRegex( |
604 ValueError, "cannot __enter__ multiple times" | |
605 ): | |
589 with reader as reader2: | 606 with reader as reader2: |
590 pass | 607 pass |
591 | 608 |
592 def test_no_context_manager(self): | 609 def test_no_context_manager(self): |
593 cctx = zstd.ZstdCompressor() | 610 cctx = zstd.ZstdCompressor() |
742 cctx = zstd.ZstdCompressor() | 759 cctx = zstd.ZstdCompressor() |
743 | 760 |
744 source = io.BytesIO(b"foobar") | 761 source = io.BytesIO(b"foobar") |
745 | 762 |
746 with cctx.stream_reader(source, size=2) as reader: | 763 with cctx.stream_reader(source, size=2) as reader: |
747 with self.assertRaisesRegex(zstd.ZstdError, "Src size is incorrect"): | 764 with self.assertRaisesRegex( |
765 zstd.ZstdError, "Src size is incorrect" | |
766 ): | |
748 reader.read(10) | 767 reader.read(10) |
749 | 768 |
750 # Try another compression operation. | 769 # Try another compression operation. |
751 with cctx.stream_reader(source, size=42): | 770 with cctx.stream_reader(source, size=42): |
752 pass | 771 pass |
1124 self.assertEqual(no_params.dict_id, 0) | 1143 self.assertEqual(no_params.dict_id, 0) |
1125 self.assertEqual(with_params.dict_id, 0) | 1144 self.assertEqual(with_params.dict_id, 0) |
1126 self.assertFalse(no_params.has_checksum) | 1145 self.assertFalse(no_params.has_checksum) |
1127 self.assertTrue(with_params.has_checksum) | 1146 self.assertTrue(with_params.has_checksum) |
1128 | 1147 |
1129 self.assertEqual(len(with_checksum.getvalue()), len(no_checksum.getvalue()) + 4) | 1148 self.assertEqual( |
1149 len(with_checksum.getvalue()), len(no_checksum.getvalue()) + 4 | |
1150 ) | |
1130 | 1151 |
1131 def test_write_content_size(self): | 1152 def test_write_content_size(self): |
1132 no_size = NonClosingBytesIO() | 1153 no_size = NonClosingBytesIO() |
1133 cctx = zstd.ZstdCompressor(level=1, write_content_size=False) | 1154 cctx = zstd.ZstdCompressor(level=1, write_content_size=False) |
1134 with cctx.stream_writer(no_size) as compressor: | 1155 with cctx.stream_writer(no_size) as compressor: |
1143 # written. | 1164 # written. |
1144 self.assertEqual(len(with_size.getvalue()), len(no_size.getvalue())) | 1165 self.assertEqual(len(with_size.getvalue()), len(no_size.getvalue())) |
1145 | 1166 |
1146 # Declaring size will write the header. | 1167 # Declaring size will write the header. |
1147 with_size = NonClosingBytesIO() | 1168 with_size = NonClosingBytesIO() |
1148 with cctx.stream_writer(with_size, size=len(b"foobar" * 256)) as compressor: | 1169 with cctx.stream_writer( |
1170 with_size, size=len(b"foobar" * 256) | |
1171 ) as compressor: | |
1149 self.assertEqual(compressor.write(b"foobar" * 256), 0) | 1172 self.assertEqual(compressor.write(b"foobar" * 256), 0) |
1150 | 1173 |
1151 no_params = zstd.get_frame_parameters(no_size.getvalue()) | 1174 no_params = zstd.get_frame_parameters(no_size.getvalue()) |
1152 with_params = zstd.get_frame_parameters(with_size.getvalue()) | 1175 with_params = zstd.get_frame_parameters(with_size.getvalue()) |
1153 self.assertEqual(no_params.content_size, zstd.CONTENTSIZE_UNKNOWN) | 1176 self.assertEqual(no_params.content_size, zstd.CONTENTSIZE_UNKNOWN) |
1189 self.assertEqual(no_params.dict_id, 0) | 1212 self.assertEqual(no_params.dict_id, 0) |
1190 self.assertEqual(with_params.dict_id, d.dict_id()) | 1213 self.assertEqual(with_params.dict_id, d.dict_id()) |
1191 self.assertFalse(no_params.has_checksum) | 1214 self.assertFalse(no_params.has_checksum) |
1192 self.assertFalse(with_params.has_checksum) | 1215 self.assertFalse(with_params.has_checksum) |
1193 | 1216 |
1194 self.assertEqual(len(with_dict_id.getvalue()), len(no_dict_id.getvalue()) + 4) | 1217 self.assertEqual( |
1218 len(with_dict_id.getvalue()), len(no_dict_id.getvalue()) + 4 | |
1219 ) | |
1195 | 1220 |
1196 def test_memory_size(self): | 1221 def test_memory_size(self): |
1197 cctx = zstd.ZstdCompressor(level=3) | 1222 cctx = zstd.ZstdCompressor(level=3) |
1198 buffer = io.BytesIO() | 1223 buffer = io.BytesIO() |
1199 with cctx.stream_writer(buffer) as compressor: | 1224 with cctx.stream_writer(buffer) as compressor: |
1335 | 1360 |
1336 # Buffer protocol works. | 1361 # Buffer protocol works. |
1337 for chunk in cctx.read_to_iter(b"foobar"): | 1362 for chunk in cctx.read_to_iter(b"foobar"): |
1338 pass | 1363 pass |
1339 | 1364 |
1340 with self.assertRaisesRegex(ValueError, "must pass an object with a read"): | 1365 with self.assertRaisesRegex( |
1366 ValueError, "must pass an object with a read" | |
1367 ): | |
1341 for chunk in cctx.read_to_iter(True): | 1368 for chunk in cctx.read_to_iter(True): |
1342 pass | 1369 pass |
1343 | 1370 |
1344 def test_read_empty(self): | 1371 def test_read_empty(self): |
1345 cctx = zstd.ZstdCompressor(level=1, write_content_size=False) | 1372 cctx = zstd.ZstdCompressor(level=1, write_content_size=False) |
1511 ], | 1538 ], |
1512 ) | 1539 ) |
1513 | 1540 |
1514 dctx = zstd.ZstdDecompressor() | 1541 dctx = zstd.ZstdDecompressor() |
1515 | 1542 |
1516 self.assertEqual(dctx.decompress(b"".join(chunks)), (b"x" * 1000) + (b"y" * 24)) | 1543 self.assertEqual( |
1544 dctx.decompress(b"".join(chunks)), (b"x" * 1000) + (b"y" * 24) | |
1545 ) | |
1517 | 1546 |
1518 def test_small_chunk_size(self): | 1547 def test_small_chunk_size(self): |
1519 cctx = zstd.ZstdCompressor() | 1548 cctx = zstd.ZstdCompressor() |
1520 chunker = cctx.chunker(chunk_size=1) | 1549 chunker = cctx.chunker(chunk_size=1) |
1521 | 1550 |
1531 b"\xfa\xd3\x77\x43", | 1560 b"\xfa\xd3\x77\x43", |
1532 ) | 1561 ) |
1533 | 1562 |
1534 dctx = zstd.ZstdDecompressor() | 1563 dctx = zstd.ZstdDecompressor() |
1535 self.assertEqual( | 1564 self.assertEqual( |
1536 dctx.decompress(b"".join(chunks), max_output_size=10000), b"foo" * 1024 | 1565 dctx.decompress(b"".join(chunks), max_output_size=10000), |
1566 b"foo" * 1024, | |
1537 ) | 1567 ) |
1538 | 1568 |
1539 def test_input_types(self): | 1569 def test_input_types(self): |
1540 cctx = zstd.ZstdCompressor() | 1570 cctx = zstd.ZstdCompressor() |
1541 | 1571 |
1600 | 1630 |
1601 list(chunker.compress(b"foo")) | 1631 list(chunker.compress(b"foo")) |
1602 list(chunker.finish()) | 1632 list(chunker.finish()) |
1603 | 1633 |
1604 with self.assertRaisesRegex( | 1634 with self.assertRaisesRegex( |
1605 zstd.ZstdError, r"cannot call compress\(\) after compression finished" | 1635 zstd.ZstdError, |
1636 r"cannot call compress\(\) after compression finished", | |
1606 ): | 1637 ): |
1607 list(chunker.compress(b"foo")) | 1638 list(chunker.compress(b"foo")) |
1608 | 1639 |
1609 def test_flush_after_finish(self): | 1640 def test_flush_after_finish(self): |
1610 cctx = zstd.ZstdCompressor() | 1641 cctx = zstd.ZstdCompressor() |
1642 cctx.multi_compress_to_buffer(True) | 1673 cctx.multi_compress_to_buffer(True) |
1643 | 1674 |
1644 with self.assertRaises(TypeError): | 1675 with self.assertRaises(TypeError): |
1645 cctx.multi_compress_to_buffer((1, 2)) | 1676 cctx.multi_compress_to_buffer((1, 2)) |
1646 | 1677 |
1647 with self.assertRaisesRegex(TypeError, "item 0 not a bytes like object"): | 1678 with self.assertRaisesRegex( |
1679 TypeError, "item 0 not a bytes like object" | |
1680 ): | |
1648 cctx.multi_compress_to_buffer([u"foo"]) | 1681 cctx.multi_compress_to_buffer([u"foo"]) |
1649 | 1682 |
1650 def test_empty_input(self): | 1683 def test_empty_input(self): |
1651 cctx = zstd.ZstdCompressor() | 1684 cctx = zstd.ZstdCompressor() |
1652 | 1685 |