--- a/contrib/python-zstandard/tests/test_decompressor_fuzzing.py Thu Apr 04 15:24:03 2019 -0700
+++ b/contrib/python-zstandard/tests/test_decompressor_fuzzing.py Thu Apr 04 17:34:43 2019 -0700
@@ -12,6 +12,7 @@
from . common import (
make_cffi,
+ NonClosingBytesIO,
random_input_data,
)
@@ -23,22 +24,200 @@
suppress_health_check=[hypothesis.HealthCheck.large_base_example])
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
- source_read_size=strategies.integers(1, 16384),
+ streaming=strategies.booleans(),
+ source_read_size=strategies.integers(1, 1048576),
read_sizes=strategies.data())
- def test_stream_source_read_variance(self, original, level, source_read_size,
- read_sizes):
+ def test_stream_source_read_variance(self, original, level, streaming,
+ source_read_size, read_sizes):
cctx = zstd.ZstdCompressor(level=level)
- frame = cctx.compress(original)
+
+ if streaming:
+ source = io.BytesIO()
+ writer = cctx.stream_writer(source)
+ writer.write(original)
+ writer.flush(zstd.FLUSH_FRAME)
+ source.seek(0)
+ else:
+ frame = cctx.compress(original)
+ source = io.BytesIO(frame)
dctx = zstd.ZstdDecompressor()
- source = io.BytesIO(frame)
chunks = []
with dctx.stream_reader(source, read_size=source_read_size) as reader:
while True:
- read_size = read_sizes.draw(strategies.integers(1, 16384))
+ read_size = read_sizes.draw(strategies.integers(-1, 131072))
+ chunk = reader.read(read_size)
+ if not chunk and read_size:
+ break
+
+ chunks.append(chunk)
+
+ self.assertEqual(b''.join(chunks), original)
+
+ # Similar to above except we have a constant read() size.
+ @hypothesis.settings(
+ suppress_health_check=[hypothesis.HealthCheck.large_base_example])
+ @hypothesis.given(original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ streaming=strategies.booleans(),
+ source_read_size=strategies.integers(1, 1048576),
+ read_size=strategies.integers(-1, 131072))
+ def test_stream_source_read_size(self, original, level, streaming,
+ source_read_size, read_size):
+ if read_size == 0:
+ read_size = 1
+
+ cctx = zstd.ZstdCompressor(level=level)
+
+ if streaming:
+ source = io.BytesIO()
+ writer = cctx.stream_writer(source)
+ writer.write(original)
+ writer.flush(zstd.FLUSH_FRAME)
+ source.seek(0)
+ else:
+ frame = cctx.compress(original)
+ source = io.BytesIO(frame)
+
+ dctx = zstd.ZstdDecompressor()
+
+ chunks = []
+ reader = dctx.stream_reader(source, read_size=source_read_size)
+ while True:
+ chunk = reader.read(read_size)
+ if not chunk and read_size:
+ break
+
+ chunks.append(chunk)
+
+ self.assertEqual(b''.join(chunks), original)
+
+ @hypothesis.settings(
+ suppress_health_check=[hypothesis.HealthCheck.large_base_example])
+ @hypothesis.given(original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ streaming=strategies.booleans(),
+ source_read_size=strategies.integers(1, 1048576),
+ read_sizes=strategies.data())
+ def test_buffer_source_read_variance(self, original, level, streaming,
+ source_read_size, read_sizes):
+ cctx = zstd.ZstdCompressor(level=level)
+
+ if streaming:
+ source = io.BytesIO()
+ writer = cctx.stream_writer(source)
+ writer.write(original)
+ writer.flush(zstd.FLUSH_FRAME)
+ frame = source.getvalue()
+ else:
+ frame = cctx.compress(original)
+
+ dctx = zstd.ZstdDecompressor()
+ chunks = []
+
+ with dctx.stream_reader(frame, read_size=source_read_size) as reader:
+ while True:
+ read_size = read_sizes.draw(strategies.integers(-1, 131072))
chunk = reader.read(read_size)
- if not chunk:
+ if not chunk and read_size:
+ break
+
+ chunks.append(chunk)
+
+ self.assertEqual(b''.join(chunks), original)
+
+ # Similar to above except we have a constant read() size.
+ @hypothesis.settings(
+ suppress_health_check=[hypothesis.HealthCheck.large_base_example])
+ @hypothesis.given(original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ streaming=strategies.booleans(),
+ source_read_size=strategies.integers(1, 1048576),
+ read_size=strategies.integers(-1, 131072))
+ def test_buffer_source_constant_read_size(self, original, level, streaming,
+ source_read_size, read_size):
+ if read_size == 0:
+ read_size = -1
+
+ cctx = zstd.ZstdCompressor(level=level)
+
+ if streaming:
+ source = io.BytesIO()
+ writer = cctx.stream_writer(source)
+ writer.write(original)
+ writer.flush(zstd.FLUSH_FRAME)
+ frame = source.getvalue()
+ else:
+ frame = cctx.compress(original)
+
+ dctx = zstd.ZstdDecompressor()
+ chunks = []
+
+ reader = dctx.stream_reader(frame, read_size=source_read_size)
+ while True:
+ chunk = reader.read(read_size)
+ if not chunk and read_size:
+ break
+
+ chunks.append(chunk)
+
+ self.assertEqual(b''.join(chunks), original)
+
+ @hypothesis.settings(
+ suppress_health_check=[hypothesis.HealthCheck.large_base_example])
+ @hypothesis.given(original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ streaming=strategies.booleans(),
+ source_read_size=strategies.integers(1, 1048576))
+ def test_stream_source_readall(self, original, level, streaming,
+ source_read_size):
+ cctx = zstd.ZstdCompressor(level=level)
+
+ if streaming:
+ source = io.BytesIO()
+ writer = cctx.stream_writer(source)
+ writer.write(original)
+ writer.flush(zstd.FLUSH_FRAME)
+ source.seek(0)
+ else:
+ frame = cctx.compress(original)
+ source = io.BytesIO(frame)
+
+ dctx = zstd.ZstdDecompressor()
+
+ data = dctx.stream_reader(source, read_size=source_read_size).readall()
+ self.assertEqual(data, original)
+
+ @hypothesis.settings(
+ suppress_health_check=[hypothesis.HealthCheck.large_base_example])
+ @hypothesis.given(original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ streaming=strategies.booleans(),
+ source_read_size=strategies.integers(1, 1048576),
+ read_sizes=strategies.data())
+ def test_stream_source_read1_variance(self, original, level, streaming,
+ source_read_size, read_sizes):
+ cctx = zstd.ZstdCompressor(level=level)
+
+ if streaming:
+ source = io.BytesIO()
+ writer = cctx.stream_writer(source)
+ writer.write(original)
+ writer.flush(zstd.FLUSH_FRAME)
+ source.seek(0)
+ else:
+ frame = cctx.compress(original)
+ source = io.BytesIO(frame)
+
+ dctx = zstd.ZstdDecompressor()
+
+ chunks = []
+ with dctx.stream_reader(source, read_size=source_read_size) as reader:
+ while True:
+ read_size = read_sizes.draw(strategies.integers(-1, 131072))
+ chunk = reader.read1(read_size)
+ if not chunk and read_size:
break
chunks.append(chunk)
@@ -49,24 +228,36 @@
suppress_health_check=[hypothesis.HealthCheck.large_base_example])
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
- source_read_size=strategies.integers(1, 16384),
+ streaming=strategies.booleans(),
+ source_read_size=strategies.integers(1, 1048576),
read_sizes=strategies.data())
- def test_buffer_source_read_variance(self, original, level, source_read_size,
- read_sizes):
+ def test_stream_source_readinto1_variance(self, original, level, streaming,
+ source_read_size, read_sizes):
cctx = zstd.ZstdCompressor(level=level)
- frame = cctx.compress(original)
+
+ if streaming:
+ source = io.BytesIO()
+ writer = cctx.stream_writer(source)
+ writer.write(original)
+ writer.flush(zstd.FLUSH_FRAME)
+ source.seek(0)
+ else:
+ frame = cctx.compress(original)
+ source = io.BytesIO(frame)
dctx = zstd.ZstdDecompressor()
+
chunks = []
-
- with dctx.stream_reader(frame, read_size=source_read_size) as reader:
+ with dctx.stream_reader(source, read_size=source_read_size) as reader:
while True:
- read_size = read_sizes.draw(strategies.integers(1, 16384))
- chunk = reader.read(read_size)
- if not chunk:
+ read_size = read_sizes.draw(strategies.integers(1, 131072))
+ b = bytearray(read_size)
+ count = reader.readinto1(b)
+
+ if not count:
break
- chunks.append(chunk)
+ chunks.append(bytes(b[0:count]))
self.assertEqual(b''.join(chunks), original)
@@ -75,7 +266,7 @@
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
- source_read_size=strategies.integers(1, 16384),
+ source_read_size=strategies.integers(1, 1048576),
seek_amounts=strategies.data(),
read_sizes=strategies.data())
def test_relative_seeks(self, original, level, source_read_size, seek_amounts,
@@ -99,6 +290,46 @@
self.assertEqual(original[offset:offset + len(chunk)], chunk)
+ @hypothesis.settings(
+ suppress_health_check=[hypothesis.HealthCheck.large_base_example])
+ @hypothesis.given(
+ originals=strategies.data(),
+ frame_count=strategies.integers(min_value=2, max_value=10),
+ level=strategies.integers(min_value=1, max_value=5),
+ source_read_size=strategies.integers(1, 1048576),
+ read_sizes=strategies.data())
+ def test_multiple_frames(self, originals, frame_count, level,
+ source_read_size, read_sizes):
+
+ cctx = zstd.ZstdCompressor(level=level)
+ source = io.BytesIO()
+ buffer = io.BytesIO()
+ writer = cctx.stream_writer(buffer)
+
+ for i in range(frame_count):
+ data = originals.draw(strategies.sampled_from(random_input_data()))
+ source.write(data)
+ writer.write(data)
+ writer.flush(zstd.FLUSH_FRAME)
+
+ dctx = zstd.ZstdDecompressor()
+ buffer.seek(0)
+ reader = dctx.stream_reader(buffer, read_size=source_read_size,
+ read_across_frames=True)
+
+ chunks = []
+
+ while True:
+ read_amount = read_sizes.draw(strategies.integers(-1, 16384))
+ chunk = reader.read(read_amount)
+
+ if not chunk and read_amount:
+ break
+
+ chunks.append(chunk)
+
+ self.assertEqual(source.getvalue(), b''.join(chunks))
+
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
@@ -113,7 +344,7 @@
dctx = zstd.ZstdDecompressor()
source = io.BytesIO(frame)
- dest = io.BytesIO()
+ dest = NonClosingBytesIO()
with dctx.stream_writer(dest, write_size=write_size) as decompressor:
while True:
@@ -234,10 +465,12 @@
write_checksum=True,
**kwargs)
+ if not hasattr(cctx, 'multi_compress_to_buffer'):
+ self.skipTest('multi_compress_to_buffer not available')
+
frames_buffer = cctx.multi_compress_to_buffer(original, threads=-1)
dctx = zstd.ZstdDecompressor(**kwargs)
-
result = dctx.multi_decompress_to_buffer(frames_buffer)
self.assertEqual(len(result), len(original))