contrib/python-zstandard/tests/test_data_structures.py
changeset 30435 b86a448a2965
child 30895 c32454d69b85
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/contrib/python-zstandard/tests/test_data_structures.py	Thu Nov 10 22:15:58 2016 -0800
@@ -0,0 +1,107 @@
+import io
+
+try:
+    import unittest2 as unittest
+except ImportError:
+    import unittest
+
+try:
+    import hypothesis
+    import hypothesis.strategies as strategies
+except ImportError:
+    hypothesis = None
+
+import zstd
+
+class TestCompressionParameters(unittest.TestCase):
+    def test_init_bad_arg_type(self):
+        with self.assertRaises(TypeError):
+            zstd.CompressionParameters()
+
+        with self.assertRaises(TypeError):
+            zstd.CompressionParameters(0, 1)
+
+    def test_bounds(self):
+        zstd.CompressionParameters(zstd.WINDOWLOG_MIN,
+                                   zstd.CHAINLOG_MIN,
+                                   zstd.HASHLOG_MIN,
+                                   zstd.SEARCHLOG_MIN,
+                                   zstd.SEARCHLENGTH_MIN,
+                                   zstd.TARGETLENGTH_MIN,
+                                   zstd.STRATEGY_FAST)
+
+        zstd.CompressionParameters(zstd.WINDOWLOG_MAX,
+                                   zstd.CHAINLOG_MAX,
+                                   zstd.HASHLOG_MAX,
+                                   zstd.SEARCHLOG_MAX,
+                                   zstd.SEARCHLENGTH_MAX,
+                                   zstd.TARGETLENGTH_MAX,
+                                   zstd.STRATEGY_BTOPT)
+
+    def test_get_compression_parameters(self):
+        p = zstd.get_compression_parameters(1)
+        self.assertIsInstance(p, zstd.CompressionParameters)
+
+        self.assertEqual(p[0], 19)
+
+if hypothesis:
+    s_windowlog = strategies.integers(min_value=zstd.WINDOWLOG_MIN,
+                                      max_value=zstd.WINDOWLOG_MAX)
+    s_chainlog = strategies.integers(min_value=zstd.CHAINLOG_MIN,
+                                     max_value=zstd.CHAINLOG_MAX)
+    s_hashlog = strategies.integers(min_value=zstd.HASHLOG_MIN,
+                                    max_value=zstd.HASHLOG_MAX)
+    s_searchlog = strategies.integers(min_value=zstd.SEARCHLOG_MIN,
+                                      max_value=zstd.SEARCHLOG_MAX)
+    s_searchlength = strategies.integers(min_value=zstd.SEARCHLENGTH_MIN,
+                                         max_value=zstd.SEARCHLENGTH_MAX)
+    s_targetlength = strategies.integers(min_value=zstd.TARGETLENGTH_MIN,
+                                         max_value=zstd.TARGETLENGTH_MAX)
+    s_strategy = strategies.sampled_from((zstd.STRATEGY_FAST,
+                                          zstd.STRATEGY_DFAST,
+                                          zstd.STRATEGY_GREEDY,
+                                          zstd.STRATEGY_LAZY,
+                                          zstd.STRATEGY_LAZY2,
+                                          zstd.STRATEGY_BTLAZY2,
+                                          zstd.STRATEGY_BTOPT))
+
+    class TestCompressionParametersHypothesis(unittest.TestCase):
+        @hypothesis.given(s_windowlog, s_chainlog, s_hashlog, s_searchlog,
+                          s_searchlength, s_targetlength, s_strategy)
+        def test_valid_init(self, windowlog, chainlog, hashlog, searchlog,
+                            searchlength, targetlength, strategy):
+            p = zstd.CompressionParameters(windowlog, chainlog, hashlog,
+                                           searchlog, searchlength,
+                                           targetlength, strategy)
+            self.assertEqual(tuple(p),
+                             (windowlog, chainlog, hashlog, searchlog,
+                              searchlength, targetlength, strategy))
+
+            # Verify we can instantiate a compressor with the supplied values.
+            # ZSTD_checkCParams moves the goal posts on us from what's advertised
+            # in the constants. So move along with them.
+            if searchlength == zstd.SEARCHLENGTH_MIN and strategy in (zstd.STRATEGY_FAST, zstd.STRATEGY_GREEDY):
+                searchlength += 1
+                p = zstd.CompressionParameters(windowlog, chainlog, hashlog,
+                                searchlog, searchlength,
+                                targetlength, strategy)
+            elif searchlength == zstd.SEARCHLENGTH_MAX and strategy != zstd.STRATEGY_FAST:
+                searchlength -= 1
+                p = zstd.CompressionParameters(windowlog, chainlog, hashlog,
+                                searchlog, searchlength,
+                                targetlength, strategy)
+
+            cctx = zstd.ZstdCompressor(compression_params=p)
+            with cctx.write_to(io.BytesIO()):
+                pass
+
+        @hypothesis.given(s_windowlog, s_chainlog, s_hashlog, s_searchlog,
+                          s_searchlength, s_targetlength, s_strategy)
+        def test_estimate_compression_context_size(self, windowlog, chainlog,
+                                                   hashlog, searchlog,
+                                                   searchlength, targetlength,
+                                                   strategy):
+            p = zstd.CompressionParameters(windowlog, chainlog, hashlog,
+                                searchlog, searchlength,
+                                targetlength, strategy)
+            size = zstd.estimate_compression_context_size(p)