Mercurial > public > mercurial-scm > hg-stable
diff contrib/python-zstandard/tests/test_compressor_fuzzing.py @ 43999:de7838053207
zstandard: vendor python-zstandard 0.13.0
Version 0.13.0 of the package was just released. It contains
an upgraded zstd C library which can result in some performance
wins, official support for Python 3.8, and a blackened code base.
There were no meaningful code or functionality changes in this
release of python-zstandard: just reformatting and an upgraded
zstd library version. So the diff seems much larger than what it
is.
Files were added without modifications.
The clang-format-ignorelist file was updated to reflect a new
header file in the zstd distribution.
# no-check-commit because 3rd party code has different style guidelines
Differential Revision: https://phab.mercurial-scm.org/D7770
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Sat, 28 Dec 2019 09:55:45 -0800 |
parents | 675775c33ab6 |
children | 5e84a96d865b |
line wrap: on
line diff
--- a/contrib/python-zstandard/tests/test_compressor_fuzzing.py Fri Dec 27 18:54:57 2019 -0500 +++ b/contrib/python-zstandard/tests/test_compressor_fuzzing.py Sat Dec 28 09:55:45 2019 -0800 @@ -6,28 +6,31 @@ import hypothesis import hypothesis.strategies as strategies except ImportError: - raise unittest.SkipTest('hypothesis not available') + raise unittest.SkipTest("hypothesis not available") import zstandard as zstd -from . common import ( +from .common import ( make_cffi, NonClosingBytesIO, random_input_data, + TestCase, ) -@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +@unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set") @make_cffi -class TestCompressor_stream_reader_fuzzing(unittest.TestCase): +class TestCompressor_stream_reader_fuzzing(TestCase): @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - source_read_size=strategies.integers(1, 16384), - read_size=strategies.integers(-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE)) - def test_stream_source_read(self, original, level, source_read_size, - read_size): + suppress_health_check=[hypothesis.HealthCheck.large_base_example] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + source_read_size=strategies.integers(1, 16384), + read_size=strategies.integers(-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE), + ) + def test_stream_source_read(self, original, level, source_read_size, read_size): if read_size == 0: read_size = -1 @@ -35,8 +38,9 @@ ref_frame = refctx.compress(original) cctx = zstd.ZstdCompressor(level=level) - with cctx.stream_reader(io.BytesIO(original), size=len(original), - read_size=source_read_size) as reader: + with cctx.stream_reader( + io.BytesIO(original), size=len(original), read_size=source_read_size + ) as reader: chunks = [] while True: chunk = reader.read(read_size) @@ -45,16 +49,18 @@ chunks.append(chunk) - self.assertEqual(b''.join(chunks), ref_frame) + self.assertEqual(b"".join(chunks), ref_frame) @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - source_read_size=strategies.integers(1, 16384), - read_size=strategies.integers(-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE)) - def test_buffer_source_read(self, original, level, source_read_size, - read_size): + suppress_health_check=[hypothesis.HealthCheck.large_base_example] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + source_read_size=strategies.integers(1, 16384), + read_size=strategies.integers(-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE), + ) + def test_buffer_source_read(self, original, level, source_read_size, read_size): if read_size == 0: read_size = -1 @@ -62,8 +68,9 @@ ref_frame = refctx.compress(original) cctx = zstd.ZstdCompressor(level=level) - with cctx.stream_reader(original, size=len(original), - read_size=source_read_size) as reader: + with cctx.stream_reader( + original, size=len(original), read_size=source_read_size + ) as reader: chunks = [] while True: chunk = reader.read(read_size) @@ -72,22 +79,30 @@ chunks.append(chunk) - self.assertEqual(b''.join(chunks), ref_frame) + self.assertEqual(b"".join(chunks), ref_frame) @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - source_read_size=strategies.integers(1, 16384), - read_sizes=strategies.data()) - def test_stream_source_read_variance(self, original, level, source_read_size, - read_sizes): + suppress_health_check=[ + hypothesis.HealthCheck.large_base_example, + hypothesis.HealthCheck.too_slow, + ] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + source_read_size=strategies.integers(1, 16384), + read_sizes=strategies.data(), + ) + def test_stream_source_read_variance( + self, original, level, source_read_size, read_sizes + ): refctx = zstd.ZstdCompressor(level=level) ref_frame = refctx.compress(original) cctx = zstd.ZstdCompressor(level=level) - with cctx.stream_reader(io.BytesIO(original), size=len(original), - read_size=source_read_size) as reader: + with cctx.stream_reader( + io.BytesIO(original), size=len(original), read_size=source_read_size + ) as reader: chunks = [] while True: read_size = read_sizes.draw(strategies.integers(-1, 16384)) @@ -97,23 +112,31 @@ chunks.append(chunk) - self.assertEqual(b''.join(chunks), ref_frame) + self.assertEqual(b"".join(chunks), ref_frame) @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - source_read_size=strategies.integers(1, 16384), - read_sizes=strategies.data()) - def test_buffer_source_read_variance(self, original, level, source_read_size, - read_sizes): + suppress_health_check=[ + hypothesis.HealthCheck.large_base_example, + hypothesis.HealthCheck.too_slow, + ] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + source_read_size=strategies.integers(1, 16384), + read_sizes=strategies.data(), + ) + def test_buffer_source_read_variance( + self, original, level, source_read_size, read_sizes + ): refctx = zstd.ZstdCompressor(level=level) ref_frame = refctx.compress(original) cctx = zstd.ZstdCompressor(level=level) - with cctx.stream_reader(original, size=len(original), - read_size=source_read_size) as reader: + with cctx.stream_reader( + original, size=len(original), read_size=source_read_size + ) as reader: chunks = [] while True: read_size = read_sizes.draw(strategies.integers(-1, 16384)) @@ -123,22 +146,25 @@ chunks.append(chunk) - self.assertEqual(b''.join(chunks), ref_frame) + self.assertEqual(b"".join(chunks), ref_frame) @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - source_read_size=strategies.integers(1, 16384), - read_size=strategies.integers(1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE)) - def test_stream_source_readinto(self, original, level, - source_read_size, read_size): + suppress_health_check=[hypothesis.HealthCheck.large_base_example] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + source_read_size=strategies.integers(1, 16384), + read_size=strategies.integers(1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE), + ) + def test_stream_source_readinto(self, original, level, source_read_size, read_size): refctx = zstd.ZstdCompressor(level=level) ref_frame = refctx.compress(original) cctx = zstd.ZstdCompressor(level=level) - with cctx.stream_reader(io.BytesIO(original), size=len(original), - read_size=source_read_size) as reader: + with cctx.stream_reader( + io.BytesIO(original), size=len(original), read_size=source_read_size + ) as reader: chunks = [] while True: b = bytearray(read_size) @@ -149,23 +175,26 @@ chunks.append(bytes(b[0:count])) - self.assertEqual(b''.join(chunks), ref_frame) + self.assertEqual(b"".join(chunks), ref_frame) @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - source_read_size=strategies.integers(1, 16384), - read_size=strategies.integers(1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE)) - def test_buffer_source_readinto(self, original, level, - source_read_size, read_size): + suppress_health_check=[hypothesis.HealthCheck.large_base_example] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + source_read_size=strategies.integers(1, 16384), + read_size=strategies.integers(1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE), + ) + def test_buffer_source_readinto(self, original, level, source_read_size, read_size): refctx = zstd.ZstdCompressor(level=level) ref_frame = refctx.compress(original) cctx = zstd.ZstdCompressor(level=level) - with cctx.stream_reader(original, size=len(original), - read_size=source_read_size) as reader: + with cctx.stream_reader( + original, size=len(original), read_size=source_read_size + ) as reader: chunks = [] while True: b = bytearray(read_size) @@ -176,22 +205,30 @@ chunks.append(bytes(b[0:count])) - self.assertEqual(b''.join(chunks), ref_frame) + self.assertEqual(b"".join(chunks), ref_frame) @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - source_read_size=strategies.integers(1, 16384), - read_sizes=strategies.data()) - def test_stream_source_readinto_variance(self, original, level, - source_read_size, read_sizes): + suppress_health_check=[ + hypothesis.HealthCheck.large_base_example, + hypothesis.HealthCheck.too_slow, + ] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + source_read_size=strategies.integers(1, 16384), + read_sizes=strategies.data(), + ) + def test_stream_source_readinto_variance( + self, original, level, source_read_size, read_sizes + ): refctx = zstd.ZstdCompressor(level=level) ref_frame = refctx.compress(original) cctx = zstd.ZstdCompressor(level=level) - with cctx.stream_reader(io.BytesIO(original), size=len(original), - read_size=source_read_size) as reader: + with cctx.stream_reader( + io.BytesIO(original), size=len(original), read_size=source_read_size + ) as reader: chunks = [] while True: read_size = read_sizes.draw(strategies.integers(1, 16384)) @@ -203,23 +240,31 @@ chunks.append(bytes(b[0:count])) - self.assertEqual(b''.join(chunks), ref_frame) + self.assertEqual(b"".join(chunks), ref_frame) @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - source_read_size=strategies.integers(1, 16384), - read_sizes=strategies.data()) - def test_buffer_source_readinto_variance(self, original, level, - source_read_size, read_sizes): + suppress_health_check=[ + hypothesis.HealthCheck.large_base_example, + hypothesis.HealthCheck.too_slow, + ] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + source_read_size=strategies.integers(1, 16384), + read_sizes=strategies.data(), + ) + def test_buffer_source_readinto_variance( + self, original, level, source_read_size, read_sizes + ): refctx = zstd.ZstdCompressor(level=level) ref_frame = refctx.compress(original) cctx = zstd.ZstdCompressor(level=level) - with cctx.stream_reader(original, size=len(original), - read_size=source_read_size) as reader: + with cctx.stream_reader( + original, size=len(original), read_size=source_read_size + ) as reader: chunks = [] while True: read_size = read_sizes.draw(strategies.integers(1, 16384)) @@ -231,16 +276,18 @@ chunks.append(bytes(b[0:count])) - self.assertEqual(b''.join(chunks), ref_frame) + self.assertEqual(b"".join(chunks), ref_frame) @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - source_read_size=strategies.integers(1, 16384), - read_size=strategies.integers(-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE)) - def test_stream_source_read1(self, original, level, source_read_size, - read_size): + suppress_health_check=[hypothesis.HealthCheck.large_base_example] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + source_read_size=strategies.integers(1, 16384), + read_size=strategies.integers(-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE), + ) + def test_stream_source_read1(self, original, level, source_read_size, read_size): if read_size == 0: read_size = -1 @@ -248,8 +295,9 @@ ref_frame = refctx.compress(original) cctx = zstd.ZstdCompressor(level=level) - with cctx.stream_reader(io.BytesIO(original), size=len(original), - read_size=source_read_size) as reader: + with cctx.stream_reader( + io.BytesIO(original), size=len(original), read_size=source_read_size + ) as reader: chunks = [] while True: chunk = reader.read1(read_size) @@ -258,16 +306,18 @@ chunks.append(chunk) - self.assertEqual(b''.join(chunks), ref_frame) + self.assertEqual(b"".join(chunks), ref_frame) @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - source_read_size=strategies.integers(1, 16384), - read_size=strategies.integers(-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE)) - def test_buffer_source_read1(self, original, level, source_read_size, - read_size): + suppress_health_check=[hypothesis.HealthCheck.large_base_example] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + source_read_size=strategies.integers(1, 16384), + read_size=strategies.integers(-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE), + ) + def test_buffer_source_read1(self, original, level, source_read_size, read_size): if read_size == 0: read_size = -1 @@ -275,8 +325,9 @@ ref_frame = refctx.compress(original) cctx = zstd.ZstdCompressor(level=level) - with cctx.stream_reader(original, size=len(original), - read_size=source_read_size) as reader: + with cctx.stream_reader( + original, size=len(original), read_size=source_read_size + ) as reader: chunks = [] while True: chunk = reader.read1(read_size) @@ -285,22 +336,30 @@ chunks.append(chunk) - self.assertEqual(b''.join(chunks), ref_frame) + self.assertEqual(b"".join(chunks), ref_frame) @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - source_read_size=strategies.integers(1, 16384), - read_sizes=strategies.data()) - def test_stream_source_read1_variance(self, original, level, source_read_size, - read_sizes): + suppress_health_check=[ + hypothesis.HealthCheck.large_base_example, + hypothesis.HealthCheck.too_slow, + ] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + source_read_size=strategies.integers(1, 16384), + read_sizes=strategies.data(), + ) + def test_stream_source_read1_variance( + self, original, level, source_read_size, read_sizes + ): refctx = zstd.ZstdCompressor(level=level) ref_frame = refctx.compress(original) cctx = zstd.ZstdCompressor(level=level) - with cctx.stream_reader(io.BytesIO(original), size=len(original), - read_size=source_read_size) as reader: + with cctx.stream_reader( + io.BytesIO(original), size=len(original), read_size=source_read_size + ) as reader: chunks = [] while True: read_size = read_sizes.draw(strategies.integers(-1, 16384)) @@ -310,23 +369,31 @@ chunks.append(chunk) - self.assertEqual(b''.join(chunks), ref_frame) + self.assertEqual(b"".join(chunks), ref_frame) @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - source_read_size=strategies.integers(1, 16384), - read_sizes=strategies.data()) - def test_buffer_source_read1_variance(self, original, level, source_read_size, - read_sizes): + suppress_health_check=[ + hypothesis.HealthCheck.large_base_example, + hypothesis.HealthCheck.too_slow, + ] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + source_read_size=strategies.integers(1, 16384), + read_sizes=strategies.data(), + ) + def test_buffer_source_read1_variance( + self, original, level, source_read_size, read_sizes + ): refctx = zstd.ZstdCompressor(level=level) ref_frame = refctx.compress(original) cctx = zstd.ZstdCompressor(level=level) - with cctx.stream_reader(original, size=len(original), - read_size=source_read_size) as reader: + with cctx.stream_reader( + original, size=len(original), read_size=source_read_size + ) as reader: chunks = [] while True: read_size = read_sizes.draw(strategies.integers(-1, 16384)) @@ -336,17 +403,20 @@ chunks.append(chunk) - self.assertEqual(b''.join(chunks), ref_frame) - + self.assertEqual(b"".join(chunks), ref_frame) @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - source_read_size=strategies.integers(1, 16384), - read_size=strategies.integers(1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE)) - def test_stream_source_readinto1(self, original, level, source_read_size, - read_size): + suppress_health_check=[hypothesis.HealthCheck.large_base_example] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + source_read_size=strategies.integers(1, 16384), + read_size=strategies.integers(1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE), + ) + def test_stream_source_readinto1( + self, original, level, source_read_size, read_size + ): if read_size == 0: read_size = -1 @@ -354,8 +424,9 @@ ref_frame = refctx.compress(original) cctx = zstd.ZstdCompressor(level=level) - with cctx.stream_reader(io.BytesIO(original), size=len(original), - read_size=source_read_size) as reader: + with cctx.stream_reader( + io.BytesIO(original), size=len(original), read_size=source_read_size + ) as reader: chunks = [] while True: b = bytearray(read_size) @@ -366,16 +437,20 @@ chunks.append(bytes(b[0:count])) - self.assertEqual(b''.join(chunks), ref_frame) + self.assertEqual(b"".join(chunks), ref_frame) @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - source_read_size=strategies.integers(1, 16384), - read_size=strategies.integers(1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE)) - def test_buffer_source_readinto1(self, original, level, source_read_size, - read_size): + suppress_health_check=[hypothesis.HealthCheck.large_base_example] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + source_read_size=strategies.integers(1, 16384), + read_size=strategies.integers(1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE), + ) + def test_buffer_source_readinto1( + self, original, level, source_read_size, read_size + ): if read_size == 0: read_size = -1 @@ -383,8 +458,9 @@ ref_frame = refctx.compress(original) cctx = zstd.ZstdCompressor(level=level) - with cctx.stream_reader(original, size=len(original), - read_size=source_read_size) as reader: + with cctx.stream_reader( + original, size=len(original), read_size=source_read_size + ) as reader: chunks = [] while True: b = bytearray(read_size) @@ -395,22 +471,30 @@ chunks.append(bytes(b[0:count])) - self.assertEqual(b''.join(chunks), ref_frame) + self.assertEqual(b"".join(chunks), ref_frame) @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - source_read_size=strategies.integers(1, 16384), - read_sizes=strategies.data()) - def test_stream_source_readinto1_variance(self, original, level, source_read_size, - read_sizes): + suppress_health_check=[ + hypothesis.HealthCheck.large_base_example, + hypothesis.HealthCheck.too_slow, + ] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + source_read_size=strategies.integers(1, 16384), + read_sizes=strategies.data(), + ) + def test_stream_source_readinto1_variance( + self, original, level, source_read_size, read_sizes + ): refctx = zstd.ZstdCompressor(level=level) ref_frame = refctx.compress(original) cctx = zstd.ZstdCompressor(level=level) - with cctx.stream_reader(io.BytesIO(original), size=len(original), - read_size=source_read_size) as reader: + with cctx.stream_reader( + io.BytesIO(original), size=len(original), read_size=source_read_size + ) as reader: chunks = [] while True: read_size = read_sizes.draw(strategies.integers(1, 16384)) @@ -422,23 +506,31 @@ chunks.append(bytes(b[0:count])) - self.assertEqual(b''.join(chunks), ref_frame) + self.assertEqual(b"".join(chunks), ref_frame) @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - source_read_size=strategies.integers(1, 16384), - read_sizes=strategies.data()) - def test_buffer_source_readinto1_variance(self, original, level, source_read_size, - read_sizes): + suppress_health_check=[ + hypothesis.HealthCheck.large_base_example, + hypothesis.HealthCheck.too_slow, + ] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + source_read_size=strategies.integers(1, 16384), + read_sizes=strategies.data(), + ) + def test_buffer_source_readinto1_variance( + self, original, level, source_read_size, read_sizes + ): refctx = zstd.ZstdCompressor(level=level) ref_frame = refctx.compress(original) cctx = zstd.ZstdCompressor(level=level) - with cctx.stream_reader(original, size=len(original), - read_size=source_read_size) as reader: + with cctx.stream_reader( + original, size=len(original), read_size=source_read_size + ) as reader: chunks = [] while True: read_size = read_sizes.draw(strategies.integers(1, 16384)) @@ -450,35 +542,40 @@ chunks.append(bytes(b[0:count])) - self.assertEqual(b''.join(chunks), ref_frame) - + self.assertEqual(b"".join(chunks), ref_frame) -@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +@unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set") @make_cffi -class TestCompressor_stream_writer_fuzzing(unittest.TestCase): - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - write_size=strategies.integers(min_value=1, max_value=1048576)) +class TestCompressor_stream_writer_fuzzing(TestCase): + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + write_size=strategies.integers(min_value=1, max_value=1048576), + ) def test_write_size_variance(self, original, level, write_size): refctx = zstd.ZstdCompressor(level=level) ref_frame = refctx.compress(original) cctx = zstd.ZstdCompressor(level=level) b = NonClosingBytesIO() - with cctx.stream_writer(b, size=len(original), write_size=write_size) as compressor: + with cctx.stream_writer( + b, size=len(original), write_size=write_size + ) as compressor: compressor.write(original) self.assertEqual(b.getvalue(), ref_frame) -@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +@unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set") @make_cffi -class TestCompressor_copy_stream_fuzzing(unittest.TestCase): - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - read_size=strategies.integers(min_value=1, max_value=1048576), - write_size=strategies.integers(min_value=1, max_value=1048576)) +class TestCompressor_copy_stream_fuzzing(TestCase): + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + read_size=strategies.integers(min_value=1, max_value=1048576), + write_size=strategies.integers(min_value=1, max_value=1048576), + ) def test_read_write_size_variance(self, original, level, read_size, write_size): refctx = zstd.ZstdCompressor(level=level) ref_frame = refctx.compress(original) @@ -487,20 +584,27 @@ source = io.BytesIO(original) dest = io.BytesIO() - cctx.copy_stream(source, dest, size=len(original), read_size=read_size, - write_size=write_size) + cctx.copy_stream( + source, dest, size=len(original), read_size=read_size, write_size=write_size + ) self.assertEqual(dest.getvalue(), ref_frame) -@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +@unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set") @make_cffi -class TestCompressor_compressobj_fuzzing(unittest.TestCase): +class TestCompressor_compressobj_fuzzing(TestCase): @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - chunk_sizes=strategies.data()) + suppress_health_check=[ + hypothesis.HealthCheck.large_base_example, + hypothesis.HealthCheck.too_slow, + ] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + chunk_sizes=strategies.data(), + ) def test_random_input_sizes(self, original, level, chunk_sizes): refctx = zstd.ZstdCompressor(level=level) ref_frame = refctx.compress(original) @@ -512,7 +616,7 @@ i = 0 while True: chunk_size = chunk_sizes.draw(strategies.integers(1, 4096)) - source = original[i:i + chunk_size] + source = original[i : i + chunk_size] if not source: break @@ -521,14 +625,20 @@ chunks.append(cobj.flush()) - self.assertEqual(b''.join(chunks), ref_frame) + self.assertEqual(b"".join(chunks), ref_frame) @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - chunk_sizes=strategies.data(), - flushes=strategies.data()) + suppress_health_check=[ + hypothesis.HealthCheck.large_base_example, + hypothesis.HealthCheck.too_slow, + ] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + chunk_sizes=strategies.data(), + flushes=strategies.data(), + ) def test_flush_block(self, original, level, chunk_sizes, flushes): cctx = zstd.ZstdCompressor(level=level) cobj = cctx.compressobj() @@ -541,7 +651,7 @@ i = 0 while True: input_size = chunk_sizes.draw(strategies.integers(1, 4096)) - source = original[i:i + input_size] + source = original[i : i + input_size] if not source: break @@ -558,24 +668,28 @@ compressed_chunks.append(chunk) decompressed_chunks.append(dobj.decompress(chunk)) - self.assertEqual(b''.join(decompressed_chunks), original[0:i]) + self.assertEqual(b"".join(decompressed_chunks), original[0:i]) chunk = cobj.flush(zstd.COMPRESSOBJ_FLUSH_FINISH) compressed_chunks.append(chunk) decompressed_chunks.append(dobj.decompress(chunk)) - self.assertEqual(dctx.decompress(b''.join(compressed_chunks), - max_output_size=len(original)), - original) - self.assertEqual(b''.join(decompressed_chunks), original) + self.assertEqual( + dctx.decompress(b"".join(compressed_chunks), max_output_size=len(original)), + original, + ) + self.assertEqual(b"".join(decompressed_chunks), original) + -@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +@unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set") @make_cffi -class TestCompressor_read_to_iter_fuzzing(unittest.TestCase): - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - read_size=strategies.integers(min_value=1, max_value=4096), - write_size=strategies.integers(min_value=1, max_value=4096)) +class TestCompressor_read_to_iter_fuzzing(TestCase): + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + read_size=strategies.integers(min_value=1, max_value=4096), + write_size=strategies.integers(min_value=1, max_value=4096), + ) def test_read_write_size_variance(self, original, level, read_size, write_size): refcctx = zstd.ZstdCompressor(level=level) ref_frame = refcctx.compress(original) @@ -583,32 +697,35 @@ source = io.BytesIO(original) cctx = zstd.ZstdCompressor(level=level) - chunks = list(cctx.read_to_iter(source, size=len(original), - read_size=read_size, - write_size=write_size)) + chunks = list( + cctx.read_to_iter( + source, size=len(original), read_size=read_size, write_size=write_size + ) + ) - self.assertEqual(b''.join(chunks), ref_frame) + self.assertEqual(b"".join(chunks), ref_frame) -@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') -class TestCompressor_multi_compress_to_buffer_fuzzing(unittest.TestCase): - @hypothesis.given(original=strategies.lists(strategies.sampled_from(random_input_data()), - min_size=1, max_size=1024), - threads=strategies.integers(min_value=1, max_value=8), - use_dict=strategies.booleans()) +@unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set") +class TestCompressor_multi_compress_to_buffer_fuzzing(TestCase): + @hypothesis.given( + original=strategies.lists( + strategies.sampled_from(random_input_data()), min_size=1, max_size=1024 + ), + threads=strategies.integers(min_value=1, max_value=8), + use_dict=strategies.booleans(), + ) def test_data_equivalence(self, original, threads, use_dict): kwargs = {} # Use a content dictionary because it is cheap to create. if use_dict: - kwargs['dict_data'] = zstd.ZstdCompressionDict(original[0]) + kwargs["dict_data"] = zstd.ZstdCompressionDict(original[0]) - cctx = zstd.ZstdCompressor(level=1, - write_checksum=True, - **kwargs) + cctx = zstd.ZstdCompressor(level=1, write_checksum=True, **kwargs) - if not hasattr(cctx, 'multi_compress_to_buffer'): - self.skipTest('multi_compress_to_buffer not available') + if not hasattr(cctx, "multi_compress_to_buffer"): + self.skipTest("multi_compress_to_buffer not available") result = cctx.multi_compress_to_buffer(original, threads=-1) @@ -624,17 +741,21 @@ self.assertEqual(dctx.decompress(frame), original[i]) -@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +@unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set") @make_cffi -class TestCompressor_chunker_fuzzing(unittest.TestCase): +class TestCompressor_chunker_fuzzing(TestCase): @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - chunk_size=strategies.integers( - min_value=1, - max_value=32 * 1048576), - input_sizes=strategies.data()) + suppress_health_check=[ + hypothesis.HealthCheck.large_base_example, + hypothesis.HealthCheck.too_slow, + ] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + chunk_size=strategies.integers(min_value=1, max_value=32 * 1048576), + input_sizes=strategies.data(), + ) def test_random_input_sizes(self, original, level, chunk_size, input_sizes): cctx = zstd.ZstdCompressor(level=level) chunker = cctx.chunker(chunk_size=chunk_size) @@ -643,7 +764,7 @@ i = 0 while True: input_size = input_sizes.draw(strategies.integers(1, 4096)) - source = original[i:i + input_size] + source = original[i : i + input_size] if not source: break @@ -654,23 +775,26 @@ dctx = zstd.ZstdDecompressor() - self.assertEqual(dctx.decompress(b''.join(chunks), - max_output_size=len(original)), - original) + self.assertEqual( + dctx.decompress(b"".join(chunks), max_output_size=len(original)), original + ) self.assertTrue(all(len(chunk) == chunk_size for chunk in chunks[:-1])) @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - chunk_size=strategies.integers( - min_value=1, - max_value=32 * 1048576), - input_sizes=strategies.data(), - flushes=strategies.data()) - def test_flush_block(self, original, level, chunk_size, input_sizes, - flushes): + suppress_health_check=[ + hypothesis.HealthCheck.large_base_example, + hypothesis.HealthCheck.too_slow, + ] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + chunk_size=strategies.integers(min_value=1, max_value=32 * 1048576), + input_sizes=strategies.data(), + flushes=strategies.data(), + ) + def test_flush_block(self, original, level, chunk_size, input_sizes, flushes): cctx = zstd.ZstdCompressor(level=level) chunker = cctx.chunker(chunk_size=chunk_size) @@ -682,7 +806,7 @@ i = 0 while True: input_size = input_sizes.draw(strategies.integers(1, 4096)) - source = original[i:i + input_size] + source = original[i : i + input_size] if not source: break @@ -690,22 +814,23 @@ chunks = list(chunker.compress(source)) compressed_chunks.extend(chunks) - decompressed_chunks.append(dobj.decompress(b''.join(chunks))) + decompressed_chunks.append(dobj.decompress(b"".join(chunks))) if not flushes.draw(strategies.booleans()): continue chunks = list(chunker.flush()) compressed_chunks.extend(chunks) - decompressed_chunks.append(dobj.decompress(b''.join(chunks))) + decompressed_chunks.append(dobj.decompress(b"".join(chunks))) - self.assertEqual(b''.join(decompressed_chunks), original[0:i]) + self.assertEqual(b"".join(decompressed_chunks), original[0:i]) chunks = list(chunker.finish()) compressed_chunks.extend(chunks) - decompressed_chunks.append(dobj.decompress(b''.join(chunks))) + decompressed_chunks.append(dobj.decompress(b"".join(chunks))) - self.assertEqual(dctx.decompress(b''.join(compressed_chunks), - max_output_size=len(original)), - original) - self.assertEqual(b''.join(decompressed_chunks), original) \ No newline at end of file + self.assertEqual( + dctx.decompress(b"".join(compressed_chunks), max_output_size=len(original)), + original, + ) + self.assertEqual(b"".join(decompressed_chunks), original)