Mercurial > public > mercurial-scm > hg-stable
diff contrib/python-zstandard/tests/test_compressor.py @ 43999:de7838053207
zstandard: vendor python-zstandard 0.13.0
Version 0.13.0 of the package was just released. It contains
an upgraded zstd C library which can result in some performance
wins, official support for Python 3.8, and a blackened code base.
There were no meaningful code or functionality changes in this
release of python-zstandard: just reformatting and an upgraded
zstd library version. So the diff seems much larger than what it
is.
Files were added without modifications.
The clang-format-ignorelist file was updated to reflect a new
header file in the zstd distribution.
# no-check-commit because 3rd party code has different style guidelines
Differential Revision: https://phab.mercurial-scm.org/D7770
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Sat, 28 Dec 2019 09:55:45 -0800 |
parents | 69de49c4e39c |
children | 5e84a96d865b |
line wrap: on
line diff
--- a/contrib/python-zstandard/tests/test_compressor.py Fri Dec 27 18:54:57 2019 -0500 +++ b/contrib/python-zstandard/tests/test_compressor.py Sat Dec 28 09:55:45 2019 -0800 @@ -13,6 +13,7 @@ make_cffi, NonClosingBytesIO, OpCountingBytesIO, + TestCase, ) @@ -23,14 +24,13 @@ def multithreaded_chunk_size(level, source_size=0): - params = zstd.ZstdCompressionParameters.from_level(level, - source_size=source_size) + params = zstd.ZstdCompressionParameters.from_level(level, source_size=source_size) return 1 << (params.window_log + 2) @make_cffi -class TestCompressor(unittest.TestCase): +class TestCompressor(TestCase): def test_level_bounds(self): with self.assertRaises(ValueError): zstd.ZstdCompressor(level=23) @@ -41,11 +41,11 @@ @make_cffi -class TestCompressor_compress(unittest.TestCase): +class TestCompressor_compress(TestCase): def test_compress_empty(self): cctx = zstd.ZstdCompressor(level=1, write_content_size=False) - result = cctx.compress(b'') - self.assertEqual(result, b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00') + result = cctx.compress(b"") + self.assertEqual(result, b"\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00") params = zstd.get_frame_parameters(result) self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN) self.assertEqual(params.window_size, 524288) @@ -53,21 +53,21 @@ self.assertFalse(params.has_checksum, 0) cctx = zstd.ZstdCompressor() - result = cctx.compress(b'') - self.assertEqual(result, b'\x28\xb5\x2f\xfd\x20\x00\x01\x00\x00') + result = cctx.compress(b"") + self.assertEqual(result, b"\x28\xb5\x2f\xfd\x20\x00\x01\x00\x00") params = zstd.get_frame_parameters(result) self.assertEqual(params.content_size, 0) def test_input_types(self): cctx = zstd.ZstdCompressor(level=1, write_content_size=False) - expected = b'\x28\xb5\x2f\xfd\x00\x00\x19\x00\x00\x66\x6f\x6f' + expected = b"\x28\xb5\x2f\xfd\x00\x00\x19\x00\x00\x66\x6f\x6f" mutable_array = bytearray(3) - mutable_array[:] = b'foo' + mutable_array[:] = b"foo" sources = [ - memoryview(b'foo'), - bytearray(b'foo'), + memoryview(b"foo"), + bytearray(b"foo"), mutable_array, ] @@ -77,43 +77,46 @@ def test_compress_large(self): chunks = [] for i in range(255): - chunks.append(struct.Struct('>B').pack(i) * 16384) + chunks.append(struct.Struct(">B").pack(i) * 16384) cctx = zstd.ZstdCompressor(level=3, write_content_size=False) - result = cctx.compress(b''.join(chunks)) + result = cctx.compress(b"".join(chunks)) self.assertEqual(len(result), 999) - self.assertEqual(result[0:4], b'\x28\xb5\x2f\xfd') + self.assertEqual(result[0:4], b"\x28\xb5\x2f\xfd") # This matches the test for read_to_iter() below. cctx = zstd.ZstdCompressor(level=1, write_content_size=False) - result = cctx.compress(b'f' * zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE + b'o') - self.assertEqual(result, b'\x28\xb5\x2f\xfd\x00\x40\x54\x00\x00' - b'\x10\x66\x66\x01\x00\xfb\xff\x39\xc0' - b'\x02\x09\x00\x00\x6f') + result = cctx.compress(b"f" * zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE + b"o") + self.assertEqual( + result, + b"\x28\xb5\x2f\xfd\x00\x40\x54\x00\x00" + b"\x10\x66\x66\x01\x00\xfb\xff\x39\xc0" + b"\x02\x09\x00\x00\x6f", + ) def test_negative_level(self): cctx = zstd.ZstdCompressor(level=-4) - result = cctx.compress(b'foo' * 256) + result = cctx.compress(b"foo" * 256) def test_no_magic(self): - params = zstd.ZstdCompressionParameters.from_level( - 1, format=zstd.FORMAT_ZSTD1) + params = zstd.ZstdCompressionParameters.from_level(1, format=zstd.FORMAT_ZSTD1) cctx = zstd.ZstdCompressor(compression_params=params) - magic = cctx.compress(b'foobar') + magic = cctx.compress(b"foobar") params = zstd.ZstdCompressionParameters.from_level( - 1, format=zstd.FORMAT_ZSTD1_MAGICLESS) + 1, format=zstd.FORMAT_ZSTD1_MAGICLESS + ) cctx = zstd.ZstdCompressor(compression_params=params) - no_magic = cctx.compress(b'foobar') + no_magic = cctx.compress(b"foobar") - self.assertEqual(magic[0:4], b'\x28\xb5\x2f\xfd') + self.assertEqual(magic[0:4], b"\x28\xb5\x2f\xfd") self.assertEqual(magic[4:], no_magic) def test_write_checksum(self): cctx = zstd.ZstdCompressor(level=1) - no_checksum = cctx.compress(b'foobar') + no_checksum = cctx.compress(b"foobar") cctx = zstd.ZstdCompressor(level=1, write_checksum=True) - with_checksum = cctx.compress(b'foobar') + with_checksum = cctx.compress(b"foobar") self.assertEqual(len(with_checksum), len(no_checksum) + 4) @@ -125,9 +128,9 @@ def test_write_content_size(self): cctx = zstd.ZstdCompressor(level=1) - with_size = cctx.compress(b'foobar' * 256) + with_size = cctx.compress(b"foobar" * 256) cctx = zstd.ZstdCompressor(level=1, write_content_size=False) - no_size = cctx.compress(b'foobar' * 256) + no_size = cctx.compress(b"foobar" * 256) self.assertEqual(len(with_size), len(no_size) + 1) @@ -139,17 +142,17 @@ def test_no_dict_id(self): samples = [] for i in range(128): - samples.append(b'foo' * 64) - samples.append(b'bar' * 64) - samples.append(b'foobar' * 64) + samples.append(b"foo" * 64) + samples.append(b"bar" * 64) + samples.append(b"foobar" * 64) d = zstd.train_dictionary(1024, samples) cctx = zstd.ZstdCompressor(level=1, dict_data=d) - with_dict_id = cctx.compress(b'foobarfoobar') + with_dict_id = cctx.compress(b"foobarfoobar") cctx = zstd.ZstdCompressor(level=1, dict_data=d, write_dict_id=False) - no_dict_id = cctx.compress(b'foobarfoobar') + no_dict_id = cctx.compress(b"foobarfoobar") self.assertEqual(len(with_dict_id), len(no_dict_id) + 4) @@ -161,23 +164,23 @@ def test_compress_dict_multiple(self): samples = [] for i in range(128): - samples.append(b'foo' * 64) - samples.append(b'bar' * 64) - samples.append(b'foobar' * 64) + samples.append(b"foo" * 64) + samples.append(b"bar" * 64) + samples.append(b"foobar" * 64) d = zstd.train_dictionary(8192, samples) cctx = zstd.ZstdCompressor(level=1, dict_data=d) for i in range(32): - cctx.compress(b'foo bar foobar foo bar foobar') + cctx.compress(b"foo bar foobar foo bar foobar") def test_dict_precompute(self): samples = [] for i in range(128): - samples.append(b'foo' * 64) - samples.append(b'bar' * 64) - samples.append(b'foobar' * 64) + samples.append(b"foo" * 64) + samples.append(b"bar" * 64) + samples.append(b"foobar" * 64) d = zstd.train_dictionary(8192, samples) d.precompute_compress(level=1) @@ -185,11 +188,11 @@ cctx = zstd.ZstdCompressor(level=1, dict_data=d) for i in range(32): - cctx.compress(b'foo bar foobar foo bar foobar') + cctx.compress(b"foo bar foobar foo bar foobar") def test_multithreaded(self): chunk_size = multithreaded_chunk_size(1) - source = b''.join([b'x' * chunk_size, b'y' * chunk_size]) + source = b"".join([b"x" * chunk_size, b"y" * chunk_size]) cctx = zstd.ZstdCompressor(level=1, threads=2) compressed = cctx.compress(source) @@ -205,73 +208,72 @@ def test_multithreaded_dict(self): samples = [] for i in range(128): - samples.append(b'foo' * 64) - samples.append(b'bar' * 64) - samples.append(b'foobar' * 64) + samples.append(b"foo" * 64) + samples.append(b"bar" * 64) + samples.append(b"foobar" * 64) d = zstd.train_dictionary(1024, samples) cctx = zstd.ZstdCompressor(dict_data=d, threads=2) - result = cctx.compress(b'foo') - params = zstd.get_frame_parameters(result); - self.assertEqual(params.content_size, 3); + result = cctx.compress(b"foo") + params = zstd.get_frame_parameters(result) + self.assertEqual(params.content_size, 3) self.assertEqual(params.dict_id, d.dict_id()) - self.assertEqual(result, - b'\x28\xb5\x2f\xfd\x23\x8f\x55\x0f\x70\x03\x19\x00\x00' - b'\x66\x6f\x6f') + self.assertEqual( + result, + b"\x28\xb5\x2f\xfd\x23\x8f\x55\x0f\x70\x03\x19\x00\x00" b"\x66\x6f\x6f", + ) def test_multithreaded_compression_params(self): params = zstd.ZstdCompressionParameters.from_level(0, threads=2) cctx = zstd.ZstdCompressor(compression_params=params) - result = cctx.compress(b'foo') - params = zstd.get_frame_parameters(result); - self.assertEqual(params.content_size, 3); + result = cctx.compress(b"foo") + params = zstd.get_frame_parameters(result) + self.assertEqual(params.content_size, 3) - self.assertEqual(result, - b'\x28\xb5\x2f\xfd\x20\x03\x19\x00\x00\x66\x6f\x6f') + self.assertEqual(result, b"\x28\xb5\x2f\xfd\x20\x03\x19\x00\x00\x66\x6f\x6f") @make_cffi -class TestCompressor_compressobj(unittest.TestCase): +class TestCompressor_compressobj(TestCase): def test_compressobj_empty(self): cctx = zstd.ZstdCompressor(level=1, write_content_size=False) cobj = cctx.compressobj() - self.assertEqual(cobj.compress(b''), b'') - self.assertEqual(cobj.flush(), - b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00') + self.assertEqual(cobj.compress(b""), b"") + self.assertEqual(cobj.flush(), b"\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00") def test_input_types(self): - expected = b'\x28\xb5\x2f\xfd\x00\x48\x19\x00\x00\x66\x6f\x6f' + expected = b"\x28\xb5\x2f\xfd\x00\x48\x19\x00\x00\x66\x6f\x6f" cctx = zstd.ZstdCompressor(level=1, write_content_size=False) mutable_array = bytearray(3) - mutable_array[:] = b'foo' + mutable_array[:] = b"foo" sources = [ - memoryview(b'foo'), - bytearray(b'foo'), + memoryview(b"foo"), + bytearray(b"foo"), mutable_array, ] for source in sources: cobj = cctx.compressobj() - self.assertEqual(cobj.compress(source), b'') + self.assertEqual(cobj.compress(source), b"") self.assertEqual(cobj.flush(), expected) def test_compressobj_large(self): chunks = [] for i in range(255): - chunks.append(struct.Struct('>B').pack(i) * 16384) + chunks.append(struct.Struct(">B").pack(i) * 16384) cctx = zstd.ZstdCompressor(level=3) cobj = cctx.compressobj() - result = cobj.compress(b''.join(chunks)) + cobj.flush() + result = cobj.compress(b"".join(chunks)) + cobj.flush() self.assertEqual(len(result), 999) - self.assertEqual(result[0:4], b'\x28\xb5\x2f\xfd') + self.assertEqual(result[0:4], b"\x28\xb5\x2f\xfd") params = zstd.get_frame_parameters(result) self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN) @@ -282,10 +284,10 @@ def test_write_checksum(self): cctx = zstd.ZstdCompressor(level=1) cobj = cctx.compressobj() - no_checksum = cobj.compress(b'foobar') + cobj.flush() + no_checksum = cobj.compress(b"foobar") + cobj.flush() cctx = zstd.ZstdCompressor(level=1, write_checksum=True) cobj = cctx.compressobj() - with_checksum = cobj.compress(b'foobar') + cobj.flush() + with_checksum = cobj.compress(b"foobar") + cobj.flush() no_params = zstd.get_frame_parameters(no_checksum) with_params = zstd.get_frame_parameters(with_checksum) @@ -300,11 +302,11 @@ def test_write_content_size(self): cctx = zstd.ZstdCompressor(level=1) - cobj = cctx.compressobj(size=len(b'foobar' * 256)) - with_size = cobj.compress(b'foobar' * 256) + cobj.flush() + cobj = cctx.compressobj(size=len(b"foobar" * 256)) + with_size = cobj.compress(b"foobar" * 256) + cobj.flush() cctx = zstd.ZstdCompressor(level=1, write_content_size=False) - cobj = cctx.compressobj(size=len(b'foobar' * 256)) - no_size = cobj.compress(b'foobar' * 256) + cobj.flush() + cobj = cctx.compressobj(size=len(b"foobar" * 256)) + no_size = cobj.compress(b"foobar" * 256) + cobj.flush() no_params = zstd.get_frame_parameters(no_size) with_params = zstd.get_frame_parameters(with_size) @@ -321,48 +323,53 @@ cctx = zstd.ZstdCompressor() cobj = cctx.compressobj() - cobj.compress(b'foo') + cobj.compress(b"foo") cobj.flush() - with self.assertRaisesRegexp(zstd.ZstdError, r'cannot call compress\(\) after compressor'): - cobj.compress(b'foo') + with self.assertRaisesRegex( + zstd.ZstdError, r"cannot call compress\(\) after compressor" + ): + cobj.compress(b"foo") - with self.assertRaisesRegexp(zstd.ZstdError, 'compressor object already finished'): + with self.assertRaisesRegex( + zstd.ZstdError, "compressor object already finished" + ): cobj.flush() def test_flush_block_repeated(self): cctx = zstd.ZstdCompressor(level=1) cobj = cctx.compressobj() - self.assertEqual(cobj.compress(b'foo'), b'') - self.assertEqual(cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK), - b'\x28\xb5\x2f\xfd\x00\x48\x18\x00\x00foo') - self.assertEqual(cobj.compress(b'bar'), b'') + self.assertEqual(cobj.compress(b"foo"), b"") + self.assertEqual( + cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK), + b"\x28\xb5\x2f\xfd\x00\x48\x18\x00\x00foo", + ) + self.assertEqual(cobj.compress(b"bar"), b"") # 3 byte header plus content. - self.assertEqual(cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK), - b'\x18\x00\x00bar') - self.assertEqual(cobj.flush(), b'\x01\x00\x00') + self.assertEqual(cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK), b"\x18\x00\x00bar") + self.assertEqual(cobj.flush(), b"\x01\x00\x00") def test_flush_empty_block(self): cctx = zstd.ZstdCompressor(write_checksum=True) cobj = cctx.compressobj() - cobj.compress(b'foobar') + cobj.compress(b"foobar") cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK) # No-op if no block is active (this is internal to zstd). - self.assertEqual(cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK), b'') + self.assertEqual(cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK), b"") trailing = cobj.flush() # 3 bytes block header + 4 bytes frame checksum self.assertEqual(len(trailing), 7) header = trailing[0:3] - self.assertEqual(header, b'\x01\x00\x00') + self.assertEqual(header, b"\x01\x00\x00") def test_multithreaded(self): source = io.BytesIO() - source.write(b'a' * 1048576) - source.write(b'b' * 1048576) - source.write(b'c' * 1048576) + source.write(b"a" * 1048576) + source.write(b"b" * 1048576) + source.write(b"c" * 1048576) source.seek(0) cctx = zstd.ZstdCompressor(level=1, threads=2) @@ -378,9 +385,9 @@ chunks.append(cobj.flush()) - compressed = b''.join(chunks) + compressed = b"".join(chunks) - self.assertEqual(len(compressed), 295) + self.assertEqual(len(compressed), 119) def test_frame_progression(self): cctx = zstd.ZstdCompressor() @@ -389,7 +396,7 @@ cobj = cctx.compressobj() - cobj.compress(b'foobar') + cobj.compress(b"foobar") self.assertEqual(cctx.frame_progression(), (6, 0, 0)) cobj.flush() @@ -399,20 +406,20 @@ cctx = zstd.ZstdCompressor() cobj = cctx.compressobj(size=2) - with self.assertRaisesRegexp(zstd.ZstdError, 'Src size is incorrect'): - cobj.compress(b'foo') + with self.assertRaisesRegex(zstd.ZstdError, "Src size is incorrect"): + cobj.compress(b"foo") # Try another operation on this instance. - with self.assertRaisesRegexp(zstd.ZstdError, 'Src size is incorrect'): - cobj.compress(b'aa') + with self.assertRaisesRegex(zstd.ZstdError, "Src size is incorrect"): + cobj.compress(b"aa") # Try another operation on the compressor. cctx.compressobj(size=4) - cctx.compress(b'foobar') + cctx.compress(b"foobar") @make_cffi -class TestCompressor_copy_stream(unittest.TestCase): +class TestCompressor_copy_stream(TestCase): def test_no_read(self): source = object() dest = io.BytesIO() @@ -438,13 +445,12 @@ self.assertEqual(int(r), 0) self.assertEqual(w, 9) - self.assertEqual(dest.getvalue(), - b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00') + self.assertEqual(dest.getvalue(), b"\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00") def test_large_data(self): source = io.BytesIO() for i in range(255): - source.write(struct.Struct('>B').pack(i) * 16384) + source.write(struct.Struct(">B").pack(i) * 16384) source.seek(0) dest = io.BytesIO() @@ -461,7 +467,7 @@ self.assertFalse(params.has_checksum) def test_write_checksum(self): - source = io.BytesIO(b'foobar') + source = io.BytesIO(b"foobar") no_checksum = io.BytesIO() cctx = zstd.ZstdCompressor(level=1) @@ -472,8 +478,7 @@ cctx = zstd.ZstdCompressor(level=1, write_checksum=True) cctx.copy_stream(source, with_checksum) - self.assertEqual(len(with_checksum.getvalue()), - len(no_checksum.getvalue()) + 4) + self.assertEqual(len(with_checksum.getvalue()), len(no_checksum.getvalue()) + 4) no_params = zstd.get_frame_parameters(no_checksum.getvalue()) with_params = zstd.get_frame_parameters(with_checksum.getvalue()) @@ -485,7 +490,7 @@ self.assertTrue(with_params.has_checksum) def test_write_content_size(self): - source = io.BytesIO(b'foobar' * 256) + source = io.BytesIO(b"foobar" * 256) no_size = io.BytesIO() cctx = zstd.ZstdCompressor(level=1, write_content_size=False) @@ -497,16 +502,14 @@ cctx.copy_stream(source, with_size) # Source content size is unknown, so no content size written. - self.assertEqual(len(with_size.getvalue()), - len(no_size.getvalue())) + self.assertEqual(len(with_size.getvalue()), len(no_size.getvalue())) source.seek(0) with_size = io.BytesIO() cctx.copy_stream(source, with_size, size=len(source.getvalue())) # We specified source size, so content size header is present. - self.assertEqual(len(with_size.getvalue()), - len(no_size.getvalue()) + 1) + self.assertEqual(len(with_size.getvalue()), len(no_size.getvalue()) + 1) no_params = zstd.get_frame_parameters(no_size.getvalue()) with_params = zstd.get_frame_parameters(with_size.getvalue()) @@ -518,7 +521,7 @@ self.assertFalse(with_params.has_checksum) def test_read_write_size(self): - source = OpCountingBytesIO(b'foobarfoobar') + source = OpCountingBytesIO(b"foobarfoobar") dest = OpCountingBytesIO() cctx = zstd.ZstdCompressor() r, w = cctx.copy_stream(source, dest, read_size=1, write_size=1) @@ -530,16 +533,16 @@ def test_multithreaded(self): source = io.BytesIO() - source.write(b'a' * 1048576) - source.write(b'b' * 1048576) - source.write(b'c' * 1048576) + source.write(b"a" * 1048576) + source.write(b"b" * 1048576) + source.write(b"c" * 1048576) source.seek(0) dest = io.BytesIO() cctx = zstd.ZstdCompressor(threads=2, write_content_size=False) r, w = cctx.copy_stream(source, dest) self.assertEqual(r, 3145728) - self.assertEqual(w, 295) + self.assertEqual(w, 111) params = zstd.get_frame_parameters(dest.getvalue()) self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN) @@ -559,15 +562,15 @@ def test_bad_size(self): source = io.BytesIO() - source.write(b'a' * 32768) - source.write(b'b' * 32768) + source.write(b"a" * 32768) + source.write(b"b" * 32768) source.seek(0) dest = io.BytesIO() cctx = zstd.ZstdCompressor() - with self.assertRaisesRegexp(zstd.ZstdError, 'Src size is incorrect'): + with self.assertRaisesRegex(zstd.ZstdError, "Src size is incorrect"): cctx.copy_stream(source, dest, size=42) # Try another operation on this compressor. @@ -577,31 +580,31 @@ @make_cffi -class TestCompressor_stream_reader(unittest.TestCase): +class TestCompressor_stream_reader(TestCase): def test_context_manager(self): cctx = zstd.ZstdCompressor() - with cctx.stream_reader(b'foo') as reader: - with self.assertRaisesRegexp(ValueError, 'cannot __enter__ multiple times'): + with cctx.stream_reader(b"foo") as reader: + with self.assertRaisesRegex(ValueError, "cannot __enter__ multiple times"): with reader as reader2: pass def test_no_context_manager(self): cctx = zstd.ZstdCompressor() - reader = cctx.stream_reader(b'foo') + reader = cctx.stream_reader(b"foo") reader.read(4) self.assertFalse(reader.closed) reader.close() self.assertTrue(reader.closed) - with self.assertRaisesRegexp(ValueError, 'stream is closed'): + with self.assertRaisesRegex(ValueError, "stream is closed"): reader.read(1) def test_not_implemented(self): cctx = zstd.ZstdCompressor() - with cctx.stream_reader(b'foo' * 60) as reader: + with cctx.stream_reader(b"foo" * 60) as reader: with self.assertRaises(io.UnsupportedOperation): reader.readline() @@ -618,12 +621,12 @@ reader.writelines([]) with self.assertRaises(OSError): - reader.write(b'foo') + reader.write(b"foo") def test_constant_methods(self): cctx = zstd.ZstdCompressor() - with cctx.stream_reader(b'boo') as reader: + with cctx.stream_reader(b"boo") as reader: self.assertTrue(reader.readable()) self.assertFalse(reader.writable()) self.assertFalse(reader.seekable()) @@ -637,27 +640,29 @@ def test_read_closed(self): cctx = zstd.ZstdCompressor() - with cctx.stream_reader(b'foo' * 60) as reader: + with cctx.stream_reader(b"foo" * 60) as reader: reader.close() self.assertTrue(reader.closed) - with self.assertRaisesRegexp(ValueError, 'stream is closed'): + with self.assertRaisesRegex(ValueError, "stream is closed"): reader.read(10) def test_read_sizes(self): cctx = zstd.ZstdCompressor() - foo = cctx.compress(b'foo') + foo = cctx.compress(b"foo") - with cctx.stream_reader(b'foo') as reader: - with self.assertRaisesRegexp(ValueError, 'cannot read negative amounts less than -1'): + with cctx.stream_reader(b"foo") as reader: + with self.assertRaisesRegex( + ValueError, "cannot read negative amounts less than -1" + ): reader.read(-2) - self.assertEqual(reader.read(0), b'') + self.assertEqual(reader.read(0), b"") self.assertEqual(reader.read(), foo) def test_read_buffer(self): cctx = zstd.ZstdCompressor() - source = b''.join([b'foo' * 60, b'bar' * 60, b'baz' * 60]) + source = b"".join([b"foo" * 60, b"bar" * 60, b"baz" * 60]) frame = cctx.compress(source) with cctx.stream_reader(source) as reader: @@ -667,13 +672,13 @@ result = reader.read(8192) self.assertEqual(result, frame) self.assertEqual(reader.tell(), len(result)) - self.assertEqual(reader.read(), b'') + self.assertEqual(reader.read(), b"") self.assertEqual(reader.tell(), len(result)) def test_read_buffer_small_chunks(self): cctx = zstd.ZstdCompressor() - source = b'foo' * 60 + source = b"foo" * 60 chunks = [] with cctx.stream_reader(source) as reader: @@ -687,12 +692,12 @@ chunks.append(chunk) self.assertEqual(reader.tell(), sum(map(len, chunks))) - self.assertEqual(b''.join(chunks), cctx.compress(source)) + self.assertEqual(b"".join(chunks), cctx.compress(source)) def test_read_stream(self): cctx = zstd.ZstdCompressor() - source = b''.join([b'foo' * 60, b'bar' * 60, b'baz' * 60]) + source = b"".join([b"foo" * 60, b"bar" * 60, b"baz" * 60]) frame = cctx.compress(source) with cctx.stream_reader(io.BytesIO(source), size=len(source)) as reader: @@ -701,13 +706,13 @@ chunk = reader.read(8192) self.assertEqual(chunk, frame) self.assertEqual(reader.tell(), len(chunk)) - self.assertEqual(reader.read(), b'') + self.assertEqual(reader.read(), b"") self.assertEqual(reader.tell(), len(chunk)) def test_read_stream_small_chunks(self): cctx = zstd.ZstdCompressor() - source = b'foo' * 60 + source = b"foo" * 60 chunks = [] with cctx.stream_reader(io.BytesIO(source), size=len(source)) as reader: @@ -721,25 +726,25 @@ chunks.append(chunk) self.assertEqual(reader.tell(), sum(map(len, chunks))) - self.assertEqual(b''.join(chunks), cctx.compress(source)) + self.assertEqual(b"".join(chunks), cctx.compress(source)) def test_read_after_exit(self): cctx = zstd.ZstdCompressor() - with cctx.stream_reader(b'foo' * 60) as reader: + with cctx.stream_reader(b"foo" * 60) as reader: while reader.read(8192): pass - with self.assertRaisesRegexp(ValueError, 'stream is closed'): + with self.assertRaisesRegex(ValueError, "stream is closed"): reader.read(10) def test_bad_size(self): cctx = zstd.ZstdCompressor() - source = io.BytesIO(b'foobar') + source = io.BytesIO(b"foobar") with cctx.stream_reader(source, size=2) as reader: - with self.assertRaisesRegexp(zstd.ZstdError, 'Src size is incorrect'): + with self.assertRaisesRegex(zstd.ZstdError, "Src size is incorrect"): reader.read(10) # Try another compression operation. @@ -748,36 +753,36 @@ def test_readall(self): cctx = zstd.ZstdCompressor() - frame = cctx.compress(b'foo' * 1024) + frame = cctx.compress(b"foo" * 1024) - reader = cctx.stream_reader(b'foo' * 1024) + reader = cctx.stream_reader(b"foo" * 1024) self.assertEqual(reader.readall(), frame) def test_readinto(self): cctx = zstd.ZstdCompressor() - foo = cctx.compress(b'foo') + foo = cctx.compress(b"foo") - reader = cctx.stream_reader(b'foo') + reader = cctx.stream_reader(b"foo") with self.assertRaises(Exception): - reader.readinto(b'foobar') + reader.readinto(b"foobar") # readinto() with sufficiently large destination. b = bytearray(1024) - reader = cctx.stream_reader(b'foo') + reader = cctx.stream_reader(b"foo") self.assertEqual(reader.readinto(b), len(foo)) - self.assertEqual(b[0:len(foo)], foo) + self.assertEqual(b[0 : len(foo)], foo) self.assertEqual(reader.readinto(b), 0) - self.assertEqual(b[0:len(foo)], foo) + self.assertEqual(b[0 : len(foo)], foo) # readinto() with small reads. b = bytearray(1024) - reader = cctx.stream_reader(b'foo', read_size=1) + reader = cctx.stream_reader(b"foo", read_size=1) self.assertEqual(reader.readinto(b), len(foo)) - self.assertEqual(b[0:len(foo)], foo) + self.assertEqual(b[0 : len(foo)], foo) # Too small destination buffer. b = bytearray(2) - reader = cctx.stream_reader(b'foo') + reader = cctx.stream_reader(b"foo") self.assertEqual(reader.readinto(b), 2) self.assertEqual(b[:], foo[0:2]) self.assertEqual(reader.readinto(b), 2) @@ -787,41 +792,41 @@ def test_readinto1(self): cctx = zstd.ZstdCompressor() - foo = b''.join(cctx.read_to_iter(io.BytesIO(b'foo'))) + foo = b"".join(cctx.read_to_iter(io.BytesIO(b"foo"))) - reader = cctx.stream_reader(b'foo') + reader = cctx.stream_reader(b"foo") with self.assertRaises(Exception): - reader.readinto1(b'foobar') + reader.readinto1(b"foobar") b = bytearray(1024) - source = OpCountingBytesIO(b'foo') + source = OpCountingBytesIO(b"foo") reader = cctx.stream_reader(source) self.assertEqual(reader.readinto1(b), len(foo)) - self.assertEqual(b[0:len(foo)], foo) + self.assertEqual(b[0 : len(foo)], foo) self.assertEqual(source._read_count, 2) # readinto1() with small reads. b = bytearray(1024) - source = OpCountingBytesIO(b'foo') + source = OpCountingBytesIO(b"foo") reader = cctx.stream_reader(source, read_size=1) self.assertEqual(reader.readinto1(b), len(foo)) - self.assertEqual(b[0:len(foo)], foo) + self.assertEqual(b[0 : len(foo)], foo) self.assertEqual(source._read_count, 4) def test_read1(self): cctx = zstd.ZstdCompressor() - foo = b''.join(cctx.read_to_iter(io.BytesIO(b'foo'))) + foo = b"".join(cctx.read_to_iter(io.BytesIO(b"foo"))) - b = OpCountingBytesIO(b'foo') + b = OpCountingBytesIO(b"foo") reader = cctx.stream_reader(b) self.assertEqual(reader.read1(), foo) self.assertEqual(b._read_count, 2) - b = OpCountingBytesIO(b'foo') + b = OpCountingBytesIO(b"foo") reader = cctx.stream_reader(b) - self.assertEqual(reader.read1(0), b'') + self.assertEqual(reader.read1(0), b"") self.assertEqual(reader.read1(2), foo[0:2]) self.assertEqual(b._read_count, 2) self.assertEqual(reader.read1(2), foo[2:4]) @@ -829,7 +834,7 @@ @make_cffi -class TestCompressor_stream_writer(unittest.TestCase): +class TestCompressor_stream_writer(TestCase): def test_io_api(self): buffer = io.BytesIO() cctx = zstd.ZstdCompressor() @@ -899,7 +904,7 @@ self.assertFalse(writer.closed) def test_fileno_file(self): - with tempfile.TemporaryFile('wb') as tf: + with tempfile.TemporaryFile("wb") as tf: cctx = zstd.ZstdCompressor() writer = cctx.stream_writer(tf) @@ -910,33 +915,35 @@ cctx = zstd.ZstdCompressor(level=1) writer = cctx.stream_writer(buffer) - writer.write(b'foo' * 1024) + writer.write(b"foo" * 1024) self.assertFalse(writer.closed) self.assertFalse(buffer.closed) writer.close() self.assertTrue(writer.closed) self.assertTrue(buffer.closed) - with self.assertRaisesRegexp(ValueError, 'stream is closed'): - writer.write(b'foo') + with self.assertRaisesRegex(ValueError, "stream is closed"): + writer.write(b"foo") - with self.assertRaisesRegexp(ValueError, 'stream is closed'): + with self.assertRaisesRegex(ValueError, "stream is closed"): writer.flush() - with self.assertRaisesRegexp(ValueError, 'stream is closed'): + with self.assertRaisesRegex(ValueError, "stream is closed"): with writer: pass - self.assertEqual(buffer.getvalue(), - b'\x28\xb5\x2f\xfd\x00\x48\x55\x00\x00\x18\x66\x6f' - b'\x6f\x01\x00\xfa\xd3\x77\x43') + self.assertEqual( + buffer.getvalue(), + b"\x28\xb5\x2f\xfd\x00\x48\x55\x00\x00\x18\x66\x6f" + b"\x6f\x01\x00\xfa\xd3\x77\x43", + ) # Context manager exit should close stream. buffer = io.BytesIO() writer = cctx.stream_writer(buffer) with writer: - writer.write(b'foo') + writer.write(b"foo") self.assertTrue(writer.closed) @@ -944,10 +951,10 @@ buffer = NonClosingBytesIO() cctx = zstd.ZstdCompressor(level=1, write_content_size=False) with cctx.stream_writer(buffer) as compressor: - compressor.write(b'') + compressor.write(b"") result = buffer.getvalue() - self.assertEqual(result, b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00') + self.assertEqual(result, b"\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00") params = zstd.get_frame_parameters(result) self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN) @@ -958,11 +965,11 @@ # Test without context manager. buffer = io.BytesIO() compressor = cctx.stream_writer(buffer) - self.assertEqual(compressor.write(b''), 0) - self.assertEqual(buffer.getvalue(), b'') + self.assertEqual(compressor.write(b""), 0) + self.assertEqual(buffer.getvalue(), b"") self.assertEqual(compressor.flush(zstd.FLUSH_FRAME), 9) result = buffer.getvalue() - self.assertEqual(result, b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00') + self.assertEqual(result, b"\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00") params = zstd.get_frame_parameters(result) self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN) @@ -972,18 +979,18 @@ # Test write_return_read=True compressor = cctx.stream_writer(buffer, write_return_read=True) - self.assertEqual(compressor.write(b''), 0) + self.assertEqual(compressor.write(b""), 0) def test_input_types(self): - expected = b'\x28\xb5\x2f\xfd\x00\x48\x19\x00\x00\x66\x6f\x6f' + expected = b"\x28\xb5\x2f\xfd\x00\x48\x19\x00\x00\x66\x6f\x6f" cctx = zstd.ZstdCompressor(level=1) mutable_array = bytearray(3) - mutable_array[:] = b'foo' + mutable_array[:] = b"foo" sources = [ - memoryview(b'foo'), - bytearray(b'foo'), + memoryview(b"foo"), + bytearray(b"foo"), mutable_array, ] @@ -1001,51 +1008,55 @@ buffer = NonClosingBytesIO() cctx = zstd.ZstdCompressor(level=5) with cctx.stream_writer(buffer) as compressor: - self.assertEqual(compressor.write(b'foo'), 0) - self.assertEqual(compressor.write(b'bar'), 0) - self.assertEqual(compressor.write(b'x' * 8192), 0) + self.assertEqual(compressor.write(b"foo"), 0) + self.assertEqual(compressor.write(b"bar"), 0) + self.assertEqual(compressor.write(b"x" * 8192), 0) result = buffer.getvalue() - self.assertEqual(result, - b'\x28\xb5\x2f\xfd\x00\x58\x75\x00\x00\x38\x66\x6f' - b'\x6f\x62\x61\x72\x78\x01\x00\xfc\xdf\x03\x23') + self.assertEqual( + result, + b"\x28\xb5\x2f\xfd\x00\x58\x75\x00\x00\x38\x66\x6f" + b"\x6f\x62\x61\x72\x78\x01\x00\xfc\xdf\x03\x23", + ) # Test without context manager. buffer = io.BytesIO() compressor = cctx.stream_writer(buffer) - self.assertEqual(compressor.write(b'foo'), 0) - self.assertEqual(compressor.write(b'bar'), 0) - self.assertEqual(compressor.write(b'x' * 8192), 0) + self.assertEqual(compressor.write(b"foo"), 0) + self.assertEqual(compressor.write(b"bar"), 0) + self.assertEqual(compressor.write(b"x" * 8192), 0) self.assertEqual(compressor.flush(zstd.FLUSH_FRAME), 23) result = buffer.getvalue() - self.assertEqual(result, - b'\x28\xb5\x2f\xfd\x00\x58\x75\x00\x00\x38\x66\x6f' - b'\x6f\x62\x61\x72\x78\x01\x00\xfc\xdf\x03\x23') + self.assertEqual( + result, + b"\x28\xb5\x2f\xfd\x00\x58\x75\x00\x00\x38\x66\x6f" + b"\x6f\x62\x61\x72\x78\x01\x00\xfc\xdf\x03\x23", + ) # Test with write_return_read=True. compressor = cctx.stream_writer(buffer, write_return_read=True) - self.assertEqual(compressor.write(b'foo'), 3) - self.assertEqual(compressor.write(b'barbiz'), 6) - self.assertEqual(compressor.write(b'x' * 8192), 8192) + self.assertEqual(compressor.write(b"foo"), 3) + self.assertEqual(compressor.write(b"barbiz"), 6) + self.assertEqual(compressor.write(b"x" * 8192), 8192) def test_dictionary(self): samples = [] for i in range(128): - samples.append(b'foo' * 64) - samples.append(b'bar' * 64) - samples.append(b'foobar' * 64) + samples.append(b"foo" * 64) + samples.append(b"bar" * 64) + samples.append(b"foobar" * 64) d = zstd.train_dictionary(8192, samples) h = hashlib.sha1(d.as_bytes()).hexdigest() - self.assertEqual(h, '7a2e59a876db958f74257141045af8f912e00d4e') + self.assertEqual(h, "7a2e59a876db958f74257141045af8f912e00d4e") buffer = NonClosingBytesIO() cctx = zstd.ZstdCompressor(level=9, dict_data=d) with cctx.stream_writer(buffer) as compressor: - self.assertEqual(compressor.write(b'foo'), 0) - self.assertEqual(compressor.write(b'bar'), 0) - self.assertEqual(compressor.write(b'foo' * 16384), 0) + self.assertEqual(compressor.write(b"foo"), 0) + self.assertEqual(compressor.write(b"bar"), 0) + self.assertEqual(compressor.write(b"foo" * 16384), 0) compressed = buffer.getvalue() @@ -1056,14 +1067,15 @@ self.assertFalse(params.has_checksum) h = hashlib.sha1(compressed).hexdigest() - self.assertEqual(h, '0a7c05635061f58039727cdbe76388c6f4cfef06') + self.assertEqual(h, "0a7c05635061f58039727cdbe76388c6f4cfef06") - source = b'foo' + b'bar' + (b'foo' * 16384) + source = b"foo" + b"bar" + (b"foo" * 16384) dctx = zstd.ZstdDecompressor(dict_data=d) - self.assertEqual(dctx.decompress(compressed, max_output_size=len(source)), - source) + self.assertEqual( + dctx.decompress(compressed, max_output_size=len(source)), source + ) def test_compression_params(self): params = zstd.ZstdCompressionParameters( @@ -1073,14 +1085,15 @@ min_match=5, search_log=4, target_length=10, - strategy=zstd.STRATEGY_FAST) + strategy=zstd.STRATEGY_FAST, + ) buffer = NonClosingBytesIO() cctx = zstd.ZstdCompressor(compression_params=params) with cctx.stream_writer(buffer) as compressor: - self.assertEqual(compressor.write(b'foo'), 0) - self.assertEqual(compressor.write(b'bar'), 0) - self.assertEqual(compressor.write(b'foobar' * 16384), 0) + self.assertEqual(compressor.write(b"foo"), 0) + self.assertEqual(compressor.write(b"bar"), 0) + self.assertEqual(compressor.write(b"foobar" * 16384), 0) compressed = buffer.getvalue() @@ -1091,18 +1104,18 @@ self.assertFalse(params.has_checksum) h = hashlib.sha1(compressed).hexdigest() - self.assertEqual(h, 'dd4bb7d37c1a0235b38a2f6b462814376843ef0b') + self.assertEqual(h, "dd4bb7d37c1a0235b38a2f6b462814376843ef0b") def test_write_checksum(self): no_checksum = NonClosingBytesIO() cctx = zstd.ZstdCompressor(level=1) with cctx.stream_writer(no_checksum) as compressor: - self.assertEqual(compressor.write(b'foobar'), 0) + self.assertEqual(compressor.write(b"foobar"), 0) with_checksum = NonClosingBytesIO() cctx = zstd.ZstdCompressor(level=1, write_checksum=True) with cctx.stream_writer(with_checksum) as compressor: - self.assertEqual(compressor.write(b'foobar'), 0) + self.assertEqual(compressor.write(b"foobar"), 0) no_params = zstd.get_frame_parameters(no_checksum.getvalue()) with_params = zstd.get_frame_parameters(with_checksum.getvalue()) @@ -1113,29 +1126,27 @@ self.assertFalse(no_params.has_checksum) self.assertTrue(with_params.has_checksum) - self.assertEqual(len(with_checksum.getvalue()), - len(no_checksum.getvalue()) + 4) + self.assertEqual(len(with_checksum.getvalue()), len(no_checksum.getvalue()) + 4) def test_write_content_size(self): no_size = NonClosingBytesIO() cctx = zstd.ZstdCompressor(level=1, write_content_size=False) with cctx.stream_writer(no_size) as compressor: - self.assertEqual(compressor.write(b'foobar' * 256), 0) + self.assertEqual(compressor.write(b"foobar" * 256), 0) with_size = NonClosingBytesIO() cctx = zstd.ZstdCompressor(level=1) with cctx.stream_writer(with_size) as compressor: - self.assertEqual(compressor.write(b'foobar' * 256), 0) + self.assertEqual(compressor.write(b"foobar" * 256), 0) # Source size is not known in streaming mode, so header not # written. - self.assertEqual(len(with_size.getvalue()), - len(no_size.getvalue())) + self.assertEqual(len(with_size.getvalue()), len(no_size.getvalue())) # Declaring size will write the header. with_size = NonClosingBytesIO() - with cctx.stream_writer(with_size, size=len(b'foobar' * 256)) as compressor: - self.assertEqual(compressor.write(b'foobar' * 256), 0) + with cctx.stream_writer(with_size, size=len(b"foobar" * 256)) as compressor: + self.assertEqual(compressor.write(b"foobar" * 256), 0) no_params = zstd.get_frame_parameters(no_size.getvalue()) with_params = zstd.get_frame_parameters(with_size.getvalue()) @@ -1146,31 +1157,30 @@ self.assertFalse(no_params.has_checksum) self.assertFalse(with_params.has_checksum) - self.assertEqual(len(with_size.getvalue()), - len(no_size.getvalue()) + 1) + self.assertEqual(len(with_size.getvalue()), len(no_size.getvalue()) + 1) def test_no_dict_id(self): samples = [] for i in range(128): - samples.append(b'foo' * 64) - samples.append(b'bar' * 64) - samples.append(b'foobar' * 64) + samples.append(b"foo" * 64) + samples.append(b"bar" * 64) + samples.append(b"foobar" * 64) d = zstd.train_dictionary(1024, samples) with_dict_id = NonClosingBytesIO() cctx = zstd.ZstdCompressor(level=1, dict_data=d) with cctx.stream_writer(with_dict_id) as compressor: - self.assertEqual(compressor.write(b'foobarfoobar'), 0) + self.assertEqual(compressor.write(b"foobarfoobar"), 0) - self.assertEqual(with_dict_id.getvalue()[4:5], b'\x03') + self.assertEqual(with_dict_id.getvalue()[4:5], b"\x03") cctx = zstd.ZstdCompressor(level=1, dict_data=d, write_dict_id=False) no_dict_id = NonClosingBytesIO() with cctx.stream_writer(no_dict_id) as compressor: - self.assertEqual(compressor.write(b'foobarfoobar'), 0) + self.assertEqual(compressor.write(b"foobarfoobar"), 0) - self.assertEqual(no_dict_id.getvalue()[4:5], b'\x00') + self.assertEqual(no_dict_id.getvalue()[4:5], b"\x00") no_params = zstd.get_frame_parameters(no_dict_id.getvalue()) with_params = zstd.get_frame_parameters(with_dict_id.getvalue()) @@ -1181,14 +1191,13 @@ self.assertFalse(no_params.has_checksum) self.assertFalse(with_params.has_checksum) - self.assertEqual(len(with_dict_id.getvalue()), - len(no_dict_id.getvalue()) + 4) + self.assertEqual(len(with_dict_id.getvalue()), len(no_dict_id.getvalue()) + 4) def test_memory_size(self): cctx = zstd.ZstdCompressor(level=3) buffer = io.BytesIO() with cctx.stream_writer(buffer) as compressor: - compressor.write(b'foo') + compressor.write(b"foo") size = compressor.memory_size() self.assertGreater(size, 100000) @@ -1197,9 +1206,9 @@ cctx = zstd.ZstdCompressor(level=3) dest = OpCountingBytesIO() with cctx.stream_writer(dest, write_size=1) as compressor: - self.assertEqual(compressor.write(b'foo'), 0) - self.assertEqual(compressor.write(b'bar'), 0) - self.assertEqual(compressor.write(b'foobar'), 0) + self.assertEqual(compressor.write(b"foo"), 0) + self.assertEqual(compressor.write(b"bar"), 0) + self.assertEqual(compressor.write(b"foobar"), 0) self.assertEqual(len(dest.getvalue()), dest._write_count) @@ -1207,15 +1216,15 @@ cctx = zstd.ZstdCompressor(level=3) dest = OpCountingBytesIO() with cctx.stream_writer(dest) as compressor: - self.assertEqual(compressor.write(b'foo'), 0) + self.assertEqual(compressor.write(b"foo"), 0) self.assertEqual(dest._write_count, 0) self.assertEqual(compressor.flush(), 12) self.assertEqual(dest._write_count, 1) - self.assertEqual(compressor.write(b'bar'), 0) + self.assertEqual(compressor.write(b"bar"), 0) self.assertEqual(dest._write_count, 1) self.assertEqual(compressor.flush(), 6) self.assertEqual(dest._write_count, 2) - self.assertEqual(compressor.write(b'baz'), 0) + self.assertEqual(compressor.write(b"baz"), 0) self.assertEqual(dest._write_count, 3) @@ -1223,7 +1232,7 @@ cctx = zstd.ZstdCompressor(level=3, write_checksum=True) dest = OpCountingBytesIO() with cctx.stream_writer(dest) as compressor: - self.assertEqual(compressor.write(b'foobar' * 8192), 0) + self.assertEqual(compressor.write(b"foobar" * 8192), 0) count = dest._write_count offset = dest.tell() self.assertEqual(compressor.flush(), 23) @@ -1238,41 +1247,43 @@ self.assertEqual(len(trailing), 7) header = trailing[0:3] - self.assertEqual(header, b'\x01\x00\x00') + self.assertEqual(header, b"\x01\x00\x00") def test_flush_frame(self): cctx = zstd.ZstdCompressor(level=3) dest = OpCountingBytesIO() with cctx.stream_writer(dest) as compressor: - self.assertEqual(compressor.write(b'foobar' * 8192), 0) + self.assertEqual(compressor.write(b"foobar" * 8192), 0) self.assertEqual(compressor.flush(zstd.FLUSH_FRAME), 23) - compressor.write(b'biz' * 16384) + compressor.write(b"biz" * 16384) - self.assertEqual(dest.getvalue(), - # Frame 1. - b'\x28\xb5\x2f\xfd\x00\x58\x75\x00\x00\x30\x66\x6f\x6f' - b'\x62\x61\x72\x01\x00\xf7\xbf\xe8\xa5\x08' - # Frame 2. - b'\x28\xb5\x2f\xfd\x00\x58\x5d\x00\x00\x18\x62\x69\x7a' - b'\x01\x00\xfa\x3f\x75\x37\x04') + self.assertEqual( + dest.getvalue(), + # Frame 1. + b"\x28\xb5\x2f\xfd\x00\x58\x75\x00\x00\x30\x66\x6f\x6f" + b"\x62\x61\x72\x01\x00\xf7\xbf\xe8\xa5\x08" + # Frame 2. + b"\x28\xb5\x2f\xfd\x00\x58\x5d\x00\x00\x18\x62\x69\x7a" + b"\x01\x00\xfa\x3f\x75\x37\x04", + ) def test_bad_flush_mode(self): cctx = zstd.ZstdCompressor() dest = io.BytesIO() with cctx.stream_writer(dest) as compressor: - with self.assertRaisesRegexp(ValueError, 'unknown flush_mode: 42'): + with self.assertRaisesRegex(ValueError, "unknown flush_mode: 42"): compressor.flush(flush_mode=42) def test_multithreaded(self): dest = NonClosingBytesIO() cctx = zstd.ZstdCompressor(threads=2) with cctx.stream_writer(dest) as compressor: - compressor.write(b'a' * 1048576) - compressor.write(b'b' * 1048576) - compressor.write(b'c' * 1048576) + compressor.write(b"a" * 1048576) + compressor.write(b"b" * 1048576) + compressor.write(b"c" * 1048576) - self.assertEqual(len(dest.getvalue()), 295) + self.assertEqual(len(dest.getvalue()), 111) def test_tell(self): dest = io.BytesIO() @@ -1281,7 +1292,7 @@ self.assertEqual(compressor.tell(), 0) for i in range(256): - compressor.write(b'foo' * (i + 1)) + compressor.write(b"foo" * (i + 1)) self.assertEqual(compressor.tell(), dest.tell()) def test_bad_size(self): @@ -1289,9 +1300,9 @@ dest = io.BytesIO() - with self.assertRaisesRegexp(zstd.ZstdError, 'Src size is incorrect'): + with self.assertRaisesRegex(zstd.ZstdError, "Src size is incorrect"): with cctx.stream_writer(dest, size=2) as compressor: - compressor.write(b'foo') + compressor.write(b"foo") # Test another operation. with cctx.stream_writer(dest, size=42): @@ -1301,20 +1312,20 @@ dest = NonClosingBytesIO() cctx = zstd.ZstdCompressor() with cctx.stream_writer(dest) as compressor: - with tarfile.open('tf', mode='w|', fileobj=compressor) as tf: - tf.add(__file__, 'test_compressor.py') + with tarfile.open("tf", mode="w|", fileobj=compressor) as tf: + tf.add(__file__, "test_compressor.py") dest = io.BytesIO(dest.getvalue()) dctx = zstd.ZstdDecompressor() with dctx.stream_reader(dest) as reader: - with tarfile.open(mode='r|', fileobj=reader) as tf: + with tarfile.open(mode="r|", fileobj=reader) as tf: for member in tf: - self.assertEqual(member.name, 'test_compressor.py') + self.assertEqual(member.name, "test_compressor.py") @make_cffi -class TestCompressor_read_to_iter(unittest.TestCase): +class TestCompressor_read_to_iter(TestCase): def test_type_validation(self): cctx = zstd.ZstdCompressor() @@ -1323,10 +1334,10 @@ pass # Buffer protocol works. - for chunk in cctx.read_to_iter(b'foobar'): + for chunk in cctx.read_to_iter(b"foobar"): pass - with self.assertRaisesRegexp(ValueError, 'must pass an object with a read'): + with self.assertRaisesRegex(ValueError, "must pass an object with a read"): for chunk in cctx.read_to_iter(True): pass @@ -1337,22 +1348,22 @@ it = cctx.read_to_iter(source) chunks = list(it) self.assertEqual(len(chunks), 1) - compressed = b''.join(chunks) - self.assertEqual(compressed, b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00') + compressed = b"".join(chunks) + self.assertEqual(compressed, b"\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00") # And again with the buffer protocol. - it = cctx.read_to_iter(b'') + it = cctx.read_to_iter(b"") chunks = list(it) self.assertEqual(len(chunks), 1) - compressed2 = b''.join(chunks) + compressed2 = b"".join(chunks) self.assertEqual(compressed2, compressed) def test_read_large(self): cctx = zstd.ZstdCompressor(level=1, write_content_size=False) source = io.BytesIO() - source.write(b'f' * zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE) - source.write(b'o') + source.write(b"f" * zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE) + source.write(b"o") source.seek(0) # Creating an iterator should not perform any compression until @@ -1380,9 +1391,9 @@ next(it) # We should get the same output as the one-shot compression mechanism. - self.assertEqual(b''.join(chunks), cctx.compress(source.getvalue())) + self.assertEqual(b"".join(chunks), cctx.compress(source.getvalue())) - params = zstd.get_frame_parameters(b''.join(chunks)) + params = zstd.get_frame_parameters(b"".join(chunks)) self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN) self.assertEqual(params.window_size, 262144) self.assertEqual(params.dict_id, 0) @@ -1393,16 +1404,16 @@ chunks = list(it) self.assertEqual(len(chunks), 2) - params = zstd.get_frame_parameters(b''.join(chunks)) + params = zstd.get_frame_parameters(b"".join(chunks)) self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN) - #self.assertEqual(params.window_size, 262144) + # self.assertEqual(params.window_size, 262144) self.assertEqual(params.dict_id, 0) self.assertFalse(params.has_checksum) - self.assertEqual(b''.join(chunks), cctx.compress(source.getvalue())) + self.assertEqual(b"".join(chunks), cctx.compress(source.getvalue())) def test_read_write_size(self): - source = OpCountingBytesIO(b'foobarfoobar') + source = OpCountingBytesIO(b"foobarfoobar") cctx = zstd.ZstdCompressor(level=3) for chunk in cctx.read_to_iter(source, read_size=1, write_size=1): self.assertEqual(len(chunk), 1) @@ -1411,42 +1422,42 @@ def test_multithreaded(self): source = io.BytesIO() - source.write(b'a' * 1048576) - source.write(b'b' * 1048576) - source.write(b'c' * 1048576) + source.write(b"a" * 1048576) + source.write(b"b" * 1048576) + source.write(b"c" * 1048576) source.seek(0) cctx = zstd.ZstdCompressor(threads=2) - compressed = b''.join(cctx.read_to_iter(source)) - self.assertEqual(len(compressed), 295) + compressed = b"".join(cctx.read_to_iter(source)) + self.assertEqual(len(compressed), 111) def test_bad_size(self): cctx = zstd.ZstdCompressor() - source = io.BytesIO(b'a' * 42) + source = io.BytesIO(b"a" * 42) - with self.assertRaisesRegexp(zstd.ZstdError, 'Src size is incorrect'): - b''.join(cctx.read_to_iter(source, size=2)) + with self.assertRaisesRegex(zstd.ZstdError, "Src size is incorrect"): + b"".join(cctx.read_to_iter(source, size=2)) # Test another operation on errored compressor. - b''.join(cctx.read_to_iter(source)) + b"".join(cctx.read_to_iter(source)) @make_cffi -class TestCompressor_chunker(unittest.TestCase): +class TestCompressor_chunker(TestCase): def test_empty(self): cctx = zstd.ZstdCompressor(write_content_size=False) chunker = cctx.chunker() - it = chunker.compress(b'') + it = chunker.compress(b"") with self.assertRaises(StopIteration): next(it) it = chunker.finish() - self.assertEqual(next(it), b'\x28\xb5\x2f\xfd\x00\x58\x01\x00\x00') + self.assertEqual(next(it), b"\x28\xb5\x2f\xfd\x00\x58\x01\x00\x00") with self.assertRaises(StopIteration): next(it) @@ -1455,21 +1466,23 @@ cctx = zstd.ZstdCompressor() chunker = cctx.chunker() - it = chunker.compress(b'foobar') + it = chunker.compress(b"foobar") with self.assertRaises(StopIteration): next(it) - it = chunker.compress(b'baz' * 30) + it = chunker.compress(b"baz" * 30) with self.assertRaises(StopIteration): next(it) it = chunker.finish() - self.assertEqual(next(it), - b'\x28\xb5\x2f\xfd\x00\x58\x7d\x00\x00\x48\x66\x6f' - b'\x6f\x62\x61\x72\x62\x61\x7a\x01\x00\xe4\xe4\x8e') + self.assertEqual( + next(it), + b"\x28\xb5\x2f\xfd\x00\x58\x7d\x00\x00\x48\x66\x6f" + b"\x6f\x62\x61\x72\x62\x61\x7a\x01\x00\xe4\xe4\x8e", + ) with self.assertRaises(StopIteration): next(it) @@ -1478,57 +1491,60 @@ cctx = zstd.ZstdCompressor() chunker = cctx.chunker(size=1024) - it = chunker.compress(b'x' * 1000) + it = chunker.compress(b"x" * 1000) with self.assertRaises(StopIteration): next(it) - it = chunker.compress(b'y' * 24) + it = chunker.compress(b"y" * 24) with self.assertRaises(StopIteration): next(it) chunks = list(chunker.finish()) - self.assertEqual(chunks, [ - b'\x28\xb5\x2f\xfd\x60\x00\x03\x65\x00\x00\x18\x78\x78\x79\x02\x00' - b'\xa0\x16\xe3\x2b\x80\x05' - ]) + self.assertEqual( + chunks, + [ + b"\x28\xb5\x2f\xfd\x60\x00\x03\x65\x00\x00\x18\x78\x78\x79\x02\x00" + b"\xa0\x16\xe3\x2b\x80\x05" + ], + ) dctx = zstd.ZstdDecompressor() - self.assertEqual(dctx.decompress(b''.join(chunks)), - (b'x' * 1000) + (b'y' * 24)) + self.assertEqual(dctx.decompress(b"".join(chunks)), (b"x" * 1000) + (b"y" * 24)) def test_small_chunk_size(self): cctx = zstd.ZstdCompressor() chunker = cctx.chunker(chunk_size=1) - chunks = list(chunker.compress(b'foo' * 1024)) + chunks = list(chunker.compress(b"foo" * 1024)) self.assertEqual(chunks, []) chunks = list(chunker.finish()) self.assertTrue(all(len(chunk) == 1 for chunk in chunks)) self.assertEqual( - b''.join(chunks), - b'\x28\xb5\x2f\xfd\x00\x58\x55\x00\x00\x18\x66\x6f\x6f\x01\x00' - b'\xfa\xd3\x77\x43') + b"".join(chunks), + b"\x28\xb5\x2f\xfd\x00\x58\x55\x00\x00\x18\x66\x6f\x6f\x01\x00" + b"\xfa\xd3\x77\x43", + ) dctx = zstd.ZstdDecompressor() - self.assertEqual(dctx.decompress(b''.join(chunks), - max_output_size=10000), - b'foo' * 1024) + self.assertEqual( + dctx.decompress(b"".join(chunks), max_output_size=10000), b"foo" * 1024 + ) def test_input_types(self): cctx = zstd.ZstdCompressor() mutable_array = bytearray(3) - mutable_array[:] = b'foo' + mutable_array[:] = b"foo" sources = [ - memoryview(b'foo'), - bytearray(b'foo'), + memoryview(b"foo"), + bytearray(b"foo"), mutable_array, ] @@ -1536,28 +1552,32 @@ chunker = cctx.chunker() self.assertEqual(list(chunker.compress(source)), []) - self.assertEqual(list(chunker.finish()), [ - b'\x28\xb5\x2f\xfd\x00\x58\x19\x00\x00\x66\x6f\x6f' - ]) + self.assertEqual( + list(chunker.finish()), + [b"\x28\xb5\x2f\xfd\x00\x58\x19\x00\x00\x66\x6f\x6f"], + ) def test_flush(self): cctx = zstd.ZstdCompressor() chunker = cctx.chunker() - self.assertEqual(list(chunker.compress(b'foo' * 1024)), []) - self.assertEqual(list(chunker.compress(b'bar' * 1024)), []) + self.assertEqual(list(chunker.compress(b"foo" * 1024)), []) + self.assertEqual(list(chunker.compress(b"bar" * 1024)), []) chunks1 = list(chunker.flush()) - self.assertEqual(chunks1, [ - b'\x28\xb5\x2f\xfd\x00\x58\x8c\x00\x00\x30\x66\x6f\x6f\x62\x61\x72' - b'\x02\x00\xfa\x03\xfe\xd0\x9f\xbe\x1b\x02' - ]) + self.assertEqual( + chunks1, + [ + b"\x28\xb5\x2f\xfd\x00\x58\x8c\x00\x00\x30\x66\x6f\x6f\x62\x61\x72" + b"\x02\x00\xfa\x03\xfe\xd0\x9f\xbe\x1b\x02" + ], + ) self.assertEqual(list(chunker.flush()), []) self.assertEqual(list(chunker.flush()), []) - self.assertEqual(list(chunker.compress(b'baz' * 1024)), []) + self.assertEqual(list(chunker.compress(b"baz" * 1024)), []) chunks2 = list(chunker.flush()) self.assertEqual(len(chunks2), 1) @@ -1567,53 +1587,56 @@ dctx = zstd.ZstdDecompressor() - self.assertEqual(dctx.decompress(b''.join(chunks1 + chunks2 + chunks3), - max_output_size=10000), - (b'foo' * 1024) + (b'bar' * 1024) + (b'baz' * 1024)) + self.assertEqual( + dctx.decompress( + b"".join(chunks1 + chunks2 + chunks3), max_output_size=10000 + ), + (b"foo" * 1024) + (b"bar" * 1024) + (b"baz" * 1024), + ) def test_compress_after_finish(self): cctx = zstd.ZstdCompressor() chunker = cctx.chunker() - list(chunker.compress(b'foo')) + list(chunker.compress(b"foo")) list(chunker.finish()) - with self.assertRaisesRegexp( - zstd.ZstdError, - r'cannot call compress\(\) after compression finished'): - list(chunker.compress(b'foo')) + with self.assertRaisesRegex( + zstd.ZstdError, r"cannot call compress\(\) after compression finished" + ): + list(chunker.compress(b"foo")) def test_flush_after_finish(self): cctx = zstd.ZstdCompressor() chunker = cctx.chunker() - list(chunker.compress(b'foo')) + list(chunker.compress(b"foo")) list(chunker.finish()) - with self.assertRaisesRegexp( - zstd.ZstdError, - r'cannot call flush\(\) after compression finished'): + with self.assertRaisesRegex( + zstd.ZstdError, r"cannot call flush\(\) after compression finished" + ): list(chunker.flush()) def test_finish_after_finish(self): cctx = zstd.ZstdCompressor() chunker = cctx.chunker() - list(chunker.compress(b'foo')) + list(chunker.compress(b"foo")) list(chunker.finish()) - with self.assertRaisesRegexp( - zstd.ZstdError, - r'cannot call finish\(\) after compression finished'): + with self.assertRaisesRegex( + zstd.ZstdError, r"cannot call finish\(\) after compression finished" + ): list(chunker.finish()) -class TestCompressor_multi_compress_to_buffer(unittest.TestCase): +class TestCompressor_multi_compress_to_buffer(TestCase): def test_invalid_inputs(self): cctx = zstd.ZstdCompressor() - if not hasattr(cctx, 'multi_compress_to_buffer'): - self.skipTest('multi_compress_to_buffer not available') + if not hasattr(cctx, "multi_compress_to_buffer"): + self.skipTest("multi_compress_to_buffer not available") with self.assertRaises(TypeError): cctx.multi_compress_to_buffer(True) @@ -1621,28 +1644,28 @@ with self.assertRaises(TypeError): cctx.multi_compress_to_buffer((1, 2)) - with self.assertRaisesRegexp(TypeError, 'item 0 not a bytes like object'): - cctx.multi_compress_to_buffer([u'foo']) + with self.assertRaisesRegex(TypeError, "item 0 not a bytes like object"): + cctx.multi_compress_to_buffer([u"foo"]) def test_empty_input(self): cctx = zstd.ZstdCompressor() - if not hasattr(cctx, 'multi_compress_to_buffer'): - self.skipTest('multi_compress_to_buffer not available') + if not hasattr(cctx, "multi_compress_to_buffer"): + self.skipTest("multi_compress_to_buffer not available") - with self.assertRaisesRegexp(ValueError, 'no source elements found'): + with self.assertRaisesRegex(ValueError, "no source elements found"): cctx.multi_compress_to_buffer([]) - with self.assertRaisesRegexp(ValueError, 'source elements are empty'): - cctx.multi_compress_to_buffer([b'', b'', b'']) + with self.assertRaisesRegex(ValueError, "source elements are empty"): + cctx.multi_compress_to_buffer([b"", b"", b""]) def test_list_input(self): cctx = zstd.ZstdCompressor(write_checksum=True) - if not hasattr(cctx, 'multi_compress_to_buffer'): - self.skipTest('multi_compress_to_buffer not available') + if not hasattr(cctx, "multi_compress_to_buffer"): + self.skipTest("multi_compress_to_buffer not available") - original = [b'foo' * 12, b'bar' * 6] + original = [b"foo" * 12, b"bar" * 6] frames = [cctx.compress(c) for c in original] b = cctx.multi_compress_to_buffer(original) @@ -1657,15 +1680,16 @@ def test_buffer_with_segments_input(self): cctx = zstd.ZstdCompressor(write_checksum=True) - if not hasattr(cctx, 'multi_compress_to_buffer'): - self.skipTest('multi_compress_to_buffer not available') + if not hasattr(cctx, "multi_compress_to_buffer"): + self.skipTest("multi_compress_to_buffer not available") - original = [b'foo' * 4, b'bar' * 6] + original = [b"foo" * 4, b"bar" * 6] frames = [cctx.compress(c) for c in original] - offsets = struct.pack('=QQQQ', 0, len(original[0]), - len(original[0]), len(original[1])) - segments = zstd.BufferWithSegments(b''.join(original), offsets) + offsets = struct.pack( + "=QQQQ", 0, len(original[0]), len(original[0]), len(original[1]) + ) + segments = zstd.BufferWithSegments(b"".join(original), offsets) result = cctx.multi_compress_to_buffer(segments) @@ -1678,28 +1702,39 @@ def test_buffer_with_segments_collection_input(self): cctx = zstd.ZstdCompressor(write_checksum=True) - if not hasattr(cctx, 'multi_compress_to_buffer'): - self.skipTest('multi_compress_to_buffer not available') + if not hasattr(cctx, "multi_compress_to_buffer"): + self.skipTest("multi_compress_to_buffer not available") original = [ - b'foo1', - b'foo2' * 2, - b'foo3' * 3, - b'foo4' * 4, - b'foo5' * 5, + b"foo1", + b"foo2" * 2, + b"foo3" * 3, + b"foo4" * 4, + b"foo5" * 5, ] frames = [cctx.compress(c) for c in original] - b = b''.join([original[0], original[1]]) - b1 = zstd.BufferWithSegments(b, struct.pack('=QQQQ', - 0, len(original[0]), - len(original[0]), len(original[1]))) - b = b''.join([original[2], original[3], original[4]]) - b2 = zstd.BufferWithSegments(b, struct.pack('=QQQQQQ', - 0, len(original[2]), - len(original[2]), len(original[3]), - len(original[2]) + len(original[3]), len(original[4]))) + b = b"".join([original[0], original[1]]) + b1 = zstd.BufferWithSegments( + b, + struct.pack( + "=QQQQ", 0, len(original[0]), len(original[0]), len(original[1]) + ), + ) + b = b"".join([original[2], original[3], original[4]]) + b2 = zstd.BufferWithSegments( + b, + struct.pack( + "=QQQQQQ", + 0, + len(original[2]), + len(original[2]), + len(original[3]), + len(original[2]) + len(original[3]), + len(original[4]), + ), + ) c = zstd.BufferWithSegmentsCollection(b1, b2) @@ -1714,16 +1749,16 @@ # threads argument will cause multi-threaded ZSTD APIs to be used, which will # make output different. refcctx = zstd.ZstdCompressor(write_checksum=True) - reference = [refcctx.compress(b'x' * 64), refcctx.compress(b'y' * 64)] + reference = [refcctx.compress(b"x" * 64), refcctx.compress(b"y" * 64)] cctx = zstd.ZstdCompressor(write_checksum=True) - if not hasattr(cctx, 'multi_compress_to_buffer'): - self.skipTest('multi_compress_to_buffer not available') + if not hasattr(cctx, "multi_compress_to_buffer"): + self.skipTest("multi_compress_to_buffer not available") frames = [] - frames.extend(b'x' * 64 for i in range(256)) - frames.extend(b'y' * 64 for i in range(256)) + frames.extend(b"x" * 64 for i in range(256)) + frames.extend(b"y" * 64 for i in range(256)) result = cctx.multi_compress_to_buffer(frames, threads=-1)