contrib/python-zstandard/tests/test_decompressor.py
changeset 40121 73fef626dae3
parent 37495 b1fb341d8a61
child 42070 675775c33ab6
equal deleted inserted replaced
40120:89742f1fa6cb 40121:73fef626dae3
   291 @make_cffi
   291 @make_cffi
   292 class TestDecompressor_stream_reader(unittest.TestCase):
   292 class TestDecompressor_stream_reader(unittest.TestCase):
   293     def test_context_manager(self):
   293     def test_context_manager(self):
   294         dctx = zstd.ZstdDecompressor()
   294         dctx = zstd.ZstdDecompressor()
   295 
   295 
   296         reader = dctx.stream_reader(b'foo')
       
   297         with self.assertRaisesRegexp(zstd.ZstdError, 'read\(\) must be called from an active'):
       
   298             reader.read(1)
       
   299 
       
   300         with dctx.stream_reader(b'foo') as reader:
   296         with dctx.stream_reader(b'foo') as reader:
   301             with self.assertRaisesRegexp(ValueError, 'cannot __enter__ multiple times'):
   297             with self.assertRaisesRegexp(ValueError, 'cannot __enter__ multiple times'):
   302                 with reader as reader2:
   298                 with reader as reader2:
   303                     pass
   299                     pass
   304 
   300 
   329 
   325 
   330     def test_constant_methods(self):
   326     def test_constant_methods(self):
   331         dctx = zstd.ZstdDecompressor()
   327         dctx = zstd.ZstdDecompressor()
   332 
   328 
   333         with dctx.stream_reader(b'foo') as reader:
   329         with dctx.stream_reader(b'foo') as reader:
       
   330             self.assertFalse(reader.closed)
   334             self.assertTrue(reader.readable())
   331             self.assertTrue(reader.readable())
   335             self.assertFalse(reader.writable())
   332             self.assertFalse(reader.writable())
   336             self.assertTrue(reader.seekable())
   333             self.assertTrue(reader.seekable())
   337             self.assertFalse(reader.isatty())
   334             self.assertFalse(reader.isatty())
       
   335             self.assertFalse(reader.closed)
   338             self.assertIsNone(reader.flush())
   336             self.assertIsNone(reader.flush())
       
   337             self.assertFalse(reader.closed)
       
   338 
       
   339         self.assertTrue(reader.closed)
   339 
   340 
   340     def test_read_closed(self):
   341     def test_read_closed(self):
   341         dctx = zstd.ZstdDecompressor()
   342         dctx = zstd.ZstdDecompressor()
   342 
   343 
   343         with dctx.stream_reader(b'foo') as reader:
   344         with dctx.stream_reader(b'foo') as reader:
   344             reader.close()
   345             reader.close()
       
   346             self.assertTrue(reader.closed)
   345             with self.assertRaisesRegexp(ValueError, 'stream is closed'):
   347             with self.assertRaisesRegexp(ValueError, 'stream is closed'):
   346                 reader.read(1)
   348                 reader.read(1)
   347 
   349 
   348     def test_bad_read_size(self):
   350     def test_bad_read_size(self):
   349         dctx = zstd.ZstdDecompressor()
   351         dctx = zstd.ZstdDecompressor()
   370             result = reader.read(8192)
   372             result = reader.read(8192)
   371             self.assertEqual(result, source)
   373             self.assertEqual(result, source)
   372             self.assertEqual(reader.tell(), len(source))
   374             self.assertEqual(reader.tell(), len(source))
   373 
   375 
   374             # Read after EOF should return empty bytes.
   376             # Read after EOF should return empty bytes.
   375             self.assertEqual(reader.read(), b'')
   377             self.assertEqual(reader.read(1), b'')
   376             self.assertEqual(reader.tell(), len(result))
   378             self.assertEqual(reader.tell(), len(result))
   377 
   379 
   378         self.assertTrue(reader.closed())
   380         self.assertTrue(reader.closed)
   379 
   381 
   380     def test_read_buffer_small_chunks(self):
   382     def test_read_buffer_small_chunks(self):
   381         cctx = zstd.ZstdCompressor()
   383         cctx = zstd.ZstdCompressor()
   382         source = b''.join([b'foo' * 60, b'bar' * 60, b'baz' * 60])
   384         source = b''.join([b'foo' * 60, b'bar' * 60, b'baz' * 60])
   383         frame = cctx.compress(source)
   385         frame = cctx.compress(source)
   406             self.assertEqual(reader.tell(), 0)
   408             self.assertEqual(reader.tell(), 0)
   407 
   409 
   408             chunk = reader.read(8192)
   410             chunk = reader.read(8192)
   409             self.assertEqual(chunk, source)
   411             self.assertEqual(chunk, source)
   410             self.assertEqual(reader.tell(), len(source))
   412             self.assertEqual(reader.tell(), len(source))
   411             self.assertEqual(reader.read(), b'')
   413             self.assertEqual(reader.read(1), b'')
   412             self.assertEqual(reader.tell(), len(source))
   414             self.assertEqual(reader.tell(), len(source))
       
   415             self.assertFalse(reader.closed)
       
   416 
       
   417         self.assertTrue(reader.closed)
   413 
   418 
   414     def test_read_stream_small_chunks(self):
   419     def test_read_stream_small_chunks(self):
   415         cctx = zstd.ZstdCompressor()
   420         cctx = zstd.ZstdCompressor()
   416         source = b''.join([b'foo' * 60, b'bar' * 60, b'baz' * 60])
   421         source = b''.join([b'foo' * 60, b'bar' * 60, b'baz' * 60])
   417         frame = cctx.compress(source)
   422         frame = cctx.compress(source)
   438 
   443 
   439         with dctx.stream_reader(frame) as reader:
   444         with dctx.stream_reader(frame) as reader:
   440             while reader.read(16):
   445             while reader.read(16):
   441                 pass
   446                 pass
   442 
   447 
   443         with self.assertRaisesRegexp(zstd.ZstdError, 'read\(\) must be called from an active'):
   448         self.assertTrue(reader.closed)
       
   449 
       
   450         with self.assertRaisesRegexp(ValueError, 'stream is closed'):
   444             reader.read(10)
   451             reader.read(10)
   445 
   452 
   446     def test_illegal_seeks(self):
   453     def test_illegal_seeks(self):
   447         cctx = zstd.ZstdCompressor()
   454         cctx = zstd.ZstdCompressor()
   448         frame = cctx.compress(b'foo' * 60)
   455         frame = cctx.compress(b'foo' * 60)
   472             reader.close()
   479             reader.close()
   473 
   480 
   474             with self.assertRaisesRegexp(ValueError, 'stream is closed'):
   481             with self.assertRaisesRegexp(ValueError, 'stream is closed'):
   475                 reader.seek(4, os.SEEK_SET)
   482                 reader.seek(4, os.SEEK_SET)
   476 
   483 
   477         with self.assertRaisesRegexp(
   484         with self.assertRaisesRegexp(ValueError, 'stream is closed'):
   478             zstd.ZstdError, 'seek\(\) must be called from an active context'):
       
   479             reader.seek(0)
   485             reader.seek(0)
   480 
   486 
   481     def test_seek(self):
   487     def test_seek(self):
   482         source = b'foobar' * 60
   488         source = b'foobar' * 60
   483         cctx = zstd.ZstdCompressor()
   489         cctx = zstd.ZstdCompressor()
   489             reader.seek(3)
   495             reader.seek(3)
   490             self.assertEqual(reader.read(3), b'bar')
   496             self.assertEqual(reader.read(3), b'bar')
   491 
   497 
   492             reader.seek(4, os.SEEK_CUR)
   498             reader.seek(4, os.SEEK_CUR)
   493             self.assertEqual(reader.read(2), b'ar')
   499             self.assertEqual(reader.read(2), b'ar')
       
   500 
       
   501     def test_no_context_manager(self):
       
   502         source = b'foobar' * 60
       
   503         cctx = zstd.ZstdCompressor()
       
   504         frame = cctx.compress(source)
       
   505 
       
   506         dctx = zstd.ZstdDecompressor()
       
   507         reader = dctx.stream_reader(frame)
       
   508 
       
   509         self.assertEqual(reader.read(6), b'foobar')
       
   510         self.assertEqual(reader.read(18), b'foobar' * 3)
       
   511         self.assertFalse(reader.closed)
       
   512 
       
   513         # Calling close prevents subsequent use.
       
   514         reader.close()
       
   515         self.assertTrue(reader.closed)
       
   516 
       
   517         with self.assertRaisesRegexp(ValueError, 'stream is closed'):
       
   518             reader.read(6)
       
   519 
       
   520     def test_read_after_error(self):
       
   521         source = io.BytesIO(b'')
       
   522         dctx = zstd.ZstdDecompressor()
       
   523 
       
   524         reader = dctx.stream_reader(source)
       
   525 
       
   526         with reader:
       
   527             with self.assertRaises(TypeError):
       
   528                 reader.read()
       
   529 
       
   530         with reader:
       
   531             with self.assertRaisesRegexp(ValueError, 'stream is closed'):
       
   532                 reader.read(100)
   494 
   533 
   495 
   534 
   496 @make_cffi
   535 @make_cffi
   497 class TestDecompressor_decompressobj(unittest.TestCase):
   536 class TestDecompressor_decompressobj(unittest.TestCase):
   498     def test_simple(self):
   537     def test_simple(self):