--- a/contrib/python-zstandard/zstd_cffi.py Tue Sep 25 20:55:03 2018 +0900
+++ b/contrib/python-zstandard/zstd_cffi.py Mon Oct 08 16:27:40 2018 -0700
@@ -40,6 +40,8 @@
'DECOMPRESSION_RECOMMENDED_INPUT_SIZE',
'DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE',
'MAGIC_NUMBER',
+ 'BLOCKSIZELOG_MAX',
+ 'BLOCKSIZE_MAX',
'WINDOWLOG_MIN',
'WINDOWLOG_MAX',
'CHAINLOG_MIN',
@@ -52,6 +54,7 @@
'SEARCHLENGTH_MIN',
'SEARCHLENGTH_MAX',
'TARGETLENGTH_MIN',
+ 'TARGETLENGTH_MAX',
'LDM_MINMATCH_MIN',
'LDM_MINMATCH_MAX',
'LDM_BUCKETSIZELOG_MAX',
@@ -102,6 +105,8 @@
CONTENTSIZE_ERROR = lib.ZSTD_CONTENTSIZE_ERROR
ZSTD_VERSION = (lib.ZSTD_VERSION_MAJOR, lib.ZSTD_VERSION_MINOR, lib.ZSTD_VERSION_RELEASE)
+BLOCKSIZELOG_MAX = lib.ZSTD_BLOCKSIZELOG_MAX
+BLOCKSIZE_MAX = lib.ZSTD_BLOCKSIZE_MAX
WINDOWLOG_MIN = lib.ZSTD_WINDOWLOG_MIN
WINDOWLOG_MAX = lib.ZSTD_WINDOWLOG_MAX
CHAINLOG_MIN = lib.ZSTD_CHAINLOG_MIN
@@ -114,6 +119,7 @@
SEARCHLENGTH_MIN = lib.ZSTD_SEARCHLENGTH_MIN
SEARCHLENGTH_MAX = lib.ZSTD_SEARCHLENGTH_MAX
TARGETLENGTH_MIN = lib.ZSTD_TARGETLENGTH_MIN
+TARGETLENGTH_MAX = lib.ZSTD_TARGETLENGTH_MAX
LDM_MINMATCH_MIN = lib.ZSTD_LDM_MINMATCH_MIN
LDM_MINMATCH_MAX = lib.ZSTD_LDM_MINMATCH_MAX
LDM_BUCKETSIZELOG_MAX = lib.ZSTD_LDM_BUCKETSIZELOG_MAX
@@ -191,7 +197,6 @@
(lib.ZSTD_p_nbWorkers, params.threads),
(lib.ZSTD_p_jobSize, params.job_size),
(lib.ZSTD_p_overlapSizeLog, params.overlap_size_log),
- (lib.ZSTD_p_compressLiterals, params.compress_literals),
(lib.ZSTD_p_forceMaxWindow, params.force_max_window),
(lib.ZSTD_p_enableLongDistanceMatching, params.enable_ldm),
(lib.ZSTD_p_ldmHashLog, params.ldm_hash_log),
@@ -224,9 +229,6 @@
if arg not in kwargs:
kwargs[arg] = getattr(params, attr)
- if 'compress_literals' not in kwargs:
- kwargs['compress_literals'] = 1 if level >= 0 else 0
-
return ZstdCompressionParameters(**kwargs)
def __init__(self, format=0, compression_level=0, window_log=0, hash_log=0,
@@ -235,14 +237,11 @@
write_dict_id=0, job_size=0, overlap_size_log=0,
force_max_window=0, enable_ldm=0, ldm_hash_log=0,
ldm_min_match=0, ldm_bucket_size_log=0, ldm_hash_every_log=0,
- threads=0, compress_literals=None):
+ threads=0):
if threads < 0:
threads = _cpu_count()
- if compress_literals is None:
- compress_literals = compression_level >= 0
-
self.format = format
self.compression_level = compression_level
self.window_log = window_log
@@ -257,7 +256,6 @@
self.write_dict_id = write_dict_id
self.job_size = job_size
self.overlap_size_log = overlap_size_log
- self.compress_literals = compress_literals
self.force_max_window = force_max_window
self.enable_ldm = enable_ldm
self.ldm_hash_log = ldm_hash_log
@@ -411,13 +409,14 @@
raise ZstdError('zstd compress error: %s' %
_zstd_error(zresult))
- if not out_buffer.pos:
- break
+ if out_buffer.pos:
+ self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:])
+ total_write += out_buffer.pos
+ self._bytes_compressed += out_buffer.pos
+ out_buffer.pos = 0
- self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:])
- total_write += out_buffer.pos
- self._bytes_compressed += out_buffer.pos
- out_buffer.pos = 0
+ if not zresult:
+ break
return total_write
@@ -460,6 +459,14 @@
if self._finished:
raise ZstdError('compressor object already finished')
+ if flush_mode == COMPRESSOBJ_FLUSH_BLOCK:
+ z_flush_mode = lib.ZSTD_e_flush
+ elif flush_mode == COMPRESSOBJ_FLUSH_FINISH:
+ z_flush_mode = lib.ZSTD_e_end
+ self._finished = True
+ else:
+ raise ZstdError('unhandled flush mode')
+
assert self._out.pos == 0
in_buffer = ffi.new('ZSTD_inBuffer *')
@@ -467,35 +474,13 @@
in_buffer.size = 0
in_buffer.pos = 0
- if flush_mode == COMPRESSOBJ_FLUSH_BLOCK:
- zresult = lib.ZSTD_compress_generic(self._compressor._cctx,
- self._out,
- in_buffer,
- lib.ZSTD_e_flush)
- if lib.ZSTD_isError(zresult):
- raise ZstdError('zstd compress error: %s' %
- _zstd_error(zresult))
-
- # Output buffer is guaranteed to hold full block.
- assert zresult == 0
-
- if self._out.pos:
- result = ffi.buffer(self._out.dst, self._out.pos)[:]
- self._out.pos = 0
- return result
- else:
- return b''
-
- assert flush_mode == COMPRESSOBJ_FLUSH_FINISH
- self._finished = True
-
chunks = []
while True:
zresult = lib.ZSTD_compress_generic(self._compressor._cctx,
self._out,
in_buffer,
- lib.ZSTD_e_end)
+ z_flush_mode)
if lib.ZSTD_isError(zresult):
raise ZstdError('error ending compression stream: %s' %
_zstd_error(zresult))
@@ -510,11 +495,107 @@
return b''.join(chunks)
+class ZstdCompressionChunker(object):
+ def __init__(self, compressor, chunk_size):
+ self._compressor = compressor
+ self._out = ffi.new('ZSTD_outBuffer *')
+ self._dst_buffer = ffi.new('char[]', chunk_size)
+ self._out.dst = self._dst_buffer
+ self._out.size = chunk_size
+ self._out.pos = 0
+
+ self._in = ffi.new('ZSTD_inBuffer *')
+ self._in.src = ffi.NULL
+ self._in.size = 0
+ self._in.pos = 0
+ self._finished = False
+
+ def compress(self, data):
+ if self._finished:
+ raise ZstdError('cannot call compress() after compression finished')
+
+ if self._in.src != ffi.NULL:
+ raise ZstdError('cannot perform operation before consuming output '
+ 'from previous operation')
+
+ data_buffer = ffi.from_buffer(data)
+
+ if not len(data_buffer):
+ return
+
+ self._in.src = data_buffer
+ self._in.size = len(data_buffer)
+ self._in.pos = 0
+
+ while self._in.pos < self._in.size:
+ zresult = lib.ZSTD_compress_generic(self._compressor._cctx,
+ self._out,
+ self._in,
+ lib.ZSTD_e_continue)
+
+ if self._in.pos == self._in.size:
+ self._in.src = ffi.NULL
+ self._in.size = 0
+ self._in.pos = 0
+
+ if lib.ZSTD_isError(zresult):
+ raise ZstdError('zstd compress error: %s' %
+ _zstd_error(zresult))
+
+ if self._out.pos == self._out.size:
+ yield ffi.buffer(self._out.dst, self._out.pos)[:]
+ self._out.pos = 0
+
+ def flush(self):
+ if self._finished:
+ raise ZstdError('cannot call flush() after compression finished')
+
+ if self._in.src != ffi.NULL:
+ raise ZstdError('cannot call flush() before consuming output from '
+ 'previous operation')
+
+ while True:
+ zresult = lib.ZSTD_compress_generic(self._compressor._cctx,
+ self._out, self._in,
+ lib.ZSTD_e_flush)
+ if lib.ZSTD_isError(zresult):
+ raise ZstdError('zstd compress error: %s' % _zstd_error(zresult))
+
+ if self._out.pos:
+ yield ffi.buffer(self._out.dst, self._out.pos)[:]
+ self._out.pos = 0
+
+ if not zresult:
+ return
+
+ def finish(self):
+ if self._finished:
+ raise ZstdError('cannot call finish() after compression finished')
+
+ if self._in.src != ffi.NULL:
+ raise ZstdError('cannot call finish() before consuming output from '
+ 'previous operation')
+
+ while True:
+ zresult = lib.ZSTD_compress_generic(self._compressor._cctx,
+ self._out, self._in,
+ lib.ZSTD_e_end)
+ if lib.ZSTD_isError(zresult):
+ raise ZstdError('zstd compress error: %s' % _zstd_error(zresult))
+
+ if self._out.pos:
+ yield ffi.buffer(self._out.dst, self._out.pos)[:]
+ self._out.pos = 0
+
+ if not zresult:
+ self._finished = True
+ return
+
+
class CompressionReader(object):
- def __init__(self, compressor, source, size, read_size):
+ def __init__(self, compressor, source, read_size):
self._compressor = compressor
self._source = source
- self._source_size = size
self._read_size = read_size
self._entered = False
self._closed = False
@@ -530,12 +611,6 @@
if self._entered:
raise ValueError('cannot __enter__ multiple times')
- zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._compressor._cctx,
- self._source_size)
- if lib.ZSTD_isError(zresult):
- raise ZstdError('error setting source size: %s' %
- _zstd_error(zresult))
-
self._entered = True
return self
@@ -578,6 +653,7 @@
self._closed = True
return None
+ @property
def closed(self):
return self._closed
@@ -596,9 +672,6 @@
next = __next__
def read(self, size=-1):
- if not self._entered:
- raise ZstdError('read() must be called from an active context manager')
-
if self._closed:
raise ValueError('stream is closed')
@@ -759,16 +832,14 @@
self._dict_data = dict_data
# We defer setting up garbage collection until after calling
- # _ensure_cctx() to ensure the memory size estimate is more accurate.
+ # _setup_cctx() to ensure the memory size estimate is more accurate.
try:
- self._ensure_cctx()
+ self._setup_cctx()
finally:
self._cctx = ffi.gc(cctx, lib.ZSTD_freeCCtx,
size=lib.ZSTD_sizeof_CCtx(cctx))
- def _ensure_cctx(self):
- lib.ZSTD_CCtx_reset(self._cctx)
-
+ def _setup_cctx(self):
zresult = lib.ZSTD_CCtx_setParametersUsingCCtxParams(self._cctx,
self._params)
if lib.ZSTD_isError(zresult):
@@ -793,7 +864,7 @@
return lib.ZSTD_sizeof_CCtx(self._cctx)
def compress(self, data):
- self._ensure_cctx()
+ lib.ZSTD_CCtx_reset(self._cctx)
data_buffer = ffi.from_buffer(data)
@@ -830,7 +901,7 @@
return ffi.buffer(out, out_buffer.pos)[:]
def compressobj(self, size=-1):
- self._ensure_cctx()
+ lib.ZSTD_CCtx_reset(self._cctx)
if size < 0:
size = lib.ZSTD_CONTENTSIZE_UNKNOWN
@@ -851,6 +922,19 @@
return cobj
+ def chunker(self, size=-1, chunk_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE):
+ lib.ZSTD_CCtx_reset(self._cctx)
+
+ if size < 0:
+ size = lib.ZSTD_CONTENTSIZE_UNKNOWN
+
+ zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size)
+ if lib.ZSTD_isError(zresult):
+ raise ZstdError('error setting source size: %s' %
+ _zstd_error(zresult))
+
+ return ZstdCompressionChunker(self, chunk_size=chunk_size)
+
def copy_stream(self, ifh, ofh, size=-1,
read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE,
write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE):
@@ -860,7 +944,7 @@
if not hasattr(ofh, 'write'):
raise ValueError('second argument must have a write() method')
- self._ensure_cctx()
+ lib.ZSTD_CCtx_reset(self._cctx)
if size < 0:
size = lib.ZSTD_CONTENTSIZE_UNKNOWN
@@ -927,7 +1011,7 @@
def stream_reader(self, source, size=-1,
read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE):
- self._ensure_cctx()
+ lib.ZSTD_CCtx_reset(self._cctx)
try:
size = len(source)
@@ -937,7 +1021,12 @@
if size < 0:
size = lib.ZSTD_CONTENTSIZE_UNKNOWN
- return CompressionReader(self, source, size, read_size)
+ zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size)
+ if lib.ZSTD_isError(zresult):
+ raise ZstdError('error setting source size: %s' %
+ _zstd_error(zresult))
+
+ return CompressionReader(self, source, read_size)
def stream_writer(self, writer, size=-1,
write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE):
@@ -945,7 +1034,7 @@
if not hasattr(writer, 'write'):
raise ValueError('must pass an object with a write() method')
- self._ensure_cctx()
+ lib.ZSTD_CCtx_reset(self._cctx)
if size < 0:
size = lib.ZSTD_CONTENTSIZE_UNKNOWN
@@ -967,7 +1056,7 @@
raise ValueError('must pass an object with a read() method or '
'conforms to buffer protocol')
- self._ensure_cctx()
+ lib.ZSTD_CCtx_reset(self._cctx)
if size < 0:
size = lib.ZSTD_CONTENTSIZE_UNKNOWN
@@ -1267,7 +1356,7 @@
chunks = []
- while in_buffer.pos < in_buffer.size:
+ while True:
zresult = lib.ZSTD_decompress_generic(self._decompressor._dctx,
out_buffer, in_buffer)
if lib.ZSTD_isError(zresult):
@@ -1280,7 +1369,12 @@
if out_buffer.pos:
chunks.append(ffi.buffer(out_buffer.dst, out_buffer.pos)[:])
- out_buffer.pos = 0
+
+ if (zresult == 0 or
+ (in_buffer.pos == in_buffer.size and out_buffer.pos == 0)):
+ break
+
+ out_buffer.pos = 0
return b''.join(chunks)
@@ -1303,8 +1397,6 @@
if self._entered:
raise ValueError('cannot __enter__ multiple times')
- self._decompressor._ensure_dctx()
-
self._entered = True
return self
@@ -1347,6 +1439,7 @@
self._closed = True
return None
+ @property
def closed(self):
return self._closed
@@ -1364,10 +1457,7 @@
next = __next__
- def read(self, size=-1):
- if not self._entered:
- raise ZstdError('read() must be called from an active context manager')
-
+ def read(self, size):
if self._closed:
raise ValueError('stream is closed')
@@ -1442,10 +1532,6 @@
return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
def seek(self, pos, whence=os.SEEK_SET):
- if not self._entered:
- raise ZstdError('seek() must be called from an active context '
- 'manager')
-
if self._closed:
raise ValueError('stream is closed')