contrib/python-zstandard/tests/test_data_structures.py
changeset 37495 b1fb341d8a61
parent 31796 e0dc40530c5a
child 40121 73fef626dae3
equal deleted inserted replaced
37494:1ce7a55b09d1 37495:b1fb341d8a61
     1 try:
     1 import sys
     2     import unittest2 as unittest
     2 import unittest
     3 except ImportError:
     3 
     4     import unittest
     4 import zstandard as zstd
     5 
       
     6 import zstd
       
     7 
     5 
     8 from . common import (
     6 from . common import (
     9     make_cffi,
     7     make_cffi,
    10 )
     8 )
    11 
     9 
    12 
    10 
    13 @make_cffi
    11 @make_cffi
    14 class TestCompressionParameters(unittest.TestCase):
    12 class TestCompressionParameters(unittest.TestCase):
    15     def test_init_bad_arg_type(self):
       
    16         with self.assertRaises(TypeError):
       
    17             zstd.CompressionParameters()
       
    18 
       
    19         with self.assertRaises(TypeError):
       
    20             zstd.CompressionParameters(0, 1)
       
    21 
       
    22     def test_bounds(self):
    13     def test_bounds(self):
    23         zstd.CompressionParameters(zstd.WINDOWLOG_MIN,
    14         zstd.ZstdCompressionParameters(window_log=zstd.WINDOWLOG_MIN,
    24                                    zstd.CHAINLOG_MIN,
    15                                        chain_log=zstd.CHAINLOG_MIN,
    25                                    zstd.HASHLOG_MIN,
    16                                        hash_log=zstd.HASHLOG_MIN,
    26                                    zstd.SEARCHLOG_MIN,
    17                                        search_log=zstd.SEARCHLOG_MIN,
    27                                    zstd.SEARCHLENGTH_MIN + 1,
    18                                        min_match=zstd.SEARCHLENGTH_MIN + 1,
    28                                    zstd.TARGETLENGTH_MIN,
    19                                        target_length=zstd.TARGETLENGTH_MIN,
    29                                    zstd.STRATEGY_FAST)
    20                                        compression_strategy=zstd.STRATEGY_FAST)
    30 
    21 
    31         zstd.CompressionParameters(zstd.WINDOWLOG_MAX,
    22         zstd.ZstdCompressionParameters(window_log=zstd.WINDOWLOG_MAX,
    32                                    zstd.CHAINLOG_MAX,
    23                                        chain_log=zstd.CHAINLOG_MAX,
    33                                    zstd.HASHLOG_MAX,
    24                                        hash_log=zstd.HASHLOG_MAX,
    34                                    zstd.SEARCHLOG_MAX,
    25                                        search_log=zstd.SEARCHLOG_MAX,
    35                                    zstd.SEARCHLENGTH_MAX - 1,
    26                                        min_match=zstd.SEARCHLENGTH_MAX - 1,
    36                                    zstd.TARGETLENGTH_MAX,
    27                                        compression_strategy=zstd.STRATEGY_BTULTRA)
    37                                    zstd.STRATEGY_BTOPT)
    28 
    38 
    29     def test_from_level(self):
    39     def test_get_compression_parameters(self):
    30         p = zstd.ZstdCompressionParameters.from_level(1)
    40         p = zstd.get_compression_parameters(1)
       
    41         self.assertIsInstance(p, zstd.CompressionParameters)
    31         self.assertIsInstance(p, zstd.CompressionParameters)
    42 
    32 
    43         self.assertEqual(p.window_log, 19)
    33         self.assertEqual(p.window_log, 19)
    44 
    34 
       
    35         p = zstd.ZstdCompressionParameters.from_level(-4)
       
    36         self.assertEqual(p.window_log, 19)
       
    37         self.assertEqual(p.compress_literals, 0)
       
    38 
    45     def test_members(self):
    39     def test_members(self):
    46         p = zstd.CompressionParameters(10, 6, 7, 4, 5, 8, 1)
    40         p = zstd.ZstdCompressionParameters(window_log=10,
       
    41                                            chain_log=6,
       
    42                                            hash_log=7,
       
    43                                            search_log=4,
       
    44                                            min_match=5,
       
    45                                            target_length=8,
       
    46                                            compression_strategy=1)
    47         self.assertEqual(p.window_log, 10)
    47         self.assertEqual(p.window_log, 10)
    48         self.assertEqual(p.chain_log, 6)
    48         self.assertEqual(p.chain_log, 6)
    49         self.assertEqual(p.hash_log, 7)
    49         self.assertEqual(p.hash_log, 7)
    50         self.assertEqual(p.search_log, 4)
    50         self.assertEqual(p.search_log, 4)
    51         self.assertEqual(p.search_length, 5)
    51         self.assertEqual(p.min_match, 5)
    52         self.assertEqual(p.target_length, 8)
    52         self.assertEqual(p.target_length, 8)
    53         self.assertEqual(p.strategy, 1)
    53         self.assertEqual(p.compression_strategy, 1)
       
    54 
       
    55         p = zstd.ZstdCompressionParameters(compression_level=2)
       
    56         self.assertEqual(p.compression_level, 2)
       
    57 
       
    58         p = zstd.ZstdCompressionParameters(threads=4)
       
    59         self.assertEqual(p.threads, 4)
       
    60 
       
    61         p = zstd.ZstdCompressionParameters(threads=2, job_size=1048576,
       
    62                                        overlap_size_log=6)
       
    63         self.assertEqual(p.threads, 2)
       
    64         self.assertEqual(p.job_size, 1048576)
       
    65         self.assertEqual(p.overlap_size_log, 6)
       
    66 
       
    67         p = zstd.ZstdCompressionParameters(compression_level=2)
       
    68         self.assertEqual(p.compress_literals, 1)
       
    69 
       
    70         p = zstd.ZstdCompressionParameters(compress_literals=False)
       
    71         self.assertEqual(p.compress_literals, 0)
       
    72 
       
    73         p = zstd.ZstdCompressionParameters(compression_level=-1)
       
    74         self.assertEqual(p.compression_level, -1)
       
    75         self.assertEqual(p.compress_literals, 0)
       
    76 
       
    77         p = zstd.ZstdCompressionParameters(compression_level=-2, compress_literals=True)
       
    78         self.assertEqual(p.compression_level, -2)
       
    79         self.assertEqual(p.compress_literals, 1)
       
    80 
       
    81         p = zstd.ZstdCompressionParameters(force_max_window=True)
       
    82         self.assertEqual(p.force_max_window, 1)
       
    83 
       
    84         p = zstd.ZstdCompressionParameters(enable_ldm=True)
       
    85         self.assertEqual(p.enable_ldm, 1)
       
    86 
       
    87         p = zstd.ZstdCompressionParameters(ldm_hash_log=7)
       
    88         self.assertEqual(p.ldm_hash_log, 7)
       
    89 
       
    90         p = zstd.ZstdCompressionParameters(ldm_min_match=6)
       
    91         self.assertEqual(p.ldm_min_match, 6)
       
    92 
       
    93         p = zstd.ZstdCompressionParameters(ldm_bucket_size_log=7)
       
    94         self.assertEqual(p.ldm_bucket_size_log, 7)
       
    95 
       
    96         p = zstd.ZstdCompressionParameters(ldm_hash_every_log=8)
       
    97         self.assertEqual(p.ldm_hash_every_log, 8)
    54 
    98 
    55     def test_estimated_compression_context_size(self):
    99     def test_estimated_compression_context_size(self):
    56         p = zstd.CompressionParameters(20, 16, 17,  1,  5, 16, zstd.STRATEGY_DFAST)
   100         p = zstd.ZstdCompressionParameters(window_log=20,
       
   101                                            chain_log=16,
       
   102                                            hash_log=17,
       
   103                                            search_log=1,
       
   104                                            min_match=5,
       
   105                                            target_length=16,
       
   106                                            compression_strategy=zstd.STRATEGY_DFAST)
    57 
   107 
    58         # 32-bit has slightly different values from 64-bit.
   108         # 32-bit has slightly different values from 64-bit.
    59         self.assertAlmostEqual(p.estimated_compression_context_size(), 1287076,
   109         self.assertAlmostEqual(p.estimated_compression_context_size(), 1294072,
    60                                delta=110)
   110                                delta=250)
    61 
   111 
    62 
   112 
    63 @make_cffi
   113 @make_cffi
    64 class TestFrameParameters(unittest.TestCase):
   114 class TestFrameParameters(unittest.TestCase):
    65     def test_invalid_type(self):
   115     def test_invalid_type(self):
    66         with self.assertRaises(TypeError):
   116         with self.assertRaises(TypeError):
    67             zstd.get_frame_parameters(None)
   117             zstd.get_frame_parameters(None)
    68 
   118 
    69         with self.assertRaises(TypeError):
   119         # Python 3 doesn't appear to convert unicode to Py_buffer.
    70             zstd.get_frame_parameters(u'foobarbaz')
   120         if sys.version_info[0] >= 3:
       
   121             with self.assertRaises(TypeError):
       
   122                 zstd.get_frame_parameters(u'foobarbaz')
       
   123         else:
       
   124             # CPython will convert unicode to Py_buffer. But CFFI won't.
       
   125             if zstd.backend == 'cffi':
       
   126                 with self.assertRaises(TypeError):
       
   127                     zstd.get_frame_parameters(u'foobarbaz')
       
   128             else:
       
   129                 with self.assertRaises(zstd.ZstdError):
       
   130                     zstd.get_frame_parameters(u'foobarbaz')
    71 
   131 
    72     def test_invalid_input_sizes(self):
   132     def test_invalid_input_sizes(self):
    73         with self.assertRaisesRegexp(zstd.ZstdError, 'not enough data for frame'):
   133         with self.assertRaisesRegexp(zstd.ZstdError, 'not enough data for frame'):
    74             zstd.get_frame_parameters(b'')
   134             zstd.get_frame_parameters(b'')
    75 
   135 
    80         with self.assertRaisesRegexp(zstd.ZstdError, 'Unknown frame descriptor'):
   140         with self.assertRaisesRegexp(zstd.ZstdError, 'Unknown frame descriptor'):
    81             zstd.get_frame_parameters(b'foobarbaz')
   141             zstd.get_frame_parameters(b'foobarbaz')
    82 
   142 
    83     def test_attributes(self):
   143     def test_attributes(self):
    84         params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x00\x00')
   144         params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x00\x00')
    85         self.assertEqual(params.content_size, 0)
   145         self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN)
    86         self.assertEqual(params.window_size, 1024)
   146         self.assertEqual(params.window_size, 1024)
    87         self.assertEqual(params.dict_id, 0)
   147         self.assertEqual(params.dict_id, 0)
    88         self.assertFalse(params.has_checksum)
   148         self.assertFalse(params.has_checksum)
    89 
   149 
    90         # Lowest 2 bits indicate a dictionary and length. Here, the dict id is 1 byte.
   150         # Lowest 2 bits indicate a dictionary and length. Here, the dict id is 1 byte.
    91         params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x01\x00\xff')
   151         params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x01\x00\xff')
    92         self.assertEqual(params.content_size, 0)
   152         self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN)
    93         self.assertEqual(params.window_size, 1024)
   153         self.assertEqual(params.window_size, 1024)
    94         self.assertEqual(params.dict_id, 255)
   154         self.assertEqual(params.dict_id, 255)
    95         self.assertFalse(params.has_checksum)
   155         self.assertFalse(params.has_checksum)
    96 
   156 
    97         # Lowest 3rd bit indicates if checksum is present.
   157         # Lowest 3rd bit indicates if checksum is present.
    98         params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x04\x00')
   158         params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x04\x00')
    99         self.assertEqual(params.content_size, 0)
   159         self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN)
   100         self.assertEqual(params.window_size, 1024)
   160         self.assertEqual(params.window_size, 1024)
   101         self.assertEqual(params.dict_id, 0)
   161         self.assertEqual(params.dict_id, 0)
   102         self.assertTrue(params.has_checksum)
   162         self.assertTrue(params.has_checksum)
   103 
   163 
   104         # Upper 2 bits indicate content size.
   164         # Upper 2 bits indicate content size.
   108         self.assertEqual(params.dict_id, 0)
   168         self.assertEqual(params.dict_id, 0)
   109         self.assertFalse(params.has_checksum)
   169         self.assertFalse(params.has_checksum)
   110 
   170 
   111         # Window descriptor is 2nd byte after frame header.
   171         # Window descriptor is 2nd byte after frame header.
   112         params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x00\x40')
   172         params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x00\x40')
   113         self.assertEqual(params.content_size, 0)
   173         self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN)
   114         self.assertEqual(params.window_size, 262144)
   174         self.assertEqual(params.window_size, 262144)
   115         self.assertEqual(params.dict_id, 0)
   175         self.assertEqual(params.dict_id, 0)
   116         self.assertFalse(params.has_checksum)
   176         self.assertFalse(params.has_checksum)
   117 
   177 
   118         # Set multiple things.
   178         # Set multiple things.
   119         params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x45\x40\x0f\x10\x00')
   179         params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x45\x40\x0f\x10\x00')
   120         self.assertEqual(params.content_size, 272)
   180         self.assertEqual(params.content_size, 272)
   121         self.assertEqual(params.window_size, 262144)
   181         self.assertEqual(params.window_size, 262144)
   122         self.assertEqual(params.dict_id, 15)
   182         self.assertEqual(params.dict_id, 15)
   123         self.assertTrue(params.has_checksum)
   183         self.assertTrue(params.has_checksum)
       
   184 
       
   185     def test_input_types(self):
       
   186         v = zstd.FRAME_HEADER + b'\x00\x00'
       
   187 
       
   188         mutable_array = bytearray(len(v))
       
   189         mutable_array[:] = v
       
   190 
       
   191         sources = [
       
   192             memoryview(v),
       
   193             bytearray(v),
       
   194             mutable_array,
       
   195         ]
       
   196 
       
   197         for source in sources:
       
   198             params = zstd.get_frame_parameters(source)
       
   199             self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN)
       
   200             self.assertEqual(params.window_size, 1024)
       
   201             self.assertEqual(params.dict_id, 0)
       
   202             self.assertFalse(params.has_checksum)