1 try: |
1 import sys |
2 import unittest2 as unittest |
2 import unittest |
3 except ImportError: |
3 |
4 import unittest |
4 import zstandard as zstd |
5 |
|
6 import zstd |
|
7 |
5 |
8 from . common import ( |
6 from . common import ( |
9 make_cffi, |
7 make_cffi, |
10 ) |
8 ) |
11 |
9 |
12 |
10 |
13 @make_cffi |
11 @make_cffi |
14 class TestCompressionParameters(unittest.TestCase): |
12 class TestCompressionParameters(unittest.TestCase): |
15 def test_init_bad_arg_type(self): |
|
16 with self.assertRaises(TypeError): |
|
17 zstd.CompressionParameters() |
|
18 |
|
19 with self.assertRaises(TypeError): |
|
20 zstd.CompressionParameters(0, 1) |
|
21 |
|
22 def test_bounds(self): |
13 def test_bounds(self): |
23 zstd.CompressionParameters(zstd.WINDOWLOG_MIN, |
14 zstd.ZstdCompressionParameters(window_log=zstd.WINDOWLOG_MIN, |
24 zstd.CHAINLOG_MIN, |
15 chain_log=zstd.CHAINLOG_MIN, |
25 zstd.HASHLOG_MIN, |
16 hash_log=zstd.HASHLOG_MIN, |
26 zstd.SEARCHLOG_MIN, |
17 search_log=zstd.SEARCHLOG_MIN, |
27 zstd.SEARCHLENGTH_MIN + 1, |
18 min_match=zstd.SEARCHLENGTH_MIN + 1, |
28 zstd.TARGETLENGTH_MIN, |
19 target_length=zstd.TARGETLENGTH_MIN, |
29 zstd.STRATEGY_FAST) |
20 compression_strategy=zstd.STRATEGY_FAST) |
30 |
21 |
31 zstd.CompressionParameters(zstd.WINDOWLOG_MAX, |
22 zstd.ZstdCompressionParameters(window_log=zstd.WINDOWLOG_MAX, |
32 zstd.CHAINLOG_MAX, |
23 chain_log=zstd.CHAINLOG_MAX, |
33 zstd.HASHLOG_MAX, |
24 hash_log=zstd.HASHLOG_MAX, |
34 zstd.SEARCHLOG_MAX, |
25 search_log=zstd.SEARCHLOG_MAX, |
35 zstd.SEARCHLENGTH_MAX - 1, |
26 min_match=zstd.SEARCHLENGTH_MAX - 1, |
36 zstd.TARGETLENGTH_MAX, |
27 compression_strategy=zstd.STRATEGY_BTULTRA) |
37 zstd.STRATEGY_BTOPT) |
28 |
38 |
29 def test_from_level(self): |
39 def test_get_compression_parameters(self): |
30 p = zstd.ZstdCompressionParameters.from_level(1) |
40 p = zstd.get_compression_parameters(1) |
|
41 self.assertIsInstance(p, zstd.CompressionParameters) |
31 self.assertIsInstance(p, zstd.CompressionParameters) |
42 |
32 |
43 self.assertEqual(p.window_log, 19) |
33 self.assertEqual(p.window_log, 19) |
44 |
34 |
|
35 p = zstd.ZstdCompressionParameters.from_level(-4) |
|
36 self.assertEqual(p.window_log, 19) |
|
37 self.assertEqual(p.compress_literals, 0) |
|
38 |
45 def test_members(self): |
39 def test_members(self): |
46 p = zstd.CompressionParameters(10, 6, 7, 4, 5, 8, 1) |
40 p = zstd.ZstdCompressionParameters(window_log=10, |
|
41 chain_log=6, |
|
42 hash_log=7, |
|
43 search_log=4, |
|
44 min_match=5, |
|
45 target_length=8, |
|
46 compression_strategy=1) |
47 self.assertEqual(p.window_log, 10) |
47 self.assertEqual(p.window_log, 10) |
48 self.assertEqual(p.chain_log, 6) |
48 self.assertEqual(p.chain_log, 6) |
49 self.assertEqual(p.hash_log, 7) |
49 self.assertEqual(p.hash_log, 7) |
50 self.assertEqual(p.search_log, 4) |
50 self.assertEqual(p.search_log, 4) |
51 self.assertEqual(p.search_length, 5) |
51 self.assertEqual(p.min_match, 5) |
52 self.assertEqual(p.target_length, 8) |
52 self.assertEqual(p.target_length, 8) |
53 self.assertEqual(p.strategy, 1) |
53 self.assertEqual(p.compression_strategy, 1) |
|
54 |
|
55 p = zstd.ZstdCompressionParameters(compression_level=2) |
|
56 self.assertEqual(p.compression_level, 2) |
|
57 |
|
58 p = zstd.ZstdCompressionParameters(threads=4) |
|
59 self.assertEqual(p.threads, 4) |
|
60 |
|
61 p = zstd.ZstdCompressionParameters(threads=2, job_size=1048576, |
|
62 overlap_size_log=6) |
|
63 self.assertEqual(p.threads, 2) |
|
64 self.assertEqual(p.job_size, 1048576) |
|
65 self.assertEqual(p.overlap_size_log, 6) |
|
66 |
|
67 p = zstd.ZstdCompressionParameters(compression_level=2) |
|
68 self.assertEqual(p.compress_literals, 1) |
|
69 |
|
70 p = zstd.ZstdCompressionParameters(compress_literals=False) |
|
71 self.assertEqual(p.compress_literals, 0) |
|
72 |
|
73 p = zstd.ZstdCompressionParameters(compression_level=-1) |
|
74 self.assertEqual(p.compression_level, -1) |
|
75 self.assertEqual(p.compress_literals, 0) |
|
76 |
|
77 p = zstd.ZstdCompressionParameters(compression_level=-2, compress_literals=True) |
|
78 self.assertEqual(p.compression_level, -2) |
|
79 self.assertEqual(p.compress_literals, 1) |
|
80 |
|
81 p = zstd.ZstdCompressionParameters(force_max_window=True) |
|
82 self.assertEqual(p.force_max_window, 1) |
|
83 |
|
84 p = zstd.ZstdCompressionParameters(enable_ldm=True) |
|
85 self.assertEqual(p.enable_ldm, 1) |
|
86 |
|
87 p = zstd.ZstdCompressionParameters(ldm_hash_log=7) |
|
88 self.assertEqual(p.ldm_hash_log, 7) |
|
89 |
|
90 p = zstd.ZstdCompressionParameters(ldm_min_match=6) |
|
91 self.assertEqual(p.ldm_min_match, 6) |
|
92 |
|
93 p = zstd.ZstdCompressionParameters(ldm_bucket_size_log=7) |
|
94 self.assertEqual(p.ldm_bucket_size_log, 7) |
|
95 |
|
96 p = zstd.ZstdCompressionParameters(ldm_hash_every_log=8) |
|
97 self.assertEqual(p.ldm_hash_every_log, 8) |
54 |
98 |
55 def test_estimated_compression_context_size(self): |
99 def test_estimated_compression_context_size(self): |
56 p = zstd.CompressionParameters(20, 16, 17, 1, 5, 16, zstd.STRATEGY_DFAST) |
100 p = zstd.ZstdCompressionParameters(window_log=20, |
|
101 chain_log=16, |
|
102 hash_log=17, |
|
103 search_log=1, |
|
104 min_match=5, |
|
105 target_length=16, |
|
106 compression_strategy=zstd.STRATEGY_DFAST) |
57 |
107 |
58 # 32-bit has slightly different values from 64-bit. |
108 # 32-bit has slightly different values from 64-bit. |
59 self.assertAlmostEqual(p.estimated_compression_context_size(), 1287076, |
109 self.assertAlmostEqual(p.estimated_compression_context_size(), 1294072, |
60 delta=110) |
110 delta=250) |
61 |
111 |
62 |
112 |
63 @make_cffi |
113 @make_cffi |
64 class TestFrameParameters(unittest.TestCase): |
114 class TestFrameParameters(unittest.TestCase): |
65 def test_invalid_type(self): |
115 def test_invalid_type(self): |
66 with self.assertRaises(TypeError): |
116 with self.assertRaises(TypeError): |
67 zstd.get_frame_parameters(None) |
117 zstd.get_frame_parameters(None) |
68 |
118 |
69 with self.assertRaises(TypeError): |
119 # Python 3 doesn't appear to convert unicode to Py_buffer. |
70 zstd.get_frame_parameters(u'foobarbaz') |
120 if sys.version_info[0] >= 3: |
|
121 with self.assertRaises(TypeError): |
|
122 zstd.get_frame_parameters(u'foobarbaz') |
|
123 else: |
|
124 # CPython will convert unicode to Py_buffer. But CFFI won't. |
|
125 if zstd.backend == 'cffi': |
|
126 with self.assertRaises(TypeError): |
|
127 zstd.get_frame_parameters(u'foobarbaz') |
|
128 else: |
|
129 with self.assertRaises(zstd.ZstdError): |
|
130 zstd.get_frame_parameters(u'foobarbaz') |
71 |
131 |
72 def test_invalid_input_sizes(self): |
132 def test_invalid_input_sizes(self): |
73 with self.assertRaisesRegexp(zstd.ZstdError, 'not enough data for frame'): |
133 with self.assertRaisesRegexp(zstd.ZstdError, 'not enough data for frame'): |
74 zstd.get_frame_parameters(b'') |
134 zstd.get_frame_parameters(b'') |
75 |
135 |
80 with self.assertRaisesRegexp(zstd.ZstdError, 'Unknown frame descriptor'): |
140 with self.assertRaisesRegexp(zstd.ZstdError, 'Unknown frame descriptor'): |
81 zstd.get_frame_parameters(b'foobarbaz') |
141 zstd.get_frame_parameters(b'foobarbaz') |
82 |
142 |
83 def test_attributes(self): |
143 def test_attributes(self): |
84 params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x00\x00') |
144 params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x00\x00') |
85 self.assertEqual(params.content_size, 0) |
145 self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN) |
86 self.assertEqual(params.window_size, 1024) |
146 self.assertEqual(params.window_size, 1024) |
87 self.assertEqual(params.dict_id, 0) |
147 self.assertEqual(params.dict_id, 0) |
88 self.assertFalse(params.has_checksum) |
148 self.assertFalse(params.has_checksum) |
89 |
149 |
90 # Lowest 2 bits indicate a dictionary and length. Here, the dict id is 1 byte. |
150 # Lowest 2 bits indicate a dictionary and length. Here, the dict id is 1 byte. |
91 params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x01\x00\xff') |
151 params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x01\x00\xff') |
92 self.assertEqual(params.content_size, 0) |
152 self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN) |
93 self.assertEqual(params.window_size, 1024) |
153 self.assertEqual(params.window_size, 1024) |
94 self.assertEqual(params.dict_id, 255) |
154 self.assertEqual(params.dict_id, 255) |
95 self.assertFalse(params.has_checksum) |
155 self.assertFalse(params.has_checksum) |
96 |
156 |
97 # Lowest 3rd bit indicates if checksum is present. |
157 # Lowest 3rd bit indicates if checksum is present. |
98 params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x04\x00') |
158 params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x04\x00') |
99 self.assertEqual(params.content_size, 0) |
159 self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN) |
100 self.assertEqual(params.window_size, 1024) |
160 self.assertEqual(params.window_size, 1024) |
101 self.assertEqual(params.dict_id, 0) |
161 self.assertEqual(params.dict_id, 0) |
102 self.assertTrue(params.has_checksum) |
162 self.assertTrue(params.has_checksum) |
103 |
163 |
104 # Upper 2 bits indicate content size. |
164 # Upper 2 bits indicate content size. |
108 self.assertEqual(params.dict_id, 0) |
168 self.assertEqual(params.dict_id, 0) |
109 self.assertFalse(params.has_checksum) |
169 self.assertFalse(params.has_checksum) |
110 |
170 |
111 # Window descriptor is 2nd byte after frame header. |
171 # Window descriptor is 2nd byte after frame header. |
112 params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x00\x40') |
172 params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x00\x40') |
113 self.assertEqual(params.content_size, 0) |
173 self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN) |
114 self.assertEqual(params.window_size, 262144) |
174 self.assertEqual(params.window_size, 262144) |
115 self.assertEqual(params.dict_id, 0) |
175 self.assertEqual(params.dict_id, 0) |
116 self.assertFalse(params.has_checksum) |
176 self.assertFalse(params.has_checksum) |
117 |
177 |
118 # Set multiple things. |
178 # Set multiple things. |
119 params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x45\x40\x0f\x10\x00') |
179 params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x45\x40\x0f\x10\x00') |
120 self.assertEqual(params.content_size, 272) |
180 self.assertEqual(params.content_size, 272) |
121 self.assertEqual(params.window_size, 262144) |
181 self.assertEqual(params.window_size, 262144) |
122 self.assertEqual(params.dict_id, 15) |
182 self.assertEqual(params.dict_id, 15) |
123 self.assertTrue(params.has_checksum) |
183 self.assertTrue(params.has_checksum) |
|
184 |
|
185 def test_input_types(self): |
|
186 v = zstd.FRAME_HEADER + b'\x00\x00' |
|
187 |
|
188 mutable_array = bytearray(len(v)) |
|
189 mutable_array[:] = v |
|
190 |
|
191 sources = [ |
|
192 memoryview(v), |
|
193 bytearray(v), |
|
194 mutable_array, |
|
195 ] |
|
196 |
|
197 for source in sources: |
|
198 params = zstd.get_frame_parameters(source) |
|
199 self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN) |
|
200 self.assertEqual(params.window_size, 1024) |
|
201 self.assertEqual(params.dict_id, 0) |
|
202 self.assertFalse(params.has_checksum) |