Mercurial > public > mercurial-scm > hg
diff contrib/python-zstandard/tests/test_decompressor.py @ 42070:675775c33ab6
zstandard: vendor python-zstandard 0.11
The upstream source distribution from PyPI was extracted. Unwanted
files were removed.
The clang-format ignore list was updated to reflect the new source
of files.
The project contains a vendored copy of zstandard 1.3.8. The old
version was 1.3.6. This should result in some minor performance wins.
test-check-py3-compat.t was updated to reflect now-passing tests on
Python 3.8.
Some HTTP tests were updated to reflect new zstd compression output.
# no-check-commit because 3rd party code has different style guidelines
Differential Revision: https://phab.mercurial-scm.org/D6199
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Thu, 04 Apr 2019 17:34:43 -0700 |
parents | 73fef626dae3 |
children | de7838053207 |
line wrap: on
line diff
--- a/contrib/python-zstandard/tests/test_decompressor.py Thu Apr 04 15:24:03 2019 -0700 +++ b/contrib/python-zstandard/tests/test_decompressor.py Thu Apr 04 17:34:43 2019 -0700 @@ -3,6 +3,7 @@ import random import struct import sys +import tempfile import unittest import zstandard as zstd @@ -10,6 +11,7 @@ from .common import ( generate_samples, make_cffi, + NonClosingBytesIO, OpCountingBytesIO, ) @@ -219,7 +221,7 @@ cctx = zstd.ZstdCompressor(write_content_size=False) frame = cctx.compress(source) - dctx = zstd.ZstdDecompressor(max_window_size=1) + dctx = zstd.ZstdDecompressor(max_window_size=2**zstd.WINDOWLOG_MIN) with self.assertRaisesRegexp( zstd.ZstdError, 'decompression error: Frame requires too much memory'): @@ -302,19 +304,16 @@ dctx = zstd.ZstdDecompressor() with dctx.stream_reader(b'foo') as reader: - with self.assertRaises(NotImplementedError): + with self.assertRaises(io.UnsupportedOperation): reader.readline() - with self.assertRaises(NotImplementedError): + with self.assertRaises(io.UnsupportedOperation): reader.readlines() - with self.assertRaises(NotImplementedError): - reader.readall() - - with self.assertRaises(NotImplementedError): + with self.assertRaises(io.UnsupportedOperation): iter(reader) - with self.assertRaises(NotImplementedError): + with self.assertRaises(io.UnsupportedOperation): next(reader) with self.assertRaises(io.UnsupportedOperation): @@ -347,15 +346,18 @@ with self.assertRaisesRegexp(ValueError, 'stream is closed'): reader.read(1) - def test_bad_read_size(self): + def test_read_sizes(self): + cctx = zstd.ZstdCompressor() + foo = cctx.compress(b'foo') + dctx = zstd.ZstdDecompressor() - with dctx.stream_reader(b'foo') as reader: - with self.assertRaisesRegexp(ValueError, 'cannot read negative or size 0 amounts'): - reader.read(-1) + with dctx.stream_reader(foo) as reader: + with self.assertRaisesRegexp(ValueError, 'cannot read negative amounts less than -1'): + reader.read(-2) - with self.assertRaisesRegexp(ValueError, 'cannot read negative or size 0 amounts'): - reader.read(0) + self.assertEqual(reader.read(0), b'') + self.assertEqual(reader.read(), b'foo') def test_read_buffer(self): cctx = zstd.ZstdCompressor() @@ -524,13 +526,243 @@ reader = dctx.stream_reader(source) with reader: - with self.assertRaises(TypeError): - reader.read() + reader.read(0) with reader: with self.assertRaisesRegexp(ValueError, 'stream is closed'): reader.read(100) + def test_partial_read(self): + # Inspired by https://github.com/indygreg/python-zstandard/issues/71. + buffer = io.BytesIO() + cctx = zstd.ZstdCompressor() + writer = cctx.stream_writer(buffer) + writer.write(bytearray(os.urandom(1000000))) + writer.flush(zstd.FLUSH_FRAME) + buffer.seek(0) + + dctx = zstd.ZstdDecompressor() + reader = dctx.stream_reader(buffer) + + while True: + chunk = reader.read(8192) + if not chunk: + break + + def test_read_multiple_frames(self): + cctx = zstd.ZstdCompressor() + source = io.BytesIO() + writer = cctx.stream_writer(source) + writer.write(b'foo') + writer.flush(zstd.FLUSH_FRAME) + writer.write(b'bar') + writer.flush(zstd.FLUSH_FRAME) + + dctx = zstd.ZstdDecompressor() + + reader = dctx.stream_reader(source.getvalue()) + self.assertEqual(reader.read(2), b'fo') + self.assertEqual(reader.read(2), b'o') + self.assertEqual(reader.read(2), b'ba') + self.assertEqual(reader.read(2), b'r') + + source.seek(0) + reader = dctx.stream_reader(source) + self.assertEqual(reader.read(2), b'fo') + self.assertEqual(reader.read(2), b'o') + self.assertEqual(reader.read(2), b'ba') + self.assertEqual(reader.read(2), b'r') + + reader = dctx.stream_reader(source.getvalue()) + self.assertEqual(reader.read(3), b'foo') + self.assertEqual(reader.read(3), b'bar') + + source.seek(0) + reader = dctx.stream_reader(source) + self.assertEqual(reader.read(3), b'foo') + self.assertEqual(reader.read(3), b'bar') + + reader = dctx.stream_reader(source.getvalue()) + self.assertEqual(reader.read(4), b'foo') + self.assertEqual(reader.read(4), b'bar') + + source.seek(0) + reader = dctx.stream_reader(source) + self.assertEqual(reader.read(4), b'foo') + self.assertEqual(reader.read(4), b'bar') + + reader = dctx.stream_reader(source.getvalue()) + self.assertEqual(reader.read(128), b'foo') + self.assertEqual(reader.read(128), b'bar') + + source.seek(0) + reader = dctx.stream_reader(source) + self.assertEqual(reader.read(128), b'foo') + self.assertEqual(reader.read(128), b'bar') + + # Now tests for reads spanning frames. + reader = dctx.stream_reader(source.getvalue(), read_across_frames=True) + self.assertEqual(reader.read(3), b'foo') + self.assertEqual(reader.read(3), b'bar') + + source.seek(0) + reader = dctx.stream_reader(source, read_across_frames=True) + self.assertEqual(reader.read(3), b'foo') + self.assertEqual(reader.read(3), b'bar') + + reader = dctx.stream_reader(source.getvalue(), read_across_frames=True) + self.assertEqual(reader.read(6), b'foobar') + + source.seek(0) + reader = dctx.stream_reader(source, read_across_frames=True) + self.assertEqual(reader.read(6), b'foobar') + + reader = dctx.stream_reader(source.getvalue(), read_across_frames=True) + self.assertEqual(reader.read(7), b'foobar') + + source.seek(0) + reader = dctx.stream_reader(source, read_across_frames=True) + self.assertEqual(reader.read(7), b'foobar') + + reader = dctx.stream_reader(source.getvalue(), read_across_frames=True) + self.assertEqual(reader.read(128), b'foobar') + + source.seek(0) + reader = dctx.stream_reader(source, read_across_frames=True) + self.assertEqual(reader.read(128), b'foobar') + + def test_readinto(self): + cctx = zstd.ZstdCompressor() + foo = cctx.compress(b'foo') + + dctx = zstd.ZstdDecompressor() + + # Attempting to readinto() a non-writable buffer fails. + # The exact exception varies based on the backend. + reader = dctx.stream_reader(foo) + with self.assertRaises(Exception): + reader.readinto(b'foobar') + + # readinto() with sufficiently large destination. + b = bytearray(1024) + reader = dctx.stream_reader(foo) + self.assertEqual(reader.readinto(b), 3) + self.assertEqual(b[0:3], b'foo') + self.assertEqual(reader.readinto(b), 0) + self.assertEqual(b[0:3], b'foo') + + # readinto() with small reads. + b = bytearray(1024) + reader = dctx.stream_reader(foo, read_size=1) + self.assertEqual(reader.readinto(b), 3) + self.assertEqual(b[0:3], b'foo') + + # Too small destination buffer. + b = bytearray(2) + reader = dctx.stream_reader(foo) + self.assertEqual(reader.readinto(b), 2) + self.assertEqual(b[:], b'fo') + + def test_readinto1(self): + cctx = zstd.ZstdCompressor() + foo = cctx.compress(b'foo') + + dctx = zstd.ZstdDecompressor() + + reader = dctx.stream_reader(foo) + with self.assertRaises(Exception): + reader.readinto1(b'foobar') + + # Sufficiently large destination. + b = bytearray(1024) + reader = dctx.stream_reader(foo) + self.assertEqual(reader.readinto1(b), 3) + self.assertEqual(b[0:3], b'foo') + self.assertEqual(reader.readinto1(b), 0) + self.assertEqual(b[0:3], b'foo') + + # readinto() with small reads. + b = bytearray(1024) + reader = dctx.stream_reader(foo, read_size=1) + self.assertEqual(reader.readinto1(b), 3) + self.assertEqual(b[0:3], b'foo') + + # Too small destination buffer. + b = bytearray(2) + reader = dctx.stream_reader(foo) + self.assertEqual(reader.readinto1(b), 2) + self.assertEqual(b[:], b'fo') + + def test_readall(self): + cctx = zstd.ZstdCompressor() + foo = cctx.compress(b'foo') + + dctx = zstd.ZstdDecompressor() + reader = dctx.stream_reader(foo) + + self.assertEqual(reader.readall(), b'foo') + + def test_read1(self): + cctx = zstd.ZstdCompressor() + foo = cctx.compress(b'foo') + + dctx = zstd.ZstdDecompressor() + + b = OpCountingBytesIO(foo) + reader = dctx.stream_reader(b) + + self.assertEqual(reader.read1(), b'foo') + self.assertEqual(b._read_count, 1) + + b = OpCountingBytesIO(foo) + reader = dctx.stream_reader(b) + + self.assertEqual(reader.read1(0), b'') + self.assertEqual(reader.read1(2), b'fo') + self.assertEqual(b._read_count, 1) + self.assertEqual(reader.read1(1), b'o') + self.assertEqual(b._read_count, 1) + self.assertEqual(reader.read1(1), b'') + self.assertEqual(b._read_count, 2) + + def test_read_lines(self): + cctx = zstd.ZstdCompressor() + source = b'\n'.join(('line %d' % i).encode('ascii') for i in range(1024)) + + frame = cctx.compress(source) + + dctx = zstd.ZstdDecompressor() + reader = dctx.stream_reader(frame) + tr = io.TextIOWrapper(reader, encoding='utf-8') + + lines = [] + for line in tr: + lines.append(line.encode('utf-8')) + + self.assertEqual(len(lines), 1024) + self.assertEqual(b''.join(lines), source) + + reader = dctx.stream_reader(frame) + tr = io.TextIOWrapper(reader, encoding='utf-8') + + lines = tr.readlines() + self.assertEqual(len(lines), 1024) + self.assertEqual(''.join(lines).encode('utf-8'), source) + + reader = dctx.stream_reader(frame) + tr = io.TextIOWrapper(reader, encoding='utf-8') + + lines = [] + while True: + line = tr.readline() + if not line: + break + + lines.append(line.encode('utf-8')) + + self.assertEqual(len(lines), 1024) + self.assertEqual(b''.join(lines), source) + @make_cffi class TestDecompressor_decompressobj(unittest.TestCase): @@ -540,6 +772,9 @@ dctx = zstd.ZstdDecompressor() dobj = dctx.decompressobj() self.assertEqual(dobj.decompress(data), b'foobar') + self.assertIsNone(dobj.flush()) + self.assertIsNone(dobj.flush(10)) + self.assertIsNone(dobj.flush(length=100)) def test_input_types(self): compressed = zstd.ZstdCompressor(level=1).compress(b'foo') @@ -557,7 +792,11 @@ for source in sources: dobj = dctx.decompressobj() + self.assertIsNone(dobj.flush()) + self.assertIsNone(dobj.flush(10)) + self.assertIsNone(dobj.flush(length=100)) self.assertEqual(dobj.decompress(source), b'foo') + self.assertIsNone(dobj.flush()) def test_reuse(self): data = zstd.ZstdCompressor(level=1).compress(b'foobar') @@ -568,6 +807,7 @@ with self.assertRaisesRegexp(zstd.ZstdError, 'cannot use a decompressobj'): dobj.decompress(data) + self.assertIsNone(dobj.flush()) def test_bad_write_size(self): dctx = zstd.ZstdDecompressor() @@ -585,16 +825,141 @@ dobj = dctx.decompressobj(write_size=i + 1) self.assertEqual(dobj.decompress(data), source) + def decompress_via_writer(data): buffer = io.BytesIO() dctx = zstd.ZstdDecompressor() - with dctx.stream_writer(buffer) as decompressor: - decompressor.write(data) + decompressor = dctx.stream_writer(buffer) + decompressor.write(data) + return buffer.getvalue() @make_cffi class TestDecompressor_stream_writer(unittest.TestCase): + def test_io_api(self): + buffer = io.BytesIO() + dctx = zstd.ZstdDecompressor() + writer = dctx.stream_writer(buffer) + + self.assertFalse(writer.closed) + self.assertFalse(writer.isatty()) + self.assertFalse(writer.readable()) + + with self.assertRaises(io.UnsupportedOperation): + writer.readline() + + with self.assertRaises(io.UnsupportedOperation): + writer.readline(42) + + with self.assertRaises(io.UnsupportedOperation): + writer.readline(size=42) + + with self.assertRaises(io.UnsupportedOperation): + writer.readlines() + + with self.assertRaises(io.UnsupportedOperation): + writer.readlines(42) + + with self.assertRaises(io.UnsupportedOperation): + writer.readlines(hint=42) + + with self.assertRaises(io.UnsupportedOperation): + writer.seek(0) + + with self.assertRaises(io.UnsupportedOperation): + writer.seek(10, os.SEEK_SET) + + self.assertFalse(writer.seekable()) + + with self.assertRaises(io.UnsupportedOperation): + writer.tell() + + with self.assertRaises(io.UnsupportedOperation): + writer.truncate() + + with self.assertRaises(io.UnsupportedOperation): + writer.truncate(42) + + with self.assertRaises(io.UnsupportedOperation): + writer.truncate(size=42) + + self.assertTrue(writer.writable()) + + with self.assertRaises(io.UnsupportedOperation): + writer.writelines([]) + + with self.assertRaises(io.UnsupportedOperation): + writer.read() + + with self.assertRaises(io.UnsupportedOperation): + writer.read(42) + + with self.assertRaises(io.UnsupportedOperation): + writer.read(size=42) + + with self.assertRaises(io.UnsupportedOperation): + writer.readall() + + with self.assertRaises(io.UnsupportedOperation): + writer.readinto(None) + + with self.assertRaises(io.UnsupportedOperation): + writer.fileno() + + def test_fileno_file(self): + with tempfile.TemporaryFile('wb') as tf: + dctx = zstd.ZstdDecompressor() + writer = dctx.stream_writer(tf) + + self.assertEqual(writer.fileno(), tf.fileno()) + + def test_close(self): + foo = zstd.ZstdCompressor().compress(b'foo') + + buffer = NonClosingBytesIO() + dctx = zstd.ZstdDecompressor() + writer = dctx.stream_writer(buffer) + + writer.write(foo) + self.assertFalse(writer.closed) + self.assertFalse(buffer.closed) + writer.close() + self.assertTrue(writer.closed) + self.assertTrue(buffer.closed) + + with self.assertRaisesRegexp(ValueError, 'stream is closed'): + writer.write(b'') + + with self.assertRaisesRegexp(ValueError, 'stream is closed'): + writer.flush() + + with self.assertRaisesRegexp(ValueError, 'stream is closed'): + with writer: + pass + + self.assertEqual(buffer.getvalue(), b'foo') + + # Context manager exit should close stream. + buffer = NonClosingBytesIO() + writer = dctx.stream_writer(buffer) + + with writer: + writer.write(foo) + + self.assertTrue(writer.closed) + self.assertEqual(buffer.getvalue(), b'foo') + + def test_flush(self): + buffer = OpCountingBytesIO() + dctx = zstd.ZstdDecompressor() + writer = dctx.stream_writer(buffer) + + writer.flush() + self.assertEqual(buffer._flush_count, 1) + writer.flush() + self.assertEqual(buffer._flush_count, 2) + def test_empty_roundtrip(self): cctx = zstd.ZstdCompressor() empty = cctx.compress(b'') @@ -616,9 +981,21 @@ dctx = zstd.ZstdDecompressor() for source in sources: buffer = io.BytesIO() + + decompressor = dctx.stream_writer(buffer) + decompressor.write(source) + self.assertEqual(buffer.getvalue(), b'foo') + + buffer = NonClosingBytesIO() + with dctx.stream_writer(buffer) as decompressor: - decompressor.write(source) + self.assertEqual(decompressor.write(source), 3) + + self.assertEqual(buffer.getvalue(), b'foo') + buffer = io.BytesIO() + writer = dctx.stream_writer(buffer, write_return_read=True) + self.assertEqual(writer.write(source), len(source)) self.assertEqual(buffer.getvalue(), b'foo') def test_large_roundtrip(self): @@ -641,7 +1018,7 @@ cctx = zstd.ZstdCompressor() compressed = cctx.compress(orig) - buffer = io.BytesIO() + buffer = NonClosingBytesIO() dctx = zstd.ZstdDecompressor() with dctx.stream_writer(buffer) as decompressor: pos = 0 @@ -651,6 +1028,17 @@ pos += 8192 self.assertEqual(buffer.getvalue(), orig) + # Again with write_return_read=True + buffer = io.BytesIO() + writer = dctx.stream_writer(buffer, write_return_read=True) + pos = 0 + while pos < len(compressed): + pos2 = pos + 8192 + chunk = compressed[pos:pos2] + self.assertEqual(writer.write(chunk), len(chunk)) + pos += 8192 + self.assertEqual(buffer.getvalue(), orig) + def test_dictionary(self): samples = [] for i in range(128): @@ -661,7 +1049,7 @@ d = zstd.train_dictionary(8192, samples) orig = b'foobar' * 16384 - buffer = io.BytesIO() + buffer = NonClosingBytesIO() cctx = zstd.ZstdCompressor(dict_data=d) with cctx.stream_writer(buffer) as compressor: self.assertEqual(compressor.write(orig), 0) @@ -670,6 +1058,12 @@ buffer = io.BytesIO() dctx = zstd.ZstdDecompressor(dict_data=d) + decompressor = dctx.stream_writer(buffer) + self.assertEqual(decompressor.write(compressed), len(orig)) + self.assertEqual(buffer.getvalue(), orig) + + buffer = NonClosingBytesIO() + with dctx.stream_writer(buffer) as decompressor: self.assertEqual(decompressor.write(compressed), len(orig)) @@ -678,6 +1072,11 @@ def test_memory_size(self): dctx = zstd.ZstdDecompressor() buffer = io.BytesIO() + + decompressor = dctx.stream_writer(buffer) + size = decompressor.memory_size() + self.assertGreater(size, 100000) + with dctx.stream_writer(buffer) as decompressor: size = decompressor.memory_size() @@ -810,7 +1209,7 @@ @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') def test_large_input(self): bytes = list(struct.Struct('>B').pack(i) for i in range(256)) - compressed = io.BytesIO() + compressed = NonClosingBytesIO() input_size = 0 cctx = zstd.ZstdCompressor(level=1) with cctx.stream_writer(compressed) as compressor: @@ -823,7 +1222,7 @@ if have_compressed and have_raw: break - compressed.seek(0) + compressed = io.BytesIO(compressed.getvalue()) self.assertGreater(len(compressed.getvalue()), zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE) @@ -861,7 +1260,7 @@ source = io.BytesIO() - compressed = io.BytesIO() + compressed = NonClosingBytesIO() with cctx.stream_writer(compressed) as compressor: for i in range(256): chunk = b'\0' * 1024 @@ -874,7 +1273,7 @@ max_output_size=len(source.getvalue())) self.assertEqual(simple, source.getvalue()) - compressed.seek(0) + compressed = io.BytesIO(compressed.getvalue()) streamed = b''.join(dctx.read_to_iter(compressed)) self.assertEqual(streamed, source.getvalue()) @@ -1001,6 +1400,9 @@ def test_invalid_inputs(self): dctx = zstd.ZstdDecompressor() + if not hasattr(dctx, 'multi_decompress_to_buffer'): + self.skipTest('multi_decompress_to_buffer not available') + with self.assertRaises(TypeError): dctx.multi_decompress_to_buffer(True) @@ -1020,6 +1422,10 @@ frames = [cctx.compress(d) for d in original] dctx = zstd.ZstdDecompressor() + + if not hasattr(dctx, 'multi_decompress_to_buffer'): + self.skipTest('multi_decompress_to_buffer not available') + result = dctx.multi_decompress_to_buffer(frames) self.assertEqual(len(result), len(frames)) @@ -1041,6 +1447,10 @@ sizes = struct.pack('=' + 'Q' * len(original), *map(len, original)) dctx = zstd.ZstdDecompressor() + + if not hasattr(dctx, 'multi_decompress_to_buffer'): + self.skipTest('multi_decompress_to_buffer not available') + result = dctx.multi_decompress_to_buffer(frames, decompressed_sizes=sizes) self.assertEqual(len(result), len(frames)) @@ -1057,6 +1467,9 @@ dctx = zstd.ZstdDecompressor() + if not hasattr(dctx, 'multi_decompress_to_buffer'): + self.skipTest('multi_decompress_to_buffer not available') + segments = struct.pack('=QQQQ', 0, len(frames[0]), len(frames[0]), len(frames[1])) b = zstd.BufferWithSegments(b''.join(frames), segments) @@ -1074,12 +1487,16 @@ frames = [cctx.compress(d) for d in original] sizes = struct.pack('=' + 'Q' * len(original), *map(len, original)) + dctx = zstd.ZstdDecompressor() + + if not hasattr(dctx, 'multi_decompress_to_buffer'): + self.skipTest('multi_decompress_to_buffer not available') + segments = struct.pack('=QQQQQQ', 0, len(frames[0]), len(frames[0]), len(frames[1]), len(frames[0]) + len(frames[1]), len(frames[2])) b = zstd.BufferWithSegments(b''.join(frames), segments) - dctx = zstd.ZstdDecompressor() result = dctx.multi_decompress_to_buffer(b, decompressed_sizes=sizes) self.assertEqual(len(result), len(frames)) @@ -1099,10 +1516,14 @@ b'foo4' * 6, ] + if not hasattr(cctx, 'multi_compress_to_buffer'): + self.skipTest('multi_compress_to_buffer not available') + frames = cctx.multi_compress_to_buffer(original) # Check round trip. dctx = zstd.ZstdDecompressor() + decompressed = dctx.multi_decompress_to_buffer(frames, threads=3) self.assertEqual(len(decompressed), len(original)) @@ -1138,7 +1559,12 @@ frames = [cctx.compress(s) for s in generate_samples()] dctx = zstd.ZstdDecompressor(dict_data=d) + + if not hasattr(dctx, 'multi_decompress_to_buffer'): + self.skipTest('multi_decompress_to_buffer not available') + result = dctx.multi_decompress_to_buffer(frames) + self.assertEqual([o.tobytes() for o in result], generate_samples()) def test_multiple_threads(self): @@ -1149,6 +1575,10 @@ frames.extend(cctx.compress(b'y' * 64) for i in range(256)) dctx = zstd.ZstdDecompressor() + + if not hasattr(dctx, 'multi_decompress_to_buffer'): + self.skipTest('multi_decompress_to_buffer not available') + result = dctx.multi_decompress_to_buffer(frames, threads=-1) self.assertEqual(len(result), len(frames)) @@ -1164,6 +1594,9 @@ dctx = zstd.ZstdDecompressor() + if not hasattr(dctx, 'multi_decompress_to_buffer'): + self.skipTest('multi_decompress_to_buffer not available') + with self.assertRaisesRegexp(zstd.ZstdError, 'error decompressing item 1: (' 'Corrupted block|'