Mercurial > public > mercurial-scm > hg
diff contrib/python-zstandard/tests/test_decompressor.py @ 43994:de7838053207
zstandard: vendor python-zstandard 0.13.0
Version 0.13.0 of the package was just released. It contains
an upgraded zstd C library which can result in some performance
wins, official support for Python 3.8, and a blackened code base.
There were no meaningful code or functionality changes in this
release of python-zstandard: just reformatting and an upgraded
zstd library version. So the diff seems much larger than what it
is.
Files were added without modifications.
The clang-format-ignorelist file was updated to reflect a new
header file in the zstd distribution.
# no-check-commit because 3rd party code has different style guidelines
Differential Revision: https://phab.mercurial-scm.org/D7770
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Sat, 28 Dec 2019 09:55:45 -0800 |
parents | 675775c33ab6 |
children | 5e84a96d865b |
line wrap: on
line diff
--- a/contrib/python-zstandard/tests/test_decompressor.py Fri Dec 27 18:54:57 2019 -0500 +++ b/contrib/python-zstandard/tests/test_decompressor.py Sat Dec 28 09:55:45 2019 -0800 @@ -13,6 +13,7 @@ make_cffi, NonClosingBytesIO, OpCountingBytesIO, + TestCase, ) @@ -23,62 +24,67 @@ @make_cffi -class TestFrameHeaderSize(unittest.TestCase): +class TestFrameHeaderSize(TestCase): def test_empty(self): - with self.assertRaisesRegexp( - zstd.ZstdError, 'could not determine frame header size: Src size ' - 'is incorrect'): - zstd.frame_header_size(b'') + with self.assertRaisesRegex( + zstd.ZstdError, + "could not determine frame header size: Src size " "is incorrect", + ): + zstd.frame_header_size(b"") def test_too_small(self): - with self.assertRaisesRegexp( - zstd.ZstdError, 'could not determine frame header size: Src size ' - 'is incorrect'): - zstd.frame_header_size(b'foob') + with self.assertRaisesRegex( + zstd.ZstdError, + "could not determine frame header size: Src size " "is incorrect", + ): + zstd.frame_header_size(b"foob") def test_basic(self): # It doesn't matter that it isn't a valid frame. - self.assertEqual(zstd.frame_header_size(b'long enough but no magic'), 6) + self.assertEqual(zstd.frame_header_size(b"long enough but no magic"), 6) @make_cffi -class TestFrameContentSize(unittest.TestCase): +class TestFrameContentSize(TestCase): def test_empty(self): - with self.assertRaisesRegexp(zstd.ZstdError, - 'error when determining content size'): - zstd.frame_content_size(b'') + with self.assertRaisesRegex( + zstd.ZstdError, "error when determining content size" + ): + zstd.frame_content_size(b"") def test_too_small(self): - with self.assertRaisesRegexp(zstd.ZstdError, - 'error when determining content size'): - zstd.frame_content_size(b'foob') + with self.assertRaisesRegex( + zstd.ZstdError, "error when determining content size" + ): + zstd.frame_content_size(b"foob") def test_bad_frame(self): - with self.assertRaisesRegexp(zstd.ZstdError, - 'error when determining content size'): - zstd.frame_content_size(b'invalid frame header') + with self.assertRaisesRegex( + zstd.ZstdError, "error when determining content size" + ): + zstd.frame_content_size(b"invalid frame header") def test_unknown(self): cctx = zstd.ZstdCompressor(write_content_size=False) - frame = cctx.compress(b'foobar') + frame = cctx.compress(b"foobar") self.assertEqual(zstd.frame_content_size(frame), -1) def test_empty(self): cctx = zstd.ZstdCompressor() - frame = cctx.compress(b'') + frame = cctx.compress(b"") self.assertEqual(zstd.frame_content_size(frame), 0) def test_basic(self): cctx = zstd.ZstdCompressor() - frame = cctx.compress(b'foobar') + frame = cctx.compress(b"foobar") self.assertEqual(zstd.frame_content_size(frame), 6) @make_cffi -class TestDecompressor(unittest.TestCase): +class TestDecompressor(TestCase): def test_memory_size(self): dctx = zstd.ZstdDecompressor() @@ -86,22 +92,26 @@ @make_cffi -class TestDecompressor_decompress(unittest.TestCase): +class TestDecompressor_decompress(TestCase): def test_empty_input(self): dctx = zstd.ZstdDecompressor() - with self.assertRaisesRegexp(zstd.ZstdError, 'error determining content size from frame header'): - dctx.decompress(b'') + with self.assertRaisesRegex( + zstd.ZstdError, "error determining content size from frame header" + ): + dctx.decompress(b"") def test_invalid_input(self): dctx = zstd.ZstdDecompressor() - with self.assertRaisesRegexp(zstd.ZstdError, 'error determining content size from frame header'): - dctx.decompress(b'foobar') + with self.assertRaisesRegex( + zstd.ZstdError, "error determining content size from frame header" + ): + dctx.decompress(b"foobar") def test_input_types(self): cctx = zstd.ZstdCompressor(level=1) - compressed = cctx.compress(b'foo') + compressed = cctx.compress(b"foo") mutable_array = bytearray(len(compressed)) mutable_array[:] = compressed @@ -114,36 +124,38 @@ dctx = zstd.ZstdDecompressor() for source in sources: - self.assertEqual(dctx.decompress(source), b'foo') + self.assertEqual(dctx.decompress(source), b"foo") def test_no_content_size_in_frame(self): cctx = zstd.ZstdCompressor(write_content_size=False) - compressed = cctx.compress(b'foobar') + compressed = cctx.compress(b"foobar") dctx = zstd.ZstdDecompressor() - with self.assertRaisesRegexp(zstd.ZstdError, 'could not determine content size in frame header'): + with self.assertRaisesRegex( + zstd.ZstdError, "could not determine content size in frame header" + ): dctx.decompress(compressed) def test_content_size_present(self): cctx = zstd.ZstdCompressor() - compressed = cctx.compress(b'foobar') + compressed = cctx.compress(b"foobar") dctx = zstd.ZstdDecompressor() decompressed = dctx.decompress(compressed) - self.assertEqual(decompressed, b'foobar') + self.assertEqual(decompressed, b"foobar") def test_empty_roundtrip(self): cctx = zstd.ZstdCompressor() - compressed = cctx.compress(b'') + compressed = cctx.compress(b"") dctx = zstd.ZstdDecompressor() decompressed = dctx.decompress(compressed) - self.assertEqual(decompressed, b'') + self.assertEqual(decompressed, b"") def test_max_output_size(self): cctx = zstd.ZstdCompressor(write_content_size=False) - source = b'foobar' * 256 + source = b"foobar" * 256 compressed = cctx.compress(source) dctx = zstd.ZstdDecompressor() @@ -152,8 +164,9 @@ self.assertEqual(decompressed, source) # Input size - 1 fails - with self.assertRaisesRegexp(zstd.ZstdError, - 'decompression error: did not decompress full frame'): + with self.assertRaisesRegex( + zstd.ZstdError, "decompression error: did not decompress full frame" + ): dctx.decompress(compressed, max_output_size=len(source) - 1) # Input size + 1 works @@ -166,24 +179,24 @@ def test_stupidly_large_output_buffer(self): cctx = zstd.ZstdCompressor(write_content_size=False) - compressed = cctx.compress(b'foobar' * 256) + compressed = cctx.compress(b"foobar" * 256) dctx = zstd.ZstdDecompressor() # Will get OverflowError on some Python distributions that can't # handle really large integers. with self.assertRaises((MemoryError, OverflowError)): - dctx.decompress(compressed, max_output_size=2**62) + dctx.decompress(compressed, max_output_size=2 ** 62) def test_dictionary(self): samples = [] for i in range(128): - samples.append(b'foo' * 64) - samples.append(b'bar' * 64) - samples.append(b'foobar' * 64) + samples.append(b"foo" * 64) + samples.append(b"bar" * 64) + samples.append(b"foobar" * 64) d = zstd.train_dictionary(8192, samples) - orig = b'foobar' * 16384 + orig = b"foobar" * 16384 cctx = zstd.ZstdCompressor(level=1, dict_data=d) compressed = cctx.compress(orig) @@ -195,13 +208,13 @@ def test_dictionary_multiple(self): samples = [] for i in range(128): - samples.append(b'foo' * 64) - samples.append(b'bar' * 64) - samples.append(b'foobar' * 64) + samples.append(b"foo" * 64) + samples.append(b"bar" * 64) + samples.append(b"foobar" * 64) d = zstd.train_dictionary(8192, samples) - sources = (b'foobar' * 8192, b'foo' * 8192, b'bar' * 8192) + sources = (b"foobar" * 8192, b"foo" * 8192, b"bar" * 8192) compressed = [] cctx = zstd.ZstdCompressor(level=1, dict_data=d) for source in sources: @@ -213,7 +226,7 @@ self.assertEqual(decompressed, sources[i]) def test_max_window_size(self): - with open(__file__, 'rb') as fh: + with open(__file__, "rb") as fh: source = fh.read() # If we write a content size, the decompressor engages single pass @@ -221,15 +234,16 @@ cctx = zstd.ZstdCompressor(write_content_size=False) frame = cctx.compress(source) - dctx = zstd.ZstdDecompressor(max_window_size=2**zstd.WINDOWLOG_MIN) + dctx = zstd.ZstdDecompressor(max_window_size=2 ** zstd.WINDOWLOG_MIN) - with self.assertRaisesRegexp( - zstd.ZstdError, 'decompression error: Frame requires too much memory'): + with self.assertRaisesRegex( + zstd.ZstdError, "decompression error: Frame requires too much memory" + ): dctx.decompress(frame, max_output_size=len(source)) @make_cffi -class TestDecompressor_copy_stream(unittest.TestCase): +class TestDecompressor_copy_stream(TestCase): def test_no_read(self): source = object() dest = io.BytesIO() @@ -256,12 +270,12 @@ self.assertEqual(r, 0) self.assertEqual(w, 0) - self.assertEqual(dest.getvalue(), b'') + self.assertEqual(dest.getvalue(), b"") def test_large_data(self): source = io.BytesIO() for i in range(255): - source.write(struct.Struct('>B').pack(i) * 16384) + source.write(struct.Struct(">B").pack(i) * 16384) source.seek(0) compressed = io.BytesIO() @@ -277,33 +291,32 @@ self.assertEqual(w, len(source.getvalue())) def test_read_write_size(self): - source = OpCountingBytesIO(zstd.ZstdCompressor().compress( - b'foobarfoobar')) + source = OpCountingBytesIO(zstd.ZstdCompressor().compress(b"foobarfoobar")) dest = OpCountingBytesIO() dctx = zstd.ZstdDecompressor() r, w = dctx.copy_stream(source, dest, read_size=1, write_size=1) self.assertEqual(r, len(source.getvalue())) - self.assertEqual(w, len(b'foobarfoobar')) + self.assertEqual(w, len(b"foobarfoobar")) self.assertEqual(source._read_count, len(source.getvalue()) + 1) self.assertEqual(dest._write_count, len(dest.getvalue())) @make_cffi -class TestDecompressor_stream_reader(unittest.TestCase): +class TestDecompressor_stream_reader(TestCase): def test_context_manager(self): dctx = zstd.ZstdDecompressor() - with dctx.stream_reader(b'foo') as reader: - with self.assertRaisesRegexp(ValueError, 'cannot __enter__ multiple times'): + with dctx.stream_reader(b"foo") as reader: + with self.assertRaisesRegex(ValueError, "cannot __enter__ multiple times"): with reader as reader2: pass def test_not_implemented(self): dctx = zstd.ZstdDecompressor() - with dctx.stream_reader(b'foo') as reader: + with dctx.stream_reader(b"foo") as reader: with self.assertRaises(io.UnsupportedOperation): reader.readline() @@ -317,7 +330,7 @@ next(reader) with self.assertRaises(io.UnsupportedOperation): - reader.write(b'foo') + reader.write(b"foo") with self.assertRaises(io.UnsupportedOperation): reader.writelines([]) @@ -325,7 +338,7 @@ def test_constant_methods(self): dctx = zstd.ZstdDecompressor() - with dctx.stream_reader(b'foo') as reader: + with dctx.stream_reader(b"foo") as reader: self.assertFalse(reader.closed) self.assertTrue(reader.readable()) self.assertFalse(reader.writable()) @@ -340,29 +353,31 @@ def test_read_closed(self): dctx = zstd.ZstdDecompressor() - with dctx.stream_reader(b'foo') as reader: + with dctx.stream_reader(b"foo") as reader: reader.close() self.assertTrue(reader.closed) - with self.assertRaisesRegexp(ValueError, 'stream is closed'): + with self.assertRaisesRegex(ValueError, "stream is closed"): reader.read(1) def test_read_sizes(self): cctx = zstd.ZstdCompressor() - foo = cctx.compress(b'foo') + foo = cctx.compress(b"foo") dctx = zstd.ZstdDecompressor() with dctx.stream_reader(foo) as reader: - with self.assertRaisesRegexp(ValueError, 'cannot read negative amounts less than -1'): + with self.assertRaisesRegex( + ValueError, "cannot read negative amounts less than -1" + ): reader.read(-2) - self.assertEqual(reader.read(0), b'') - self.assertEqual(reader.read(), b'foo') + self.assertEqual(reader.read(0), b"") + self.assertEqual(reader.read(), b"foo") def test_read_buffer(self): cctx = zstd.ZstdCompressor() - source = b''.join([b'foo' * 60, b'bar' * 60, b'baz' * 60]) + source = b"".join([b"foo" * 60, b"bar" * 60, b"baz" * 60]) frame = cctx.compress(source) dctx = zstd.ZstdDecompressor() @@ -376,14 +391,14 @@ self.assertEqual(reader.tell(), len(source)) # Read after EOF should return empty bytes. - self.assertEqual(reader.read(1), b'') + self.assertEqual(reader.read(1), b"") self.assertEqual(reader.tell(), len(result)) self.assertTrue(reader.closed) def test_read_buffer_small_chunks(self): cctx = zstd.ZstdCompressor() - source = b''.join([b'foo' * 60, b'bar' * 60, b'baz' * 60]) + source = b"".join([b"foo" * 60, b"bar" * 60, b"baz" * 60]) frame = cctx.compress(source) dctx = zstd.ZstdDecompressor() @@ -398,11 +413,11 @@ chunks.append(chunk) self.assertEqual(reader.tell(), sum(map(len, chunks))) - self.assertEqual(b''.join(chunks), source) + self.assertEqual(b"".join(chunks), source) def test_read_stream(self): cctx = zstd.ZstdCompressor() - source = b''.join([b'foo' * 60, b'bar' * 60, b'baz' * 60]) + source = b"".join([b"foo" * 60, b"bar" * 60, b"baz" * 60]) frame = cctx.compress(source) dctx = zstd.ZstdDecompressor() @@ -412,7 +427,7 @@ chunk = reader.read(8192) self.assertEqual(chunk, source) self.assertEqual(reader.tell(), len(source)) - self.assertEqual(reader.read(1), b'') + self.assertEqual(reader.read(1), b"") self.assertEqual(reader.tell(), len(source)) self.assertFalse(reader.closed) @@ -420,7 +435,7 @@ def test_read_stream_small_chunks(self): cctx = zstd.ZstdCompressor() - source = b''.join([b'foo' * 60, b'bar' * 60, b'baz' * 60]) + source = b"".join([b"foo" * 60, b"bar" * 60, b"baz" * 60]) frame = cctx.compress(source) dctx = zstd.ZstdDecompressor() @@ -435,11 +450,11 @@ chunks.append(chunk) self.assertEqual(reader.tell(), sum(map(len, chunks))) - self.assertEqual(b''.join(chunks), source) + self.assertEqual(b"".join(chunks), source) def test_read_after_exit(self): cctx = zstd.ZstdCompressor() - frame = cctx.compress(b'foo' * 60) + frame = cctx.compress(b"foo" * 60) dctx = zstd.ZstdDecompressor() @@ -449,45 +464,46 @@ self.assertTrue(reader.closed) - with self.assertRaisesRegexp(ValueError, 'stream is closed'): + with self.assertRaisesRegex(ValueError, "stream is closed"): reader.read(10) def test_illegal_seeks(self): cctx = zstd.ZstdCompressor() - frame = cctx.compress(b'foo' * 60) + frame = cctx.compress(b"foo" * 60) dctx = zstd.ZstdDecompressor() with dctx.stream_reader(frame) as reader: - with self.assertRaisesRegexp(ValueError, - 'cannot seek to negative position'): + with self.assertRaisesRegex(ValueError, "cannot seek to negative position"): reader.seek(-1, os.SEEK_SET) reader.read(1) - with self.assertRaisesRegexp( - ValueError, 'cannot seek zstd decompression stream backwards'): + with self.assertRaisesRegex( + ValueError, "cannot seek zstd decompression stream backwards" + ): reader.seek(0, os.SEEK_SET) - with self.assertRaisesRegexp( - ValueError, 'cannot seek zstd decompression stream backwards'): + with self.assertRaisesRegex( + ValueError, "cannot seek zstd decompression stream backwards" + ): reader.seek(-1, os.SEEK_CUR) - with self.assertRaisesRegexp( - ValueError, - 'zstd decompression streams cannot be seeked with SEEK_END'): + with self.assertRaisesRegex( + ValueError, "zstd decompression streams cannot be seeked with SEEK_END" + ): reader.seek(0, os.SEEK_END) reader.close() - with self.assertRaisesRegexp(ValueError, 'stream is closed'): + with self.assertRaisesRegex(ValueError, "stream is closed"): reader.seek(4, os.SEEK_SET) - with self.assertRaisesRegexp(ValueError, 'stream is closed'): + with self.assertRaisesRegex(ValueError, "stream is closed"): reader.seek(0) def test_seek(self): - source = b'foobar' * 60 + source = b"foobar" * 60 cctx = zstd.ZstdCompressor() frame = cctx.compress(source) @@ -495,32 +511,32 @@ with dctx.stream_reader(frame) as reader: reader.seek(3) - self.assertEqual(reader.read(3), b'bar') + self.assertEqual(reader.read(3), b"bar") reader.seek(4, os.SEEK_CUR) - self.assertEqual(reader.read(2), b'ar') + self.assertEqual(reader.read(2), b"ar") def test_no_context_manager(self): - source = b'foobar' * 60 + source = b"foobar" * 60 cctx = zstd.ZstdCompressor() frame = cctx.compress(source) dctx = zstd.ZstdDecompressor() reader = dctx.stream_reader(frame) - self.assertEqual(reader.read(6), b'foobar') - self.assertEqual(reader.read(18), b'foobar' * 3) + self.assertEqual(reader.read(6), b"foobar") + self.assertEqual(reader.read(18), b"foobar" * 3) self.assertFalse(reader.closed) # Calling close prevents subsequent use. reader.close() self.assertTrue(reader.closed) - with self.assertRaisesRegexp(ValueError, 'stream is closed'): + with self.assertRaisesRegex(ValueError, "stream is closed"): reader.read(6) def test_read_after_error(self): - source = io.BytesIO(b'') + source = io.BytesIO(b"") dctx = zstd.ZstdDecompressor() reader = dctx.stream_reader(source) @@ -529,7 +545,7 @@ reader.read(0) with reader: - with self.assertRaisesRegexp(ValueError, 'stream is closed'): + with self.assertRaisesRegex(ValueError, "stream is closed"): reader.read(100) def test_partial_read(self): @@ -553,87 +569,87 @@ cctx = zstd.ZstdCompressor() source = io.BytesIO() writer = cctx.stream_writer(source) - writer.write(b'foo') + writer.write(b"foo") writer.flush(zstd.FLUSH_FRAME) - writer.write(b'bar') + writer.write(b"bar") writer.flush(zstd.FLUSH_FRAME) dctx = zstd.ZstdDecompressor() reader = dctx.stream_reader(source.getvalue()) - self.assertEqual(reader.read(2), b'fo') - self.assertEqual(reader.read(2), b'o') - self.assertEqual(reader.read(2), b'ba') - self.assertEqual(reader.read(2), b'r') + self.assertEqual(reader.read(2), b"fo") + self.assertEqual(reader.read(2), b"o") + self.assertEqual(reader.read(2), b"ba") + self.assertEqual(reader.read(2), b"r") source.seek(0) reader = dctx.stream_reader(source) - self.assertEqual(reader.read(2), b'fo') - self.assertEqual(reader.read(2), b'o') - self.assertEqual(reader.read(2), b'ba') - self.assertEqual(reader.read(2), b'r') + self.assertEqual(reader.read(2), b"fo") + self.assertEqual(reader.read(2), b"o") + self.assertEqual(reader.read(2), b"ba") + self.assertEqual(reader.read(2), b"r") reader = dctx.stream_reader(source.getvalue()) - self.assertEqual(reader.read(3), b'foo') - self.assertEqual(reader.read(3), b'bar') + self.assertEqual(reader.read(3), b"foo") + self.assertEqual(reader.read(3), b"bar") source.seek(0) reader = dctx.stream_reader(source) - self.assertEqual(reader.read(3), b'foo') - self.assertEqual(reader.read(3), b'bar') + self.assertEqual(reader.read(3), b"foo") + self.assertEqual(reader.read(3), b"bar") reader = dctx.stream_reader(source.getvalue()) - self.assertEqual(reader.read(4), b'foo') - self.assertEqual(reader.read(4), b'bar') + self.assertEqual(reader.read(4), b"foo") + self.assertEqual(reader.read(4), b"bar") source.seek(0) reader = dctx.stream_reader(source) - self.assertEqual(reader.read(4), b'foo') - self.assertEqual(reader.read(4), b'bar') + self.assertEqual(reader.read(4), b"foo") + self.assertEqual(reader.read(4), b"bar") reader = dctx.stream_reader(source.getvalue()) - self.assertEqual(reader.read(128), b'foo') - self.assertEqual(reader.read(128), b'bar') + self.assertEqual(reader.read(128), b"foo") + self.assertEqual(reader.read(128), b"bar") source.seek(0) reader = dctx.stream_reader(source) - self.assertEqual(reader.read(128), b'foo') - self.assertEqual(reader.read(128), b'bar') + self.assertEqual(reader.read(128), b"foo") + self.assertEqual(reader.read(128), b"bar") # Now tests for reads spanning frames. reader = dctx.stream_reader(source.getvalue(), read_across_frames=True) - self.assertEqual(reader.read(3), b'foo') - self.assertEqual(reader.read(3), b'bar') + self.assertEqual(reader.read(3), b"foo") + self.assertEqual(reader.read(3), b"bar") source.seek(0) reader = dctx.stream_reader(source, read_across_frames=True) - self.assertEqual(reader.read(3), b'foo') - self.assertEqual(reader.read(3), b'bar') + self.assertEqual(reader.read(3), b"foo") + self.assertEqual(reader.read(3), b"bar") reader = dctx.stream_reader(source.getvalue(), read_across_frames=True) - self.assertEqual(reader.read(6), b'foobar') + self.assertEqual(reader.read(6), b"foobar") source.seek(0) reader = dctx.stream_reader(source, read_across_frames=True) - self.assertEqual(reader.read(6), b'foobar') + self.assertEqual(reader.read(6), b"foobar") reader = dctx.stream_reader(source.getvalue(), read_across_frames=True) - self.assertEqual(reader.read(7), b'foobar') + self.assertEqual(reader.read(7), b"foobar") source.seek(0) reader = dctx.stream_reader(source, read_across_frames=True) - self.assertEqual(reader.read(7), b'foobar') + self.assertEqual(reader.read(7), b"foobar") reader = dctx.stream_reader(source.getvalue(), read_across_frames=True) - self.assertEqual(reader.read(128), b'foobar') + self.assertEqual(reader.read(128), b"foobar") source.seek(0) reader = dctx.stream_reader(source, read_across_frames=True) - self.assertEqual(reader.read(128), b'foobar') + self.assertEqual(reader.read(128), b"foobar") def test_readinto(self): cctx = zstd.ZstdCompressor() - foo = cctx.compress(b'foo') + foo = cctx.compress(b"foo") dctx = zstd.ZstdDecompressor() @@ -641,116 +657,116 @@ # The exact exception varies based on the backend. reader = dctx.stream_reader(foo) with self.assertRaises(Exception): - reader.readinto(b'foobar') + reader.readinto(b"foobar") # readinto() with sufficiently large destination. b = bytearray(1024) reader = dctx.stream_reader(foo) self.assertEqual(reader.readinto(b), 3) - self.assertEqual(b[0:3], b'foo') + self.assertEqual(b[0:3], b"foo") self.assertEqual(reader.readinto(b), 0) - self.assertEqual(b[0:3], b'foo') + self.assertEqual(b[0:3], b"foo") # readinto() with small reads. b = bytearray(1024) reader = dctx.stream_reader(foo, read_size=1) self.assertEqual(reader.readinto(b), 3) - self.assertEqual(b[0:3], b'foo') + self.assertEqual(b[0:3], b"foo") # Too small destination buffer. b = bytearray(2) reader = dctx.stream_reader(foo) self.assertEqual(reader.readinto(b), 2) - self.assertEqual(b[:], b'fo') + self.assertEqual(b[:], b"fo") def test_readinto1(self): cctx = zstd.ZstdCompressor() - foo = cctx.compress(b'foo') + foo = cctx.compress(b"foo") dctx = zstd.ZstdDecompressor() reader = dctx.stream_reader(foo) with self.assertRaises(Exception): - reader.readinto1(b'foobar') + reader.readinto1(b"foobar") # Sufficiently large destination. b = bytearray(1024) reader = dctx.stream_reader(foo) self.assertEqual(reader.readinto1(b), 3) - self.assertEqual(b[0:3], b'foo') + self.assertEqual(b[0:3], b"foo") self.assertEqual(reader.readinto1(b), 0) - self.assertEqual(b[0:3], b'foo') + self.assertEqual(b[0:3], b"foo") # readinto() with small reads. b = bytearray(1024) reader = dctx.stream_reader(foo, read_size=1) self.assertEqual(reader.readinto1(b), 3) - self.assertEqual(b[0:3], b'foo') + self.assertEqual(b[0:3], b"foo") # Too small destination buffer. b = bytearray(2) reader = dctx.stream_reader(foo) self.assertEqual(reader.readinto1(b), 2) - self.assertEqual(b[:], b'fo') + self.assertEqual(b[:], b"fo") def test_readall(self): cctx = zstd.ZstdCompressor() - foo = cctx.compress(b'foo') + foo = cctx.compress(b"foo") dctx = zstd.ZstdDecompressor() reader = dctx.stream_reader(foo) - self.assertEqual(reader.readall(), b'foo') + self.assertEqual(reader.readall(), b"foo") def test_read1(self): cctx = zstd.ZstdCompressor() - foo = cctx.compress(b'foo') + foo = cctx.compress(b"foo") dctx = zstd.ZstdDecompressor() b = OpCountingBytesIO(foo) reader = dctx.stream_reader(b) - self.assertEqual(reader.read1(), b'foo') + self.assertEqual(reader.read1(), b"foo") self.assertEqual(b._read_count, 1) b = OpCountingBytesIO(foo) reader = dctx.stream_reader(b) - self.assertEqual(reader.read1(0), b'') - self.assertEqual(reader.read1(2), b'fo') + self.assertEqual(reader.read1(0), b"") + self.assertEqual(reader.read1(2), b"fo") self.assertEqual(b._read_count, 1) - self.assertEqual(reader.read1(1), b'o') + self.assertEqual(reader.read1(1), b"o") self.assertEqual(b._read_count, 1) - self.assertEqual(reader.read1(1), b'') + self.assertEqual(reader.read1(1), b"") self.assertEqual(b._read_count, 2) def test_read_lines(self): cctx = zstd.ZstdCompressor() - source = b'\n'.join(('line %d' % i).encode('ascii') for i in range(1024)) + source = b"\n".join(("line %d" % i).encode("ascii") for i in range(1024)) frame = cctx.compress(source) dctx = zstd.ZstdDecompressor() reader = dctx.stream_reader(frame) - tr = io.TextIOWrapper(reader, encoding='utf-8') + tr = io.TextIOWrapper(reader, encoding="utf-8") lines = [] for line in tr: - lines.append(line.encode('utf-8')) + lines.append(line.encode("utf-8")) self.assertEqual(len(lines), 1024) - self.assertEqual(b''.join(lines), source) + self.assertEqual(b"".join(lines), source) reader = dctx.stream_reader(frame) - tr = io.TextIOWrapper(reader, encoding='utf-8') + tr = io.TextIOWrapper(reader, encoding="utf-8") lines = tr.readlines() self.assertEqual(len(lines), 1024) - self.assertEqual(''.join(lines).encode('utf-8'), source) + self.assertEqual("".join(lines).encode("utf-8"), source) reader = dctx.stream_reader(frame) - tr = io.TextIOWrapper(reader, encoding='utf-8') + tr = io.TextIOWrapper(reader, encoding="utf-8") lines = [] while True: @@ -758,26 +774,26 @@ if not line: break - lines.append(line.encode('utf-8')) + lines.append(line.encode("utf-8")) self.assertEqual(len(lines), 1024) - self.assertEqual(b''.join(lines), source) + self.assertEqual(b"".join(lines), source) @make_cffi -class TestDecompressor_decompressobj(unittest.TestCase): +class TestDecompressor_decompressobj(TestCase): def test_simple(self): - data = zstd.ZstdCompressor(level=1).compress(b'foobar') + data = zstd.ZstdCompressor(level=1).compress(b"foobar") dctx = zstd.ZstdDecompressor() dobj = dctx.decompressobj() - self.assertEqual(dobj.decompress(data), b'foobar') + self.assertEqual(dobj.decompress(data), b"foobar") self.assertIsNone(dobj.flush()) self.assertIsNone(dobj.flush(10)) self.assertIsNone(dobj.flush(length=100)) def test_input_types(self): - compressed = zstd.ZstdCompressor(level=1).compress(b'foo') + compressed = zstd.ZstdCompressor(level=1).compress(b"foo") dctx = zstd.ZstdDecompressor() @@ -795,28 +811,28 @@ self.assertIsNone(dobj.flush()) self.assertIsNone(dobj.flush(10)) self.assertIsNone(dobj.flush(length=100)) - self.assertEqual(dobj.decompress(source), b'foo') + self.assertEqual(dobj.decompress(source), b"foo") self.assertIsNone(dobj.flush()) def test_reuse(self): - data = zstd.ZstdCompressor(level=1).compress(b'foobar') + data = zstd.ZstdCompressor(level=1).compress(b"foobar") dctx = zstd.ZstdDecompressor() dobj = dctx.decompressobj() dobj.decompress(data) - with self.assertRaisesRegexp(zstd.ZstdError, 'cannot use a decompressobj'): + with self.assertRaisesRegex(zstd.ZstdError, "cannot use a decompressobj"): dobj.decompress(data) self.assertIsNone(dobj.flush()) def test_bad_write_size(self): dctx = zstd.ZstdDecompressor() - with self.assertRaisesRegexp(ValueError, 'write_size must be positive'): + with self.assertRaisesRegex(ValueError, "write_size must be positive"): dctx.decompressobj(write_size=0) def test_write_size(self): - source = b'foo' * 64 + b'bar' * 128 + source = b"foo" * 64 + b"bar" * 128 data = zstd.ZstdCompressor(level=1).compress(source) dctx = zstd.ZstdDecompressor() @@ -836,7 +852,7 @@ @make_cffi -class TestDecompressor_stream_writer(unittest.TestCase): +class TestDecompressor_stream_writer(TestCase): def test_io_api(self): buffer = io.BytesIO() dctx = zstd.ZstdDecompressor() @@ -908,14 +924,14 @@ writer.fileno() def test_fileno_file(self): - with tempfile.TemporaryFile('wb') as tf: + with tempfile.TemporaryFile("wb") as tf: dctx = zstd.ZstdDecompressor() writer = dctx.stream_writer(tf) self.assertEqual(writer.fileno(), tf.fileno()) def test_close(self): - foo = zstd.ZstdCompressor().compress(b'foo') + foo = zstd.ZstdCompressor().compress(b"foo") buffer = NonClosingBytesIO() dctx = zstd.ZstdDecompressor() @@ -928,17 +944,17 @@ self.assertTrue(writer.closed) self.assertTrue(buffer.closed) - with self.assertRaisesRegexp(ValueError, 'stream is closed'): - writer.write(b'') + with self.assertRaisesRegex(ValueError, "stream is closed"): + writer.write(b"") - with self.assertRaisesRegexp(ValueError, 'stream is closed'): + with self.assertRaisesRegex(ValueError, "stream is closed"): writer.flush() - with self.assertRaisesRegexp(ValueError, 'stream is closed'): + with self.assertRaisesRegex(ValueError, "stream is closed"): with writer: pass - self.assertEqual(buffer.getvalue(), b'foo') + self.assertEqual(buffer.getvalue(), b"foo") # Context manager exit should close stream. buffer = NonClosingBytesIO() @@ -948,7 +964,7 @@ writer.write(foo) self.assertTrue(writer.closed) - self.assertEqual(buffer.getvalue(), b'foo') + self.assertEqual(buffer.getvalue(), b"foo") def test_flush(self): buffer = OpCountingBytesIO() @@ -962,12 +978,12 @@ def test_empty_roundtrip(self): cctx = zstd.ZstdCompressor() - empty = cctx.compress(b'') - self.assertEqual(decompress_via_writer(empty), b'') + empty = cctx.compress(b"") + self.assertEqual(decompress_via_writer(empty), b"") def test_input_types(self): cctx = zstd.ZstdCompressor(level=1) - compressed = cctx.compress(b'foo') + compressed = cctx.compress(b"foo") mutable_array = bytearray(len(compressed)) mutable_array[:] = compressed @@ -984,25 +1000,25 @@ decompressor = dctx.stream_writer(buffer) decompressor.write(source) - self.assertEqual(buffer.getvalue(), b'foo') + self.assertEqual(buffer.getvalue(), b"foo") buffer = NonClosingBytesIO() with dctx.stream_writer(buffer) as decompressor: self.assertEqual(decompressor.write(source), 3) - self.assertEqual(buffer.getvalue(), b'foo') + self.assertEqual(buffer.getvalue(), b"foo") buffer = io.BytesIO() writer = dctx.stream_writer(buffer, write_return_read=True) self.assertEqual(writer.write(source), len(source)) - self.assertEqual(buffer.getvalue(), b'foo') + self.assertEqual(buffer.getvalue(), b"foo") def test_large_roundtrip(self): chunks = [] for i in range(255): - chunks.append(struct.Struct('>B').pack(i) * 16384) - orig = b''.join(chunks) + chunks.append(struct.Struct(">B").pack(i) * 16384) + orig = b"".join(chunks) cctx = zstd.ZstdCompressor() compressed = cctx.compress(orig) @@ -1012,9 +1028,9 @@ chunks = [] for i in range(255): for j in range(255): - chunks.append(struct.Struct('>B').pack(j) * i) + chunks.append(struct.Struct(">B").pack(j) * i) - orig = b''.join(chunks) + orig = b"".join(chunks) cctx = zstd.ZstdCompressor() compressed = cctx.compress(orig) @@ -1042,13 +1058,13 @@ def test_dictionary(self): samples = [] for i in range(128): - samples.append(b'foo' * 64) - samples.append(b'bar' * 64) - samples.append(b'foobar' * 64) + samples.append(b"foo" * 64) + samples.append(b"bar" * 64) + samples.append(b"foobar" * 64) d = zstd.train_dictionary(8192, samples) - orig = b'foobar' * 16384 + orig = b"foobar" * 16384 buffer = NonClosingBytesIO() cctx = zstd.ZstdCompressor(dict_data=d) with cctx.stream_writer(buffer) as compressor: @@ -1083,22 +1099,22 @@ self.assertGreater(size, 100000) def test_write_size(self): - source = zstd.ZstdCompressor().compress(b'foobarfoobar') + source = zstd.ZstdCompressor().compress(b"foobarfoobar") dest = OpCountingBytesIO() dctx = zstd.ZstdDecompressor() with dctx.stream_writer(dest, write_size=1) as decompressor: - s = struct.Struct('>B') + s = struct.Struct(">B") for c in source: if not isinstance(c, str): c = s.pack(c) decompressor.write(c) - self.assertEqual(dest.getvalue(), b'foobarfoobar') + self.assertEqual(dest.getvalue(), b"foobarfoobar") self.assertEqual(dest._write_count, len(dest.getvalue())) @make_cffi -class TestDecompressor_read_to_iter(unittest.TestCase): +class TestDecompressor_read_to_iter(TestCase): def test_type_validation(self): dctx = zstd.ZstdDecompressor() @@ -1106,10 +1122,10 @@ dctx.read_to_iter(io.BytesIO()) # Buffer protocol works. - dctx.read_to_iter(b'foobar') + dctx.read_to_iter(b"foobar") - with self.assertRaisesRegexp(ValueError, 'must pass an object with a read'): - b''.join(dctx.read_to_iter(True)) + with self.assertRaisesRegex(ValueError, "must pass an object with a read"): + b"".join(dctx.read_to_iter(True)) def test_empty_input(self): dctx = zstd.ZstdDecompressor() @@ -1120,25 +1136,25 @@ with self.assertRaises(StopIteration): next(it) - it = dctx.read_to_iter(b'') + it = dctx.read_to_iter(b"") with self.assertRaises(StopIteration): next(it) def test_invalid_input(self): dctx = zstd.ZstdDecompressor() - source = io.BytesIO(b'foobar') + source = io.BytesIO(b"foobar") it = dctx.read_to_iter(source) - with self.assertRaisesRegexp(zstd.ZstdError, 'Unknown frame descriptor'): + with self.assertRaisesRegex(zstd.ZstdError, "Unknown frame descriptor"): next(it) - it = dctx.read_to_iter(b'foobar') - with self.assertRaisesRegexp(zstd.ZstdError, 'Unknown frame descriptor'): + it = dctx.read_to_iter(b"foobar") + with self.assertRaisesRegex(zstd.ZstdError, "Unknown frame descriptor"): next(it) def test_empty_roundtrip(self): cctx = zstd.ZstdCompressor(level=1, write_content_size=False) - empty = cctx.compress(b'') + empty = cctx.compress(b"") source = io.BytesIO(empty) source.seek(0) @@ -1157,24 +1173,28 @@ def test_skip_bytes_too_large(self): dctx = zstd.ZstdDecompressor() - with self.assertRaisesRegexp(ValueError, 'skip_bytes must be smaller than read_size'): - b''.join(dctx.read_to_iter(b'', skip_bytes=1, read_size=1)) + with self.assertRaisesRegex( + ValueError, "skip_bytes must be smaller than read_size" + ): + b"".join(dctx.read_to_iter(b"", skip_bytes=1, read_size=1)) - with self.assertRaisesRegexp(ValueError, 'skip_bytes larger than first input chunk'): - b''.join(dctx.read_to_iter(b'foobar', skip_bytes=10)) + with self.assertRaisesRegex( + ValueError, "skip_bytes larger than first input chunk" + ): + b"".join(dctx.read_to_iter(b"foobar", skip_bytes=10)) def test_skip_bytes(self): cctx = zstd.ZstdCompressor(write_content_size=False) - compressed = cctx.compress(b'foobar') + compressed = cctx.compress(b"foobar") dctx = zstd.ZstdDecompressor() - output = b''.join(dctx.read_to_iter(b'hdr' + compressed, skip_bytes=3)) - self.assertEqual(output, b'foobar') + output = b"".join(dctx.read_to_iter(b"hdr" + compressed, skip_bytes=3)) + self.assertEqual(output, b"foobar") def test_large_output(self): source = io.BytesIO() - source.write(b'f' * zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE) - source.write(b'o') + source.write(b"f" * zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE) + source.write(b"o") source.seek(0) cctx = zstd.ZstdCompressor(level=1) @@ -1191,7 +1211,7 @@ with self.assertRaises(StopIteration): next(it) - decompressed = b''.join(chunks) + decompressed = b"".join(chunks) self.assertEqual(decompressed, source.getvalue()) # And again with buffer protocol. @@ -1203,12 +1223,12 @@ with self.assertRaises(StopIteration): next(it) - decompressed = b''.join(chunks) + decompressed = b"".join(chunks) self.assertEqual(decompressed, source.getvalue()) - @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') + @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set") def test_large_input(self): - bytes = list(struct.Struct('>B').pack(i) for i in range(256)) + bytes = list(struct.Struct(">B").pack(i) for i in range(256)) compressed = NonClosingBytesIO() input_size = 0 cctx = zstd.ZstdCompressor(level=1) @@ -1217,14 +1237,18 @@ compressor.write(random.choice(bytes)) input_size += 1 - have_compressed = len(compressed.getvalue()) > zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE + have_compressed = ( + len(compressed.getvalue()) + > zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE + ) have_raw = input_size > zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE * 2 if have_compressed and have_raw: break compressed = io.BytesIO(compressed.getvalue()) - self.assertGreater(len(compressed.getvalue()), - zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE) + self.assertGreater( + len(compressed.getvalue()), zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE + ) dctx = zstd.ZstdDecompressor() it = dctx.read_to_iter(compressed) @@ -1237,7 +1261,7 @@ with self.assertRaises(StopIteration): next(it) - decompressed = b''.join(chunks) + decompressed = b"".join(chunks) self.assertEqual(len(decompressed), input_size) # And again with buffer protocol. @@ -1251,7 +1275,7 @@ with self.assertRaises(StopIteration): next(it) - decompressed = b''.join(chunks) + decompressed = b"".join(chunks) self.assertEqual(len(decompressed), input_size) def test_interesting(self): @@ -1263,22 +1287,23 @@ compressed = NonClosingBytesIO() with cctx.stream_writer(compressed) as compressor: for i in range(256): - chunk = b'\0' * 1024 + chunk = b"\0" * 1024 compressor.write(chunk) source.write(chunk) dctx = zstd.ZstdDecompressor() - simple = dctx.decompress(compressed.getvalue(), - max_output_size=len(source.getvalue())) + simple = dctx.decompress( + compressed.getvalue(), max_output_size=len(source.getvalue()) + ) self.assertEqual(simple, source.getvalue()) compressed = io.BytesIO(compressed.getvalue()) - streamed = b''.join(dctx.read_to_iter(compressed)) + streamed = b"".join(dctx.read_to_iter(compressed)) self.assertEqual(streamed, source.getvalue()) def test_read_write_size(self): - source = OpCountingBytesIO(zstd.ZstdCompressor().compress(b'foobarfoobar')) + source = OpCountingBytesIO(zstd.ZstdCompressor().compress(b"foobarfoobar")) dctx = zstd.ZstdDecompressor() for chunk in dctx.read_to_iter(source, read_size=1, write_size=1): self.assertEqual(len(chunk), 1) @@ -1287,97 +1312,110 @@ def test_magic_less(self): params = zstd.CompressionParameters.from_level( - 1, format=zstd.FORMAT_ZSTD1_MAGICLESS) + 1, format=zstd.FORMAT_ZSTD1_MAGICLESS + ) cctx = zstd.ZstdCompressor(compression_params=params) - frame = cctx.compress(b'foobar') + frame = cctx.compress(b"foobar") - self.assertNotEqual(frame[0:4], b'\x28\xb5\x2f\xfd') + self.assertNotEqual(frame[0:4], b"\x28\xb5\x2f\xfd") dctx = zstd.ZstdDecompressor() - with self.assertRaisesRegexp( - zstd.ZstdError, 'error determining content size from frame header'): + with self.assertRaisesRegex( + zstd.ZstdError, "error determining content size from frame header" + ): dctx.decompress(frame) dctx = zstd.ZstdDecompressor(format=zstd.FORMAT_ZSTD1_MAGICLESS) - res = b''.join(dctx.read_to_iter(frame)) - self.assertEqual(res, b'foobar') + res = b"".join(dctx.read_to_iter(frame)) + self.assertEqual(res, b"foobar") @make_cffi -class TestDecompressor_content_dict_chain(unittest.TestCase): +class TestDecompressor_content_dict_chain(TestCase): def test_bad_inputs_simple(self): dctx = zstd.ZstdDecompressor() with self.assertRaises(TypeError): - dctx.decompress_content_dict_chain(b'foo') + dctx.decompress_content_dict_chain(b"foo") with self.assertRaises(TypeError): - dctx.decompress_content_dict_chain((b'foo', b'bar')) + dctx.decompress_content_dict_chain((b"foo", b"bar")) - with self.assertRaisesRegexp(ValueError, 'empty input chain'): + with self.assertRaisesRegex(ValueError, "empty input chain"): dctx.decompress_content_dict_chain([]) - with self.assertRaisesRegexp(ValueError, 'chunk 0 must be bytes'): - dctx.decompress_content_dict_chain([u'foo']) + with self.assertRaisesRegex(ValueError, "chunk 0 must be bytes"): + dctx.decompress_content_dict_chain([u"foo"]) - with self.assertRaisesRegexp(ValueError, 'chunk 0 must be bytes'): + with self.assertRaisesRegex(ValueError, "chunk 0 must be bytes"): dctx.decompress_content_dict_chain([True]) - with self.assertRaisesRegexp(ValueError, 'chunk 0 is too small to contain a zstd frame'): + with self.assertRaisesRegex( + ValueError, "chunk 0 is too small to contain a zstd frame" + ): dctx.decompress_content_dict_chain([zstd.FRAME_HEADER]) - with self.assertRaisesRegexp(ValueError, 'chunk 0 is not a valid zstd frame'): - dctx.decompress_content_dict_chain([b'foo' * 8]) + with self.assertRaisesRegex(ValueError, "chunk 0 is not a valid zstd frame"): + dctx.decompress_content_dict_chain([b"foo" * 8]) - no_size = zstd.ZstdCompressor(write_content_size=False).compress(b'foo' * 64) + no_size = zstd.ZstdCompressor(write_content_size=False).compress(b"foo" * 64) - with self.assertRaisesRegexp(ValueError, 'chunk 0 missing content size in frame'): + with self.assertRaisesRegex( + ValueError, "chunk 0 missing content size in frame" + ): dctx.decompress_content_dict_chain([no_size]) # Corrupt first frame. - frame = zstd.ZstdCompressor().compress(b'foo' * 64) + frame = zstd.ZstdCompressor().compress(b"foo" * 64) frame = frame[0:12] + frame[15:] - with self.assertRaisesRegexp(zstd.ZstdError, - 'chunk 0 did not decompress full frame'): + with self.assertRaisesRegex( + zstd.ZstdError, "chunk 0 did not decompress full frame" + ): dctx.decompress_content_dict_chain([frame]) def test_bad_subsequent_input(self): - initial = zstd.ZstdCompressor().compress(b'foo' * 64) + initial = zstd.ZstdCompressor().compress(b"foo" * 64) dctx = zstd.ZstdDecompressor() - with self.assertRaisesRegexp(ValueError, 'chunk 1 must be bytes'): - dctx.decompress_content_dict_chain([initial, u'foo']) + with self.assertRaisesRegex(ValueError, "chunk 1 must be bytes"): + dctx.decompress_content_dict_chain([initial, u"foo"]) - with self.assertRaisesRegexp(ValueError, 'chunk 1 must be bytes'): + with self.assertRaisesRegex(ValueError, "chunk 1 must be bytes"): dctx.decompress_content_dict_chain([initial, None]) - with self.assertRaisesRegexp(ValueError, 'chunk 1 is too small to contain a zstd frame'): + with self.assertRaisesRegex( + ValueError, "chunk 1 is too small to contain a zstd frame" + ): dctx.decompress_content_dict_chain([initial, zstd.FRAME_HEADER]) - with self.assertRaisesRegexp(ValueError, 'chunk 1 is not a valid zstd frame'): - dctx.decompress_content_dict_chain([initial, b'foo' * 8]) + with self.assertRaisesRegex(ValueError, "chunk 1 is not a valid zstd frame"): + dctx.decompress_content_dict_chain([initial, b"foo" * 8]) - no_size = zstd.ZstdCompressor(write_content_size=False).compress(b'foo' * 64) + no_size = zstd.ZstdCompressor(write_content_size=False).compress(b"foo" * 64) - with self.assertRaisesRegexp(ValueError, 'chunk 1 missing content size in frame'): + with self.assertRaisesRegex( + ValueError, "chunk 1 missing content size in frame" + ): dctx.decompress_content_dict_chain([initial, no_size]) # Corrupt second frame. - cctx = zstd.ZstdCompressor(dict_data=zstd.ZstdCompressionDict(b'foo' * 64)) - frame = cctx.compress(b'bar' * 64) + cctx = zstd.ZstdCompressor(dict_data=zstd.ZstdCompressionDict(b"foo" * 64)) + frame = cctx.compress(b"bar" * 64) frame = frame[0:12] + frame[15:] - with self.assertRaisesRegexp(zstd.ZstdError, 'chunk 1 did not decompress full frame'): + with self.assertRaisesRegex( + zstd.ZstdError, "chunk 1 did not decompress full frame" + ): dctx.decompress_content_dict_chain([initial, frame]) def test_simple(self): original = [ - b'foo' * 64, - b'foobar' * 64, - b'baz' * 64, - b'foobaz' * 64, - b'foobarbaz' * 64, + b"foo" * 64, + b"foobar" * 64, + b"baz" * 64, + b"foobaz" * 64, + b"foobarbaz" * 64, ] chunks = [] @@ -1396,12 +1434,12 @@ # TODO enable for CFFI -class TestDecompressor_multi_decompress_to_buffer(unittest.TestCase): +class TestDecompressor_multi_decompress_to_buffer(TestCase): def test_invalid_inputs(self): dctx = zstd.ZstdDecompressor() - if not hasattr(dctx, 'multi_decompress_to_buffer'): - self.skipTest('multi_decompress_to_buffer not available') + if not hasattr(dctx, "multi_decompress_to_buffer"): + self.skipTest("multi_decompress_to_buffer not available") with self.assertRaises(TypeError): dctx.multi_decompress_to_buffer(True) @@ -1409,22 +1447,24 @@ with self.assertRaises(TypeError): dctx.multi_decompress_to_buffer((1, 2)) - with self.assertRaisesRegexp(TypeError, 'item 0 not a bytes like object'): - dctx.multi_decompress_to_buffer([u'foo']) + with self.assertRaisesRegex(TypeError, "item 0 not a bytes like object"): + dctx.multi_decompress_to_buffer([u"foo"]) - with self.assertRaisesRegexp(ValueError, 'could not determine decompressed size of item 0'): - dctx.multi_decompress_to_buffer([b'foobarbaz']) + with self.assertRaisesRegex( + ValueError, "could not determine decompressed size of item 0" + ): + dctx.multi_decompress_to_buffer([b"foobarbaz"]) def test_list_input(self): cctx = zstd.ZstdCompressor() - original = [b'foo' * 4, b'bar' * 6] + original = [b"foo" * 4, b"bar" * 6] frames = [cctx.compress(d) for d in original] dctx = zstd.ZstdDecompressor() - if not hasattr(dctx, 'multi_decompress_to_buffer'): - self.skipTest('multi_decompress_to_buffer not available') + if not hasattr(dctx, "multi_decompress_to_buffer"): + self.skipTest("multi_decompress_to_buffer not available") result = dctx.multi_decompress_to_buffer(frames) @@ -1442,14 +1482,14 @@ def test_list_input_frame_sizes(self): cctx = zstd.ZstdCompressor() - original = [b'foo' * 4, b'bar' * 6, b'baz' * 8] + original = [b"foo" * 4, b"bar" * 6, b"baz" * 8] frames = [cctx.compress(d) for d in original] - sizes = struct.pack('=' + 'Q' * len(original), *map(len, original)) + sizes = struct.pack("=" + "Q" * len(original), *map(len, original)) dctx = zstd.ZstdDecompressor() - if not hasattr(dctx, 'multi_decompress_to_buffer'): - self.skipTest('multi_decompress_to_buffer not available') + if not hasattr(dctx, "multi_decompress_to_buffer"): + self.skipTest("multi_decompress_to_buffer not available") result = dctx.multi_decompress_to_buffer(frames, decompressed_sizes=sizes) @@ -1462,16 +1502,18 @@ def test_buffer_with_segments_input(self): cctx = zstd.ZstdCompressor() - original = [b'foo' * 4, b'bar' * 6] + original = [b"foo" * 4, b"bar" * 6] frames = [cctx.compress(d) for d in original] dctx = zstd.ZstdDecompressor() - if not hasattr(dctx, 'multi_decompress_to_buffer'): - self.skipTest('multi_decompress_to_buffer not available') + if not hasattr(dctx, "multi_decompress_to_buffer"): + self.skipTest("multi_decompress_to_buffer not available") - segments = struct.pack('=QQQQ', 0, len(frames[0]), len(frames[0]), len(frames[1])) - b = zstd.BufferWithSegments(b''.join(frames), segments) + segments = struct.pack( + "=QQQQ", 0, len(frames[0]), len(frames[0]), len(frames[1]) + ) + b = zstd.BufferWithSegments(b"".join(frames), segments) result = dctx.multi_decompress_to_buffer(b) @@ -1483,19 +1525,25 @@ def test_buffer_with_segments_sizes(self): cctx = zstd.ZstdCompressor(write_content_size=False) - original = [b'foo' * 4, b'bar' * 6, b'baz' * 8] + original = [b"foo" * 4, b"bar" * 6, b"baz" * 8] frames = [cctx.compress(d) for d in original] - sizes = struct.pack('=' + 'Q' * len(original), *map(len, original)) + sizes = struct.pack("=" + "Q" * len(original), *map(len, original)) dctx = zstd.ZstdDecompressor() - if not hasattr(dctx, 'multi_decompress_to_buffer'): - self.skipTest('multi_decompress_to_buffer not available') + if not hasattr(dctx, "multi_decompress_to_buffer"): + self.skipTest("multi_decompress_to_buffer not available") - segments = struct.pack('=QQQQQQ', 0, len(frames[0]), - len(frames[0]), len(frames[1]), - len(frames[0]) + len(frames[1]), len(frames[2])) - b = zstd.BufferWithSegments(b''.join(frames), segments) + segments = struct.pack( + "=QQQQQQ", + 0, + len(frames[0]), + len(frames[0]), + len(frames[1]), + len(frames[0]) + len(frames[1]), + len(frames[2]), + ) + b = zstd.BufferWithSegments(b"".join(frames), segments) result = dctx.multi_decompress_to_buffer(b, decompressed_sizes=sizes) @@ -1509,15 +1557,15 @@ cctx = zstd.ZstdCompressor() original = [ - b'foo0' * 2, - b'foo1' * 3, - b'foo2' * 4, - b'foo3' * 5, - b'foo4' * 6, + b"foo0" * 2, + b"foo1" * 3, + b"foo2" * 4, + b"foo3" * 5, + b"foo4" * 6, ] - if not hasattr(cctx, 'multi_compress_to_buffer'): - self.skipTest('multi_compress_to_buffer not available') + if not hasattr(cctx, "multi_compress_to_buffer"): + self.skipTest("multi_compress_to_buffer not available") frames = cctx.multi_compress_to_buffer(original) @@ -1532,16 +1580,24 @@ self.assertEqual(data, decompressed[i].tobytes()) # And a manual mode. - b = b''.join([frames[0].tobytes(), frames[1].tobytes()]) - b1 = zstd.BufferWithSegments(b, struct.pack('=QQQQ', - 0, len(frames[0]), - len(frames[0]), len(frames[1]))) + b = b"".join([frames[0].tobytes(), frames[1].tobytes()]) + b1 = zstd.BufferWithSegments( + b, struct.pack("=QQQQ", 0, len(frames[0]), len(frames[0]), len(frames[1])) + ) - b = b''.join([frames[2].tobytes(), frames[3].tobytes(), frames[4].tobytes()]) - b2 = zstd.BufferWithSegments(b, struct.pack('=QQQQQQ', - 0, len(frames[2]), - len(frames[2]), len(frames[3]), - len(frames[2]) + len(frames[3]), len(frames[4]))) + b = b"".join([frames[2].tobytes(), frames[3].tobytes(), frames[4].tobytes()]) + b2 = zstd.BufferWithSegments( + b, + struct.pack( + "=QQQQQQ", + 0, + len(frames[2]), + len(frames[2]), + len(frames[3]), + len(frames[2]) + len(frames[3]), + len(frames[4]), + ), + ) c = zstd.BufferWithSegmentsCollection(b1, b2) @@ -1560,8 +1616,8 @@ dctx = zstd.ZstdDecompressor(dict_data=d) - if not hasattr(dctx, 'multi_decompress_to_buffer'): - self.skipTest('multi_decompress_to_buffer not available') + if not hasattr(dctx, "multi_decompress_to_buffer"): + self.skipTest("multi_decompress_to_buffer not available") result = dctx.multi_decompress_to_buffer(frames) @@ -1571,41 +1627,44 @@ cctx = zstd.ZstdCompressor() frames = [] - frames.extend(cctx.compress(b'x' * 64) for i in range(256)) - frames.extend(cctx.compress(b'y' * 64) for i in range(256)) + frames.extend(cctx.compress(b"x" * 64) for i in range(256)) + frames.extend(cctx.compress(b"y" * 64) for i in range(256)) dctx = zstd.ZstdDecompressor() - if not hasattr(dctx, 'multi_decompress_to_buffer'): - self.skipTest('multi_decompress_to_buffer not available') + if not hasattr(dctx, "multi_decompress_to_buffer"): + self.skipTest("multi_decompress_to_buffer not available") result = dctx.multi_decompress_to_buffer(frames, threads=-1) self.assertEqual(len(result), len(frames)) self.assertEqual(result.size(), 2 * 64 * 256) - self.assertEqual(result[0].tobytes(), b'x' * 64) - self.assertEqual(result[256].tobytes(), b'y' * 64) + self.assertEqual(result[0].tobytes(), b"x" * 64) + self.assertEqual(result[256].tobytes(), b"y" * 64) def test_item_failure(self): cctx = zstd.ZstdCompressor() - frames = [cctx.compress(b'x' * 128), cctx.compress(b'y' * 128)] + frames = [cctx.compress(b"x" * 128), cctx.compress(b"y" * 128)] - frames[1] = frames[1][0:15] + b'extra' + frames[1][15:] + frames[1] = frames[1][0:15] + b"extra" + frames[1][15:] dctx = zstd.ZstdDecompressor() - if not hasattr(dctx, 'multi_decompress_to_buffer'): - self.skipTest('multi_decompress_to_buffer not available') + if not hasattr(dctx, "multi_decompress_to_buffer"): + self.skipTest("multi_decompress_to_buffer not available") - with self.assertRaisesRegexp(zstd.ZstdError, - 'error decompressing item 1: (' - 'Corrupted block|' - 'Destination buffer is too small)'): + with self.assertRaisesRegex( + zstd.ZstdError, + "error decompressing item 1: (" + "Corrupted block|" + "Destination buffer is too small)", + ): dctx.multi_decompress_to_buffer(frames) - with self.assertRaisesRegexp(zstd.ZstdError, - 'error decompressing item 1: (' - 'Corrupted block|' - 'Destination buffer is too small)'): + with self.assertRaisesRegex( + zstd.ZstdError, + "error decompressing item 1: (" + "Corrupted block|" + "Destination buffer is too small)", + ): dctx.multi_decompress_to_buffer(frames, threads=2) -