comparison contrib/python-zstandard/tests/test_compressor_fuzzing.py @ 31796:e0dc40530c5a

zstd: vendor python-zstandard 0.8.0 Commit 81e1f5bbf1fc54808649562d3ed829730765c540 from https://github.com/indygreg/python-zstandard is imported without modifications (other than removing unwanted files). Updates relevant to Mercurial include: * Support for multi-threaded compression (we can use this for bundle and wire protocol compression). * APIs for batch compression and decompression operations using multiple threads and optimal memory allocation mechanism. (Can be useful for revlog perf improvements.) * A ``BufferWithSegments`` type that models a single memory buffer containing N discrete items of known lengths. This type can be used for very efficient 0-copy data operations. # no-check-commit
author Gregory Szorc <gregory.szorc@gmail.com>
date Sat, 01 Apr 2017 15:24:03 -0700
parents
children b1fb341d8a61
comparison
equal deleted inserted replaced
31795:2b130e26c3a4 31796:e0dc40530c5a
1 import io
2 import os
3
4 try:
5 import unittest2 as unittest
6 except ImportError:
7 import unittest
8
9 try:
10 import hypothesis
11 import hypothesis.strategies as strategies
12 except ImportError:
13 raise unittest.SkipTest('hypothesis not available')
14
15 import zstd
16
17 from . common import (
18 make_cffi,
19 random_input_data,
20 )
21
22
23 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
24 @make_cffi
25 class TestCompressor_write_to_fuzzing(unittest.TestCase):
26 @hypothesis.given(original=strategies.sampled_from(random_input_data()),
27 level=strategies.integers(min_value=1, max_value=5),
28 write_size=strategies.integers(min_value=1, max_value=1048576))
29 def test_write_size_variance(self, original, level, write_size):
30 refctx = zstd.ZstdCompressor(level=level)
31 ref_frame = refctx.compress(original)
32
33 cctx = zstd.ZstdCompressor(level=level)
34 b = io.BytesIO()
35 with cctx.write_to(b, size=len(original), write_size=write_size) as compressor:
36 compressor.write(original)
37
38 self.assertEqual(b.getvalue(), ref_frame)
39
40
41 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
42 @make_cffi
43 class TestCompressor_copy_stream_fuzzing(unittest.TestCase):
44 @hypothesis.given(original=strategies.sampled_from(random_input_data()),
45 level=strategies.integers(min_value=1, max_value=5),
46 read_size=strategies.integers(min_value=1, max_value=1048576),
47 write_size=strategies.integers(min_value=1, max_value=1048576))
48 def test_read_write_size_variance(self, original, level, read_size, write_size):
49 refctx = zstd.ZstdCompressor(level=level)
50 ref_frame = refctx.compress(original)
51
52 cctx = zstd.ZstdCompressor(level=level)
53 source = io.BytesIO(original)
54 dest = io.BytesIO()
55
56 cctx.copy_stream(source, dest, size=len(original), read_size=read_size,
57 write_size=write_size)
58
59 self.assertEqual(dest.getvalue(), ref_frame)
60
61
62 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
63 @make_cffi
64 class TestCompressor_compressobj_fuzzing(unittest.TestCase):
65 @hypothesis.given(original=strategies.sampled_from(random_input_data()),
66 level=strategies.integers(min_value=1, max_value=5),
67 chunk_sizes=strategies.streaming(
68 strategies.integers(min_value=1, max_value=4096)))
69 def test_random_input_sizes(self, original, level, chunk_sizes):
70 chunk_sizes = iter(chunk_sizes)
71
72 refctx = zstd.ZstdCompressor(level=level)
73 ref_frame = refctx.compress(original)
74
75 cctx = zstd.ZstdCompressor(level=level)
76 cobj = cctx.compressobj(size=len(original))
77
78 chunks = []
79 i = 0
80 while True:
81 chunk_size = next(chunk_sizes)
82 source = original[i:i + chunk_size]
83 if not source:
84 break
85
86 chunks.append(cobj.compress(source))
87 i += chunk_size
88
89 chunks.append(cobj.flush())
90
91 self.assertEqual(b''.join(chunks), ref_frame)
92
93
94 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
95 @make_cffi
96 class TestCompressor_read_from_fuzzing(unittest.TestCase):
97 @hypothesis.given(original=strategies.sampled_from(random_input_data()),
98 level=strategies.integers(min_value=1, max_value=5),
99 read_size=strategies.integers(min_value=1, max_value=4096),
100 write_size=strategies.integers(min_value=1, max_value=4096))
101 def test_read_write_size_variance(self, original, level, read_size, write_size):
102 refcctx = zstd.ZstdCompressor(level=level)
103 ref_frame = refcctx.compress(original)
104
105 source = io.BytesIO(original)
106
107 cctx = zstd.ZstdCompressor(level=level)
108 chunks = list(cctx.read_from(source, size=len(original), read_size=read_size,
109 write_size=write_size))
110
111 self.assertEqual(b''.join(chunks), ref_frame)
112
113
114 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
115 class TestCompressor_multi_compress_to_buffer_fuzzing(unittest.TestCase):
116 @hypothesis.given(original=strategies.lists(strategies.sampled_from(random_input_data()),
117 min_size=1, max_size=1024),
118 threads=strategies.integers(min_value=1, max_value=8),
119 use_dict=strategies.booleans())
120 def test_data_equivalence(self, original, threads, use_dict):
121 kwargs = {}
122
123 # Use a content dictionary because it is cheap to create.
124 if use_dict:
125 kwargs['dict_data'] = zstd.ZstdCompressionDict(original[0])
126
127 cctx = zstd.ZstdCompressor(level=1,
128 write_content_size=True,
129 write_checksum=True,
130 **kwargs)
131
132 result = cctx.multi_compress_to_buffer(original, threads=-1)
133
134 self.assertEqual(len(result), len(original))
135
136 # The frame produced via the batch APIs may not be bit identical to that
137 # produced by compress() because compression parameters are adjusted
138 # from the first input in batch mode. So the only thing we can do is
139 # verify the decompressed data matches the input.
140 dctx = zstd.ZstdDecompressor(**kwargs)
141
142 for i, frame in enumerate(result):
143 self.assertEqual(dctx.decompress(frame), original[i])