--- a/contrib/python-zstandard/tests/test_compressor_fuzzing.py Sun Apr 08 01:08:43 2018 +0200
+++ b/contrib/python-zstandard/tests/test_compressor_fuzzing.py Mon Apr 09 10:13:29 2018 -0700
@@ -1,10 +1,6 @@
import io
import os
-
-try:
- import unittest2 as unittest
-except ImportError:
- import unittest
+import unittest
try:
import hypothesis
@@ -12,7 +8,7 @@
except ImportError:
raise unittest.SkipTest('hypothesis not available')
-import zstd
+import zstandard as zstd
from . common import (
make_cffi,
@@ -22,7 +18,57 @@
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
-class TestCompressor_write_to_fuzzing(unittest.TestCase):
+class TestCompressor_stream_reader_fuzzing(unittest.TestCase):
+ @hypothesis.given(original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ source_read_size=strategies.integers(1, 16384),
+ read_sizes=strategies.data())
+ def test_stream_source_read_variance(self, original, level, source_read_size,
+ read_sizes):
+ refctx = zstd.ZstdCompressor(level=level)
+ ref_frame = refctx.compress(original)
+
+ cctx = zstd.ZstdCompressor(level=level)
+ with cctx.stream_reader(io.BytesIO(original), size=len(original),
+ read_size=source_read_size) as reader:
+ chunks = []
+ while True:
+ read_size = read_sizes.draw(strategies.integers(1, 16384))
+ chunk = reader.read(read_size)
+
+ if not chunk:
+ break
+ chunks.append(chunk)
+
+ self.assertEqual(b''.join(chunks), ref_frame)
+
+ @hypothesis.given(original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ source_read_size=strategies.integers(1, 16384),
+ read_sizes=strategies.data())
+ def test_buffer_source_read_variance(self, original, level, source_read_size,
+ read_sizes):
+
+ refctx = zstd.ZstdCompressor(level=level)
+ ref_frame = refctx.compress(original)
+
+ cctx = zstd.ZstdCompressor(level=level)
+ with cctx.stream_reader(original, size=len(original),
+ read_size=source_read_size) as reader:
+ chunks = []
+ while True:
+ read_size = read_sizes.draw(strategies.integers(1, 16384))
+ chunk = reader.read(read_size)
+ if not chunk:
+ break
+ chunks.append(chunk)
+
+ self.assertEqual(b''.join(chunks), ref_frame)
+
+
+@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
+@make_cffi
+class TestCompressor_stream_writer_fuzzing(unittest.TestCase):
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
write_size=strategies.integers(min_value=1, max_value=1048576))
@@ -32,7 +78,7 @@
cctx = zstd.ZstdCompressor(level=level)
b = io.BytesIO()
- with cctx.write_to(b, size=len(original), write_size=write_size) as compressor:
+ with cctx.stream_writer(b, size=len(original), write_size=write_size) as compressor:
compressor.write(original)
self.assertEqual(b.getvalue(), ref_frame)
@@ -62,13 +108,12 @@
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
class TestCompressor_compressobj_fuzzing(unittest.TestCase):
+ @hypothesis.settings(
+ suppress_health_check=[hypothesis.HealthCheck.large_base_example])
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
- chunk_sizes=strategies.streaming(
- strategies.integers(min_value=1, max_value=4096)))
+ chunk_sizes=strategies.data())
def test_random_input_sizes(self, original, level, chunk_sizes):
- chunk_sizes = iter(chunk_sizes)
-
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
@@ -78,7 +123,7 @@
chunks = []
i = 0
while True:
- chunk_size = next(chunk_sizes)
+ chunk_size = chunk_sizes.draw(strategies.integers(1, 4096))
source = original[i:i + chunk_size]
if not source:
break
@@ -93,7 +138,7 @@
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
-class TestCompressor_read_from_fuzzing(unittest.TestCase):
+class TestCompressor_read_to_iter_fuzzing(unittest.TestCase):
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
read_size=strategies.integers(min_value=1, max_value=4096),
@@ -105,8 +150,9 @@
source = io.BytesIO(original)
cctx = zstd.ZstdCompressor(level=level)
- chunks = list(cctx.read_from(source, size=len(original), read_size=read_size,
- write_size=write_size))
+ chunks = list(cctx.read_to_iter(source, size=len(original),
+ read_size=read_size,
+ write_size=write_size))
self.assertEqual(b''.join(chunks), ref_frame)
@@ -125,7 +171,6 @@
kwargs['dict_data'] = zstd.ZstdCompressionDict(original[0])
cctx = zstd.ZstdCompressor(level=1,
- write_content_size=True,
write_checksum=True,
**kwargs)