Mercurial > public > mercurial-scm > hg-stable
diff contrib/python-zstandard/zstandard/cffi.py @ 42070:675775c33ab6
zstandard: vendor python-zstandard 0.11
The upstream source distribution from PyPI was extracted. Unwanted
files were removed.
The clang-format ignore list was updated to reflect the new source
of files.
The project contains a vendored copy of zstandard 1.3.8. The old
version was 1.3.6. This should result in some minor performance wins.
test-check-py3-compat.t was updated to reflect now-passing tests on
Python 3.8.
Some HTTP tests were updated to reflect new zstd compression output.
# no-check-commit because 3rd party code has different style guidelines
Differential Revision: https://phab.mercurial-scm.org/D6199
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Thu, 04 Apr 2019 17:34:43 -0700 |
parents | contrib/python-zstandard/zstd_cffi.py@73fef626dae3 |
children | 69de49c4e39c |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/contrib/python-zstandard/zstandard/cffi.py Thu Apr 04 17:34:43 2019 -0700 @@ -0,0 +1,2515 @@ +# Copyright (c) 2016-present, Gregory Szorc +# All rights reserved. +# +# This software may be modified and distributed under the terms +# of the BSD license. See the LICENSE file for details. + +"""Python interface to the Zstandard (zstd) compression library.""" + +from __future__ import absolute_import, unicode_literals + +# This should match what the C extension exports. +__all__ = [ + #'BufferSegment', + #'BufferSegments', + #'BufferWithSegments', + #'BufferWithSegmentsCollection', + 'CompressionParameters', + 'ZstdCompressionDict', + 'ZstdCompressionParameters', + 'ZstdCompressor', + 'ZstdError', + 'ZstdDecompressor', + 'FrameParameters', + 'estimate_decompression_context_size', + 'frame_content_size', + 'frame_header_size', + 'get_frame_parameters', + 'train_dictionary', + + # Constants. + 'FLUSH_BLOCK', + 'FLUSH_FRAME', + 'COMPRESSOBJ_FLUSH_FINISH', + 'COMPRESSOBJ_FLUSH_BLOCK', + 'ZSTD_VERSION', + 'FRAME_HEADER', + 'CONTENTSIZE_UNKNOWN', + 'CONTENTSIZE_ERROR', + 'MAX_COMPRESSION_LEVEL', + 'COMPRESSION_RECOMMENDED_INPUT_SIZE', + 'COMPRESSION_RECOMMENDED_OUTPUT_SIZE', + 'DECOMPRESSION_RECOMMENDED_INPUT_SIZE', + 'DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE', + 'MAGIC_NUMBER', + 'BLOCKSIZELOG_MAX', + 'BLOCKSIZE_MAX', + 'WINDOWLOG_MIN', + 'WINDOWLOG_MAX', + 'CHAINLOG_MIN', + 'CHAINLOG_MAX', + 'HASHLOG_MIN', + 'HASHLOG_MAX', + 'HASHLOG3_MAX', + 'MINMATCH_MIN', + 'MINMATCH_MAX', + 'SEARCHLOG_MIN', + 'SEARCHLOG_MAX', + 'SEARCHLENGTH_MIN', + 'SEARCHLENGTH_MAX', + 'TARGETLENGTH_MIN', + 'TARGETLENGTH_MAX', + 'LDM_MINMATCH_MIN', + 'LDM_MINMATCH_MAX', + 'LDM_BUCKETSIZELOG_MAX', + 'STRATEGY_FAST', + 'STRATEGY_DFAST', + 'STRATEGY_GREEDY', + 'STRATEGY_LAZY', + 'STRATEGY_LAZY2', + 'STRATEGY_BTLAZY2', + 'STRATEGY_BTOPT', + 'STRATEGY_BTULTRA', + 'STRATEGY_BTULTRA2', + 'DICT_TYPE_AUTO', + 'DICT_TYPE_RAWCONTENT', + 'DICT_TYPE_FULLDICT', + 'FORMAT_ZSTD1', + 'FORMAT_ZSTD1_MAGICLESS', +] + +import io +import os +import sys + +from _zstd_cffi import ( + ffi, + lib, +) + +if sys.version_info[0] == 2: + bytes_type = str + int_type = long +else: + bytes_type = bytes + int_type = int + + +COMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_CStreamInSize() +COMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_CStreamOutSize() +DECOMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_DStreamInSize() +DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_DStreamOutSize() + +new_nonzero = ffi.new_allocator(should_clear_after_alloc=False) + + +MAX_COMPRESSION_LEVEL = lib.ZSTD_maxCLevel() +MAGIC_NUMBER = lib.ZSTD_MAGICNUMBER +FRAME_HEADER = b'\x28\xb5\x2f\xfd' +CONTENTSIZE_UNKNOWN = lib.ZSTD_CONTENTSIZE_UNKNOWN +CONTENTSIZE_ERROR = lib.ZSTD_CONTENTSIZE_ERROR +ZSTD_VERSION = (lib.ZSTD_VERSION_MAJOR, lib.ZSTD_VERSION_MINOR, lib.ZSTD_VERSION_RELEASE) + +BLOCKSIZELOG_MAX = lib.ZSTD_BLOCKSIZELOG_MAX +BLOCKSIZE_MAX = lib.ZSTD_BLOCKSIZE_MAX +WINDOWLOG_MIN = lib.ZSTD_WINDOWLOG_MIN +WINDOWLOG_MAX = lib.ZSTD_WINDOWLOG_MAX +CHAINLOG_MIN = lib.ZSTD_CHAINLOG_MIN +CHAINLOG_MAX = lib.ZSTD_CHAINLOG_MAX +HASHLOG_MIN = lib.ZSTD_HASHLOG_MIN +HASHLOG_MAX = lib.ZSTD_HASHLOG_MAX +HASHLOG3_MAX = lib.ZSTD_HASHLOG3_MAX +MINMATCH_MIN = lib.ZSTD_MINMATCH_MIN +MINMATCH_MAX = lib.ZSTD_MINMATCH_MAX +SEARCHLOG_MIN = lib.ZSTD_SEARCHLOG_MIN +SEARCHLOG_MAX = lib.ZSTD_SEARCHLOG_MAX +SEARCHLENGTH_MIN = lib.ZSTD_MINMATCH_MIN +SEARCHLENGTH_MAX = lib.ZSTD_MINMATCH_MAX +TARGETLENGTH_MIN = lib.ZSTD_TARGETLENGTH_MIN +TARGETLENGTH_MAX = lib.ZSTD_TARGETLENGTH_MAX +LDM_MINMATCH_MIN = lib.ZSTD_LDM_MINMATCH_MIN +LDM_MINMATCH_MAX = lib.ZSTD_LDM_MINMATCH_MAX +LDM_BUCKETSIZELOG_MAX = lib.ZSTD_LDM_BUCKETSIZELOG_MAX + +STRATEGY_FAST = lib.ZSTD_fast +STRATEGY_DFAST = lib.ZSTD_dfast +STRATEGY_GREEDY = lib.ZSTD_greedy +STRATEGY_LAZY = lib.ZSTD_lazy +STRATEGY_LAZY2 = lib.ZSTD_lazy2 +STRATEGY_BTLAZY2 = lib.ZSTD_btlazy2 +STRATEGY_BTOPT = lib.ZSTD_btopt +STRATEGY_BTULTRA = lib.ZSTD_btultra +STRATEGY_BTULTRA2 = lib.ZSTD_btultra2 + +DICT_TYPE_AUTO = lib.ZSTD_dct_auto +DICT_TYPE_RAWCONTENT = lib.ZSTD_dct_rawContent +DICT_TYPE_FULLDICT = lib.ZSTD_dct_fullDict + +FORMAT_ZSTD1 = lib.ZSTD_f_zstd1 +FORMAT_ZSTD1_MAGICLESS = lib.ZSTD_f_zstd1_magicless + +FLUSH_BLOCK = 0 +FLUSH_FRAME = 1 + +COMPRESSOBJ_FLUSH_FINISH = 0 +COMPRESSOBJ_FLUSH_BLOCK = 1 + + +def _cpu_count(): + # os.cpu_count() was introducd in Python 3.4. + try: + return os.cpu_count() or 0 + except AttributeError: + pass + + # Linux. + try: + if sys.version_info[0] == 2: + return os.sysconf(b'SC_NPROCESSORS_ONLN') + else: + return os.sysconf(u'SC_NPROCESSORS_ONLN') + except (AttributeError, ValueError): + pass + + # TODO implement on other platforms. + return 0 + + +class ZstdError(Exception): + pass + + +def _zstd_error(zresult): + # Resolves to bytes on Python 2 and 3. We use the string for formatting + # into error messages, which will be literal unicode. So convert it to + # unicode. + return ffi.string(lib.ZSTD_getErrorName(zresult)).decode('utf-8') + +def _make_cctx_params(params): + res = lib.ZSTD_createCCtxParams() + if res == ffi.NULL: + raise MemoryError() + + res = ffi.gc(res, lib.ZSTD_freeCCtxParams) + + attrs = [ + (lib.ZSTD_c_format, params.format), + (lib.ZSTD_c_compressionLevel, params.compression_level), + (lib.ZSTD_c_windowLog, params.window_log), + (lib.ZSTD_c_hashLog, params.hash_log), + (lib.ZSTD_c_chainLog, params.chain_log), + (lib.ZSTD_c_searchLog, params.search_log), + (lib.ZSTD_c_minMatch, params.min_match), + (lib.ZSTD_c_targetLength, params.target_length), + (lib.ZSTD_c_strategy, params.compression_strategy), + (lib.ZSTD_c_contentSizeFlag, params.write_content_size), + (lib.ZSTD_c_checksumFlag, params.write_checksum), + (lib.ZSTD_c_dictIDFlag, params.write_dict_id), + (lib.ZSTD_c_nbWorkers, params.threads), + (lib.ZSTD_c_jobSize, params.job_size), + (lib.ZSTD_c_overlapLog, params.overlap_log), + (lib.ZSTD_c_forceMaxWindow, params.force_max_window), + (lib.ZSTD_c_enableLongDistanceMatching, params.enable_ldm), + (lib.ZSTD_c_ldmHashLog, params.ldm_hash_log), + (lib.ZSTD_c_ldmMinMatch, params.ldm_min_match), + (lib.ZSTD_c_ldmBucketSizeLog, params.ldm_bucket_size_log), + (lib.ZSTD_c_ldmHashRateLog, params.ldm_hash_rate_log), + ] + + for param, value in attrs: + _set_compression_parameter(res, param, value) + + return res + +class ZstdCompressionParameters(object): + @staticmethod + def from_level(level, source_size=0, dict_size=0, **kwargs): + params = lib.ZSTD_getCParams(level, source_size, dict_size) + + args = { + 'window_log': 'windowLog', + 'chain_log': 'chainLog', + 'hash_log': 'hashLog', + 'search_log': 'searchLog', + 'min_match': 'minMatch', + 'target_length': 'targetLength', + 'compression_strategy': 'strategy', + } + + for arg, attr in args.items(): + if arg not in kwargs: + kwargs[arg] = getattr(params, attr) + + return ZstdCompressionParameters(**kwargs) + + def __init__(self, format=0, compression_level=0, window_log=0, hash_log=0, + chain_log=0, search_log=0, min_match=0, target_length=0, + strategy=-1, compression_strategy=-1, + write_content_size=1, write_checksum=0, + write_dict_id=0, job_size=0, overlap_log=-1, + overlap_size_log=-1, force_max_window=0, enable_ldm=0, + ldm_hash_log=0, ldm_min_match=0, ldm_bucket_size_log=0, + ldm_hash_rate_log=-1, ldm_hash_every_log=-1, threads=0): + + params = lib.ZSTD_createCCtxParams() + if params == ffi.NULL: + raise MemoryError() + + params = ffi.gc(params, lib.ZSTD_freeCCtxParams) + + self._params = params + + if threads < 0: + threads = _cpu_count() + + # We need to set ZSTD_c_nbWorkers before ZSTD_c_jobSize and ZSTD_c_overlapLog + # because setting ZSTD_c_nbWorkers resets the other parameters. + _set_compression_parameter(params, lib.ZSTD_c_nbWorkers, threads) + + _set_compression_parameter(params, lib.ZSTD_c_format, format) + _set_compression_parameter(params, lib.ZSTD_c_compressionLevel, compression_level) + _set_compression_parameter(params, lib.ZSTD_c_windowLog, window_log) + _set_compression_parameter(params, lib.ZSTD_c_hashLog, hash_log) + _set_compression_parameter(params, lib.ZSTD_c_chainLog, chain_log) + _set_compression_parameter(params, lib.ZSTD_c_searchLog, search_log) + _set_compression_parameter(params, lib.ZSTD_c_minMatch, min_match) + _set_compression_parameter(params, lib.ZSTD_c_targetLength, target_length) + + if strategy != -1 and compression_strategy != -1: + raise ValueError('cannot specify both compression_strategy and strategy') + + if compression_strategy != -1: + strategy = compression_strategy + elif strategy == -1: + strategy = 0 + + _set_compression_parameter(params, lib.ZSTD_c_strategy, strategy) + _set_compression_parameter(params, lib.ZSTD_c_contentSizeFlag, write_content_size) + _set_compression_parameter(params, lib.ZSTD_c_checksumFlag, write_checksum) + _set_compression_parameter(params, lib.ZSTD_c_dictIDFlag, write_dict_id) + _set_compression_parameter(params, lib.ZSTD_c_jobSize, job_size) + + if overlap_log != -1 and overlap_size_log != -1: + raise ValueError('cannot specify both overlap_log and overlap_size_log') + + if overlap_size_log != -1: + overlap_log = overlap_size_log + elif overlap_log == -1: + overlap_log = 0 + + _set_compression_parameter(params, lib.ZSTD_c_overlapLog, overlap_log) + _set_compression_parameter(params, lib.ZSTD_c_forceMaxWindow, force_max_window) + _set_compression_parameter(params, lib.ZSTD_c_enableLongDistanceMatching, enable_ldm) + _set_compression_parameter(params, lib.ZSTD_c_ldmHashLog, ldm_hash_log) + _set_compression_parameter(params, lib.ZSTD_c_ldmMinMatch, ldm_min_match) + _set_compression_parameter(params, lib.ZSTD_c_ldmBucketSizeLog, ldm_bucket_size_log) + + if ldm_hash_rate_log != -1 and ldm_hash_every_log != -1: + raise ValueError('cannot specify both ldm_hash_rate_log and ldm_hash_every_log') + + if ldm_hash_every_log != -1: + ldm_hash_rate_log = ldm_hash_every_log + elif ldm_hash_rate_log == -1: + ldm_hash_rate_log = 0 + + _set_compression_parameter(params, lib.ZSTD_c_ldmHashRateLog, ldm_hash_rate_log) + + @property + def format(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_format) + + @property + def compression_level(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_compressionLevel) + + @property + def window_log(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_windowLog) + + @property + def hash_log(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_hashLog) + + @property + def chain_log(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_chainLog) + + @property + def search_log(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_searchLog) + + @property + def min_match(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_minMatch) + + @property + def target_length(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_targetLength) + + @property + def compression_strategy(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_strategy) + + @property + def write_content_size(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_contentSizeFlag) + + @property + def write_checksum(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_checksumFlag) + + @property + def write_dict_id(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_dictIDFlag) + + @property + def job_size(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_jobSize) + + @property + def overlap_log(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_overlapLog) + + @property + def overlap_size_log(self): + return self.overlap_log + + @property + def force_max_window(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_forceMaxWindow) + + @property + def enable_ldm(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_enableLongDistanceMatching) + + @property + def ldm_hash_log(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_ldmHashLog) + + @property + def ldm_min_match(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_ldmMinMatch) + + @property + def ldm_bucket_size_log(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_ldmBucketSizeLog) + + @property + def ldm_hash_rate_log(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_ldmHashRateLog) + + @property + def ldm_hash_every_log(self): + return self.ldm_hash_rate_log + + @property + def threads(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_nbWorkers) + + def estimated_compression_context_size(self): + return lib.ZSTD_estimateCCtxSize_usingCCtxParams(self._params) + +CompressionParameters = ZstdCompressionParameters + +def estimate_decompression_context_size(): + return lib.ZSTD_estimateDCtxSize() + + +def _set_compression_parameter(params, param, value): + zresult = lib.ZSTD_CCtxParam_setParameter(params, param, value) + if lib.ZSTD_isError(zresult): + raise ZstdError('unable to set compression context parameter: %s' % + _zstd_error(zresult)) + + +def _get_compression_parameter(params, param): + result = ffi.new('int *') + + zresult = lib.ZSTD_CCtxParam_getParameter(params, param, result) + if lib.ZSTD_isError(zresult): + raise ZstdError('unable to get compression context parameter: %s' % + _zstd_error(zresult)) + + return result[0] + + +class ZstdCompressionWriter(object): + def __init__(self, compressor, writer, source_size, write_size, + write_return_read): + self._compressor = compressor + self._writer = writer + self._write_size = write_size + self._write_return_read = bool(write_return_read) + self._entered = False + self._closed = False + self._bytes_compressed = 0 + + self._dst_buffer = ffi.new('char[]', write_size) + self._out_buffer = ffi.new('ZSTD_outBuffer *') + self._out_buffer.dst = self._dst_buffer + self._out_buffer.size = len(self._dst_buffer) + self._out_buffer.pos = 0 + + zresult = lib.ZSTD_CCtx_setPledgedSrcSize(compressor._cctx, + source_size) + if lib.ZSTD_isError(zresult): + raise ZstdError('error setting source size: %s' % + _zstd_error(zresult)) + + def __enter__(self): + if self._closed: + raise ValueError('stream is closed') + + if self._entered: + raise ZstdError('cannot __enter__ multiple times') + + self._entered = True + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self._entered = False + + if not exc_type and not exc_value and not exc_tb: + self.close() + + self._compressor = None + + return False + + def memory_size(self): + return lib.ZSTD_sizeof_CCtx(self._compressor._cctx) + + def fileno(self): + f = getattr(self._writer, 'fileno', None) + if f: + return f() + else: + raise OSError('fileno not available on underlying writer') + + def close(self): + if self._closed: + return + + try: + self.flush(FLUSH_FRAME) + finally: + self._closed = True + + # Call close() on underlying stream as well. + f = getattr(self._writer, 'close', None) + if f: + f() + + @property + def closed(self): + return self._closed + + def isatty(self): + return False + + def readable(self): + return False + + def readline(self, size=-1): + raise io.UnsupportedOperation() + + def readlines(self, hint=-1): + raise io.UnsupportedOperation() + + def seek(self, offset, whence=None): + raise io.UnsupportedOperation() + + def seekable(self): + return False + + def truncate(self, size=None): + raise io.UnsupportedOperation() + + def writable(self): + return True + + def writelines(self, lines): + raise NotImplementedError('writelines() is not yet implemented') + + def read(self, size=-1): + raise io.UnsupportedOperation() + + def readall(self): + raise io.UnsupportedOperation() + + def readinto(self, b): + raise io.UnsupportedOperation() + + def write(self, data): + if self._closed: + raise ValueError('stream is closed') + + total_write = 0 + + data_buffer = ffi.from_buffer(data) + + in_buffer = ffi.new('ZSTD_inBuffer *') + in_buffer.src = data_buffer + in_buffer.size = len(data_buffer) + in_buffer.pos = 0 + + out_buffer = self._out_buffer + out_buffer.pos = 0 + + while in_buffer.pos < in_buffer.size: + zresult = lib.ZSTD_compressStream2(self._compressor._cctx, + out_buffer, in_buffer, + lib.ZSTD_e_continue) + if lib.ZSTD_isError(zresult): + raise ZstdError('zstd compress error: %s' % + _zstd_error(zresult)) + + if out_buffer.pos: + self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) + total_write += out_buffer.pos + self._bytes_compressed += out_buffer.pos + out_buffer.pos = 0 + + if self._write_return_read: + return in_buffer.pos + else: + return total_write + + def flush(self, flush_mode=FLUSH_BLOCK): + if flush_mode == FLUSH_BLOCK: + flush = lib.ZSTD_e_flush + elif flush_mode == FLUSH_FRAME: + flush = lib.ZSTD_e_end + else: + raise ValueError('unknown flush_mode: %r' % flush_mode) + + if self._closed: + raise ValueError('stream is closed') + + total_write = 0 + + out_buffer = self._out_buffer + out_buffer.pos = 0 + + in_buffer = ffi.new('ZSTD_inBuffer *') + in_buffer.src = ffi.NULL + in_buffer.size = 0 + in_buffer.pos = 0 + + while True: + zresult = lib.ZSTD_compressStream2(self._compressor._cctx, + out_buffer, in_buffer, + flush) + if lib.ZSTD_isError(zresult): + raise ZstdError('zstd compress error: %s' % + _zstd_error(zresult)) + + if out_buffer.pos: + self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) + total_write += out_buffer.pos + self._bytes_compressed += out_buffer.pos + out_buffer.pos = 0 + + if not zresult: + break + + return total_write + + def tell(self): + return self._bytes_compressed + + +class ZstdCompressionObj(object): + def compress(self, data): + if self._finished: + raise ZstdError('cannot call compress() after compressor finished') + + data_buffer = ffi.from_buffer(data) + source = ffi.new('ZSTD_inBuffer *') + source.src = data_buffer + source.size = len(data_buffer) + source.pos = 0 + + chunks = [] + + while source.pos < len(data): + zresult = lib.ZSTD_compressStream2(self._compressor._cctx, + self._out, + source, + lib.ZSTD_e_continue) + if lib.ZSTD_isError(zresult): + raise ZstdError('zstd compress error: %s' % + _zstd_error(zresult)) + + if self._out.pos: + chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:]) + self._out.pos = 0 + + return b''.join(chunks) + + def flush(self, flush_mode=COMPRESSOBJ_FLUSH_FINISH): + if flush_mode not in (COMPRESSOBJ_FLUSH_FINISH, COMPRESSOBJ_FLUSH_BLOCK): + raise ValueError('flush mode not recognized') + + if self._finished: + raise ZstdError('compressor object already finished') + + if flush_mode == COMPRESSOBJ_FLUSH_BLOCK: + z_flush_mode = lib.ZSTD_e_flush + elif flush_mode == COMPRESSOBJ_FLUSH_FINISH: + z_flush_mode = lib.ZSTD_e_end + self._finished = True + else: + raise ZstdError('unhandled flush mode') + + assert self._out.pos == 0 + + in_buffer = ffi.new('ZSTD_inBuffer *') + in_buffer.src = ffi.NULL + in_buffer.size = 0 + in_buffer.pos = 0 + + chunks = [] + + while True: + zresult = lib.ZSTD_compressStream2(self._compressor._cctx, + self._out, + in_buffer, + z_flush_mode) + if lib.ZSTD_isError(zresult): + raise ZstdError('error ending compression stream: %s' % + _zstd_error(zresult)) + + if self._out.pos: + chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:]) + self._out.pos = 0 + + if not zresult: + break + + return b''.join(chunks) + + +class ZstdCompressionChunker(object): + def __init__(self, compressor, chunk_size): + self._compressor = compressor + self._out = ffi.new('ZSTD_outBuffer *') + self._dst_buffer = ffi.new('char[]', chunk_size) + self._out.dst = self._dst_buffer + self._out.size = chunk_size + self._out.pos = 0 + + self._in = ffi.new('ZSTD_inBuffer *') + self._in.src = ffi.NULL + self._in.size = 0 + self._in.pos = 0 + self._finished = False + + def compress(self, data): + if self._finished: + raise ZstdError('cannot call compress() after compression finished') + + if self._in.src != ffi.NULL: + raise ZstdError('cannot perform operation before consuming output ' + 'from previous operation') + + data_buffer = ffi.from_buffer(data) + + if not len(data_buffer): + return + + self._in.src = data_buffer + self._in.size = len(data_buffer) + self._in.pos = 0 + + while self._in.pos < self._in.size: + zresult = lib.ZSTD_compressStream2(self._compressor._cctx, + self._out, + self._in, + lib.ZSTD_e_continue) + + if self._in.pos == self._in.size: + self._in.src = ffi.NULL + self._in.size = 0 + self._in.pos = 0 + + if lib.ZSTD_isError(zresult): + raise ZstdError('zstd compress error: %s' % + _zstd_error(zresult)) + + if self._out.pos == self._out.size: + yield ffi.buffer(self._out.dst, self._out.pos)[:] + self._out.pos = 0 + + def flush(self): + if self._finished: + raise ZstdError('cannot call flush() after compression finished') + + if self._in.src != ffi.NULL: + raise ZstdError('cannot call flush() before consuming output from ' + 'previous operation') + + while True: + zresult = lib.ZSTD_compressStream2(self._compressor._cctx, + self._out, self._in, + lib.ZSTD_e_flush) + if lib.ZSTD_isError(zresult): + raise ZstdError('zstd compress error: %s' % _zstd_error(zresult)) + + if self._out.pos: + yield ffi.buffer(self._out.dst, self._out.pos)[:] + self._out.pos = 0 + + if not zresult: + return + + def finish(self): + if self._finished: + raise ZstdError('cannot call finish() after compression finished') + + if self._in.src != ffi.NULL: + raise ZstdError('cannot call finish() before consuming output from ' + 'previous operation') + + while True: + zresult = lib.ZSTD_compressStream2(self._compressor._cctx, + self._out, self._in, + lib.ZSTD_e_end) + if lib.ZSTD_isError(zresult): + raise ZstdError('zstd compress error: %s' % _zstd_error(zresult)) + + if self._out.pos: + yield ffi.buffer(self._out.dst, self._out.pos)[:] + self._out.pos = 0 + + if not zresult: + self._finished = True + return + + +class ZstdCompressionReader(object): + def __init__(self, compressor, source, read_size): + self._compressor = compressor + self._source = source + self._read_size = read_size + self._entered = False + self._closed = False + self._bytes_compressed = 0 + self._finished_input = False + self._finished_output = False + + self._in_buffer = ffi.new('ZSTD_inBuffer *') + # Holds a ref so backing bytes in self._in_buffer stay alive. + self._source_buffer = None + + def __enter__(self): + if self._entered: + raise ValueError('cannot __enter__ multiple times') + + self._entered = True + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self._entered = False + self._closed = True + self._source = None + self._compressor = None + + return False + + def readable(self): + return True + + def writable(self): + return False + + def seekable(self): + return False + + def readline(self): + raise io.UnsupportedOperation() + + def readlines(self): + raise io.UnsupportedOperation() + + def write(self, data): + raise OSError('stream is not writable') + + def writelines(self, ignored): + raise OSError('stream is not writable') + + def isatty(self): + return False + + def flush(self): + return None + + def close(self): + self._closed = True + return None + + @property + def closed(self): + return self._closed + + def tell(self): + return self._bytes_compressed + + def readall(self): + chunks = [] + + while True: + chunk = self.read(1048576) + if not chunk: + break + + chunks.append(chunk) + + return b''.join(chunks) + + def __iter__(self): + raise io.UnsupportedOperation() + + def __next__(self): + raise io.UnsupportedOperation() + + next = __next__ + + def _read_input(self): + if self._finished_input: + return + + if hasattr(self._source, 'read'): + data = self._source.read(self._read_size) + + if not data: + self._finished_input = True + return + + self._source_buffer = ffi.from_buffer(data) + self._in_buffer.src = self._source_buffer + self._in_buffer.size = len(self._source_buffer) + self._in_buffer.pos = 0 + else: + self._source_buffer = ffi.from_buffer(self._source) + self._in_buffer.src = self._source_buffer + self._in_buffer.size = len(self._source_buffer) + self._in_buffer.pos = 0 + + def _compress_into_buffer(self, out_buffer): + if self._in_buffer.pos >= self._in_buffer.size: + return + + old_pos = out_buffer.pos + + zresult = lib.ZSTD_compressStream2(self._compressor._cctx, + out_buffer, self._in_buffer, + lib.ZSTD_e_continue) + + self._bytes_compressed += out_buffer.pos - old_pos + + if self._in_buffer.pos == self._in_buffer.size: + self._in_buffer.src = ffi.NULL + self._in_buffer.pos = 0 + self._in_buffer.size = 0 + self._source_buffer = None + + if not hasattr(self._source, 'read'): + self._finished_input = True + + if lib.ZSTD_isError(zresult): + raise ZstdError('zstd compress error: %s', + _zstd_error(zresult)) + + return out_buffer.pos and out_buffer.pos == out_buffer.size + + def read(self, size=-1): + if self._closed: + raise ValueError('stream is closed') + + if size < -1: + raise ValueError('cannot read negative amounts less than -1') + + if size == -1: + return self.readall() + + if self._finished_output or size == 0: + return b'' + + # Need a dedicated ref to dest buffer otherwise it gets collected. + dst_buffer = ffi.new('char[]', size) + out_buffer = ffi.new('ZSTD_outBuffer *') + out_buffer.dst = dst_buffer + out_buffer.size = size + out_buffer.pos = 0 + + if self._compress_into_buffer(out_buffer): + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + while not self._finished_input: + self._read_input() + + if self._compress_into_buffer(out_buffer): + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + # EOF + old_pos = out_buffer.pos + + zresult = lib.ZSTD_compressStream2(self._compressor._cctx, + out_buffer, self._in_buffer, + lib.ZSTD_e_end) + + self._bytes_compressed += out_buffer.pos - old_pos + + if lib.ZSTD_isError(zresult): + raise ZstdError('error ending compression stream: %s', + _zstd_error(zresult)) + + if zresult == 0: + self._finished_output = True + + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + def read1(self, size=-1): + if self._closed: + raise ValueError('stream is closed') + + if size < -1: + raise ValueError('cannot read negative amounts less than -1') + + if self._finished_output or size == 0: + return b'' + + # -1 returns arbitrary number of bytes. + if size == -1: + size = COMPRESSION_RECOMMENDED_OUTPUT_SIZE + + dst_buffer = ffi.new('char[]', size) + out_buffer = ffi.new('ZSTD_outBuffer *') + out_buffer.dst = dst_buffer + out_buffer.size = size + out_buffer.pos = 0 + + # read1() dictates that we can perform at most 1 call to the + # underlying stream to get input. However, we can't satisfy this + # restriction with compression because not all input generates output. + # It is possible to perform a block flush in order to ensure output. + # But this may not be desirable behavior. So we allow multiple read() + # to the underlying stream. But unlike read(), we stop once we have + # any output. + + self._compress_into_buffer(out_buffer) + if out_buffer.pos: + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + while not self._finished_input: + self._read_input() + + # If we've filled the output buffer, return immediately. + if self._compress_into_buffer(out_buffer): + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + # If we've populated the output buffer and we're not at EOF, + # also return, as we've satisfied the read1() limits. + if out_buffer.pos and not self._finished_input: + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + # Else if we're at EOS and we have room left in the buffer, + # fall through to below and try to add more data to the output. + + # EOF. + old_pos = out_buffer.pos + + zresult = lib.ZSTD_compressStream2(self._compressor._cctx, + out_buffer, self._in_buffer, + lib.ZSTD_e_end) + + self._bytes_compressed += out_buffer.pos - old_pos + + if lib.ZSTD_isError(zresult): + raise ZstdError('error ending compression stream: %s' % + _zstd_error(zresult)) + + if zresult == 0: + self._finished_output = True + + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + def readinto(self, b): + if self._closed: + raise ValueError('stream is closed') + + if self._finished_output: + return 0 + + # TODO use writable=True once we require CFFI >= 1.12. + dest_buffer = ffi.from_buffer(b) + ffi.memmove(b, b'', 0) + out_buffer = ffi.new('ZSTD_outBuffer *') + out_buffer.dst = dest_buffer + out_buffer.size = len(dest_buffer) + out_buffer.pos = 0 + + if self._compress_into_buffer(out_buffer): + return out_buffer.pos + + while not self._finished_input: + self._read_input() + if self._compress_into_buffer(out_buffer): + return out_buffer.pos + + # EOF. + old_pos = out_buffer.pos + zresult = lib.ZSTD_compressStream2(self._compressor._cctx, + out_buffer, self._in_buffer, + lib.ZSTD_e_end) + + self._bytes_compressed += out_buffer.pos - old_pos + + if lib.ZSTD_isError(zresult): + raise ZstdError('error ending compression stream: %s', + _zstd_error(zresult)) + + if zresult == 0: + self._finished_output = True + + return out_buffer.pos + + def readinto1(self, b): + if self._closed: + raise ValueError('stream is closed') + + if self._finished_output: + return 0 + + # TODO use writable=True once we require CFFI >= 1.12. + dest_buffer = ffi.from_buffer(b) + ffi.memmove(b, b'', 0) + + out_buffer = ffi.new('ZSTD_outBuffer *') + out_buffer.dst = dest_buffer + out_buffer.size = len(dest_buffer) + out_buffer.pos = 0 + + self._compress_into_buffer(out_buffer) + if out_buffer.pos: + return out_buffer.pos + + while not self._finished_input: + self._read_input() + + if self._compress_into_buffer(out_buffer): + return out_buffer.pos + + if out_buffer.pos and not self._finished_input: + return out_buffer.pos + + # EOF. + old_pos = out_buffer.pos + + zresult = lib.ZSTD_compressStream2(self._compressor._cctx, + out_buffer, self._in_buffer, + lib.ZSTD_e_end) + + self._bytes_compressed += out_buffer.pos - old_pos + + if lib.ZSTD_isError(zresult): + raise ZstdError('error ending compression stream: %s' % + _zstd_error(zresult)) + + if zresult == 0: + self._finished_output = True + + return out_buffer.pos + + +class ZstdCompressor(object): + def __init__(self, level=3, dict_data=None, compression_params=None, + write_checksum=None, write_content_size=None, + write_dict_id=None, threads=0): + if level > lib.ZSTD_maxCLevel(): + raise ValueError('level must be less than %d' % lib.ZSTD_maxCLevel()) + + if threads < 0: + threads = _cpu_count() + + if compression_params and write_checksum is not None: + raise ValueError('cannot define compression_params and ' + 'write_checksum') + + if compression_params and write_content_size is not None: + raise ValueError('cannot define compression_params and ' + 'write_content_size') + + if compression_params and write_dict_id is not None: + raise ValueError('cannot define compression_params and ' + 'write_dict_id') + + if compression_params and threads: + raise ValueError('cannot define compression_params and threads') + + if compression_params: + self._params = _make_cctx_params(compression_params) + else: + if write_dict_id is None: + write_dict_id = True + + params = lib.ZSTD_createCCtxParams() + if params == ffi.NULL: + raise MemoryError() + + self._params = ffi.gc(params, lib.ZSTD_freeCCtxParams) + + _set_compression_parameter(self._params, + lib.ZSTD_c_compressionLevel, + level) + + _set_compression_parameter( + self._params, + lib.ZSTD_c_contentSizeFlag, + write_content_size if write_content_size is not None else 1) + + _set_compression_parameter(self._params, + lib.ZSTD_c_checksumFlag, + 1 if write_checksum else 0) + + _set_compression_parameter(self._params, + lib.ZSTD_c_dictIDFlag, + 1 if write_dict_id else 0) + + if threads: + _set_compression_parameter(self._params, + lib.ZSTD_c_nbWorkers, + threads) + + cctx = lib.ZSTD_createCCtx() + if cctx == ffi.NULL: + raise MemoryError() + + self._cctx = cctx + self._dict_data = dict_data + + # We defer setting up garbage collection until after calling + # _setup_cctx() to ensure the memory size estimate is more accurate. + try: + self._setup_cctx() + finally: + self._cctx = ffi.gc(cctx, lib.ZSTD_freeCCtx, + size=lib.ZSTD_sizeof_CCtx(cctx)) + + def _setup_cctx(self): + zresult = lib.ZSTD_CCtx_setParametersUsingCCtxParams(self._cctx, + self._params) + if lib.ZSTD_isError(zresult): + raise ZstdError('could not set compression parameters: %s' % + _zstd_error(zresult)) + + dict_data = self._dict_data + + if dict_data: + if dict_data._cdict: + zresult = lib.ZSTD_CCtx_refCDict(self._cctx, dict_data._cdict) + else: + zresult = lib.ZSTD_CCtx_loadDictionary_advanced( + self._cctx, dict_data.as_bytes(), len(dict_data), + lib.ZSTD_dlm_byRef, dict_data._dict_type) + + if lib.ZSTD_isError(zresult): + raise ZstdError('could not load compression dictionary: %s' % + _zstd_error(zresult)) + + def memory_size(self): + return lib.ZSTD_sizeof_CCtx(self._cctx) + + def compress(self, data): + lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) + + data_buffer = ffi.from_buffer(data) + + dest_size = lib.ZSTD_compressBound(len(data_buffer)) + out = new_nonzero('char[]', dest_size) + + zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, len(data_buffer)) + if lib.ZSTD_isError(zresult): + raise ZstdError('error setting source size: %s' % + _zstd_error(zresult)) + + out_buffer = ffi.new('ZSTD_outBuffer *') + in_buffer = ffi.new('ZSTD_inBuffer *') + + out_buffer.dst = out + out_buffer.size = dest_size + out_buffer.pos = 0 + + in_buffer.src = data_buffer + in_buffer.size = len(data_buffer) + in_buffer.pos = 0 + + zresult = lib.ZSTD_compressStream2(self._cctx, + out_buffer, + in_buffer, + lib.ZSTD_e_end) + + if lib.ZSTD_isError(zresult): + raise ZstdError('cannot compress: %s' % + _zstd_error(zresult)) + elif zresult: + raise ZstdError('unexpected partial frame flush') + + return ffi.buffer(out, out_buffer.pos)[:] + + def compressobj(self, size=-1): + lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) + + if size < 0: + size = lib.ZSTD_CONTENTSIZE_UNKNOWN + + zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) + if lib.ZSTD_isError(zresult): + raise ZstdError('error setting source size: %s' % + _zstd_error(zresult)) + + cobj = ZstdCompressionObj() + cobj._out = ffi.new('ZSTD_outBuffer *') + cobj._dst_buffer = ffi.new('char[]', COMPRESSION_RECOMMENDED_OUTPUT_SIZE) + cobj._out.dst = cobj._dst_buffer + cobj._out.size = COMPRESSION_RECOMMENDED_OUTPUT_SIZE + cobj._out.pos = 0 + cobj._compressor = self + cobj._finished = False + + return cobj + + def chunker(self, size=-1, chunk_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE): + lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) + + if size < 0: + size = lib.ZSTD_CONTENTSIZE_UNKNOWN + + zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) + if lib.ZSTD_isError(zresult): + raise ZstdError('error setting source size: %s' % + _zstd_error(zresult)) + + return ZstdCompressionChunker(self, chunk_size=chunk_size) + + def copy_stream(self, ifh, ofh, size=-1, + read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE, + write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE): + + if not hasattr(ifh, 'read'): + raise ValueError('first argument must have a read() method') + if not hasattr(ofh, 'write'): + raise ValueError('second argument must have a write() method') + + lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) + + if size < 0: + size = lib.ZSTD_CONTENTSIZE_UNKNOWN + + zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) + if lib.ZSTD_isError(zresult): + raise ZstdError('error setting source size: %s' % + _zstd_error(zresult)) + + in_buffer = ffi.new('ZSTD_inBuffer *') + out_buffer = ffi.new('ZSTD_outBuffer *') + + dst_buffer = ffi.new('char[]', write_size) + out_buffer.dst = dst_buffer + out_buffer.size = write_size + out_buffer.pos = 0 + + total_read, total_write = 0, 0 + + while True: + data = ifh.read(read_size) + if not data: + break + + data_buffer = ffi.from_buffer(data) + total_read += len(data_buffer) + in_buffer.src = data_buffer + in_buffer.size = len(data_buffer) + in_buffer.pos = 0 + + while in_buffer.pos < in_buffer.size: + zresult = lib.ZSTD_compressStream2(self._cctx, + out_buffer, + in_buffer, + lib.ZSTD_e_continue) + if lib.ZSTD_isError(zresult): + raise ZstdError('zstd compress error: %s' % + _zstd_error(zresult)) + + if out_buffer.pos: + ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) + total_write += out_buffer.pos + out_buffer.pos = 0 + + # We've finished reading. Flush the compressor. + while True: + zresult = lib.ZSTD_compressStream2(self._cctx, + out_buffer, + in_buffer, + lib.ZSTD_e_end) + if lib.ZSTD_isError(zresult): + raise ZstdError('error ending compression stream: %s' % + _zstd_error(zresult)) + + if out_buffer.pos: + ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) + total_write += out_buffer.pos + out_buffer.pos = 0 + + if zresult == 0: + break + + return total_read, total_write + + def stream_reader(self, source, size=-1, + read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE): + lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) + + try: + size = len(source) + except Exception: + pass + + if size < 0: + size = lib.ZSTD_CONTENTSIZE_UNKNOWN + + zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) + if lib.ZSTD_isError(zresult): + raise ZstdError('error setting source size: %s' % + _zstd_error(zresult)) + + return ZstdCompressionReader(self, source, read_size) + + def stream_writer(self, writer, size=-1, + write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE, + write_return_read=False): + + if not hasattr(writer, 'write'): + raise ValueError('must pass an object with a write() method') + + lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) + + if size < 0: + size = lib.ZSTD_CONTENTSIZE_UNKNOWN + + return ZstdCompressionWriter(self, writer, size, write_size, + write_return_read) + + write_to = stream_writer + + def read_to_iter(self, reader, size=-1, + read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE, + write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE): + if hasattr(reader, 'read'): + have_read = True + elif hasattr(reader, '__getitem__'): + have_read = False + buffer_offset = 0 + size = len(reader) + else: + raise ValueError('must pass an object with a read() method or ' + 'conforms to buffer protocol') + + lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) + + if size < 0: + size = lib.ZSTD_CONTENTSIZE_UNKNOWN + + zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) + if lib.ZSTD_isError(zresult): + raise ZstdError('error setting source size: %s' % + _zstd_error(zresult)) + + in_buffer = ffi.new('ZSTD_inBuffer *') + out_buffer = ffi.new('ZSTD_outBuffer *') + + in_buffer.src = ffi.NULL + in_buffer.size = 0 + in_buffer.pos = 0 + + dst_buffer = ffi.new('char[]', write_size) + out_buffer.dst = dst_buffer + out_buffer.size = write_size + out_buffer.pos = 0 + + while True: + # We should never have output data sitting around after a previous + # iteration. + assert out_buffer.pos == 0 + + # Collect input data. + if have_read: + read_result = reader.read(read_size) + else: + remaining = len(reader) - buffer_offset + slice_size = min(remaining, read_size) + read_result = reader[buffer_offset:buffer_offset + slice_size] + buffer_offset += slice_size + + # No new input data. Break out of the read loop. + if not read_result: + break + + # Feed all read data into the compressor and emit output until + # exhausted. + read_buffer = ffi.from_buffer(read_result) + in_buffer.src = read_buffer + in_buffer.size = len(read_buffer) + in_buffer.pos = 0 + + while in_buffer.pos < in_buffer.size: + zresult = lib.ZSTD_compressStream2(self._cctx, out_buffer, in_buffer, + lib.ZSTD_e_continue) + if lib.ZSTD_isError(zresult): + raise ZstdError('zstd compress error: %s' % + _zstd_error(zresult)) + + if out_buffer.pos: + data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + out_buffer.pos = 0 + yield data + + assert out_buffer.pos == 0 + + # And repeat the loop to collect more data. + continue + + # If we get here, input is exhausted. End the stream and emit what + # remains. + while True: + assert out_buffer.pos == 0 + zresult = lib.ZSTD_compressStream2(self._cctx, + out_buffer, + in_buffer, + lib.ZSTD_e_end) + if lib.ZSTD_isError(zresult): + raise ZstdError('error ending compression stream: %s' % + _zstd_error(zresult)) + + if out_buffer.pos: + data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + out_buffer.pos = 0 + yield data + + if zresult == 0: + break + + read_from = read_to_iter + + def frame_progression(self): + progression = lib.ZSTD_getFrameProgression(self._cctx) + + return progression.ingested, progression.consumed, progression.produced + + +class FrameParameters(object): + def __init__(self, fparams): + self.content_size = fparams.frameContentSize + self.window_size = fparams.windowSize + self.dict_id = fparams.dictID + self.has_checksum = bool(fparams.checksumFlag) + + +def frame_content_size(data): + data_buffer = ffi.from_buffer(data) + + size = lib.ZSTD_getFrameContentSize(data_buffer, len(data_buffer)) + + if size == lib.ZSTD_CONTENTSIZE_ERROR: + raise ZstdError('error when determining content size') + elif size == lib.ZSTD_CONTENTSIZE_UNKNOWN: + return -1 + else: + return size + + +def frame_header_size(data): + data_buffer = ffi.from_buffer(data) + + zresult = lib.ZSTD_frameHeaderSize(data_buffer, len(data_buffer)) + if lib.ZSTD_isError(zresult): + raise ZstdError('could not determine frame header size: %s' % + _zstd_error(zresult)) + + return zresult + + +def get_frame_parameters(data): + params = ffi.new('ZSTD_frameHeader *') + + data_buffer = ffi.from_buffer(data) + zresult = lib.ZSTD_getFrameHeader(params, data_buffer, len(data_buffer)) + if lib.ZSTD_isError(zresult): + raise ZstdError('cannot get frame parameters: %s' % + _zstd_error(zresult)) + + if zresult: + raise ZstdError('not enough data for frame parameters; need %d bytes' % + zresult) + + return FrameParameters(params[0]) + + +class ZstdCompressionDict(object): + def __init__(self, data, dict_type=DICT_TYPE_AUTO, k=0, d=0): + assert isinstance(data, bytes_type) + self._data = data + self.k = k + self.d = d + + if dict_type not in (DICT_TYPE_AUTO, DICT_TYPE_RAWCONTENT, + DICT_TYPE_FULLDICT): + raise ValueError('invalid dictionary load mode: %d; must use ' + 'DICT_TYPE_* constants') + + self._dict_type = dict_type + self._cdict = None + + def __len__(self): + return len(self._data) + + def dict_id(self): + return int_type(lib.ZDICT_getDictID(self._data, len(self._data))) + + def as_bytes(self): + return self._data + + def precompute_compress(self, level=0, compression_params=None): + if level and compression_params: + raise ValueError('must only specify one of level or ' + 'compression_params') + + if not level and not compression_params: + raise ValueError('must specify one of level or compression_params') + + if level: + cparams = lib.ZSTD_getCParams(level, 0, len(self._data)) + else: + cparams = ffi.new('ZSTD_compressionParameters') + cparams.chainLog = compression_params.chain_log + cparams.hashLog = compression_params.hash_log + cparams.minMatch = compression_params.min_match + cparams.searchLog = compression_params.search_log + cparams.strategy = compression_params.compression_strategy + cparams.targetLength = compression_params.target_length + cparams.windowLog = compression_params.window_log + + cdict = lib.ZSTD_createCDict_advanced(self._data, len(self._data), + lib.ZSTD_dlm_byRef, + self._dict_type, + cparams, + lib.ZSTD_defaultCMem) + if cdict == ffi.NULL: + raise ZstdError('unable to precompute dictionary') + + self._cdict = ffi.gc(cdict, lib.ZSTD_freeCDict, + size=lib.ZSTD_sizeof_CDict(cdict)) + + @property + def _ddict(self): + ddict = lib.ZSTD_createDDict_advanced(self._data, len(self._data), + lib.ZSTD_dlm_byRef, + self._dict_type, + lib.ZSTD_defaultCMem) + + if ddict == ffi.NULL: + raise ZstdError('could not create decompression dict') + + ddict = ffi.gc(ddict, lib.ZSTD_freeDDict, + size=lib.ZSTD_sizeof_DDict(ddict)) + self.__dict__['_ddict'] = ddict + + return ddict + +def train_dictionary(dict_size, samples, k=0, d=0, notifications=0, dict_id=0, + level=0, steps=0, threads=0): + if not isinstance(samples, list): + raise TypeError('samples must be a list') + + if threads < 0: + threads = _cpu_count() + + total_size = sum(map(len, samples)) + + samples_buffer = new_nonzero('char[]', total_size) + sample_sizes = new_nonzero('size_t[]', len(samples)) + + offset = 0 + for i, sample in enumerate(samples): + if not isinstance(sample, bytes_type): + raise ValueError('samples must be bytes') + + l = len(sample) + ffi.memmove(samples_buffer + offset, sample, l) + offset += l + sample_sizes[i] = l + + dict_data = new_nonzero('char[]', dict_size) + + dparams = ffi.new('ZDICT_cover_params_t *')[0] + dparams.k = k + dparams.d = d + dparams.steps = steps + dparams.nbThreads = threads + dparams.zParams.notificationLevel = notifications + dparams.zParams.dictID = dict_id + dparams.zParams.compressionLevel = level + + if (not dparams.k and not dparams.d and not dparams.steps + and not dparams.nbThreads and not dparams.zParams.notificationLevel + and not dparams.zParams.dictID + and not dparams.zParams.compressionLevel): + zresult = lib.ZDICT_trainFromBuffer( + ffi.addressof(dict_data), dict_size, + ffi.addressof(samples_buffer), + ffi.addressof(sample_sizes, 0), len(samples)) + elif dparams.steps or dparams.nbThreads: + zresult = lib.ZDICT_optimizeTrainFromBuffer_cover( + ffi.addressof(dict_data), dict_size, + ffi.addressof(samples_buffer), + ffi.addressof(sample_sizes, 0), len(samples), + ffi.addressof(dparams)) + else: + zresult = lib.ZDICT_trainFromBuffer_cover( + ffi.addressof(dict_data), dict_size, + ffi.addressof(samples_buffer), + ffi.addressof(sample_sizes, 0), len(samples), + dparams) + + if lib.ZDICT_isError(zresult): + msg = ffi.string(lib.ZDICT_getErrorName(zresult)).decode('utf-8') + raise ZstdError('cannot train dict: %s' % msg) + + return ZstdCompressionDict(ffi.buffer(dict_data, zresult)[:], + dict_type=DICT_TYPE_FULLDICT, + k=dparams.k, d=dparams.d) + + +class ZstdDecompressionObj(object): + def __init__(self, decompressor, write_size): + self._decompressor = decompressor + self._write_size = write_size + self._finished = False + + def decompress(self, data): + if self._finished: + raise ZstdError('cannot use a decompressobj multiple times') + + in_buffer = ffi.new('ZSTD_inBuffer *') + out_buffer = ffi.new('ZSTD_outBuffer *') + + data_buffer = ffi.from_buffer(data) + + if len(data_buffer) == 0: + return b'' + + in_buffer.src = data_buffer + in_buffer.size = len(data_buffer) + in_buffer.pos = 0 + + dst_buffer = ffi.new('char[]', self._write_size) + out_buffer.dst = dst_buffer + out_buffer.size = len(dst_buffer) + out_buffer.pos = 0 + + chunks = [] + + while True: + zresult = lib.ZSTD_decompressStream(self._decompressor._dctx, + out_buffer, in_buffer) + if lib.ZSTD_isError(zresult): + raise ZstdError('zstd decompressor error: %s' % + _zstd_error(zresult)) + + if zresult == 0: + self._finished = True + self._decompressor = None + + if out_buffer.pos: + chunks.append(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) + + if (zresult == 0 or + (in_buffer.pos == in_buffer.size and out_buffer.pos == 0)): + break + + out_buffer.pos = 0 + + return b''.join(chunks) + + def flush(self, length=0): + pass + + +class ZstdDecompressionReader(object): + def __init__(self, decompressor, source, read_size, read_across_frames): + self._decompressor = decompressor + self._source = source + self._read_size = read_size + self._read_across_frames = bool(read_across_frames) + self._entered = False + self._closed = False + self._bytes_decompressed = 0 + self._finished_input = False + self._finished_output = False + self._in_buffer = ffi.new('ZSTD_inBuffer *') + # Holds a ref to self._in_buffer.src. + self._source_buffer = None + + def __enter__(self): + if self._entered: + raise ValueError('cannot __enter__ multiple times') + + self._entered = True + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self._entered = False + self._closed = True + self._source = None + self._decompressor = None + + return False + + def readable(self): + return True + + def writable(self): + return False + + def seekable(self): + return True + + def readline(self): + raise io.UnsupportedOperation() + + def readlines(self): + raise io.UnsupportedOperation() + + def write(self, data): + raise io.UnsupportedOperation() + + def writelines(self, lines): + raise io.UnsupportedOperation() + + def isatty(self): + return False + + def flush(self): + return None + + def close(self): + self._closed = True + return None + + @property + def closed(self): + return self._closed + + def tell(self): + return self._bytes_decompressed + + def readall(self): + chunks = [] + + while True: + chunk = self.read(1048576) + if not chunk: + break + + chunks.append(chunk) + + return b''.join(chunks) + + def __iter__(self): + raise io.UnsupportedOperation() + + def __next__(self): + raise io.UnsupportedOperation() + + next = __next__ + + def _read_input(self): + # We have data left over in the input buffer. Use it. + if self._in_buffer.pos < self._in_buffer.size: + return + + # All input data exhausted. Nothing to do. + if self._finished_input: + return + + # Else populate the input buffer from our source. + if hasattr(self._source, 'read'): + data = self._source.read(self._read_size) + + if not data: + self._finished_input = True + return + + self._source_buffer = ffi.from_buffer(data) + self._in_buffer.src = self._source_buffer + self._in_buffer.size = len(self._source_buffer) + self._in_buffer.pos = 0 + else: + self._source_buffer = ffi.from_buffer(self._source) + self._in_buffer.src = self._source_buffer + self._in_buffer.size = len(self._source_buffer) + self._in_buffer.pos = 0 + + def _decompress_into_buffer(self, out_buffer): + """Decompress available input into an output buffer. + + Returns True if data in output buffer should be emitted. + """ + zresult = lib.ZSTD_decompressStream(self._decompressor._dctx, + out_buffer, self._in_buffer) + + if self._in_buffer.pos == self._in_buffer.size: + self._in_buffer.src = ffi.NULL + self._in_buffer.pos = 0 + self._in_buffer.size = 0 + self._source_buffer = None + + if not hasattr(self._source, 'read'): + self._finished_input = True + + if lib.ZSTD_isError(zresult): + raise ZstdError('zstd decompress error: %s' % + _zstd_error(zresult)) + + # Emit data if there is data AND either: + # a) output buffer is full (read amount is satisfied) + # b) we're at end of a frame and not in frame spanning mode + return (out_buffer.pos and + (out_buffer.pos == out_buffer.size or + zresult == 0 and not self._read_across_frames)) + + def read(self, size=-1): + if self._closed: + raise ValueError('stream is closed') + + if size < -1: + raise ValueError('cannot read negative amounts less than -1') + + if size == -1: + # This is recursive. But it gets the job done. + return self.readall() + + if self._finished_output or size == 0: + return b'' + + # We /could/ call into readinto() here. But that introduces more + # overhead. + dst_buffer = ffi.new('char[]', size) + out_buffer = ffi.new('ZSTD_outBuffer *') + out_buffer.dst = dst_buffer + out_buffer.size = size + out_buffer.pos = 0 + + self._read_input() + if self._decompress_into_buffer(out_buffer): + self._bytes_decompressed += out_buffer.pos + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + while not self._finished_input: + self._read_input() + if self._decompress_into_buffer(out_buffer): + self._bytes_decompressed += out_buffer.pos + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + self._bytes_decompressed += out_buffer.pos + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + def readinto(self, b): + if self._closed: + raise ValueError('stream is closed') + + if self._finished_output: + return 0 + + # TODO use writable=True once we require CFFI >= 1.12. + dest_buffer = ffi.from_buffer(b) + ffi.memmove(b, b'', 0) + out_buffer = ffi.new('ZSTD_outBuffer *') + out_buffer.dst = dest_buffer + out_buffer.size = len(dest_buffer) + out_buffer.pos = 0 + + self._read_input() + if self._decompress_into_buffer(out_buffer): + self._bytes_decompressed += out_buffer.pos + return out_buffer.pos + + while not self._finished_input: + self._read_input() + if self._decompress_into_buffer(out_buffer): + self._bytes_decompressed += out_buffer.pos + return out_buffer.pos + + self._bytes_decompressed += out_buffer.pos + return out_buffer.pos + + def read1(self, size=-1): + if self._closed: + raise ValueError('stream is closed') + + if size < -1: + raise ValueError('cannot read negative amounts less than -1') + + if self._finished_output or size == 0: + return b'' + + # -1 returns arbitrary number of bytes. + if size == -1: + size = DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE + + dst_buffer = ffi.new('char[]', size) + out_buffer = ffi.new('ZSTD_outBuffer *') + out_buffer.dst = dst_buffer + out_buffer.size = size + out_buffer.pos = 0 + + # read1() dictates that we can perform at most 1 call to underlying + # stream to get input. However, we can't satisfy this restriction with + # decompression because not all input generates output. So we allow + # multiple read(). But unlike read(), we stop once we have any output. + while not self._finished_input: + self._read_input() + self._decompress_into_buffer(out_buffer) + + if out_buffer.pos: + break + + self._bytes_decompressed += out_buffer.pos + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + def readinto1(self, b): + if self._closed: + raise ValueError('stream is closed') + + if self._finished_output: + return 0 + + # TODO use writable=True once we require CFFI >= 1.12. + dest_buffer = ffi.from_buffer(b) + ffi.memmove(b, b'', 0) + + out_buffer = ffi.new('ZSTD_outBuffer *') + out_buffer.dst = dest_buffer + out_buffer.size = len(dest_buffer) + out_buffer.pos = 0 + + while not self._finished_input and not self._finished_output: + self._read_input() + self._decompress_into_buffer(out_buffer) + + if out_buffer.pos: + break + + self._bytes_decompressed += out_buffer.pos + return out_buffer.pos + + def seek(self, pos, whence=os.SEEK_SET): + if self._closed: + raise ValueError('stream is closed') + + read_amount = 0 + + if whence == os.SEEK_SET: + if pos < 0: + raise ValueError('cannot seek to negative position with SEEK_SET') + + if pos < self._bytes_decompressed: + raise ValueError('cannot seek zstd decompression stream ' + 'backwards') + + read_amount = pos - self._bytes_decompressed + + elif whence == os.SEEK_CUR: + if pos < 0: + raise ValueError('cannot seek zstd decompression stream ' + 'backwards') + + read_amount = pos + elif whence == os.SEEK_END: + raise ValueError('zstd decompression streams cannot be seeked ' + 'with SEEK_END') + + while read_amount: + result = self.read(min(read_amount, + DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE)) + + if not result: + break + + read_amount -= len(result) + + return self._bytes_decompressed + +class ZstdDecompressionWriter(object): + def __init__(self, decompressor, writer, write_size, write_return_read): + decompressor._ensure_dctx() + + self._decompressor = decompressor + self._writer = writer + self._write_size = write_size + self._write_return_read = bool(write_return_read) + self._entered = False + self._closed = False + + def __enter__(self): + if self._closed: + raise ValueError('stream is closed') + + if self._entered: + raise ZstdError('cannot __enter__ multiple times') + + self._entered = True + + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self._entered = False + self.close() + + def memory_size(self): + return lib.ZSTD_sizeof_DCtx(self._decompressor._dctx) + + def close(self): + if self._closed: + return + + try: + self.flush() + finally: + self._closed = True + + f = getattr(self._writer, 'close', None) + if f: + f() + + @property + def closed(self): + return self._closed + + def fileno(self): + f = getattr(self._writer, 'fileno', None) + if f: + return f() + else: + raise OSError('fileno not available on underlying writer') + + def flush(self): + if self._closed: + raise ValueError('stream is closed') + + f = getattr(self._writer, 'flush', None) + if f: + return f() + + def isatty(self): + return False + + def readable(self): + return False + + def readline(self, size=-1): + raise io.UnsupportedOperation() + + def readlines(self, hint=-1): + raise io.UnsupportedOperation() + + def seek(self, offset, whence=None): + raise io.UnsupportedOperation() + + def seekable(self): + return False + + def tell(self): + raise io.UnsupportedOperation() + + def truncate(self, size=None): + raise io.UnsupportedOperation() + + def writable(self): + return True + + def writelines(self, lines): + raise io.UnsupportedOperation() + + def read(self, size=-1): + raise io.UnsupportedOperation() + + def readall(self): + raise io.UnsupportedOperation() + + def readinto(self, b): + raise io.UnsupportedOperation() + + def write(self, data): + if self._closed: + raise ValueError('stream is closed') + + total_write = 0 + + in_buffer = ffi.new('ZSTD_inBuffer *') + out_buffer = ffi.new('ZSTD_outBuffer *') + + data_buffer = ffi.from_buffer(data) + in_buffer.src = data_buffer + in_buffer.size = len(data_buffer) + in_buffer.pos = 0 + + dst_buffer = ffi.new('char[]', self._write_size) + out_buffer.dst = dst_buffer + out_buffer.size = len(dst_buffer) + out_buffer.pos = 0 + + dctx = self._decompressor._dctx + + while in_buffer.pos < in_buffer.size: + zresult = lib.ZSTD_decompressStream(dctx, out_buffer, in_buffer) + if lib.ZSTD_isError(zresult): + raise ZstdError('zstd decompress error: %s' % + _zstd_error(zresult)) + + if out_buffer.pos: + self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) + total_write += out_buffer.pos + out_buffer.pos = 0 + + if self._write_return_read: + return in_buffer.pos + else: + return total_write + + +class ZstdDecompressor(object): + def __init__(self, dict_data=None, max_window_size=0, format=FORMAT_ZSTD1): + self._dict_data = dict_data + self._max_window_size = max_window_size + self._format = format + + dctx = lib.ZSTD_createDCtx() + if dctx == ffi.NULL: + raise MemoryError() + + self._dctx = dctx + + # Defer setting up garbage collection until full state is loaded so + # the memory size is more accurate. + try: + self._ensure_dctx() + finally: + self._dctx = ffi.gc(dctx, lib.ZSTD_freeDCtx, + size=lib.ZSTD_sizeof_DCtx(dctx)) + + def memory_size(self): + return lib.ZSTD_sizeof_DCtx(self._dctx) + + def decompress(self, data, max_output_size=0): + self._ensure_dctx() + + data_buffer = ffi.from_buffer(data) + + output_size = lib.ZSTD_getFrameContentSize(data_buffer, len(data_buffer)) + + if output_size == lib.ZSTD_CONTENTSIZE_ERROR: + raise ZstdError('error determining content size from frame header') + elif output_size == 0: + return b'' + elif output_size == lib.ZSTD_CONTENTSIZE_UNKNOWN: + if not max_output_size: + raise ZstdError('could not determine content size in frame header') + + result_buffer = ffi.new('char[]', max_output_size) + result_size = max_output_size + output_size = 0 + else: + result_buffer = ffi.new('char[]', output_size) + result_size = output_size + + out_buffer = ffi.new('ZSTD_outBuffer *') + out_buffer.dst = result_buffer + out_buffer.size = result_size + out_buffer.pos = 0 + + in_buffer = ffi.new('ZSTD_inBuffer *') + in_buffer.src = data_buffer + in_buffer.size = len(data_buffer) + in_buffer.pos = 0 + + zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer) + if lib.ZSTD_isError(zresult): + raise ZstdError('decompression error: %s' % + _zstd_error(zresult)) + elif zresult: + raise ZstdError('decompression error: did not decompress full frame') + elif output_size and out_buffer.pos != output_size: + raise ZstdError('decompression error: decompressed %d bytes; expected %d' % + (zresult, output_size)) + + return ffi.buffer(result_buffer, out_buffer.pos)[:] + + def stream_reader(self, source, read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, + read_across_frames=False): + self._ensure_dctx() + return ZstdDecompressionReader(self, source, read_size, read_across_frames) + + def decompressobj(self, write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE): + if write_size < 1: + raise ValueError('write_size must be positive') + + self._ensure_dctx() + return ZstdDecompressionObj(self, write_size=write_size) + + def read_to_iter(self, reader, read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, + write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE, + skip_bytes=0): + if skip_bytes >= read_size: + raise ValueError('skip_bytes must be smaller than read_size') + + if hasattr(reader, 'read'): + have_read = True + elif hasattr(reader, '__getitem__'): + have_read = False + buffer_offset = 0 + size = len(reader) + else: + raise ValueError('must pass an object with a read() method or ' + 'conforms to buffer protocol') + + if skip_bytes: + if have_read: + reader.read(skip_bytes) + else: + if skip_bytes > size: + raise ValueError('skip_bytes larger than first input chunk') + + buffer_offset = skip_bytes + + self._ensure_dctx() + + in_buffer = ffi.new('ZSTD_inBuffer *') + out_buffer = ffi.new('ZSTD_outBuffer *') + + dst_buffer = ffi.new('char[]', write_size) + out_buffer.dst = dst_buffer + out_buffer.size = len(dst_buffer) + out_buffer.pos = 0 + + while True: + assert out_buffer.pos == 0 + + if have_read: + read_result = reader.read(read_size) + else: + remaining = size - buffer_offset + slice_size = min(remaining, read_size) + read_result = reader[buffer_offset:buffer_offset + slice_size] + buffer_offset += slice_size + + # No new input. Break out of read loop. + if not read_result: + break + + # Feed all read data into decompressor and emit output until + # exhausted. + read_buffer = ffi.from_buffer(read_result) + in_buffer.src = read_buffer + in_buffer.size = len(read_buffer) + in_buffer.pos = 0 + + while in_buffer.pos < in_buffer.size: + assert out_buffer.pos == 0 + + zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer) + if lib.ZSTD_isError(zresult): + raise ZstdError('zstd decompress error: %s' % + _zstd_error(zresult)) + + if out_buffer.pos: + data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + out_buffer.pos = 0 + yield data + + if zresult == 0: + return + + # Repeat loop to collect more input data. + continue + + # If we get here, input is exhausted. + + read_from = read_to_iter + + def stream_writer(self, writer, write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE, + write_return_read=False): + if not hasattr(writer, 'write'): + raise ValueError('must pass an object with a write() method') + + return ZstdDecompressionWriter(self, writer, write_size, + write_return_read) + + write_to = stream_writer + + def copy_stream(self, ifh, ofh, + read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, + write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE): + if not hasattr(ifh, 'read'): + raise ValueError('first argument must have a read() method') + if not hasattr(ofh, 'write'): + raise ValueError('second argument must have a write() method') + + self._ensure_dctx() + + in_buffer = ffi.new('ZSTD_inBuffer *') + out_buffer = ffi.new('ZSTD_outBuffer *') + + dst_buffer = ffi.new('char[]', write_size) + out_buffer.dst = dst_buffer + out_buffer.size = write_size + out_buffer.pos = 0 + + total_read, total_write = 0, 0 + + # Read all available input. + while True: + data = ifh.read(read_size) + if not data: + break + + data_buffer = ffi.from_buffer(data) + total_read += len(data_buffer) + in_buffer.src = data_buffer + in_buffer.size = len(data_buffer) + in_buffer.pos = 0 + + # Flush all read data to output. + while in_buffer.pos < in_buffer.size: + zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer) + if lib.ZSTD_isError(zresult): + raise ZstdError('zstd decompressor error: %s' % + _zstd_error(zresult)) + + if out_buffer.pos: + ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) + total_write += out_buffer.pos + out_buffer.pos = 0 + + # Continue loop to keep reading. + + return total_read, total_write + + def decompress_content_dict_chain(self, frames): + if not isinstance(frames, list): + raise TypeError('argument must be a list') + + if not frames: + raise ValueError('empty input chain') + + # First chunk should not be using a dictionary. We handle it specially. + chunk = frames[0] + if not isinstance(chunk, bytes_type): + raise ValueError('chunk 0 must be bytes') + + # All chunks should be zstd frames and should have content size set. + chunk_buffer = ffi.from_buffer(chunk) + params = ffi.new('ZSTD_frameHeader *') + zresult = lib.ZSTD_getFrameHeader(params, chunk_buffer, len(chunk_buffer)) + if lib.ZSTD_isError(zresult): + raise ValueError('chunk 0 is not a valid zstd frame') + elif zresult: + raise ValueError('chunk 0 is too small to contain a zstd frame') + + if params.frameContentSize == lib.ZSTD_CONTENTSIZE_UNKNOWN: + raise ValueError('chunk 0 missing content size in frame') + + self._ensure_dctx(load_dict=False) + + last_buffer = ffi.new('char[]', params.frameContentSize) + + out_buffer = ffi.new('ZSTD_outBuffer *') + out_buffer.dst = last_buffer + out_buffer.size = len(last_buffer) + out_buffer.pos = 0 + + in_buffer = ffi.new('ZSTD_inBuffer *') + in_buffer.src = chunk_buffer + in_buffer.size = len(chunk_buffer) + in_buffer.pos = 0 + + zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer) + if lib.ZSTD_isError(zresult): + raise ZstdError('could not decompress chunk 0: %s' % + _zstd_error(zresult)) + elif zresult: + raise ZstdError('chunk 0 did not decompress full frame') + + # Special case of chain length of 1 + if len(frames) == 1: + return ffi.buffer(last_buffer, len(last_buffer))[:] + + i = 1 + while i < len(frames): + chunk = frames[i] + if not isinstance(chunk, bytes_type): + raise ValueError('chunk %d must be bytes' % i) + + chunk_buffer = ffi.from_buffer(chunk) + zresult = lib.ZSTD_getFrameHeader(params, chunk_buffer, len(chunk_buffer)) + if lib.ZSTD_isError(zresult): + raise ValueError('chunk %d is not a valid zstd frame' % i) + elif zresult: + raise ValueError('chunk %d is too small to contain a zstd frame' % i) + + if params.frameContentSize == lib.ZSTD_CONTENTSIZE_UNKNOWN: + raise ValueError('chunk %d missing content size in frame' % i) + + dest_buffer = ffi.new('char[]', params.frameContentSize) + + out_buffer.dst = dest_buffer + out_buffer.size = len(dest_buffer) + out_buffer.pos = 0 + + in_buffer.src = chunk_buffer + in_buffer.size = len(chunk_buffer) + in_buffer.pos = 0 + + zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer) + if lib.ZSTD_isError(zresult): + raise ZstdError('could not decompress chunk %d: %s' % + _zstd_error(zresult)) + elif zresult: + raise ZstdError('chunk %d did not decompress full frame' % i) + + last_buffer = dest_buffer + i += 1 + + return ffi.buffer(last_buffer, len(last_buffer))[:] + + def _ensure_dctx(self, load_dict=True): + lib.ZSTD_DCtx_reset(self._dctx, lib.ZSTD_reset_session_only) + + if self._max_window_size: + zresult = lib.ZSTD_DCtx_setMaxWindowSize(self._dctx, + self._max_window_size) + if lib.ZSTD_isError(zresult): + raise ZstdError('unable to set max window size: %s' % + _zstd_error(zresult)) + + zresult = lib.ZSTD_DCtx_setFormat(self._dctx, self._format) + if lib.ZSTD_isError(zresult): + raise ZstdError('unable to set decoding format: %s' % + _zstd_error(zresult)) + + if self._dict_data and load_dict: + zresult = lib.ZSTD_DCtx_refDDict(self._dctx, self._dict_data._ddict) + if lib.ZSTD_isError(zresult): + raise ZstdError('unable to reference prepared dictionary: %s' % + _zstd_error(zresult))