Mercurial > public > mercurial-scm > hg-stable
comparison contrib/python-zstandard/tests/test_decompressor_fuzzing.py @ 44232:5e84a96d865b
python-zstandard: blacken at 80 characters
I made this change upstream and it will make it into the next
release of python-zstandard. I figured I'd send it Mercurial's
way because it will allow us to drop this directory from the black
exclusion list.
# skip-blame blackening
Differential Revision: https://phab.mercurial-scm.org/D7937
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Wed, 22 Jan 2020 22:23:04 -0800 |
parents | de7838053207 |
children | 493034cc3265 |
comparison
equal
deleted
inserted
replaced
44231:45ec64d93b3a | 44232:5e84a96d865b |
---|---|
194 original=strategies.sampled_from(random_input_data()), | 194 original=strategies.sampled_from(random_input_data()), |
195 level=strategies.integers(min_value=1, max_value=5), | 195 level=strategies.integers(min_value=1, max_value=5), |
196 streaming=strategies.booleans(), | 196 streaming=strategies.booleans(), |
197 source_read_size=strategies.integers(1, 1048576), | 197 source_read_size=strategies.integers(1, 1048576), |
198 ) | 198 ) |
199 def test_stream_source_readall(self, original, level, streaming, source_read_size): | 199 def test_stream_source_readall( |
200 self, original, level, streaming, source_read_size | |
201 ): | |
200 cctx = zstd.ZstdCompressor(level=level) | 202 cctx = zstd.ZstdCompressor(level=level) |
201 | 203 |
202 if streaming: | 204 if streaming: |
203 source = io.BytesIO() | 205 source = io.BytesIO() |
204 writer = cctx.stream_writer(source) | 206 writer = cctx.stream_writer(source) |
396 original=strategies.sampled_from(random_input_data()), | 398 original=strategies.sampled_from(random_input_data()), |
397 level=strategies.integers(min_value=1, max_value=5), | 399 level=strategies.integers(min_value=1, max_value=5), |
398 write_size=strategies.integers(min_value=1, max_value=8192), | 400 write_size=strategies.integers(min_value=1, max_value=8192), |
399 input_sizes=strategies.data(), | 401 input_sizes=strategies.data(), |
400 ) | 402 ) |
401 def test_write_size_variance(self, original, level, write_size, input_sizes): | 403 def test_write_size_variance( |
404 self, original, level, write_size, input_sizes | |
405 ): | |
402 cctx = zstd.ZstdCompressor(level=level) | 406 cctx = zstd.ZstdCompressor(level=level) |
403 frame = cctx.compress(original) | 407 frame = cctx.compress(original) |
404 | 408 |
405 dctx = zstd.ZstdDecompressor() | 409 dctx = zstd.ZstdDecompressor() |
406 source = io.BytesIO(frame) | 410 source = io.BytesIO(frame) |
431 original=strategies.sampled_from(random_input_data()), | 435 original=strategies.sampled_from(random_input_data()), |
432 level=strategies.integers(min_value=1, max_value=5), | 436 level=strategies.integers(min_value=1, max_value=5), |
433 read_size=strategies.integers(min_value=1, max_value=8192), | 437 read_size=strategies.integers(min_value=1, max_value=8192), |
434 write_size=strategies.integers(min_value=1, max_value=8192), | 438 write_size=strategies.integers(min_value=1, max_value=8192), |
435 ) | 439 ) |
436 def test_read_write_size_variance(self, original, level, read_size, write_size): | 440 def test_read_write_size_variance( |
441 self, original, level, read_size, write_size | |
442 ): | |
437 cctx = zstd.ZstdCompressor(level=level) | 443 cctx = zstd.ZstdCompressor(level=level) |
438 frame = cctx.compress(original) | 444 frame = cctx.compress(original) |
439 | 445 |
440 source = io.BytesIO(frame) | 446 source = io.BytesIO(frame) |
441 dest = io.BytesIO() | 447 dest = io.BytesIO() |
442 | 448 |
443 dctx = zstd.ZstdDecompressor() | 449 dctx = zstd.ZstdDecompressor() |
444 dctx.copy_stream(source, dest, read_size=read_size, write_size=write_size) | 450 dctx.copy_stream( |
451 source, dest, read_size=read_size, write_size=write_size | |
452 ) | |
445 | 453 |
446 self.assertEqual(dest.getvalue(), original) | 454 self.assertEqual(dest.getvalue(), original) |
447 | 455 |
448 | 456 |
449 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set") | 457 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set") |
488 ) | 496 ) |
489 @hypothesis.given( | 497 @hypothesis.given( |
490 original=strategies.sampled_from(random_input_data()), | 498 original=strategies.sampled_from(random_input_data()), |
491 level=strategies.integers(min_value=1, max_value=5), | 499 level=strategies.integers(min_value=1, max_value=5), |
492 write_size=strategies.integers( | 500 write_size=strategies.integers( |
493 min_value=1, max_value=4 * zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE | 501 min_value=1, |
502 max_value=4 * zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE, | |
494 ), | 503 ), |
495 chunk_sizes=strategies.data(), | 504 chunk_sizes=strategies.data(), |
496 ) | 505 ) |
497 def test_random_output_sizes(self, original, level, write_size, chunk_sizes): | 506 def test_random_output_sizes( |
507 self, original, level, write_size, chunk_sizes | |
508 ): | |
498 cctx = zstd.ZstdCompressor(level=level) | 509 cctx = zstd.ZstdCompressor(level=level) |
499 frame = cctx.compress(original) | 510 frame = cctx.compress(original) |
500 | 511 |
501 source = io.BytesIO(frame) | 512 source = io.BytesIO(frame) |
502 | 513 |
522 original=strategies.sampled_from(random_input_data()), | 533 original=strategies.sampled_from(random_input_data()), |
523 level=strategies.integers(min_value=1, max_value=5), | 534 level=strategies.integers(min_value=1, max_value=5), |
524 read_size=strategies.integers(min_value=1, max_value=4096), | 535 read_size=strategies.integers(min_value=1, max_value=4096), |
525 write_size=strategies.integers(min_value=1, max_value=4096), | 536 write_size=strategies.integers(min_value=1, max_value=4096), |
526 ) | 537 ) |
527 def test_read_write_size_variance(self, original, level, read_size, write_size): | 538 def test_read_write_size_variance( |
539 self, original, level, read_size, write_size | |
540 ): | |
528 cctx = zstd.ZstdCompressor(level=level) | 541 cctx = zstd.ZstdCompressor(level=level) |
529 frame = cctx.compress(original) | 542 frame = cctx.compress(original) |
530 | 543 |
531 source = io.BytesIO(frame) | 544 source = io.BytesIO(frame) |
532 | 545 |
533 dctx = zstd.ZstdDecompressor() | 546 dctx = zstd.ZstdDecompressor() |
534 chunks = list( | 547 chunks = list( |
535 dctx.read_to_iter(source, read_size=read_size, write_size=write_size) | 548 dctx.read_to_iter( |
549 source, read_size=read_size, write_size=write_size | |
550 ) | |
536 ) | 551 ) |
537 | 552 |
538 self.assertEqual(b"".join(chunks), original) | 553 self.assertEqual(b"".join(chunks), original) |
539 | 554 |
540 | 555 |
541 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set") | 556 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set") |
542 class TestDecompressor_multi_decompress_to_buffer_fuzzing(TestCase): | 557 class TestDecompressor_multi_decompress_to_buffer_fuzzing(TestCase): |
543 @hypothesis.given( | 558 @hypothesis.given( |
544 original=strategies.lists( | 559 original=strategies.lists( |
545 strategies.sampled_from(random_input_data()), min_size=1, max_size=1024 | 560 strategies.sampled_from(random_input_data()), |
561 min_size=1, | |
562 max_size=1024, | |
546 ), | 563 ), |
547 threads=strategies.integers(min_value=1, max_value=8), | 564 threads=strategies.integers(min_value=1, max_value=8), |
548 use_dict=strategies.booleans(), | 565 use_dict=strategies.booleans(), |
549 ) | 566 ) |
550 def test_data_equivalence(self, original, threads, use_dict): | 567 def test_data_equivalence(self, original, threads, use_dict): |