Mercurial > public > mercurial-scm > hg-stable
diff contrib/python-zstandard/tests/test_compressor.py @ 44232:5e84a96d865b
python-zstandard: blacken at 80 characters
I made this change upstream and it will make it into the next
release of python-zstandard. I figured I'd send it Mercurial's
way because it will allow us to drop this directory from the black
exclusion list.
# skip-blame blackening
Differential Revision: https://phab.mercurial-scm.org/D7937
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Wed, 22 Jan 2020 22:23:04 -0800 |
parents | de7838053207 |
children | 9db77d46de79 |
line wrap: on
line diff
--- a/contrib/python-zstandard/tests/test_compressor.py Tue Jan 21 15:45:06 2020 -0800 +++ b/contrib/python-zstandard/tests/test_compressor.py Wed Jan 22 22:23:04 2020 -0800 @@ -24,7 +24,9 @@ def multithreaded_chunk_size(level, source_size=0): - params = zstd.ZstdCompressionParameters.from_level(level, source_size=source_size) + params = zstd.ZstdCompressionParameters.from_level( + level, source_size=source_size + ) return 1 << (params.window_log + 2) @@ -86,7 +88,9 @@ # This matches the test for read_to_iter() below. cctx = zstd.ZstdCompressor(level=1, write_content_size=False) - result = cctx.compress(b"f" * zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE + b"o") + result = cctx.compress( + b"f" * zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE + b"o" + ) self.assertEqual( result, b"\x28\xb5\x2f\xfd\x00\x40\x54\x00\x00" @@ -99,7 +103,9 @@ result = cctx.compress(b"foo" * 256) def test_no_magic(self): - params = zstd.ZstdCompressionParameters.from_level(1, format=zstd.FORMAT_ZSTD1) + params = zstd.ZstdCompressionParameters.from_level( + 1, format=zstd.FORMAT_ZSTD1 + ) cctx = zstd.ZstdCompressor(compression_params=params) magic = cctx.compress(b"foobar") @@ -223,7 +229,8 @@ self.assertEqual( result, - b"\x28\xb5\x2f\xfd\x23\x8f\x55\x0f\x70\x03\x19\x00\x00" b"\x66\x6f\x6f", + b"\x28\xb5\x2f\xfd\x23\x8f\x55\x0f\x70\x03\x19\x00\x00" + b"\x66\x6f\x6f", ) def test_multithreaded_compression_params(self): @@ -234,7 +241,9 @@ params = zstd.get_frame_parameters(result) self.assertEqual(params.content_size, 3) - self.assertEqual(result, b"\x28\xb5\x2f\xfd\x20\x03\x19\x00\x00\x66\x6f\x6f") + self.assertEqual( + result, b"\x28\xb5\x2f\xfd\x20\x03\x19\x00\x00\x66\x6f\x6f" + ) @make_cffi @@ -347,7 +356,9 @@ ) self.assertEqual(cobj.compress(b"bar"), b"") # 3 byte header plus content. - self.assertEqual(cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK), b"\x18\x00\x00bar") + self.assertEqual( + cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK), b"\x18\x00\x00bar" + ) self.assertEqual(cobj.flush(), b"\x01\x00\x00") def test_flush_empty_block(self): @@ -445,7 +456,9 @@ self.assertEqual(int(r), 0) self.assertEqual(w, 9) - self.assertEqual(dest.getvalue(), b"\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00") + self.assertEqual( + dest.getvalue(), b"\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00" + ) def test_large_data(self): source = io.BytesIO() @@ -478,7 +491,9 @@ cctx = zstd.ZstdCompressor(level=1, write_checksum=True) cctx.copy_stream(source, with_checksum) - self.assertEqual(len(with_checksum.getvalue()), len(no_checksum.getvalue()) + 4) + self.assertEqual( + len(with_checksum.getvalue()), len(no_checksum.getvalue()) + 4 + ) no_params = zstd.get_frame_parameters(no_checksum.getvalue()) with_params = zstd.get_frame_parameters(with_checksum.getvalue()) @@ -585,7 +600,9 @@ cctx = zstd.ZstdCompressor() with cctx.stream_reader(b"foo") as reader: - with self.assertRaisesRegex(ValueError, "cannot __enter__ multiple times"): + with self.assertRaisesRegex( + ValueError, "cannot __enter__ multiple times" + ): with reader as reader2: pass @@ -744,7 +761,9 @@ source = io.BytesIO(b"foobar") with cctx.stream_reader(source, size=2) as reader: - with self.assertRaisesRegex(zstd.ZstdError, "Src size is incorrect"): + with self.assertRaisesRegex( + zstd.ZstdError, "Src size is incorrect" + ): reader.read(10) # Try another compression operation. @@ -1126,7 +1145,9 @@ self.assertFalse(no_params.has_checksum) self.assertTrue(with_params.has_checksum) - self.assertEqual(len(with_checksum.getvalue()), len(no_checksum.getvalue()) + 4) + self.assertEqual( + len(with_checksum.getvalue()), len(no_checksum.getvalue()) + 4 + ) def test_write_content_size(self): no_size = NonClosingBytesIO() @@ -1145,7 +1166,9 @@ # Declaring size will write the header. with_size = NonClosingBytesIO() - with cctx.stream_writer(with_size, size=len(b"foobar" * 256)) as compressor: + with cctx.stream_writer( + with_size, size=len(b"foobar" * 256) + ) as compressor: self.assertEqual(compressor.write(b"foobar" * 256), 0) no_params = zstd.get_frame_parameters(no_size.getvalue()) @@ -1191,7 +1214,9 @@ self.assertFalse(no_params.has_checksum) self.assertFalse(with_params.has_checksum) - self.assertEqual(len(with_dict_id.getvalue()), len(no_dict_id.getvalue()) + 4) + self.assertEqual( + len(with_dict_id.getvalue()), len(no_dict_id.getvalue()) + 4 + ) def test_memory_size(self): cctx = zstd.ZstdCompressor(level=3) @@ -1337,7 +1362,9 @@ for chunk in cctx.read_to_iter(b"foobar"): pass - with self.assertRaisesRegex(ValueError, "must pass an object with a read"): + with self.assertRaisesRegex( + ValueError, "must pass an object with a read" + ): for chunk in cctx.read_to_iter(True): pass @@ -1513,7 +1540,9 @@ dctx = zstd.ZstdDecompressor() - self.assertEqual(dctx.decompress(b"".join(chunks)), (b"x" * 1000) + (b"y" * 24)) + self.assertEqual( + dctx.decompress(b"".join(chunks)), (b"x" * 1000) + (b"y" * 24) + ) def test_small_chunk_size(self): cctx = zstd.ZstdCompressor() @@ -1533,7 +1562,8 @@ dctx = zstd.ZstdDecompressor() self.assertEqual( - dctx.decompress(b"".join(chunks), max_output_size=10000), b"foo" * 1024 + dctx.decompress(b"".join(chunks), max_output_size=10000), + b"foo" * 1024, ) def test_input_types(self): @@ -1602,7 +1632,8 @@ list(chunker.finish()) with self.assertRaisesRegex( - zstd.ZstdError, r"cannot call compress\(\) after compression finished" + zstd.ZstdError, + r"cannot call compress\(\) after compression finished", ): list(chunker.compress(b"foo")) @@ -1644,7 +1675,9 @@ with self.assertRaises(TypeError): cctx.multi_compress_to_buffer((1, 2)) - with self.assertRaisesRegex(TypeError, "item 0 not a bytes like object"): + with self.assertRaisesRegex( + TypeError, "item 0 not a bytes like object" + ): cctx.multi_compress_to_buffer([u"foo"]) def test_empty_input(self):