Mercurial > public > mercurial-scm > hg-stable
diff contrib/python-zstandard/tests/test_decompressor.py @ 31799:e0dc40530c5a
zstd: vendor python-zstandard 0.8.0
Commit 81e1f5bbf1fc54808649562d3ed829730765c540 from
https://github.com/indygreg/python-zstandard is imported without
modifications (other than removing unwanted files).
Updates relevant to Mercurial include:
* Support for multi-threaded compression (we can use this for
bundle and wire protocol compression).
* APIs for batch compression and decompression operations using
multiple threads and optimal memory allocation mechanism. (Can
be useful for revlog perf improvements.)
* A ``BufferWithSegments`` type that models a single memory buffer
containing N discrete items of known lengths. This type can be
used for very efficient 0-copy data operations.
# no-check-commit
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Sat, 01 Apr 2017 15:24:03 -0700 |
parents | c32454d69b85 |
children | b1fb341d8a61 |
line wrap: on
line diff
--- a/contrib/python-zstandard/tests/test_decompressor.py Sat Apr 01 13:43:52 2017 -0700 +++ b/contrib/python-zstandard/tests/test_decompressor.py Sat Apr 01 15:24:03 2017 -0700 @@ -49,7 +49,7 @@ compressed = cctx.compress(b'foobar') dctx = zstd.ZstdDecompressor() - decompressed = dctx.decompress(compressed) + decompressed = dctx.decompress(compressed) self.assertEqual(decompressed, b'foobar') def test_max_output_size(self): @@ -293,7 +293,6 @@ c = s.pack(c) decompressor.write(c) - self.assertEqual(dest.getvalue(), b'foobarfoobar') self.assertEqual(dest._write_count, len(dest.getvalue())) @@ -575,3 +574,168 @@ dctx = zstd.ZstdDecompressor() decompressed = dctx.decompress_content_dict_chain(chain) self.assertEqual(decompressed, expected) + + +# TODO enable for CFFI +class TestDecompressor_multi_decompress_to_buffer(unittest.TestCase): + def test_invalid_inputs(self): + dctx = zstd.ZstdDecompressor() + + with self.assertRaises(TypeError): + dctx.multi_decompress_to_buffer(True) + + with self.assertRaises(TypeError): + dctx.multi_decompress_to_buffer((1, 2)) + + with self.assertRaisesRegexp(TypeError, 'item 0 not a bytes like object'): + dctx.multi_decompress_to_buffer([u'foo']) + + with self.assertRaisesRegexp(ValueError, 'could not determine decompressed size of item 0'): + dctx.multi_decompress_to_buffer([b'foobarbaz']) + + def test_list_input(self): + cctx = zstd.ZstdCompressor(write_content_size=True) + + original = [b'foo' * 4, b'bar' * 6] + frames = [cctx.compress(d) for d in original] + + dctx = zstd.ZstdDecompressor() + result = dctx.multi_decompress_to_buffer(frames) + + self.assertEqual(len(result), len(frames)) + self.assertEqual(result.size(), sum(map(len, original))) + + for i, data in enumerate(original): + self.assertEqual(result[i].tobytes(), data) + + self.assertEqual(result[0].offset, 0) + self.assertEqual(len(result[0]), 12) + self.assertEqual(result[1].offset, 12) + self.assertEqual(len(result[1]), 18) + + def test_list_input_frame_sizes(self): + cctx = zstd.ZstdCompressor(write_content_size=False) + + original = [b'foo' * 4, b'bar' * 6, b'baz' * 8] + frames = [cctx.compress(d) for d in original] + sizes = struct.pack('=' + 'Q' * len(original), *map(len, original)) + + dctx = zstd.ZstdDecompressor() + result = dctx.multi_decompress_to_buffer(frames, decompressed_sizes=sizes) + + self.assertEqual(len(result), len(frames)) + self.assertEqual(result.size(), sum(map(len, original))) + + for i, data in enumerate(original): + self.assertEqual(result[i].tobytes(), data) + + def test_buffer_with_segments_input(self): + cctx = zstd.ZstdCompressor(write_content_size=True) + + original = [b'foo' * 4, b'bar' * 6] + frames = [cctx.compress(d) for d in original] + + dctx = zstd.ZstdDecompressor() + + segments = struct.pack('=QQQQ', 0, len(frames[0]), len(frames[0]), len(frames[1])) + b = zstd.BufferWithSegments(b''.join(frames), segments) + + result = dctx.multi_decompress_to_buffer(b) + + self.assertEqual(len(result), len(frames)) + self.assertEqual(result[0].offset, 0) + self.assertEqual(len(result[0]), 12) + self.assertEqual(result[1].offset, 12) + self.assertEqual(len(result[1]), 18) + + def test_buffer_with_segments_sizes(self): + cctx = zstd.ZstdCompressor(write_content_size=False) + original = [b'foo' * 4, b'bar' * 6, b'baz' * 8] + frames = [cctx.compress(d) for d in original] + sizes = struct.pack('=' + 'Q' * len(original), *map(len, original)) + + segments = struct.pack('=QQQQQQ', 0, len(frames[0]), + len(frames[0]), len(frames[1]), + len(frames[0]) + len(frames[1]), len(frames[2])) + b = zstd.BufferWithSegments(b''.join(frames), segments) + + dctx = zstd.ZstdDecompressor() + result = dctx.multi_decompress_to_buffer(b, decompressed_sizes=sizes) + + self.assertEqual(len(result), len(frames)) + self.assertEqual(result.size(), sum(map(len, original))) + + for i, data in enumerate(original): + self.assertEqual(result[i].tobytes(), data) + + def test_buffer_with_segments_collection_input(self): + cctx = zstd.ZstdCompressor(write_content_size=True) + + original = [ + b'foo0' * 2, + b'foo1' * 3, + b'foo2' * 4, + b'foo3' * 5, + b'foo4' * 6, + ] + + frames = cctx.multi_compress_to_buffer(original) + + # Check round trip. + dctx = zstd.ZstdDecompressor() + decompressed = dctx.multi_decompress_to_buffer(frames, threads=3) + + self.assertEqual(len(decompressed), len(original)) + + for i, data in enumerate(original): + self.assertEqual(data, decompressed[i].tobytes()) + + # And a manual mode. + b = b''.join([frames[0].tobytes(), frames[1].tobytes()]) + b1 = zstd.BufferWithSegments(b, struct.pack('=QQQQ', + 0, len(frames[0]), + len(frames[0]), len(frames[1]))) + + b = b''.join([frames[2].tobytes(), frames[3].tobytes(), frames[4].tobytes()]) + b2 = zstd.BufferWithSegments(b, struct.pack('=QQQQQQ', + 0, len(frames[2]), + len(frames[2]), len(frames[3]), + len(frames[2]) + len(frames[3]), len(frames[4]))) + + c = zstd.BufferWithSegmentsCollection(b1, b2) + + dctx = zstd.ZstdDecompressor() + decompressed = dctx.multi_decompress_to_buffer(c) + + self.assertEqual(len(decompressed), 5) + for i in range(5): + self.assertEqual(decompressed[i].tobytes(), original[i]) + + def test_multiple_threads(self): + cctx = zstd.ZstdCompressor(write_content_size=True) + + frames = [] + frames.extend(cctx.compress(b'x' * 64) for i in range(256)) + frames.extend(cctx.compress(b'y' * 64) for i in range(256)) + + dctx = zstd.ZstdDecompressor() + result = dctx.multi_decompress_to_buffer(frames, threads=-1) + + self.assertEqual(len(result), len(frames)) + self.assertEqual(result.size(), 2 * 64 * 256) + self.assertEqual(result[0].tobytes(), b'x' * 64) + self.assertEqual(result[256].tobytes(), b'y' * 64) + + def test_item_failure(self): + cctx = zstd.ZstdCompressor(write_content_size=True) + frames = [cctx.compress(b'x' * 128), cctx.compress(b'y' * 128)] + + frames[1] = frames[1] + b'extra' + + dctx = zstd.ZstdDecompressor() + + with self.assertRaisesRegexp(zstd.ZstdError, 'error decompressing item 1: Src size incorrect'): + dctx.multi_decompress_to_buffer(frames) + + with self.assertRaisesRegexp(zstd.ZstdError, 'error decompressing item 1: Src size incorrect'): + dctx.multi_decompress_to_buffer(frames, threads=2)