--- a/contrib/python-zstandard/tests/test_compressor_fuzzing.py Fri Dec 27 18:54:57 2019 -0500
+++ b/contrib/python-zstandard/tests/test_compressor_fuzzing.py Sat Dec 28 09:55:45 2019 -0800
@@ -6,28 +6,31 @@
import hypothesis
import hypothesis.strategies as strategies
except ImportError:
- raise unittest.SkipTest('hypothesis not available')
+ raise unittest.SkipTest("hypothesis not available")
import zstandard as zstd
-from . common import (
+from .common import (
make_cffi,
NonClosingBytesIO,
random_input_data,
+ TestCase,
)
-@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
+@unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
@make_cffi
-class TestCompressor_stream_reader_fuzzing(unittest.TestCase):
+class TestCompressor_stream_reader_fuzzing(TestCase):
@hypothesis.settings(
- suppress_health_check=[hypothesis.HealthCheck.large_base_example])
- @hypothesis.given(original=strategies.sampled_from(random_input_data()),
- level=strategies.integers(min_value=1, max_value=5),
- source_read_size=strategies.integers(1, 16384),
- read_size=strategies.integers(-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE))
- def test_stream_source_read(self, original, level, source_read_size,
- read_size):
+ suppress_health_check=[hypothesis.HealthCheck.large_base_example]
+ )
+ @hypothesis.given(
+ original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ source_read_size=strategies.integers(1, 16384),
+ read_size=strategies.integers(-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE),
+ )
+ def test_stream_source_read(self, original, level, source_read_size, read_size):
if read_size == 0:
read_size = -1
@@ -35,8 +38,9 @@
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
- with cctx.stream_reader(io.BytesIO(original), size=len(original),
- read_size=source_read_size) as reader:
+ with cctx.stream_reader(
+ io.BytesIO(original), size=len(original), read_size=source_read_size
+ ) as reader:
chunks = []
while True:
chunk = reader.read(read_size)
@@ -45,16 +49,18 @@
chunks.append(chunk)
- self.assertEqual(b''.join(chunks), ref_frame)
+ self.assertEqual(b"".join(chunks), ref_frame)
@hypothesis.settings(
- suppress_health_check=[hypothesis.HealthCheck.large_base_example])
- @hypothesis.given(original=strategies.sampled_from(random_input_data()),
- level=strategies.integers(min_value=1, max_value=5),
- source_read_size=strategies.integers(1, 16384),
- read_size=strategies.integers(-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE))
- def test_buffer_source_read(self, original, level, source_read_size,
- read_size):
+ suppress_health_check=[hypothesis.HealthCheck.large_base_example]
+ )
+ @hypothesis.given(
+ original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ source_read_size=strategies.integers(1, 16384),
+ read_size=strategies.integers(-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE),
+ )
+ def test_buffer_source_read(self, original, level, source_read_size, read_size):
if read_size == 0:
read_size = -1
@@ -62,8 +68,9 @@
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
- with cctx.stream_reader(original, size=len(original),
- read_size=source_read_size) as reader:
+ with cctx.stream_reader(
+ original, size=len(original), read_size=source_read_size
+ ) as reader:
chunks = []
while True:
chunk = reader.read(read_size)
@@ -72,22 +79,30 @@
chunks.append(chunk)
- self.assertEqual(b''.join(chunks), ref_frame)
+ self.assertEqual(b"".join(chunks), ref_frame)
@hypothesis.settings(
- suppress_health_check=[hypothesis.HealthCheck.large_base_example])
- @hypothesis.given(original=strategies.sampled_from(random_input_data()),
- level=strategies.integers(min_value=1, max_value=5),
- source_read_size=strategies.integers(1, 16384),
- read_sizes=strategies.data())
- def test_stream_source_read_variance(self, original, level, source_read_size,
- read_sizes):
+ suppress_health_check=[
+ hypothesis.HealthCheck.large_base_example,
+ hypothesis.HealthCheck.too_slow,
+ ]
+ )
+ @hypothesis.given(
+ original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ source_read_size=strategies.integers(1, 16384),
+ read_sizes=strategies.data(),
+ )
+ def test_stream_source_read_variance(
+ self, original, level, source_read_size, read_sizes
+ ):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
- with cctx.stream_reader(io.BytesIO(original), size=len(original),
- read_size=source_read_size) as reader:
+ with cctx.stream_reader(
+ io.BytesIO(original), size=len(original), read_size=source_read_size
+ ) as reader:
chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(-1, 16384))
@@ -97,23 +112,31 @@
chunks.append(chunk)
- self.assertEqual(b''.join(chunks), ref_frame)
+ self.assertEqual(b"".join(chunks), ref_frame)
@hypothesis.settings(
- suppress_health_check=[hypothesis.HealthCheck.large_base_example])
- @hypothesis.given(original=strategies.sampled_from(random_input_data()),
- level=strategies.integers(min_value=1, max_value=5),
- source_read_size=strategies.integers(1, 16384),
- read_sizes=strategies.data())
- def test_buffer_source_read_variance(self, original, level, source_read_size,
- read_sizes):
+ suppress_health_check=[
+ hypothesis.HealthCheck.large_base_example,
+ hypothesis.HealthCheck.too_slow,
+ ]
+ )
+ @hypothesis.given(
+ original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ source_read_size=strategies.integers(1, 16384),
+ read_sizes=strategies.data(),
+ )
+ def test_buffer_source_read_variance(
+ self, original, level, source_read_size, read_sizes
+ ):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
- with cctx.stream_reader(original, size=len(original),
- read_size=source_read_size) as reader:
+ with cctx.stream_reader(
+ original, size=len(original), read_size=source_read_size
+ ) as reader:
chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(-1, 16384))
@@ -123,22 +146,25 @@
chunks.append(chunk)
- self.assertEqual(b''.join(chunks), ref_frame)
+ self.assertEqual(b"".join(chunks), ref_frame)
@hypothesis.settings(
- suppress_health_check=[hypothesis.HealthCheck.large_base_example])
- @hypothesis.given(original=strategies.sampled_from(random_input_data()),
- level=strategies.integers(min_value=1, max_value=5),
- source_read_size=strategies.integers(1, 16384),
- read_size=strategies.integers(1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE))
- def test_stream_source_readinto(self, original, level,
- source_read_size, read_size):
+ suppress_health_check=[hypothesis.HealthCheck.large_base_example]
+ )
+ @hypothesis.given(
+ original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ source_read_size=strategies.integers(1, 16384),
+ read_size=strategies.integers(1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE),
+ )
+ def test_stream_source_readinto(self, original, level, source_read_size, read_size):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
- with cctx.stream_reader(io.BytesIO(original), size=len(original),
- read_size=source_read_size) as reader:
+ with cctx.stream_reader(
+ io.BytesIO(original), size=len(original), read_size=source_read_size
+ ) as reader:
chunks = []
while True:
b = bytearray(read_size)
@@ -149,23 +175,26 @@
chunks.append(bytes(b[0:count]))
- self.assertEqual(b''.join(chunks), ref_frame)
+ self.assertEqual(b"".join(chunks), ref_frame)
@hypothesis.settings(
- suppress_health_check=[hypothesis.HealthCheck.large_base_example])
- @hypothesis.given(original=strategies.sampled_from(random_input_data()),
- level=strategies.integers(min_value=1, max_value=5),
- source_read_size=strategies.integers(1, 16384),
- read_size=strategies.integers(1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE))
- def test_buffer_source_readinto(self, original, level,
- source_read_size, read_size):
+ suppress_health_check=[hypothesis.HealthCheck.large_base_example]
+ )
+ @hypothesis.given(
+ original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ source_read_size=strategies.integers(1, 16384),
+ read_size=strategies.integers(1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE),
+ )
+ def test_buffer_source_readinto(self, original, level, source_read_size, read_size):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
- with cctx.stream_reader(original, size=len(original),
- read_size=source_read_size) as reader:
+ with cctx.stream_reader(
+ original, size=len(original), read_size=source_read_size
+ ) as reader:
chunks = []
while True:
b = bytearray(read_size)
@@ -176,22 +205,30 @@
chunks.append(bytes(b[0:count]))
- self.assertEqual(b''.join(chunks), ref_frame)
+ self.assertEqual(b"".join(chunks), ref_frame)
@hypothesis.settings(
- suppress_health_check=[hypothesis.HealthCheck.large_base_example])
- @hypothesis.given(original=strategies.sampled_from(random_input_data()),
- level=strategies.integers(min_value=1, max_value=5),
- source_read_size=strategies.integers(1, 16384),
- read_sizes=strategies.data())
- def test_stream_source_readinto_variance(self, original, level,
- source_read_size, read_sizes):
+ suppress_health_check=[
+ hypothesis.HealthCheck.large_base_example,
+ hypothesis.HealthCheck.too_slow,
+ ]
+ )
+ @hypothesis.given(
+ original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ source_read_size=strategies.integers(1, 16384),
+ read_sizes=strategies.data(),
+ )
+ def test_stream_source_readinto_variance(
+ self, original, level, source_read_size, read_sizes
+ ):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
- with cctx.stream_reader(io.BytesIO(original), size=len(original),
- read_size=source_read_size) as reader:
+ with cctx.stream_reader(
+ io.BytesIO(original), size=len(original), read_size=source_read_size
+ ) as reader:
chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(1, 16384))
@@ -203,23 +240,31 @@
chunks.append(bytes(b[0:count]))
- self.assertEqual(b''.join(chunks), ref_frame)
+ self.assertEqual(b"".join(chunks), ref_frame)
@hypothesis.settings(
- suppress_health_check=[hypothesis.HealthCheck.large_base_example])
- @hypothesis.given(original=strategies.sampled_from(random_input_data()),
- level=strategies.integers(min_value=1, max_value=5),
- source_read_size=strategies.integers(1, 16384),
- read_sizes=strategies.data())
- def test_buffer_source_readinto_variance(self, original, level,
- source_read_size, read_sizes):
+ suppress_health_check=[
+ hypothesis.HealthCheck.large_base_example,
+ hypothesis.HealthCheck.too_slow,
+ ]
+ )
+ @hypothesis.given(
+ original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ source_read_size=strategies.integers(1, 16384),
+ read_sizes=strategies.data(),
+ )
+ def test_buffer_source_readinto_variance(
+ self, original, level, source_read_size, read_sizes
+ ):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
- with cctx.stream_reader(original, size=len(original),
- read_size=source_read_size) as reader:
+ with cctx.stream_reader(
+ original, size=len(original), read_size=source_read_size
+ ) as reader:
chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(1, 16384))
@@ -231,16 +276,18 @@
chunks.append(bytes(b[0:count]))
- self.assertEqual(b''.join(chunks), ref_frame)
+ self.assertEqual(b"".join(chunks), ref_frame)
@hypothesis.settings(
- suppress_health_check=[hypothesis.HealthCheck.large_base_example])
- @hypothesis.given(original=strategies.sampled_from(random_input_data()),
- level=strategies.integers(min_value=1, max_value=5),
- source_read_size=strategies.integers(1, 16384),
- read_size=strategies.integers(-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE))
- def test_stream_source_read1(self, original, level, source_read_size,
- read_size):
+ suppress_health_check=[hypothesis.HealthCheck.large_base_example]
+ )
+ @hypothesis.given(
+ original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ source_read_size=strategies.integers(1, 16384),
+ read_size=strategies.integers(-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE),
+ )
+ def test_stream_source_read1(self, original, level, source_read_size, read_size):
if read_size == 0:
read_size = -1
@@ -248,8 +295,9 @@
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
- with cctx.stream_reader(io.BytesIO(original), size=len(original),
- read_size=source_read_size) as reader:
+ with cctx.stream_reader(
+ io.BytesIO(original), size=len(original), read_size=source_read_size
+ ) as reader:
chunks = []
while True:
chunk = reader.read1(read_size)
@@ -258,16 +306,18 @@
chunks.append(chunk)
- self.assertEqual(b''.join(chunks), ref_frame)
+ self.assertEqual(b"".join(chunks), ref_frame)
@hypothesis.settings(
- suppress_health_check=[hypothesis.HealthCheck.large_base_example])
- @hypothesis.given(original=strategies.sampled_from(random_input_data()),
- level=strategies.integers(min_value=1, max_value=5),
- source_read_size=strategies.integers(1, 16384),
- read_size=strategies.integers(-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE))
- def test_buffer_source_read1(self, original, level, source_read_size,
- read_size):
+ suppress_health_check=[hypothesis.HealthCheck.large_base_example]
+ )
+ @hypothesis.given(
+ original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ source_read_size=strategies.integers(1, 16384),
+ read_size=strategies.integers(-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE),
+ )
+ def test_buffer_source_read1(self, original, level, source_read_size, read_size):
if read_size == 0:
read_size = -1
@@ -275,8 +325,9 @@
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
- with cctx.stream_reader(original, size=len(original),
- read_size=source_read_size) as reader:
+ with cctx.stream_reader(
+ original, size=len(original), read_size=source_read_size
+ ) as reader:
chunks = []
while True:
chunk = reader.read1(read_size)
@@ -285,22 +336,30 @@
chunks.append(chunk)
- self.assertEqual(b''.join(chunks), ref_frame)
+ self.assertEqual(b"".join(chunks), ref_frame)
@hypothesis.settings(
- suppress_health_check=[hypothesis.HealthCheck.large_base_example])
- @hypothesis.given(original=strategies.sampled_from(random_input_data()),
- level=strategies.integers(min_value=1, max_value=5),
- source_read_size=strategies.integers(1, 16384),
- read_sizes=strategies.data())
- def test_stream_source_read1_variance(self, original, level, source_read_size,
- read_sizes):
+ suppress_health_check=[
+ hypothesis.HealthCheck.large_base_example,
+ hypothesis.HealthCheck.too_slow,
+ ]
+ )
+ @hypothesis.given(
+ original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ source_read_size=strategies.integers(1, 16384),
+ read_sizes=strategies.data(),
+ )
+ def test_stream_source_read1_variance(
+ self, original, level, source_read_size, read_sizes
+ ):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
- with cctx.stream_reader(io.BytesIO(original), size=len(original),
- read_size=source_read_size) as reader:
+ with cctx.stream_reader(
+ io.BytesIO(original), size=len(original), read_size=source_read_size
+ ) as reader:
chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(-1, 16384))
@@ -310,23 +369,31 @@
chunks.append(chunk)
- self.assertEqual(b''.join(chunks), ref_frame)
+ self.assertEqual(b"".join(chunks), ref_frame)
@hypothesis.settings(
- suppress_health_check=[hypothesis.HealthCheck.large_base_example])
- @hypothesis.given(original=strategies.sampled_from(random_input_data()),
- level=strategies.integers(min_value=1, max_value=5),
- source_read_size=strategies.integers(1, 16384),
- read_sizes=strategies.data())
- def test_buffer_source_read1_variance(self, original, level, source_read_size,
- read_sizes):
+ suppress_health_check=[
+ hypothesis.HealthCheck.large_base_example,
+ hypothesis.HealthCheck.too_slow,
+ ]
+ )
+ @hypothesis.given(
+ original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ source_read_size=strategies.integers(1, 16384),
+ read_sizes=strategies.data(),
+ )
+ def test_buffer_source_read1_variance(
+ self, original, level, source_read_size, read_sizes
+ ):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
- with cctx.stream_reader(original, size=len(original),
- read_size=source_read_size) as reader:
+ with cctx.stream_reader(
+ original, size=len(original), read_size=source_read_size
+ ) as reader:
chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(-1, 16384))
@@ -336,17 +403,20 @@
chunks.append(chunk)
- self.assertEqual(b''.join(chunks), ref_frame)
-
+ self.assertEqual(b"".join(chunks), ref_frame)
@hypothesis.settings(
- suppress_health_check=[hypothesis.HealthCheck.large_base_example])
- @hypothesis.given(original=strategies.sampled_from(random_input_data()),
- level=strategies.integers(min_value=1, max_value=5),
- source_read_size=strategies.integers(1, 16384),
- read_size=strategies.integers(1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE))
- def test_stream_source_readinto1(self, original, level, source_read_size,
- read_size):
+ suppress_health_check=[hypothesis.HealthCheck.large_base_example]
+ )
+ @hypothesis.given(
+ original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ source_read_size=strategies.integers(1, 16384),
+ read_size=strategies.integers(1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE),
+ )
+ def test_stream_source_readinto1(
+ self, original, level, source_read_size, read_size
+ ):
if read_size == 0:
read_size = -1
@@ -354,8 +424,9 @@
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
- with cctx.stream_reader(io.BytesIO(original), size=len(original),
- read_size=source_read_size) as reader:
+ with cctx.stream_reader(
+ io.BytesIO(original), size=len(original), read_size=source_read_size
+ ) as reader:
chunks = []
while True:
b = bytearray(read_size)
@@ -366,16 +437,20 @@
chunks.append(bytes(b[0:count]))
- self.assertEqual(b''.join(chunks), ref_frame)
+ self.assertEqual(b"".join(chunks), ref_frame)
@hypothesis.settings(
- suppress_health_check=[hypothesis.HealthCheck.large_base_example])
- @hypothesis.given(original=strategies.sampled_from(random_input_data()),
- level=strategies.integers(min_value=1, max_value=5),
- source_read_size=strategies.integers(1, 16384),
- read_size=strategies.integers(1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE))
- def test_buffer_source_readinto1(self, original, level, source_read_size,
- read_size):
+ suppress_health_check=[hypothesis.HealthCheck.large_base_example]
+ )
+ @hypothesis.given(
+ original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ source_read_size=strategies.integers(1, 16384),
+ read_size=strategies.integers(1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE),
+ )
+ def test_buffer_source_readinto1(
+ self, original, level, source_read_size, read_size
+ ):
if read_size == 0:
read_size = -1
@@ -383,8 +458,9 @@
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
- with cctx.stream_reader(original, size=len(original),
- read_size=source_read_size) as reader:
+ with cctx.stream_reader(
+ original, size=len(original), read_size=source_read_size
+ ) as reader:
chunks = []
while True:
b = bytearray(read_size)
@@ -395,22 +471,30 @@
chunks.append(bytes(b[0:count]))
- self.assertEqual(b''.join(chunks), ref_frame)
+ self.assertEqual(b"".join(chunks), ref_frame)
@hypothesis.settings(
- suppress_health_check=[hypothesis.HealthCheck.large_base_example])
- @hypothesis.given(original=strategies.sampled_from(random_input_data()),
- level=strategies.integers(min_value=1, max_value=5),
- source_read_size=strategies.integers(1, 16384),
- read_sizes=strategies.data())
- def test_stream_source_readinto1_variance(self, original, level, source_read_size,
- read_sizes):
+ suppress_health_check=[
+ hypothesis.HealthCheck.large_base_example,
+ hypothesis.HealthCheck.too_slow,
+ ]
+ )
+ @hypothesis.given(
+ original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ source_read_size=strategies.integers(1, 16384),
+ read_sizes=strategies.data(),
+ )
+ def test_stream_source_readinto1_variance(
+ self, original, level, source_read_size, read_sizes
+ ):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
- with cctx.stream_reader(io.BytesIO(original), size=len(original),
- read_size=source_read_size) as reader:
+ with cctx.stream_reader(
+ io.BytesIO(original), size=len(original), read_size=source_read_size
+ ) as reader:
chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(1, 16384))
@@ -422,23 +506,31 @@
chunks.append(bytes(b[0:count]))
- self.assertEqual(b''.join(chunks), ref_frame)
+ self.assertEqual(b"".join(chunks), ref_frame)
@hypothesis.settings(
- suppress_health_check=[hypothesis.HealthCheck.large_base_example])
- @hypothesis.given(original=strategies.sampled_from(random_input_data()),
- level=strategies.integers(min_value=1, max_value=5),
- source_read_size=strategies.integers(1, 16384),
- read_sizes=strategies.data())
- def test_buffer_source_readinto1_variance(self, original, level, source_read_size,
- read_sizes):
+ suppress_health_check=[
+ hypothesis.HealthCheck.large_base_example,
+ hypothesis.HealthCheck.too_slow,
+ ]
+ )
+ @hypothesis.given(
+ original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ source_read_size=strategies.integers(1, 16384),
+ read_sizes=strategies.data(),
+ )
+ def test_buffer_source_readinto1_variance(
+ self, original, level, source_read_size, read_sizes
+ ):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
- with cctx.stream_reader(original, size=len(original),
- read_size=source_read_size) as reader:
+ with cctx.stream_reader(
+ original, size=len(original), read_size=source_read_size
+ ) as reader:
chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(1, 16384))
@@ -450,35 +542,40 @@
chunks.append(bytes(b[0:count]))
- self.assertEqual(b''.join(chunks), ref_frame)
-
+ self.assertEqual(b"".join(chunks), ref_frame)
-@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
+@unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
@make_cffi
-class TestCompressor_stream_writer_fuzzing(unittest.TestCase):
- @hypothesis.given(original=strategies.sampled_from(random_input_data()),
- level=strategies.integers(min_value=1, max_value=5),
- write_size=strategies.integers(min_value=1, max_value=1048576))
+class TestCompressor_stream_writer_fuzzing(TestCase):
+ @hypothesis.given(
+ original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ write_size=strategies.integers(min_value=1, max_value=1048576),
+ )
def test_write_size_variance(self, original, level, write_size):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
b = NonClosingBytesIO()
- with cctx.stream_writer(b, size=len(original), write_size=write_size) as compressor:
+ with cctx.stream_writer(
+ b, size=len(original), write_size=write_size
+ ) as compressor:
compressor.write(original)
self.assertEqual(b.getvalue(), ref_frame)
-@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
+@unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
@make_cffi
-class TestCompressor_copy_stream_fuzzing(unittest.TestCase):
- @hypothesis.given(original=strategies.sampled_from(random_input_data()),
- level=strategies.integers(min_value=1, max_value=5),
- read_size=strategies.integers(min_value=1, max_value=1048576),
- write_size=strategies.integers(min_value=1, max_value=1048576))
+class TestCompressor_copy_stream_fuzzing(TestCase):
+ @hypothesis.given(
+ original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ read_size=strategies.integers(min_value=1, max_value=1048576),
+ write_size=strategies.integers(min_value=1, max_value=1048576),
+ )
def test_read_write_size_variance(self, original, level, read_size, write_size):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
@@ -487,20 +584,27 @@
source = io.BytesIO(original)
dest = io.BytesIO()
- cctx.copy_stream(source, dest, size=len(original), read_size=read_size,
- write_size=write_size)
+ cctx.copy_stream(
+ source, dest, size=len(original), read_size=read_size, write_size=write_size
+ )
self.assertEqual(dest.getvalue(), ref_frame)
-@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
+@unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
@make_cffi
-class TestCompressor_compressobj_fuzzing(unittest.TestCase):
+class TestCompressor_compressobj_fuzzing(TestCase):
@hypothesis.settings(
- suppress_health_check=[hypothesis.HealthCheck.large_base_example])
- @hypothesis.given(original=strategies.sampled_from(random_input_data()),
- level=strategies.integers(min_value=1, max_value=5),
- chunk_sizes=strategies.data())
+ suppress_health_check=[
+ hypothesis.HealthCheck.large_base_example,
+ hypothesis.HealthCheck.too_slow,
+ ]
+ )
+ @hypothesis.given(
+ original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ chunk_sizes=strategies.data(),
+ )
def test_random_input_sizes(self, original, level, chunk_sizes):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
@@ -512,7 +616,7 @@
i = 0
while True:
chunk_size = chunk_sizes.draw(strategies.integers(1, 4096))
- source = original[i:i + chunk_size]
+ source = original[i : i + chunk_size]
if not source:
break
@@ -521,14 +625,20 @@
chunks.append(cobj.flush())
- self.assertEqual(b''.join(chunks), ref_frame)
+ self.assertEqual(b"".join(chunks), ref_frame)
@hypothesis.settings(
- suppress_health_check=[hypothesis.HealthCheck.large_base_example])
- @hypothesis.given(original=strategies.sampled_from(random_input_data()),
- level=strategies.integers(min_value=1, max_value=5),
- chunk_sizes=strategies.data(),
- flushes=strategies.data())
+ suppress_health_check=[
+ hypothesis.HealthCheck.large_base_example,
+ hypothesis.HealthCheck.too_slow,
+ ]
+ )
+ @hypothesis.given(
+ original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ chunk_sizes=strategies.data(),
+ flushes=strategies.data(),
+ )
def test_flush_block(self, original, level, chunk_sizes, flushes):
cctx = zstd.ZstdCompressor(level=level)
cobj = cctx.compressobj()
@@ -541,7 +651,7 @@
i = 0
while True:
input_size = chunk_sizes.draw(strategies.integers(1, 4096))
- source = original[i:i + input_size]
+ source = original[i : i + input_size]
if not source:
break
@@ -558,24 +668,28 @@
compressed_chunks.append(chunk)
decompressed_chunks.append(dobj.decompress(chunk))
- self.assertEqual(b''.join(decompressed_chunks), original[0:i])
+ self.assertEqual(b"".join(decompressed_chunks), original[0:i])
chunk = cobj.flush(zstd.COMPRESSOBJ_FLUSH_FINISH)
compressed_chunks.append(chunk)
decompressed_chunks.append(dobj.decompress(chunk))
- self.assertEqual(dctx.decompress(b''.join(compressed_chunks),
- max_output_size=len(original)),
- original)
- self.assertEqual(b''.join(decompressed_chunks), original)
+ self.assertEqual(
+ dctx.decompress(b"".join(compressed_chunks), max_output_size=len(original)),
+ original,
+ )
+ self.assertEqual(b"".join(decompressed_chunks), original)
+
-@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
+@unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
@make_cffi
-class TestCompressor_read_to_iter_fuzzing(unittest.TestCase):
- @hypothesis.given(original=strategies.sampled_from(random_input_data()),
- level=strategies.integers(min_value=1, max_value=5),
- read_size=strategies.integers(min_value=1, max_value=4096),
- write_size=strategies.integers(min_value=1, max_value=4096))
+class TestCompressor_read_to_iter_fuzzing(TestCase):
+ @hypothesis.given(
+ original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ read_size=strategies.integers(min_value=1, max_value=4096),
+ write_size=strategies.integers(min_value=1, max_value=4096),
+ )
def test_read_write_size_variance(self, original, level, read_size, write_size):
refcctx = zstd.ZstdCompressor(level=level)
ref_frame = refcctx.compress(original)
@@ -583,32 +697,35 @@
source = io.BytesIO(original)
cctx = zstd.ZstdCompressor(level=level)
- chunks = list(cctx.read_to_iter(source, size=len(original),
- read_size=read_size,
- write_size=write_size))
+ chunks = list(
+ cctx.read_to_iter(
+ source, size=len(original), read_size=read_size, write_size=write_size
+ )
+ )
- self.assertEqual(b''.join(chunks), ref_frame)
+ self.assertEqual(b"".join(chunks), ref_frame)
-@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
-class TestCompressor_multi_compress_to_buffer_fuzzing(unittest.TestCase):
- @hypothesis.given(original=strategies.lists(strategies.sampled_from(random_input_data()),
- min_size=1, max_size=1024),
- threads=strategies.integers(min_value=1, max_value=8),
- use_dict=strategies.booleans())
+@unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
+class TestCompressor_multi_compress_to_buffer_fuzzing(TestCase):
+ @hypothesis.given(
+ original=strategies.lists(
+ strategies.sampled_from(random_input_data()), min_size=1, max_size=1024
+ ),
+ threads=strategies.integers(min_value=1, max_value=8),
+ use_dict=strategies.booleans(),
+ )
def test_data_equivalence(self, original, threads, use_dict):
kwargs = {}
# Use a content dictionary because it is cheap to create.
if use_dict:
- kwargs['dict_data'] = zstd.ZstdCompressionDict(original[0])
+ kwargs["dict_data"] = zstd.ZstdCompressionDict(original[0])
- cctx = zstd.ZstdCompressor(level=1,
- write_checksum=True,
- **kwargs)
+ cctx = zstd.ZstdCompressor(level=1, write_checksum=True, **kwargs)
- if not hasattr(cctx, 'multi_compress_to_buffer'):
- self.skipTest('multi_compress_to_buffer not available')
+ if not hasattr(cctx, "multi_compress_to_buffer"):
+ self.skipTest("multi_compress_to_buffer not available")
result = cctx.multi_compress_to_buffer(original, threads=-1)
@@ -624,17 +741,21 @@
self.assertEqual(dctx.decompress(frame), original[i])
-@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
+@unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
@make_cffi
-class TestCompressor_chunker_fuzzing(unittest.TestCase):
+class TestCompressor_chunker_fuzzing(TestCase):
@hypothesis.settings(
- suppress_health_check=[hypothesis.HealthCheck.large_base_example])
- @hypothesis.given(original=strategies.sampled_from(random_input_data()),
- level=strategies.integers(min_value=1, max_value=5),
- chunk_size=strategies.integers(
- min_value=1,
- max_value=32 * 1048576),
- input_sizes=strategies.data())
+ suppress_health_check=[
+ hypothesis.HealthCheck.large_base_example,
+ hypothesis.HealthCheck.too_slow,
+ ]
+ )
+ @hypothesis.given(
+ original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ chunk_size=strategies.integers(min_value=1, max_value=32 * 1048576),
+ input_sizes=strategies.data(),
+ )
def test_random_input_sizes(self, original, level, chunk_size, input_sizes):
cctx = zstd.ZstdCompressor(level=level)
chunker = cctx.chunker(chunk_size=chunk_size)
@@ -643,7 +764,7 @@
i = 0
while True:
input_size = input_sizes.draw(strategies.integers(1, 4096))
- source = original[i:i + input_size]
+ source = original[i : i + input_size]
if not source:
break
@@ -654,23 +775,26 @@
dctx = zstd.ZstdDecompressor()
- self.assertEqual(dctx.decompress(b''.join(chunks),
- max_output_size=len(original)),
- original)
+ self.assertEqual(
+ dctx.decompress(b"".join(chunks), max_output_size=len(original)), original
+ )
self.assertTrue(all(len(chunk) == chunk_size for chunk in chunks[:-1]))
@hypothesis.settings(
- suppress_health_check=[hypothesis.HealthCheck.large_base_example])
- @hypothesis.given(original=strategies.sampled_from(random_input_data()),
- level=strategies.integers(min_value=1, max_value=5),
- chunk_size=strategies.integers(
- min_value=1,
- max_value=32 * 1048576),
- input_sizes=strategies.data(),
- flushes=strategies.data())
- def test_flush_block(self, original, level, chunk_size, input_sizes,
- flushes):
+ suppress_health_check=[
+ hypothesis.HealthCheck.large_base_example,
+ hypothesis.HealthCheck.too_slow,
+ ]
+ )
+ @hypothesis.given(
+ original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ chunk_size=strategies.integers(min_value=1, max_value=32 * 1048576),
+ input_sizes=strategies.data(),
+ flushes=strategies.data(),
+ )
+ def test_flush_block(self, original, level, chunk_size, input_sizes, flushes):
cctx = zstd.ZstdCompressor(level=level)
chunker = cctx.chunker(chunk_size=chunk_size)
@@ -682,7 +806,7 @@
i = 0
while True:
input_size = input_sizes.draw(strategies.integers(1, 4096))
- source = original[i:i + input_size]
+ source = original[i : i + input_size]
if not source:
break
@@ -690,22 +814,23 @@
chunks = list(chunker.compress(source))
compressed_chunks.extend(chunks)
- decompressed_chunks.append(dobj.decompress(b''.join(chunks)))
+ decompressed_chunks.append(dobj.decompress(b"".join(chunks)))
if not flushes.draw(strategies.booleans()):
continue
chunks = list(chunker.flush())
compressed_chunks.extend(chunks)
- decompressed_chunks.append(dobj.decompress(b''.join(chunks)))
+ decompressed_chunks.append(dobj.decompress(b"".join(chunks)))
- self.assertEqual(b''.join(decompressed_chunks), original[0:i])
+ self.assertEqual(b"".join(decompressed_chunks), original[0:i])
chunks = list(chunker.finish())
compressed_chunks.extend(chunks)
- decompressed_chunks.append(dobj.decompress(b''.join(chunks)))
+ decompressed_chunks.append(dobj.decompress(b"".join(chunks)))
- self.assertEqual(dctx.decompress(b''.join(compressed_chunks),
- max_output_size=len(original)),
- original)
- self.assertEqual(b''.join(decompressed_chunks), original)
\ No newline at end of file
+ self.assertEqual(
+ dctx.decompress(b"".join(compressed_chunks), max_output_size=len(original)),
+ original,
+ )
+ self.assertEqual(b"".join(decompressed_chunks), original)