comparison contrib/python-zstandard/tests/test_decompressor_fuzzing.py @ 31799:e0dc40530c5a

zstd: vendor python-zstandard 0.8.0 Commit 81e1f5bbf1fc54808649562d3ed829730765c540 from https://github.com/indygreg/python-zstandard is imported without modifications (other than removing unwanted files). Updates relevant to Mercurial include: * Support for multi-threaded compression (we can use this for bundle and wire protocol compression). * APIs for batch compression and decompression operations using multiple threads and optimal memory allocation mechanism. (Can be useful for revlog perf improvements.) * A ``BufferWithSegments`` type that models a single memory buffer containing N discrete items of known lengths. This type can be used for very efficient 0-copy data operations. # no-check-commit
author Gregory Szorc <gregory.szorc@gmail.com>
date Sat, 01 Apr 2017 15:24:03 -0700
parents
children b1fb341d8a61
comparison
equal deleted inserted replaced
31798:2b130e26c3a4 31799:e0dc40530c5a
1 import io
2 import os
3
4 try:
5 import unittest2 as unittest
6 except ImportError:
7 import unittest
8
9 try:
10 import hypothesis
11 import hypothesis.strategies as strategies
12 except ImportError:
13 raise unittest.SkipTest('hypothesis not available')
14
15 import zstd
16
17 from . common import (
18 make_cffi,
19 random_input_data,
20 )
21
22
23 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
24 @make_cffi
25 class TestDecompressor_write_to_fuzzing(unittest.TestCase):
26 @hypothesis.given(original=strategies.sampled_from(random_input_data()),
27 level=strategies.integers(min_value=1, max_value=5),
28 write_size=strategies.integers(min_value=1, max_value=8192),
29 input_sizes=strategies.streaming(
30 strategies.integers(min_value=1, max_value=4096)))
31 def test_write_size_variance(self, original, level, write_size, input_sizes):
32 input_sizes = iter(input_sizes)
33
34 cctx = zstd.ZstdCompressor(level=level)
35 frame = cctx.compress(original)
36
37 dctx = zstd.ZstdDecompressor()
38 source = io.BytesIO(frame)
39 dest = io.BytesIO()
40
41 with dctx.write_to(dest, write_size=write_size) as decompressor:
42 while True:
43 chunk = source.read(next(input_sizes))
44 if not chunk:
45 break
46
47 decompressor.write(chunk)
48
49 self.assertEqual(dest.getvalue(), original)
50
51
52 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
53 @make_cffi
54 class TestDecompressor_copy_stream_fuzzing(unittest.TestCase):
55 @hypothesis.given(original=strategies.sampled_from(random_input_data()),
56 level=strategies.integers(min_value=1, max_value=5),
57 read_size=strategies.integers(min_value=1, max_value=8192),
58 write_size=strategies.integers(min_value=1, max_value=8192))
59 def test_read_write_size_variance(self, original, level, read_size, write_size):
60 cctx = zstd.ZstdCompressor(level=level)
61 frame = cctx.compress(original)
62
63 source = io.BytesIO(frame)
64 dest = io.BytesIO()
65
66 dctx = zstd.ZstdDecompressor()
67 dctx.copy_stream(source, dest, read_size=read_size, write_size=write_size)
68
69 self.assertEqual(dest.getvalue(), original)
70
71
72 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
73 @make_cffi
74 class TestDecompressor_decompressobj_fuzzing(unittest.TestCase):
75 @hypothesis.given(original=strategies.sampled_from(random_input_data()),
76 level=strategies.integers(min_value=1, max_value=5),
77 chunk_sizes=strategies.streaming(
78 strategies.integers(min_value=1, max_value=4096)))
79 def test_random_input_sizes(self, original, level, chunk_sizes):
80 chunk_sizes = iter(chunk_sizes)
81
82 cctx = zstd.ZstdCompressor(level=level)
83 frame = cctx.compress(original)
84
85 source = io.BytesIO(frame)
86
87 dctx = zstd.ZstdDecompressor()
88 dobj = dctx.decompressobj()
89
90 chunks = []
91 while True:
92 chunk = source.read(next(chunk_sizes))
93 if not chunk:
94 break
95
96 chunks.append(dobj.decompress(chunk))
97
98 self.assertEqual(b''.join(chunks), original)
99
100
101 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
102 @make_cffi
103 class TestDecompressor_read_from_fuzzing(unittest.TestCase):
104 @hypothesis.given(original=strategies.sampled_from(random_input_data()),
105 level=strategies.integers(min_value=1, max_value=5),
106 read_size=strategies.integers(min_value=1, max_value=4096),
107 write_size=strategies.integers(min_value=1, max_value=4096))
108 def test_read_write_size_variance(self, original, level, read_size, write_size):
109 cctx = zstd.ZstdCompressor(level=level)
110 frame = cctx.compress(original)
111
112 source = io.BytesIO(frame)
113
114 dctx = zstd.ZstdDecompressor()
115 chunks = list(dctx.read_from(source, read_size=read_size, write_size=write_size))
116
117 self.assertEqual(b''.join(chunks), original)
118
119
120 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
121 class TestDecompressor_multi_decompress_to_buffer_fuzzing(unittest.TestCase):
122 @hypothesis.given(original=strategies.lists(strategies.sampled_from(random_input_data()),
123 min_size=1, max_size=1024),
124 threads=strategies.integers(min_value=1, max_value=8),
125 use_dict=strategies.booleans())
126 def test_data_equivalence(self, original, threads, use_dict):
127 kwargs = {}
128 if use_dict:
129 kwargs['dict_data'] = zstd.ZstdCompressionDict(original[0])
130
131 cctx = zstd.ZstdCompressor(level=1,
132 write_content_size=True,
133 write_checksum=True,
134 **kwargs)
135
136 frames_buffer = cctx.multi_compress_to_buffer(original, threads=-1)
137
138 dctx = zstd.ZstdDecompressor(**kwargs)
139
140 result = dctx.multi_decompress_to_buffer(frames_buffer)
141
142 self.assertEqual(len(result), len(original))
143 for i, frame in enumerate(result):
144 self.assertEqual(frame.tobytes(), original[i])
145
146 frames_list = [f.tobytes() for f in frames_buffer]
147 result = dctx.multi_decompress_to_buffer(frames_list)
148
149 self.assertEqual(len(result), len(original))
150 for i, frame in enumerate(result):
151 self.assertEqual(frame.tobytes(), original[i])