--- a/contrib/python-zstandard/tests/test_decompressor_fuzzing.py Sun Apr 08 01:08:43 2018 +0200
+++ b/contrib/python-zstandard/tests/test_decompressor_fuzzing.py Mon Apr 09 10:13:29 2018 -0700
@@ -1,10 +1,6 @@
import io
import os
-
-try:
- import unittest2 as unittest
-except ImportError:
- import unittest
+import unittest
try:
import hypothesis
@@ -12,7 +8,7 @@
except ImportError:
raise unittest.SkipTest('hypothesis not available')
-import zstd
+import zstandard as zstd
from . common import (
make_cffi,
@@ -22,15 +18,96 @@
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
-class TestDecompressor_write_to_fuzzing(unittest.TestCase):
+class TestDecompressor_stream_reader_fuzzing(unittest.TestCase):
+ @hypothesis.settings(
+ suppress_health_check=[hypothesis.HealthCheck.large_base_example])
+ @hypothesis.given(original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ source_read_size=strategies.integers(1, 16384),
+ read_sizes=strategies.data())
+ def test_stream_source_read_variance(self, original, level, source_read_size,
+ read_sizes):
+ cctx = zstd.ZstdCompressor(level=level)
+ frame = cctx.compress(original)
+
+ dctx = zstd.ZstdDecompressor()
+ source = io.BytesIO(frame)
+
+ chunks = []
+ with dctx.stream_reader(source, read_size=source_read_size) as reader:
+ while True:
+ read_size = read_sizes.draw(strategies.integers(1, 16384))
+ chunk = reader.read(read_size)
+ if not chunk:
+ break
+
+ chunks.append(chunk)
+
+ self.assertEqual(b''.join(chunks), original)
+
+ @hypothesis.settings(
+ suppress_health_check=[hypothesis.HealthCheck.large_base_example])
+ @hypothesis.given(original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ source_read_size=strategies.integers(1, 16384),
+ read_sizes=strategies.data())
+ def test_buffer_source_read_variance(self, original, level, source_read_size,
+ read_sizes):
+ cctx = zstd.ZstdCompressor(level=level)
+ frame = cctx.compress(original)
+
+ dctx = zstd.ZstdDecompressor()
+ chunks = []
+
+ with dctx.stream_reader(frame, read_size=source_read_size) as reader:
+ while True:
+ read_size = read_sizes.draw(strategies.integers(1, 16384))
+ chunk = reader.read(read_size)
+ if not chunk:
+ break
+
+ chunks.append(chunk)
+
+ self.assertEqual(b''.join(chunks), original)
+
+ @hypothesis.settings(
+ suppress_health_check=[hypothesis.HealthCheck.large_base_example])
+ @hypothesis.given(
+ original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ source_read_size=strategies.integers(1, 16384),
+ seek_amounts=strategies.data(),
+ read_sizes=strategies.data())
+ def test_relative_seeks(self, original, level, source_read_size, seek_amounts,
+ read_sizes):
+ cctx = zstd.ZstdCompressor(level=level)
+ frame = cctx.compress(original)
+
+ dctx = zstd.ZstdDecompressor()
+
+ with dctx.stream_reader(frame, read_size=source_read_size) as reader:
+ while True:
+ amount = seek_amounts.draw(strategies.integers(0, 16384))
+ reader.seek(amount, os.SEEK_CUR)
+
+ offset = reader.tell()
+ read_amount = read_sizes.draw(strategies.integers(1, 16384))
+ chunk = reader.read(read_amount)
+
+ if not chunk:
+ break
+
+ self.assertEqual(original[offset:offset + len(chunk)], chunk)
+
+
+@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
+@make_cffi
+class TestDecompressor_stream_writer_fuzzing(unittest.TestCase):
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
write_size=strategies.integers(min_value=1, max_value=8192),
- input_sizes=strategies.streaming(
- strategies.integers(min_value=1, max_value=4096)))
+ input_sizes=strategies.data())
def test_write_size_variance(self, original, level, write_size, input_sizes):
- input_sizes = iter(input_sizes)
-
cctx = zstd.ZstdCompressor(level=level)
frame = cctx.compress(original)
@@ -38,9 +115,10 @@
source = io.BytesIO(frame)
dest = io.BytesIO()
- with dctx.write_to(dest, write_size=write_size) as decompressor:
+ with dctx.stream_writer(dest, write_size=write_size) as decompressor:
while True:
- chunk = source.read(next(input_sizes))
+ input_size = input_sizes.draw(strategies.integers(1, 4096))
+ chunk = source.read(input_size)
if not chunk:
break
@@ -74,11 +152,8 @@
class TestDecompressor_decompressobj_fuzzing(unittest.TestCase):
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
- chunk_sizes=strategies.streaming(
- strategies.integers(min_value=1, max_value=4096)))
+ chunk_sizes=strategies.data())
def test_random_input_sizes(self, original, level, chunk_sizes):
- chunk_sizes = iter(chunk_sizes)
-
cctx = zstd.ZstdCompressor(level=level)
frame = cctx.compress(original)
@@ -89,7 +164,33 @@
chunks = []
while True:
- chunk = source.read(next(chunk_sizes))
+ chunk_size = chunk_sizes.draw(strategies.integers(1, 4096))
+ chunk = source.read(chunk_size)
+ if not chunk:
+ break
+
+ chunks.append(dobj.decompress(chunk))
+
+ self.assertEqual(b''.join(chunks), original)
+
+ @hypothesis.given(original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ write_size=strategies.integers(min_value=1,
+ max_value=4 * zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE),
+ chunk_sizes=strategies.data())
+ def test_random_output_sizes(self, original, level, write_size, chunk_sizes):
+ cctx = zstd.ZstdCompressor(level=level)
+ frame = cctx.compress(original)
+
+ source = io.BytesIO(frame)
+
+ dctx = zstd.ZstdDecompressor()
+ dobj = dctx.decompressobj(write_size=write_size)
+
+ chunks = []
+ while True:
+ chunk_size = chunk_sizes.draw(strategies.integers(1, 4096))
+ chunk = source.read(chunk_size)
if not chunk:
break
@@ -100,7 +201,7 @@
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
-class TestDecompressor_read_from_fuzzing(unittest.TestCase):
+class TestDecompressor_read_to_iter_fuzzing(unittest.TestCase):
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
read_size=strategies.integers(min_value=1, max_value=4096),
@@ -112,7 +213,7 @@
source = io.BytesIO(frame)
dctx = zstd.ZstdDecompressor()
- chunks = list(dctx.read_from(source, read_size=read_size, write_size=write_size))
+ chunks = list(dctx.read_to_iter(source, read_size=read_size, write_size=write_size))
self.assertEqual(b''.join(chunks), original)