contrib/python-zstandard/zstd_cffi.py
changeset 40121 73fef626dae3
parent 37495 b1fb341d8a61
--- a/contrib/python-zstandard/zstd_cffi.py	Tue Sep 25 20:55:03 2018 +0900
+++ b/contrib/python-zstandard/zstd_cffi.py	Mon Oct 08 16:27:40 2018 -0700
@@ -40,6 +40,8 @@
     'DECOMPRESSION_RECOMMENDED_INPUT_SIZE',
     'DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE',
     'MAGIC_NUMBER',
+    'BLOCKSIZELOG_MAX',
+    'BLOCKSIZE_MAX',
     'WINDOWLOG_MIN',
     'WINDOWLOG_MAX',
     'CHAINLOG_MIN',
@@ -52,6 +54,7 @@
     'SEARCHLENGTH_MIN',
     'SEARCHLENGTH_MAX',
     'TARGETLENGTH_MIN',
+    'TARGETLENGTH_MAX',
     'LDM_MINMATCH_MIN',
     'LDM_MINMATCH_MAX',
     'LDM_BUCKETSIZELOG_MAX',
@@ -102,6 +105,8 @@
 CONTENTSIZE_ERROR = lib.ZSTD_CONTENTSIZE_ERROR
 ZSTD_VERSION = (lib.ZSTD_VERSION_MAJOR, lib.ZSTD_VERSION_MINOR, lib.ZSTD_VERSION_RELEASE)
 
+BLOCKSIZELOG_MAX = lib.ZSTD_BLOCKSIZELOG_MAX
+BLOCKSIZE_MAX = lib.ZSTD_BLOCKSIZE_MAX
 WINDOWLOG_MIN = lib.ZSTD_WINDOWLOG_MIN
 WINDOWLOG_MAX = lib.ZSTD_WINDOWLOG_MAX
 CHAINLOG_MIN = lib.ZSTD_CHAINLOG_MIN
@@ -114,6 +119,7 @@
 SEARCHLENGTH_MIN = lib.ZSTD_SEARCHLENGTH_MIN
 SEARCHLENGTH_MAX = lib.ZSTD_SEARCHLENGTH_MAX
 TARGETLENGTH_MIN = lib.ZSTD_TARGETLENGTH_MIN
+TARGETLENGTH_MAX = lib.ZSTD_TARGETLENGTH_MAX
 LDM_MINMATCH_MIN = lib.ZSTD_LDM_MINMATCH_MIN
 LDM_MINMATCH_MAX = lib.ZSTD_LDM_MINMATCH_MAX
 LDM_BUCKETSIZELOG_MAX = lib.ZSTD_LDM_BUCKETSIZELOG_MAX
@@ -191,7 +197,6 @@
         (lib.ZSTD_p_nbWorkers, params.threads),
         (lib.ZSTD_p_jobSize, params.job_size),
         (lib.ZSTD_p_overlapSizeLog, params.overlap_size_log),
-        (lib.ZSTD_p_compressLiterals, params.compress_literals),
         (lib.ZSTD_p_forceMaxWindow, params.force_max_window),
         (lib.ZSTD_p_enableLongDistanceMatching, params.enable_ldm),
         (lib.ZSTD_p_ldmHashLog, params.ldm_hash_log),
@@ -224,9 +229,6 @@
             if arg not in kwargs:
                 kwargs[arg] = getattr(params, attr)
 
-        if 'compress_literals' not in kwargs:
-            kwargs['compress_literals'] = 1 if level >= 0 else 0
-
         return ZstdCompressionParameters(**kwargs)
 
     def __init__(self, format=0, compression_level=0, window_log=0, hash_log=0,
@@ -235,14 +237,11 @@
                  write_dict_id=0, job_size=0, overlap_size_log=0,
                  force_max_window=0, enable_ldm=0, ldm_hash_log=0,
                  ldm_min_match=0, ldm_bucket_size_log=0, ldm_hash_every_log=0,
-                 threads=0, compress_literals=None):
+                 threads=0):
 
         if threads < 0:
             threads = _cpu_count()
 
-        if compress_literals is None:
-            compress_literals = compression_level >= 0
-
         self.format = format
         self.compression_level = compression_level
         self.window_log = window_log
@@ -257,7 +256,6 @@
         self.write_dict_id = write_dict_id
         self.job_size = job_size
         self.overlap_size_log = overlap_size_log
-        self.compress_literals = compress_literals
         self.force_max_window = force_max_window
         self.enable_ldm = enable_ldm
         self.ldm_hash_log = ldm_hash_log
@@ -411,13 +409,14 @@
                 raise ZstdError('zstd compress error: %s' %
                                 _zstd_error(zresult))
 
-            if not out_buffer.pos:
-                break
+            if out_buffer.pos:
+                self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:])
+                total_write += out_buffer.pos
+                self._bytes_compressed += out_buffer.pos
+                out_buffer.pos = 0
 
-            self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:])
-            total_write += out_buffer.pos
-            self._bytes_compressed += out_buffer.pos
-            out_buffer.pos = 0
+            if not zresult:
+                break
 
         return total_write
 
@@ -460,6 +459,14 @@
         if self._finished:
             raise ZstdError('compressor object already finished')
 
+        if flush_mode == COMPRESSOBJ_FLUSH_BLOCK:
+            z_flush_mode = lib.ZSTD_e_flush
+        elif flush_mode == COMPRESSOBJ_FLUSH_FINISH:
+            z_flush_mode = lib.ZSTD_e_end
+            self._finished = True
+        else:
+            raise ZstdError('unhandled flush mode')
+
         assert self._out.pos == 0
 
         in_buffer = ffi.new('ZSTD_inBuffer *')
@@ -467,35 +474,13 @@
         in_buffer.size = 0
         in_buffer.pos = 0
 
-        if flush_mode == COMPRESSOBJ_FLUSH_BLOCK:
-            zresult = lib.ZSTD_compress_generic(self._compressor._cctx,
-                                                self._out,
-                                                in_buffer,
-                                                lib.ZSTD_e_flush)
-            if lib.ZSTD_isError(zresult):
-                raise ZstdError('zstd compress error: %s' %
-                                _zstd_error(zresult))
-
-            # Output buffer is guaranteed to hold full block.
-            assert zresult == 0
-
-            if self._out.pos:
-                result = ffi.buffer(self._out.dst, self._out.pos)[:]
-                self._out.pos = 0
-                return result
-            else:
-                return b''
-
-        assert flush_mode == COMPRESSOBJ_FLUSH_FINISH
-        self._finished = True
-
         chunks = []
 
         while True:
             zresult = lib.ZSTD_compress_generic(self._compressor._cctx,
                                                 self._out,
                                                 in_buffer,
-                                                lib.ZSTD_e_end)
+                                                z_flush_mode)
             if lib.ZSTD_isError(zresult):
                 raise ZstdError('error ending compression stream: %s' %
                                 _zstd_error(zresult))
@@ -510,11 +495,107 @@
         return b''.join(chunks)
 
 
+class ZstdCompressionChunker(object):
+    def __init__(self, compressor, chunk_size):
+        self._compressor = compressor
+        self._out = ffi.new('ZSTD_outBuffer *')
+        self._dst_buffer = ffi.new('char[]', chunk_size)
+        self._out.dst = self._dst_buffer
+        self._out.size = chunk_size
+        self._out.pos = 0
+
+        self._in = ffi.new('ZSTD_inBuffer *')
+        self._in.src = ffi.NULL
+        self._in.size = 0
+        self._in.pos = 0
+        self._finished = False
+
+    def compress(self, data):
+        if self._finished:
+            raise ZstdError('cannot call compress() after compression finished')
+
+        if self._in.src != ffi.NULL:
+            raise ZstdError('cannot perform operation before consuming output '
+                            'from previous operation')
+
+        data_buffer = ffi.from_buffer(data)
+
+        if not len(data_buffer):
+            return
+
+        self._in.src = data_buffer
+        self._in.size = len(data_buffer)
+        self._in.pos = 0
+
+        while self._in.pos < self._in.size:
+            zresult = lib.ZSTD_compress_generic(self._compressor._cctx,
+                                                self._out,
+                                                self._in,
+                                                lib.ZSTD_e_continue)
+
+            if self._in.pos == self._in.size:
+                self._in.src = ffi.NULL
+                self._in.size = 0
+                self._in.pos = 0
+
+            if lib.ZSTD_isError(zresult):
+                raise ZstdError('zstd compress error: %s' %
+                                _zstd_error(zresult))
+
+            if self._out.pos == self._out.size:
+                yield ffi.buffer(self._out.dst, self._out.pos)[:]
+                self._out.pos = 0
+
+    def flush(self):
+        if self._finished:
+            raise ZstdError('cannot call flush() after compression finished')
+
+        if self._in.src != ffi.NULL:
+            raise ZstdError('cannot call flush() before consuming output from '
+                            'previous operation')
+
+        while True:
+            zresult = lib.ZSTD_compress_generic(self._compressor._cctx,
+                                                self._out, self._in,
+                                                lib.ZSTD_e_flush)
+            if lib.ZSTD_isError(zresult):
+                raise ZstdError('zstd compress error: %s' % _zstd_error(zresult))
+
+            if self._out.pos:
+                yield ffi.buffer(self._out.dst, self._out.pos)[:]
+                self._out.pos = 0
+
+            if not zresult:
+                return
+
+    def finish(self):
+        if self._finished:
+            raise ZstdError('cannot call finish() after compression finished')
+
+        if self._in.src != ffi.NULL:
+            raise ZstdError('cannot call finish() before consuming output from '
+                            'previous operation')
+
+        while True:
+            zresult = lib.ZSTD_compress_generic(self._compressor._cctx,
+                                                self._out, self._in,
+                                                lib.ZSTD_e_end)
+            if lib.ZSTD_isError(zresult):
+                raise ZstdError('zstd compress error: %s' % _zstd_error(zresult))
+
+            if self._out.pos:
+                yield ffi.buffer(self._out.dst, self._out.pos)[:]
+                self._out.pos = 0
+
+            if not zresult:
+                self._finished = True
+                return
+
+
 class CompressionReader(object):
-    def __init__(self, compressor, source, size, read_size):
+    def __init__(self, compressor, source, read_size):
         self._compressor = compressor
         self._source = source
-        self._source_size = size
         self._read_size = read_size
         self._entered = False
         self._closed = False
@@ -530,12 +611,6 @@
         if self._entered:
             raise ValueError('cannot __enter__ multiple times')
 
-        zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._compressor._cctx,
-                                                  self._source_size)
-        if lib.ZSTD_isError(zresult):
-            raise ZstdError('error setting source size: %s' %
-                            _zstd_error(zresult))
-
         self._entered = True
         return self
 
@@ -578,6 +653,7 @@
         self._closed = True
         return None
 
+    @property
     def closed(self):
         return self._closed
 
@@ -596,9 +672,6 @@
     next = __next__
 
     def read(self, size=-1):
-        if not self._entered:
-            raise ZstdError('read() must be called from an active context manager')
-
         if self._closed:
             raise ValueError('stream is closed')
 
@@ -759,16 +832,14 @@
         self._dict_data = dict_data
 
         # We defer setting up garbage collection until after calling
-        # _ensure_cctx() to ensure the memory size estimate is more accurate.
+        # _setup_cctx() to ensure the memory size estimate is more accurate.
         try:
-            self._ensure_cctx()
+            self._setup_cctx()
         finally:
             self._cctx = ffi.gc(cctx, lib.ZSTD_freeCCtx,
                                 size=lib.ZSTD_sizeof_CCtx(cctx))
 
-    def _ensure_cctx(self):
-        lib.ZSTD_CCtx_reset(self._cctx)
-
+    def _setup_cctx(self):
         zresult = lib.ZSTD_CCtx_setParametersUsingCCtxParams(self._cctx,
                                                              self._params)
         if lib.ZSTD_isError(zresult):
@@ -793,7 +864,7 @@
         return lib.ZSTD_sizeof_CCtx(self._cctx)
 
     def compress(self, data):
-        self._ensure_cctx()
+        lib.ZSTD_CCtx_reset(self._cctx)
 
         data_buffer = ffi.from_buffer(data)
 
@@ -830,7 +901,7 @@
         return ffi.buffer(out, out_buffer.pos)[:]
 
     def compressobj(self, size=-1):
-        self._ensure_cctx()
+        lib.ZSTD_CCtx_reset(self._cctx)
 
         if size < 0:
             size = lib.ZSTD_CONTENTSIZE_UNKNOWN
@@ -851,6 +922,19 @@
 
         return cobj
 
+    def chunker(self, size=-1, chunk_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE):
+        lib.ZSTD_CCtx_reset(self._cctx)
+
+        if size < 0:
+            size = lib.ZSTD_CONTENTSIZE_UNKNOWN
+
+        zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size)
+        if lib.ZSTD_isError(zresult):
+            raise ZstdError('error setting source size: %s' %
+                            _zstd_error(zresult))
+
+        return ZstdCompressionChunker(self, chunk_size=chunk_size)
+
     def copy_stream(self, ifh, ofh, size=-1,
                     read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE,
                     write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE):
@@ -860,7 +944,7 @@
         if not hasattr(ofh, 'write'):
             raise ValueError('second argument must have a write() method')
 
-        self._ensure_cctx()
+        lib.ZSTD_CCtx_reset(self._cctx)
 
         if size < 0:
             size = lib.ZSTD_CONTENTSIZE_UNKNOWN
@@ -927,7 +1011,7 @@
 
     def stream_reader(self, source, size=-1,
                       read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE):
-        self._ensure_cctx()
+        lib.ZSTD_CCtx_reset(self._cctx)
 
         try:
             size = len(source)
@@ -937,7 +1021,12 @@
         if size < 0:
             size = lib.ZSTD_CONTENTSIZE_UNKNOWN
 
-        return CompressionReader(self, source, size, read_size)
+        zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size)
+        if lib.ZSTD_isError(zresult):
+            raise ZstdError('error setting source size: %s' %
+                            _zstd_error(zresult))
+
+        return CompressionReader(self, source, read_size)
 
     def stream_writer(self, writer, size=-1,
                  write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE):
@@ -945,7 +1034,7 @@
         if not hasattr(writer, 'write'):
             raise ValueError('must pass an object with a write() method')
 
-        self._ensure_cctx()
+        lib.ZSTD_CCtx_reset(self._cctx)
 
         if size < 0:
             size = lib.ZSTD_CONTENTSIZE_UNKNOWN
@@ -967,7 +1056,7 @@
             raise ValueError('must pass an object with a read() method or '
                              'conforms to buffer protocol')
 
-        self._ensure_cctx()
+        lib.ZSTD_CCtx_reset(self._cctx)
 
         if size < 0:
             size = lib.ZSTD_CONTENTSIZE_UNKNOWN
@@ -1267,7 +1356,7 @@
 
         chunks = []
 
-        while in_buffer.pos < in_buffer.size:
+        while True:
             zresult = lib.ZSTD_decompress_generic(self._decompressor._dctx,
                                                   out_buffer, in_buffer)
             if lib.ZSTD_isError(zresult):
@@ -1280,7 +1369,12 @@
 
             if out_buffer.pos:
                 chunks.append(ffi.buffer(out_buffer.dst, out_buffer.pos)[:])
-                out_buffer.pos = 0
+
+            if (zresult == 0 or
+                    (in_buffer.pos == in_buffer.size and out_buffer.pos == 0)):
+                break
+
+            out_buffer.pos = 0
 
         return b''.join(chunks)
 
@@ -1303,8 +1397,6 @@
         if self._entered:
             raise ValueError('cannot __enter__ multiple times')
 
-        self._decompressor._ensure_dctx()
-
         self._entered = True
         return self
 
@@ -1347,6 +1439,7 @@
         self._closed = True
         return None
 
+    @property
     def closed(self):
         return self._closed
 
@@ -1364,10 +1457,7 @@
 
     next = __next__
 
-    def read(self, size=-1):
-        if not self._entered:
-            raise ZstdError('read() must be called from an active context manager')
-
+    def read(self, size):
         if self._closed:
             raise ValueError('stream is closed')
 
@@ -1442,10 +1532,6 @@
         return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
 
     def seek(self, pos, whence=os.SEEK_SET):
-        if not self._entered:
-            raise ZstdError('seek() must be called from an active context '
-                            'manager')
-
         if self._closed:
             raise ValueError('stream is closed')