diff contrib/python-zstandard/tests/test_compressor.py @ 44232:5e84a96d865b

python-zstandard: blacken at 80 characters I made this change upstream and it will make it into the next release of python-zstandard. I figured I'd send it Mercurial's way because it will allow us to drop this directory from the black exclusion list. # skip-blame blackening Differential Revision: https://phab.mercurial-scm.org/D7937
author Gregory Szorc <gregory.szorc@gmail.com>
date Wed, 22 Jan 2020 22:23:04 -0800
parents de7838053207
children 9db77d46de79
line wrap: on
line diff
--- a/contrib/python-zstandard/tests/test_compressor.py	Tue Jan 21 15:45:06 2020 -0800
+++ b/contrib/python-zstandard/tests/test_compressor.py	Wed Jan 22 22:23:04 2020 -0800
@@ -24,7 +24,9 @@
 
 
 def multithreaded_chunk_size(level, source_size=0):
-    params = zstd.ZstdCompressionParameters.from_level(level, source_size=source_size)
+    params = zstd.ZstdCompressionParameters.from_level(
+        level, source_size=source_size
+    )
 
     return 1 << (params.window_log + 2)
 
@@ -86,7 +88,9 @@
 
         # This matches the test for read_to_iter() below.
         cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
-        result = cctx.compress(b"f" * zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE + b"o")
+        result = cctx.compress(
+            b"f" * zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE + b"o"
+        )
         self.assertEqual(
             result,
             b"\x28\xb5\x2f\xfd\x00\x40\x54\x00\x00"
@@ -99,7 +103,9 @@
         result = cctx.compress(b"foo" * 256)
 
     def test_no_magic(self):
-        params = zstd.ZstdCompressionParameters.from_level(1, format=zstd.FORMAT_ZSTD1)
+        params = zstd.ZstdCompressionParameters.from_level(
+            1, format=zstd.FORMAT_ZSTD1
+        )
         cctx = zstd.ZstdCompressor(compression_params=params)
         magic = cctx.compress(b"foobar")
 
@@ -223,7 +229,8 @@
 
         self.assertEqual(
             result,
-            b"\x28\xb5\x2f\xfd\x23\x8f\x55\x0f\x70\x03\x19\x00\x00" b"\x66\x6f\x6f",
+            b"\x28\xb5\x2f\xfd\x23\x8f\x55\x0f\x70\x03\x19\x00\x00"
+            b"\x66\x6f\x6f",
         )
 
     def test_multithreaded_compression_params(self):
@@ -234,7 +241,9 @@
         params = zstd.get_frame_parameters(result)
         self.assertEqual(params.content_size, 3)
 
-        self.assertEqual(result, b"\x28\xb5\x2f\xfd\x20\x03\x19\x00\x00\x66\x6f\x6f")
+        self.assertEqual(
+            result, b"\x28\xb5\x2f\xfd\x20\x03\x19\x00\x00\x66\x6f\x6f"
+        )
 
 
 @make_cffi
@@ -347,7 +356,9 @@
         )
         self.assertEqual(cobj.compress(b"bar"), b"")
         # 3 byte header plus content.
-        self.assertEqual(cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK), b"\x18\x00\x00bar")
+        self.assertEqual(
+            cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK), b"\x18\x00\x00bar"
+        )
         self.assertEqual(cobj.flush(), b"\x01\x00\x00")
 
     def test_flush_empty_block(self):
@@ -445,7 +456,9 @@
         self.assertEqual(int(r), 0)
         self.assertEqual(w, 9)
 
-        self.assertEqual(dest.getvalue(), b"\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00")
+        self.assertEqual(
+            dest.getvalue(), b"\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00"
+        )
 
     def test_large_data(self):
         source = io.BytesIO()
@@ -478,7 +491,9 @@
         cctx = zstd.ZstdCompressor(level=1, write_checksum=True)
         cctx.copy_stream(source, with_checksum)
 
-        self.assertEqual(len(with_checksum.getvalue()), len(no_checksum.getvalue()) + 4)
+        self.assertEqual(
+            len(with_checksum.getvalue()), len(no_checksum.getvalue()) + 4
+        )
 
         no_params = zstd.get_frame_parameters(no_checksum.getvalue())
         with_params = zstd.get_frame_parameters(with_checksum.getvalue())
@@ -585,7 +600,9 @@
         cctx = zstd.ZstdCompressor()
 
         with cctx.stream_reader(b"foo") as reader:
-            with self.assertRaisesRegex(ValueError, "cannot __enter__ multiple times"):
+            with self.assertRaisesRegex(
+                ValueError, "cannot __enter__ multiple times"
+            ):
                 with reader as reader2:
                     pass
 
@@ -744,7 +761,9 @@
         source = io.BytesIO(b"foobar")
 
         with cctx.stream_reader(source, size=2) as reader:
-            with self.assertRaisesRegex(zstd.ZstdError, "Src size is incorrect"):
+            with self.assertRaisesRegex(
+                zstd.ZstdError, "Src size is incorrect"
+            ):
                 reader.read(10)
 
         # Try another compression operation.
@@ -1126,7 +1145,9 @@
         self.assertFalse(no_params.has_checksum)
         self.assertTrue(with_params.has_checksum)
 
-        self.assertEqual(len(with_checksum.getvalue()), len(no_checksum.getvalue()) + 4)
+        self.assertEqual(
+            len(with_checksum.getvalue()), len(no_checksum.getvalue()) + 4
+        )
 
     def test_write_content_size(self):
         no_size = NonClosingBytesIO()
@@ -1145,7 +1166,9 @@
 
         # Declaring size will write the header.
         with_size = NonClosingBytesIO()
-        with cctx.stream_writer(with_size, size=len(b"foobar" * 256)) as compressor:
+        with cctx.stream_writer(
+            with_size, size=len(b"foobar" * 256)
+        ) as compressor:
             self.assertEqual(compressor.write(b"foobar" * 256), 0)
 
         no_params = zstd.get_frame_parameters(no_size.getvalue())
@@ -1191,7 +1214,9 @@
         self.assertFalse(no_params.has_checksum)
         self.assertFalse(with_params.has_checksum)
 
-        self.assertEqual(len(with_dict_id.getvalue()), len(no_dict_id.getvalue()) + 4)
+        self.assertEqual(
+            len(with_dict_id.getvalue()), len(no_dict_id.getvalue()) + 4
+        )
 
     def test_memory_size(self):
         cctx = zstd.ZstdCompressor(level=3)
@@ -1337,7 +1362,9 @@
         for chunk in cctx.read_to_iter(b"foobar"):
             pass
 
-        with self.assertRaisesRegex(ValueError, "must pass an object with a read"):
+        with self.assertRaisesRegex(
+            ValueError, "must pass an object with a read"
+        ):
             for chunk in cctx.read_to_iter(True):
                 pass
 
@@ -1513,7 +1540,9 @@
 
         dctx = zstd.ZstdDecompressor()
 
-        self.assertEqual(dctx.decompress(b"".join(chunks)), (b"x" * 1000) + (b"y" * 24))
+        self.assertEqual(
+            dctx.decompress(b"".join(chunks)), (b"x" * 1000) + (b"y" * 24)
+        )
 
     def test_small_chunk_size(self):
         cctx = zstd.ZstdCompressor()
@@ -1533,7 +1562,8 @@
 
         dctx = zstd.ZstdDecompressor()
         self.assertEqual(
-            dctx.decompress(b"".join(chunks), max_output_size=10000), b"foo" * 1024
+            dctx.decompress(b"".join(chunks), max_output_size=10000),
+            b"foo" * 1024,
         )
 
     def test_input_types(self):
@@ -1602,7 +1632,8 @@
         list(chunker.finish())
 
         with self.assertRaisesRegex(
-            zstd.ZstdError, r"cannot call compress\(\) after compression finished"
+            zstd.ZstdError,
+            r"cannot call compress\(\) after compression finished",
         ):
             list(chunker.compress(b"foo"))
 
@@ -1644,7 +1675,9 @@
         with self.assertRaises(TypeError):
             cctx.multi_compress_to_buffer((1, 2))
 
-        with self.assertRaisesRegex(TypeError, "item 0 not a bytes like object"):
+        with self.assertRaisesRegex(
+            TypeError, "item 0 not a bytes like object"
+        ):
             cctx.multi_compress_to_buffer([u"foo"])
 
     def test_empty_input(self):