Mercurial > public > mercurial-scm > hg-stable
diff contrib/python-zstandard/tests/test_decompressor_fuzzing.py @ 43999:de7838053207
zstandard: vendor python-zstandard 0.13.0
Version 0.13.0 of the package was just released. It contains
an upgraded zstd C library which can result in some performance
wins, official support for Python 3.8, and a blackened code base.
There were no meaningful code or functionality changes in this
release of python-zstandard: just reformatting and an upgraded
zstd library version. So the diff seems much larger than what it
is.
Files were added without modifications.
The clang-format-ignorelist file was updated to reflect a new
header file in the zstd distribution.
# no-check-commit because 3rd party code has different style guidelines
Differential Revision: https://phab.mercurial-scm.org/D7770
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Sat, 28 Dec 2019 09:55:45 -0800 |
parents | 675775c33ab6 |
children | 5e84a96d865b |
line wrap: on
line diff
--- a/contrib/python-zstandard/tests/test_decompressor_fuzzing.py Fri Dec 27 18:54:57 2019 -0500 +++ b/contrib/python-zstandard/tests/test_decompressor_fuzzing.py Sat Dec 28 09:55:45 2019 -0800 @@ -6,29 +6,37 @@ import hypothesis import hypothesis.strategies as strategies except ImportError: - raise unittest.SkipTest('hypothesis not available') + raise unittest.SkipTest("hypothesis not available") import zstandard as zstd -from . common import ( +from .common import ( make_cffi, NonClosingBytesIO, random_input_data, + TestCase, ) -@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +@unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set") @make_cffi -class TestDecompressor_stream_reader_fuzzing(unittest.TestCase): +class TestDecompressor_stream_reader_fuzzing(TestCase): @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - streaming=strategies.booleans(), - source_read_size=strategies.integers(1, 1048576), - read_sizes=strategies.data()) - def test_stream_source_read_variance(self, original, level, streaming, - source_read_size, read_sizes): + suppress_health_check=[ + hypothesis.HealthCheck.large_base_example, + hypothesis.HealthCheck.too_slow, + ] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + streaming=strategies.booleans(), + source_read_size=strategies.integers(1, 1048576), + read_sizes=strategies.data(), + ) + def test_stream_source_read_variance( + self, original, level, streaming, source_read_size, read_sizes + ): cctx = zstd.ZstdCompressor(level=level) if streaming: @@ -53,18 +61,22 @@ chunks.append(chunk) - self.assertEqual(b''.join(chunks), original) + self.assertEqual(b"".join(chunks), original) # Similar to above except we have a constant read() size. @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - streaming=strategies.booleans(), - source_read_size=strategies.integers(1, 1048576), - read_size=strategies.integers(-1, 131072)) - def test_stream_source_read_size(self, original, level, streaming, - source_read_size, read_size): + suppress_health_check=[hypothesis.HealthCheck.large_base_example] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + streaming=strategies.booleans(), + source_read_size=strategies.integers(1, 1048576), + read_size=strategies.integers(-1, 131072), + ) + def test_stream_source_read_size( + self, original, level, streaming, source_read_size, read_size + ): if read_size == 0: read_size = 1 @@ -91,17 +103,24 @@ chunks.append(chunk) - self.assertEqual(b''.join(chunks), original) + self.assertEqual(b"".join(chunks), original) @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - streaming=strategies.booleans(), - source_read_size=strategies.integers(1, 1048576), - read_sizes=strategies.data()) - def test_buffer_source_read_variance(self, original, level, streaming, - source_read_size, read_sizes): + suppress_health_check=[ + hypothesis.HealthCheck.large_base_example, + hypothesis.HealthCheck.too_slow, + ] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + streaming=strategies.booleans(), + source_read_size=strategies.integers(1, 1048576), + read_sizes=strategies.data(), + ) + def test_buffer_source_read_variance( + self, original, level, streaming, source_read_size, read_sizes + ): cctx = zstd.ZstdCompressor(level=level) if streaming: @@ -125,18 +144,22 @@ chunks.append(chunk) - self.assertEqual(b''.join(chunks), original) + self.assertEqual(b"".join(chunks), original) # Similar to above except we have a constant read() size. @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - streaming=strategies.booleans(), - source_read_size=strategies.integers(1, 1048576), - read_size=strategies.integers(-1, 131072)) - def test_buffer_source_constant_read_size(self, original, level, streaming, - source_read_size, read_size): + suppress_health_check=[hypothesis.HealthCheck.large_base_example] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + streaming=strategies.booleans(), + source_read_size=strategies.integers(1, 1048576), + read_size=strategies.integers(-1, 131072), + ) + def test_buffer_source_constant_read_size( + self, original, level, streaming, source_read_size, read_size + ): if read_size == 0: read_size = -1 @@ -162,16 +185,18 @@ chunks.append(chunk) - self.assertEqual(b''.join(chunks), original) + self.assertEqual(b"".join(chunks), original) @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - streaming=strategies.booleans(), - source_read_size=strategies.integers(1, 1048576)) - def test_stream_source_readall(self, original, level, streaming, - source_read_size): + suppress_health_check=[hypothesis.HealthCheck.large_base_example] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + streaming=strategies.booleans(), + source_read_size=strategies.integers(1, 1048576), + ) + def test_stream_source_readall(self, original, level, streaming, source_read_size): cctx = zstd.ZstdCompressor(level=level) if streaming: @@ -190,14 +215,21 @@ self.assertEqual(data, original) @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - streaming=strategies.booleans(), - source_read_size=strategies.integers(1, 1048576), - read_sizes=strategies.data()) - def test_stream_source_read1_variance(self, original, level, streaming, - source_read_size, read_sizes): + suppress_health_check=[ + hypothesis.HealthCheck.large_base_example, + hypothesis.HealthCheck.too_slow, + ] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + streaming=strategies.booleans(), + source_read_size=strategies.integers(1, 1048576), + read_sizes=strategies.data(), + ) + def test_stream_source_read1_variance( + self, original, level, streaming, source_read_size, read_sizes + ): cctx = zstd.ZstdCompressor(level=level) if streaming: @@ -222,17 +254,24 @@ chunks.append(chunk) - self.assertEqual(b''.join(chunks), original) + self.assertEqual(b"".join(chunks), original) @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - streaming=strategies.booleans(), - source_read_size=strategies.integers(1, 1048576), - read_sizes=strategies.data()) - def test_stream_source_readinto1_variance(self, original, level, streaming, - source_read_size, read_sizes): + suppress_health_check=[ + hypothesis.HealthCheck.large_base_example, + hypothesis.HealthCheck.too_slow, + ] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + streaming=strategies.booleans(), + source_read_size=strategies.integers(1, 1048576), + read_sizes=strategies.data(), + ) + def test_stream_source_readinto1_variance( + self, original, level, streaming, source_read_size, read_sizes + ): cctx = zstd.ZstdCompressor(level=level) if streaming: @@ -259,18 +298,24 @@ chunks.append(bytes(b[0:count])) - self.assertEqual(b''.join(chunks), original) + self.assertEqual(b"".join(chunks), original) @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) + suppress_health_check=[ + hypothesis.HealthCheck.large_base_example, + hypothesis.HealthCheck.too_slow, + ] + ) @hypothesis.given( original=strategies.sampled_from(random_input_data()), level=strategies.integers(min_value=1, max_value=5), source_read_size=strategies.integers(1, 1048576), seek_amounts=strategies.data(), - read_sizes=strategies.data()) - def test_relative_seeks(self, original, level, source_read_size, seek_amounts, - read_sizes): + read_sizes=strategies.data(), + ) + def test_relative_seeks( + self, original, level, source_read_size, seek_amounts, read_sizes + ): cctx = zstd.ZstdCompressor(level=level) frame = cctx.compress(original) @@ -288,18 +333,24 @@ if not chunk: break - self.assertEqual(original[offset:offset + len(chunk)], chunk) + self.assertEqual(original[offset : offset + len(chunk)], chunk) @hypothesis.settings( - suppress_health_check=[hypothesis.HealthCheck.large_base_example]) + suppress_health_check=[ + hypothesis.HealthCheck.large_base_example, + hypothesis.HealthCheck.too_slow, + ] + ) @hypothesis.given( originals=strategies.data(), frame_count=strategies.integers(min_value=2, max_value=10), level=strategies.integers(min_value=1, max_value=5), source_read_size=strategies.integers(1, 1048576), - read_sizes=strategies.data()) - def test_multiple_frames(self, originals, frame_count, level, - source_read_size, read_sizes): + read_sizes=strategies.data(), + ) + def test_multiple_frames( + self, originals, frame_count, level, source_read_size, read_sizes + ): cctx = zstd.ZstdCompressor(level=level) source = io.BytesIO() @@ -314,8 +365,9 @@ dctx = zstd.ZstdDecompressor() buffer.seek(0) - reader = dctx.stream_reader(buffer, read_size=source_read_size, - read_across_frames=True) + reader = dctx.stream_reader( + buffer, read_size=source_read_size, read_across_frames=True + ) chunks = [] @@ -328,16 +380,24 @@ chunks.append(chunk) - self.assertEqual(source.getvalue(), b''.join(chunks)) + self.assertEqual(source.getvalue(), b"".join(chunks)) -@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +@unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set") @make_cffi -class TestDecompressor_stream_writer_fuzzing(unittest.TestCase): - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - write_size=strategies.integers(min_value=1, max_value=8192), - input_sizes=strategies.data()) +class TestDecompressor_stream_writer_fuzzing(TestCase): + @hypothesis.settings( + suppress_health_check=[ + hypothesis.HealthCheck.large_base_example, + hypothesis.HealthCheck.too_slow, + ] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + write_size=strategies.integers(min_value=1, max_value=8192), + input_sizes=strategies.data(), + ) def test_write_size_variance(self, original, level, write_size, input_sizes): cctx = zstd.ZstdCompressor(level=level) frame = cctx.compress(original) @@ -358,13 +418,21 @@ self.assertEqual(dest.getvalue(), original) -@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +@unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set") @make_cffi -class TestDecompressor_copy_stream_fuzzing(unittest.TestCase): - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - read_size=strategies.integers(min_value=1, max_value=8192), - write_size=strategies.integers(min_value=1, max_value=8192)) +class TestDecompressor_copy_stream_fuzzing(TestCase): + @hypothesis.settings( + suppress_health_check=[ + hypothesis.HealthCheck.large_base_example, + hypothesis.HealthCheck.too_slow, + ] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + read_size=strategies.integers(min_value=1, max_value=8192), + write_size=strategies.integers(min_value=1, max_value=8192), + ) def test_read_write_size_variance(self, original, level, read_size, write_size): cctx = zstd.ZstdCompressor(level=level) frame = cctx.compress(original) @@ -378,12 +446,20 @@ self.assertEqual(dest.getvalue(), original) -@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +@unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set") @make_cffi -class TestDecompressor_decompressobj_fuzzing(unittest.TestCase): - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - chunk_sizes=strategies.data()) +class TestDecompressor_decompressobj_fuzzing(TestCase): + @hypothesis.settings( + suppress_health_check=[ + hypothesis.HealthCheck.large_base_example, + hypothesis.HealthCheck.too_slow, + ] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + chunk_sizes=strategies.data(), + ) def test_random_input_sizes(self, original, level, chunk_sizes): cctx = zstd.ZstdCompressor(level=level) frame = cctx.compress(original) @@ -402,13 +478,22 @@ chunks.append(dobj.decompress(chunk)) - self.assertEqual(b''.join(chunks), original) + self.assertEqual(b"".join(chunks), original) - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - write_size=strategies.integers(min_value=1, - max_value=4 * zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE), - chunk_sizes=strategies.data()) + @hypothesis.settings( + suppress_health_check=[ + hypothesis.HealthCheck.large_base_example, + hypothesis.HealthCheck.too_slow, + ] + ) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + write_size=strategies.integers( + min_value=1, max_value=4 * zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE + ), + chunk_sizes=strategies.data(), + ) def test_random_output_sizes(self, original, level, write_size, chunk_sizes): cctx = zstd.ZstdCompressor(level=level) frame = cctx.compress(original) @@ -427,16 +512,18 @@ chunks.append(dobj.decompress(chunk)) - self.assertEqual(b''.join(chunks), original) + self.assertEqual(b"".join(chunks), original) -@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +@unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set") @make_cffi -class TestDecompressor_read_to_iter_fuzzing(unittest.TestCase): - @hypothesis.given(original=strategies.sampled_from(random_input_data()), - level=strategies.integers(min_value=1, max_value=5), - read_size=strategies.integers(min_value=1, max_value=4096), - write_size=strategies.integers(min_value=1, max_value=4096)) +class TestDecompressor_read_to_iter_fuzzing(TestCase): + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + read_size=strategies.integers(min_value=1, max_value=4096), + write_size=strategies.integers(min_value=1, max_value=4096), + ) def test_read_write_size_variance(self, original, level, read_size, write_size): cctx = zstd.ZstdCompressor(level=level) frame = cctx.compress(original) @@ -444,29 +531,33 @@ source = io.BytesIO(frame) dctx = zstd.ZstdDecompressor() - chunks = list(dctx.read_to_iter(source, read_size=read_size, write_size=write_size)) + chunks = list( + dctx.read_to_iter(source, read_size=read_size, write_size=write_size) + ) - self.assertEqual(b''.join(chunks), original) + self.assertEqual(b"".join(chunks), original) -@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') -class TestDecompressor_multi_decompress_to_buffer_fuzzing(unittest.TestCase): - @hypothesis.given(original=strategies.lists(strategies.sampled_from(random_input_data()), - min_size=1, max_size=1024), - threads=strategies.integers(min_value=1, max_value=8), - use_dict=strategies.booleans()) +@unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set") +class TestDecompressor_multi_decompress_to_buffer_fuzzing(TestCase): + @hypothesis.given( + original=strategies.lists( + strategies.sampled_from(random_input_data()), min_size=1, max_size=1024 + ), + threads=strategies.integers(min_value=1, max_value=8), + use_dict=strategies.booleans(), + ) def test_data_equivalence(self, original, threads, use_dict): kwargs = {} if use_dict: - kwargs['dict_data'] = zstd.ZstdCompressionDict(original[0]) + kwargs["dict_data"] = zstd.ZstdCompressionDict(original[0]) - cctx = zstd.ZstdCompressor(level=1, - write_content_size=True, - write_checksum=True, - **kwargs) + cctx = zstd.ZstdCompressor( + level=1, write_content_size=True, write_checksum=True, **kwargs + ) - if not hasattr(cctx, 'multi_compress_to_buffer'): - self.skipTest('multi_compress_to_buffer not available') + if not hasattr(cctx, "multi_compress_to_buffer"): + self.skipTest("multi_compress_to_buffer not available") frames_buffer = cctx.multi_compress_to_buffer(original, threads=-1)