Mercurial > public > mercurial-scm > hg
diff contrib/python-zstandard/tests/test_compressor_fuzzing.py @ 37495:b1fb341d8a61
zstandard: vendor python-zstandard 0.9.0
This was just released. It features a number of goodies. More info at
https://gregoryszorc.com/blog/2018/04/09/release-of-python-zstandard-0.9/.
The clang-format ignore list was updated to reflect the new source
of files.
The project contains a vendored copy of zstandard 1.3.4. The old
version was 1.1.3. One of the changes between those versions is that
zstandard is now dual licensed BSD + GPLv2 and the patent rights grant
has been removed. Good riddance.
The API should be backwards compatible. So no changes in core
should be needed. However, there were a number of changes in the
library that we'll want to adapt to. Those will be addressed in
subsequent commits.
Differential Revision: https://phab.mercurial-scm.org/D3198
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Mon, 09 Apr 2018 10:13:29 -0700 |
parents | e0dc40530c5a |
children | 73fef626dae3 |
line wrap: on
line diff
--- a/contrib/python-zstandard/tests/test_compressor_fuzzing.py Sun Apr 08 01:08:43 2018 +0200 +++ b/contrib/python-zstandard/tests/test_compressor_fuzzing.py Mon Apr 09 10:13:29 2018 -0700 @@ -1,10 +1,6 @@ import io import os - -try: - import unittest2 as unittest -except ImportError: - import unittest +import unittest try: import hypothesis @@ -12,7 +8,7 @@ except ImportError: raise unittest.SkipTest('hypothesis not available') -import zstd +import zstandard as zstd from . common import ( make_cffi, @@ -22,7 +18,57 @@ @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') @make_cffi -class TestCompressor_write_to_fuzzing(unittest.TestCase): +class TestCompressor_stream_reader_fuzzing(unittest.TestCase): + @hypothesis.given(original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + source_read_size=strategies.integers(1, 16384), + read_sizes=strategies.data()) + def test_stream_source_read_variance(self, original, level, source_read_size, + read_sizes): + refctx = zstd.ZstdCompressor(level=level) + ref_frame = refctx.compress(original) + + cctx = zstd.ZstdCompressor(level=level) + with cctx.stream_reader(io.BytesIO(original), size=len(original), + read_size=source_read_size) as reader: + chunks = [] + while True: + read_size = read_sizes.draw(strategies.integers(1, 16384)) + chunk = reader.read(read_size) + + if not chunk: + break + chunks.append(chunk) + + self.assertEqual(b''.join(chunks), ref_frame) + + @hypothesis.given(original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + source_read_size=strategies.integers(1, 16384), + read_sizes=strategies.data()) + def test_buffer_source_read_variance(self, original, level, source_read_size, + read_sizes): + + refctx = zstd.ZstdCompressor(level=level) + ref_frame = refctx.compress(original) + + cctx = zstd.ZstdCompressor(level=level) + with cctx.stream_reader(original, size=len(original), + read_size=source_read_size) as reader: + chunks = [] + while True: + read_size = read_sizes.draw(strategies.integers(1, 16384)) + chunk = reader.read(read_size) + if not chunk: + break + chunks.append(chunk) + + self.assertEqual(b''.join(chunks), ref_frame) + + +@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +@make_cffi +class TestCompressor_stream_writer_fuzzing(unittest.TestCase): @hypothesis.given(original=strategies.sampled_from(random_input_data()), level=strategies.integers(min_value=1, max_value=5), write_size=strategies.integers(min_value=1, max_value=1048576)) @@ -32,7 +78,7 @@ cctx = zstd.ZstdCompressor(level=level) b = io.BytesIO() - with cctx.write_to(b, size=len(original), write_size=write_size) as compressor: + with cctx.stream_writer(b, size=len(original), write_size=write_size) as compressor: compressor.write(original) self.assertEqual(b.getvalue(), ref_frame) @@ -62,13 +108,12 @@ @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') @make_cffi class TestCompressor_compressobj_fuzzing(unittest.TestCase): + @hypothesis.settings( + suppress_health_check=[hypothesis.HealthCheck.large_base_example]) @hypothesis.given(original=strategies.sampled_from(random_input_data()), level=strategies.integers(min_value=1, max_value=5), - chunk_sizes=strategies.streaming( - strategies.integers(min_value=1, max_value=4096))) + chunk_sizes=strategies.data()) def test_random_input_sizes(self, original, level, chunk_sizes): - chunk_sizes = iter(chunk_sizes) - refctx = zstd.ZstdCompressor(level=level) ref_frame = refctx.compress(original) @@ -78,7 +123,7 @@ chunks = [] i = 0 while True: - chunk_size = next(chunk_sizes) + chunk_size = chunk_sizes.draw(strategies.integers(1, 4096)) source = original[i:i + chunk_size] if not source: break @@ -93,7 +138,7 @@ @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') @make_cffi -class TestCompressor_read_from_fuzzing(unittest.TestCase): +class TestCompressor_read_to_iter_fuzzing(unittest.TestCase): @hypothesis.given(original=strategies.sampled_from(random_input_data()), level=strategies.integers(min_value=1, max_value=5), read_size=strategies.integers(min_value=1, max_value=4096), @@ -105,8 +150,9 @@ source = io.BytesIO(original) cctx = zstd.ZstdCompressor(level=level) - chunks = list(cctx.read_from(source, size=len(original), read_size=read_size, - write_size=write_size)) + chunks = list(cctx.read_to_iter(source, size=len(original), + read_size=read_size, + write_size=write_size)) self.assertEqual(b''.join(chunks), ref_frame) @@ -125,7 +171,6 @@ kwargs['dict_data'] = zstd.ZstdCompressionDict(original[0]) cctx = zstd.ZstdCompressor(level=1, - write_content_size=True, write_checksum=True, **kwargs)