|
1 import io |
|
2 |
|
3 try: |
|
4 import unittest2 as unittest |
|
5 except ImportError: |
|
6 import unittest |
|
7 |
|
8 try: |
|
9 import hypothesis |
|
10 import hypothesis.strategies as strategies |
|
11 except ImportError: |
|
12 hypothesis = None |
|
13 |
|
14 import zstd |
|
15 |
|
16 class TestCompressionParameters(unittest.TestCase): |
|
17 def test_init_bad_arg_type(self): |
|
18 with self.assertRaises(TypeError): |
|
19 zstd.CompressionParameters() |
|
20 |
|
21 with self.assertRaises(TypeError): |
|
22 zstd.CompressionParameters(0, 1) |
|
23 |
|
24 def test_bounds(self): |
|
25 zstd.CompressionParameters(zstd.WINDOWLOG_MIN, |
|
26 zstd.CHAINLOG_MIN, |
|
27 zstd.HASHLOG_MIN, |
|
28 zstd.SEARCHLOG_MIN, |
|
29 zstd.SEARCHLENGTH_MIN, |
|
30 zstd.TARGETLENGTH_MIN, |
|
31 zstd.STRATEGY_FAST) |
|
32 |
|
33 zstd.CompressionParameters(zstd.WINDOWLOG_MAX, |
|
34 zstd.CHAINLOG_MAX, |
|
35 zstd.HASHLOG_MAX, |
|
36 zstd.SEARCHLOG_MAX, |
|
37 zstd.SEARCHLENGTH_MAX, |
|
38 zstd.TARGETLENGTH_MAX, |
|
39 zstd.STRATEGY_BTOPT) |
|
40 |
|
41 def test_get_compression_parameters(self): |
|
42 p = zstd.get_compression_parameters(1) |
|
43 self.assertIsInstance(p, zstd.CompressionParameters) |
|
44 |
|
45 self.assertEqual(p[0], 19) |
|
46 |
|
47 if hypothesis: |
|
48 s_windowlog = strategies.integers(min_value=zstd.WINDOWLOG_MIN, |
|
49 max_value=zstd.WINDOWLOG_MAX) |
|
50 s_chainlog = strategies.integers(min_value=zstd.CHAINLOG_MIN, |
|
51 max_value=zstd.CHAINLOG_MAX) |
|
52 s_hashlog = strategies.integers(min_value=zstd.HASHLOG_MIN, |
|
53 max_value=zstd.HASHLOG_MAX) |
|
54 s_searchlog = strategies.integers(min_value=zstd.SEARCHLOG_MIN, |
|
55 max_value=zstd.SEARCHLOG_MAX) |
|
56 s_searchlength = strategies.integers(min_value=zstd.SEARCHLENGTH_MIN, |
|
57 max_value=zstd.SEARCHLENGTH_MAX) |
|
58 s_targetlength = strategies.integers(min_value=zstd.TARGETLENGTH_MIN, |
|
59 max_value=zstd.TARGETLENGTH_MAX) |
|
60 s_strategy = strategies.sampled_from((zstd.STRATEGY_FAST, |
|
61 zstd.STRATEGY_DFAST, |
|
62 zstd.STRATEGY_GREEDY, |
|
63 zstd.STRATEGY_LAZY, |
|
64 zstd.STRATEGY_LAZY2, |
|
65 zstd.STRATEGY_BTLAZY2, |
|
66 zstd.STRATEGY_BTOPT)) |
|
67 |
|
68 class TestCompressionParametersHypothesis(unittest.TestCase): |
|
69 @hypothesis.given(s_windowlog, s_chainlog, s_hashlog, s_searchlog, |
|
70 s_searchlength, s_targetlength, s_strategy) |
|
71 def test_valid_init(self, windowlog, chainlog, hashlog, searchlog, |
|
72 searchlength, targetlength, strategy): |
|
73 p = zstd.CompressionParameters(windowlog, chainlog, hashlog, |
|
74 searchlog, searchlength, |
|
75 targetlength, strategy) |
|
76 self.assertEqual(tuple(p), |
|
77 (windowlog, chainlog, hashlog, searchlog, |
|
78 searchlength, targetlength, strategy)) |
|
79 |
|
80 # Verify we can instantiate a compressor with the supplied values. |
|
81 # ZSTD_checkCParams moves the goal posts on us from what's advertised |
|
82 # in the constants. So move along with them. |
|
83 if searchlength == zstd.SEARCHLENGTH_MIN and strategy in (zstd.STRATEGY_FAST, zstd.STRATEGY_GREEDY): |
|
84 searchlength += 1 |
|
85 p = zstd.CompressionParameters(windowlog, chainlog, hashlog, |
|
86 searchlog, searchlength, |
|
87 targetlength, strategy) |
|
88 elif searchlength == zstd.SEARCHLENGTH_MAX and strategy != zstd.STRATEGY_FAST: |
|
89 searchlength -= 1 |
|
90 p = zstd.CompressionParameters(windowlog, chainlog, hashlog, |
|
91 searchlog, searchlength, |
|
92 targetlength, strategy) |
|
93 |
|
94 cctx = zstd.ZstdCompressor(compression_params=p) |
|
95 with cctx.write_to(io.BytesIO()): |
|
96 pass |
|
97 |
|
98 @hypothesis.given(s_windowlog, s_chainlog, s_hashlog, s_searchlog, |
|
99 s_searchlength, s_targetlength, s_strategy) |
|
100 def test_estimate_compression_context_size(self, windowlog, chainlog, |
|
101 hashlog, searchlog, |
|
102 searchlength, targetlength, |
|
103 strategy): |
|
104 p = zstd.CompressionParameters(windowlog, chainlog, hashlog, |
|
105 searchlog, searchlength, |
|
106 targetlength, strategy) |
|
107 size = zstd.estimate_compression_context_size(p) |