Mercurial > public > mercurial-scm > hg
comparison contrib/python-zstandard/tests/test_compressor_fuzzing.py @ 31796:e0dc40530c5a
zstd: vendor python-zstandard 0.8.0
Commit 81e1f5bbf1fc54808649562d3ed829730765c540 from
https://github.com/indygreg/python-zstandard is imported without
modifications (other than removing unwanted files).
Updates relevant to Mercurial include:
* Support for multi-threaded compression (we can use this for
bundle and wire protocol compression).
* APIs for batch compression and decompression operations using
multiple threads and optimal memory allocation mechanism. (Can
be useful for revlog perf improvements.)
* A ``BufferWithSegments`` type that models a single memory buffer
containing N discrete items of known lengths. This type can be
used for very efficient 0-copy data operations.
# no-check-commit
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Sat, 01 Apr 2017 15:24:03 -0700 |
parents | |
children | b1fb341d8a61 |
comparison
equal
deleted
inserted
replaced
31795:2b130e26c3a4 | 31796:e0dc40530c5a |
---|---|
1 import io | |
2 import os | |
3 | |
4 try: | |
5 import unittest2 as unittest | |
6 except ImportError: | |
7 import unittest | |
8 | |
9 try: | |
10 import hypothesis | |
11 import hypothesis.strategies as strategies | |
12 except ImportError: | |
13 raise unittest.SkipTest('hypothesis not available') | |
14 | |
15 import zstd | |
16 | |
17 from . common import ( | |
18 make_cffi, | |
19 random_input_data, | |
20 ) | |
21 | |
22 | |
23 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') | |
24 @make_cffi | |
25 class TestCompressor_write_to_fuzzing(unittest.TestCase): | |
26 @hypothesis.given(original=strategies.sampled_from(random_input_data()), | |
27 level=strategies.integers(min_value=1, max_value=5), | |
28 write_size=strategies.integers(min_value=1, max_value=1048576)) | |
29 def test_write_size_variance(self, original, level, write_size): | |
30 refctx = zstd.ZstdCompressor(level=level) | |
31 ref_frame = refctx.compress(original) | |
32 | |
33 cctx = zstd.ZstdCompressor(level=level) | |
34 b = io.BytesIO() | |
35 with cctx.write_to(b, size=len(original), write_size=write_size) as compressor: | |
36 compressor.write(original) | |
37 | |
38 self.assertEqual(b.getvalue(), ref_frame) | |
39 | |
40 | |
41 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') | |
42 @make_cffi | |
43 class TestCompressor_copy_stream_fuzzing(unittest.TestCase): | |
44 @hypothesis.given(original=strategies.sampled_from(random_input_data()), | |
45 level=strategies.integers(min_value=1, max_value=5), | |
46 read_size=strategies.integers(min_value=1, max_value=1048576), | |
47 write_size=strategies.integers(min_value=1, max_value=1048576)) | |
48 def test_read_write_size_variance(self, original, level, read_size, write_size): | |
49 refctx = zstd.ZstdCompressor(level=level) | |
50 ref_frame = refctx.compress(original) | |
51 | |
52 cctx = zstd.ZstdCompressor(level=level) | |
53 source = io.BytesIO(original) | |
54 dest = io.BytesIO() | |
55 | |
56 cctx.copy_stream(source, dest, size=len(original), read_size=read_size, | |
57 write_size=write_size) | |
58 | |
59 self.assertEqual(dest.getvalue(), ref_frame) | |
60 | |
61 | |
62 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') | |
63 @make_cffi | |
64 class TestCompressor_compressobj_fuzzing(unittest.TestCase): | |
65 @hypothesis.given(original=strategies.sampled_from(random_input_data()), | |
66 level=strategies.integers(min_value=1, max_value=5), | |
67 chunk_sizes=strategies.streaming( | |
68 strategies.integers(min_value=1, max_value=4096))) | |
69 def test_random_input_sizes(self, original, level, chunk_sizes): | |
70 chunk_sizes = iter(chunk_sizes) | |
71 | |
72 refctx = zstd.ZstdCompressor(level=level) | |
73 ref_frame = refctx.compress(original) | |
74 | |
75 cctx = zstd.ZstdCompressor(level=level) | |
76 cobj = cctx.compressobj(size=len(original)) | |
77 | |
78 chunks = [] | |
79 i = 0 | |
80 while True: | |
81 chunk_size = next(chunk_sizes) | |
82 source = original[i:i + chunk_size] | |
83 if not source: | |
84 break | |
85 | |
86 chunks.append(cobj.compress(source)) | |
87 i += chunk_size | |
88 | |
89 chunks.append(cobj.flush()) | |
90 | |
91 self.assertEqual(b''.join(chunks), ref_frame) | |
92 | |
93 | |
94 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') | |
95 @make_cffi | |
96 class TestCompressor_read_from_fuzzing(unittest.TestCase): | |
97 @hypothesis.given(original=strategies.sampled_from(random_input_data()), | |
98 level=strategies.integers(min_value=1, max_value=5), | |
99 read_size=strategies.integers(min_value=1, max_value=4096), | |
100 write_size=strategies.integers(min_value=1, max_value=4096)) | |
101 def test_read_write_size_variance(self, original, level, read_size, write_size): | |
102 refcctx = zstd.ZstdCompressor(level=level) | |
103 ref_frame = refcctx.compress(original) | |
104 | |
105 source = io.BytesIO(original) | |
106 | |
107 cctx = zstd.ZstdCompressor(level=level) | |
108 chunks = list(cctx.read_from(source, size=len(original), read_size=read_size, | |
109 write_size=write_size)) | |
110 | |
111 self.assertEqual(b''.join(chunks), ref_frame) | |
112 | |
113 | |
114 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') | |
115 class TestCompressor_multi_compress_to_buffer_fuzzing(unittest.TestCase): | |
116 @hypothesis.given(original=strategies.lists(strategies.sampled_from(random_input_data()), | |
117 min_size=1, max_size=1024), | |
118 threads=strategies.integers(min_value=1, max_value=8), | |
119 use_dict=strategies.booleans()) | |
120 def test_data_equivalence(self, original, threads, use_dict): | |
121 kwargs = {} | |
122 | |
123 # Use a content dictionary because it is cheap to create. | |
124 if use_dict: | |
125 kwargs['dict_data'] = zstd.ZstdCompressionDict(original[0]) | |
126 | |
127 cctx = zstd.ZstdCompressor(level=1, | |
128 write_content_size=True, | |
129 write_checksum=True, | |
130 **kwargs) | |
131 | |
132 result = cctx.multi_compress_to_buffer(original, threads=-1) | |
133 | |
134 self.assertEqual(len(result), len(original)) | |
135 | |
136 # The frame produced via the batch APIs may not be bit identical to that | |
137 # produced by compress() because compression parameters are adjusted | |
138 # from the first input in batch mode. So the only thing we can do is | |
139 # verify the decompressed data matches the input. | |
140 dctx = zstd.ZstdDecompressor(**kwargs) | |
141 | |
142 for i, frame in enumerate(result): | |
143 self.assertEqual(dctx.decompress(frame), original[i]) |