diff -r 668eff08387f -r 675775c33ab6 contrib/python-zstandard/zstd_cffi.py --- a/contrib/python-zstandard/zstd_cffi.py Thu Apr 04 15:24:03 2019 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1952 +0,0 @@ -# Copyright (c) 2016-present, Gregory Szorc -# All rights reserved. -# -# This software may be modified and distributed under the terms -# of the BSD license. See the LICENSE file for details. - -"""Python interface to the Zstandard (zstd) compression library.""" - -from __future__ import absolute_import, unicode_literals - -# This should match what the C extension exports. -__all__ = [ - #'BufferSegment', - #'BufferSegments', - #'BufferWithSegments', - #'BufferWithSegmentsCollection', - 'CompressionParameters', - 'ZstdCompressionDict', - 'ZstdCompressionParameters', - 'ZstdCompressor', - 'ZstdError', - 'ZstdDecompressor', - 'FrameParameters', - 'estimate_decompression_context_size', - 'frame_content_size', - 'frame_header_size', - 'get_frame_parameters', - 'train_dictionary', - - # Constants. - 'COMPRESSOBJ_FLUSH_FINISH', - 'COMPRESSOBJ_FLUSH_BLOCK', - 'ZSTD_VERSION', - 'FRAME_HEADER', - 'CONTENTSIZE_UNKNOWN', - 'CONTENTSIZE_ERROR', - 'MAX_COMPRESSION_LEVEL', - 'COMPRESSION_RECOMMENDED_INPUT_SIZE', - 'COMPRESSION_RECOMMENDED_OUTPUT_SIZE', - 'DECOMPRESSION_RECOMMENDED_INPUT_SIZE', - 'DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE', - 'MAGIC_NUMBER', - 'BLOCKSIZELOG_MAX', - 'BLOCKSIZE_MAX', - 'WINDOWLOG_MIN', - 'WINDOWLOG_MAX', - 'CHAINLOG_MIN', - 'CHAINLOG_MAX', - 'HASHLOG_MIN', - 'HASHLOG_MAX', - 'HASHLOG3_MAX', - 'SEARCHLOG_MIN', - 'SEARCHLOG_MAX', - 'SEARCHLENGTH_MIN', - 'SEARCHLENGTH_MAX', - 'TARGETLENGTH_MIN', - 'TARGETLENGTH_MAX', - 'LDM_MINMATCH_MIN', - 'LDM_MINMATCH_MAX', - 'LDM_BUCKETSIZELOG_MAX', - 'STRATEGY_FAST', - 'STRATEGY_DFAST', - 'STRATEGY_GREEDY', - 'STRATEGY_LAZY', - 'STRATEGY_LAZY2', - 'STRATEGY_BTLAZY2', - 'STRATEGY_BTOPT', - 'STRATEGY_BTULTRA', - 'DICT_TYPE_AUTO', - 'DICT_TYPE_RAWCONTENT', - 'DICT_TYPE_FULLDICT', - 'FORMAT_ZSTD1', - 'FORMAT_ZSTD1_MAGICLESS', -] - -import io -import os -import sys - -from _zstd_cffi import ( - ffi, - lib, -) - -if sys.version_info[0] == 2: - bytes_type = str - int_type = long -else: - bytes_type = bytes - int_type = int - - -COMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_CStreamInSize() -COMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_CStreamOutSize() -DECOMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_DStreamInSize() -DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_DStreamOutSize() - -new_nonzero = ffi.new_allocator(should_clear_after_alloc=False) - - -MAX_COMPRESSION_LEVEL = lib.ZSTD_maxCLevel() -MAGIC_NUMBER = lib.ZSTD_MAGICNUMBER -FRAME_HEADER = b'\x28\xb5\x2f\xfd' -CONTENTSIZE_UNKNOWN = lib.ZSTD_CONTENTSIZE_UNKNOWN -CONTENTSIZE_ERROR = lib.ZSTD_CONTENTSIZE_ERROR -ZSTD_VERSION = (lib.ZSTD_VERSION_MAJOR, lib.ZSTD_VERSION_MINOR, lib.ZSTD_VERSION_RELEASE) - -BLOCKSIZELOG_MAX = lib.ZSTD_BLOCKSIZELOG_MAX -BLOCKSIZE_MAX = lib.ZSTD_BLOCKSIZE_MAX -WINDOWLOG_MIN = lib.ZSTD_WINDOWLOG_MIN -WINDOWLOG_MAX = lib.ZSTD_WINDOWLOG_MAX -CHAINLOG_MIN = lib.ZSTD_CHAINLOG_MIN -CHAINLOG_MAX = lib.ZSTD_CHAINLOG_MAX -HASHLOG_MIN = lib.ZSTD_HASHLOG_MIN -HASHLOG_MAX = lib.ZSTD_HASHLOG_MAX -HASHLOG3_MAX = lib.ZSTD_HASHLOG3_MAX -SEARCHLOG_MIN = lib.ZSTD_SEARCHLOG_MIN -SEARCHLOG_MAX = lib.ZSTD_SEARCHLOG_MAX -SEARCHLENGTH_MIN = lib.ZSTD_SEARCHLENGTH_MIN -SEARCHLENGTH_MAX = lib.ZSTD_SEARCHLENGTH_MAX -TARGETLENGTH_MIN = lib.ZSTD_TARGETLENGTH_MIN -TARGETLENGTH_MAX = lib.ZSTD_TARGETLENGTH_MAX -LDM_MINMATCH_MIN = lib.ZSTD_LDM_MINMATCH_MIN -LDM_MINMATCH_MAX = lib.ZSTD_LDM_MINMATCH_MAX -LDM_BUCKETSIZELOG_MAX = lib.ZSTD_LDM_BUCKETSIZELOG_MAX - -STRATEGY_FAST = lib.ZSTD_fast -STRATEGY_DFAST = lib.ZSTD_dfast -STRATEGY_GREEDY = lib.ZSTD_greedy -STRATEGY_LAZY = lib.ZSTD_lazy -STRATEGY_LAZY2 = lib.ZSTD_lazy2 -STRATEGY_BTLAZY2 = lib.ZSTD_btlazy2 -STRATEGY_BTOPT = lib.ZSTD_btopt -STRATEGY_BTULTRA = lib.ZSTD_btultra - -DICT_TYPE_AUTO = lib.ZSTD_dct_auto -DICT_TYPE_RAWCONTENT = lib.ZSTD_dct_rawContent -DICT_TYPE_FULLDICT = lib.ZSTD_dct_fullDict - -FORMAT_ZSTD1 = lib.ZSTD_f_zstd1 -FORMAT_ZSTD1_MAGICLESS = lib.ZSTD_f_zstd1_magicless - -COMPRESSOBJ_FLUSH_FINISH = 0 -COMPRESSOBJ_FLUSH_BLOCK = 1 - - -def _cpu_count(): - # os.cpu_count() was introducd in Python 3.4. - try: - return os.cpu_count() or 0 - except AttributeError: - pass - - # Linux. - try: - if sys.version_info[0] == 2: - return os.sysconf(b'SC_NPROCESSORS_ONLN') - else: - return os.sysconf(u'SC_NPROCESSORS_ONLN') - except (AttributeError, ValueError): - pass - - # TODO implement on other platforms. - return 0 - - -class ZstdError(Exception): - pass - - -def _zstd_error(zresult): - # Resolves to bytes on Python 2 and 3. We use the string for formatting - # into error messages, which will be literal unicode. So convert it to - # unicode. - return ffi.string(lib.ZSTD_getErrorName(zresult)).decode('utf-8') - -def _make_cctx_params(params): - res = lib.ZSTD_createCCtxParams() - if res == ffi.NULL: - raise MemoryError() - - res = ffi.gc(res, lib.ZSTD_freeCCtxParams) - - attrs = [ - (lib.ZSTD_p_format, params.format), - (lib.ZSTD_p_compressionLevel, params.compression_level), - (lib.ZSTD_p_windowLog, params.window_log), - (lib.ZSTD_p_hashLog, params.hash_log), - (lib.ZSTD_p_chainLog, params.chain_log), - (lib.ZSTD_p_searchLog, params.search_log), - (lib.ZSTD_p_minMatch, params.min_match), - (lib.ZSTD_p_targetLength, params.target_length), - (lib.ZSTD_p_compressionStrategy, params.compression_strategy), - (lib.ZSTD_p_contentSizeFlag, params.write_content_size), - (lib.ZSTD_p_checksumFlag, params.write_checksum), - (lib.ZSTD_p_dictIDFlag, params.write_dict_id), - (lib.ZSTD_p_nbWorkers, params.threads), - (lib.ZSTD_p_jobSize, params.job_size), - (lib.ZSTD_p_overlapSizeLog, params.overlap_size_log), - (lib.ZSTD_p_forceMaxWindow, params.force_max_window), - (lib.ZSTD_p_enableLongDistanceMatching, params.enable_ldm), - (lib.ZSTD_p_ldmHashLog, params.ldm_hash_log), - (lib.ZSTD_p_ldmMinMatch, params.ldm_min_match), - (lib.ZSTD_p_ldmBucketSizeLog, params.ldm_bucket_size_log), - (lib.ZSTD_p_ldmHashEveryLog, params.ldm_hash_every_log), - ] - - for param, value in attrs: - _set_compression_parameter(res, param, value) - - return res - -class ZstdCompressionParameters(object): - @staticmethod - def from_level(level, source_size=0, dict_size=0, **kwargs): - params = lib.ZSTD_getCParams(level, source_size, dict_size) - - args = { - 'window_log': 'windowLog', - 'chain_log': 'chainLog', - 'hash_log': 'hashLog', - 'search_log': 'searchLog', - 'min_match': 'searchLength', - 'target_length': 'targetLength', - 'compression_strategy': 'strategy', - } - - for arg, attr in args.items(): - if arg not in kwargs: - kwargs[arg] = getattr(params, attr) - - return ZstdCompressionParameters(**kwargs) - - def __init__(self, format=0, compression_level=0, window_log=0, hash_log=0, - chain_log=0, search_log=0, min_match=0, target_length=0, - compression_strategy=0, write_content_size=1, write_checksum=0, - write_dict_id=0, job_size=0, overlap_size_log=0, - force_max_window=0, enable_ldm=0, ldm_hash_log=0, - ldm_min_match=0, ldm_bucket_size_log=0, ldm_hash_every_log=0, - threads=0): - - if threads < 0: - threads = _cpu_count() - - self.format = format - self.compression_level = compression_level - self.window_log = window_log - self.hash_log = hash_log - self.chain_log = chain_log - self.search_log = search_log - self.min_match = min_match - self.target_length = target_length - self.compression_strategy = compression_strategy - self.write_content_size = write_content_size - self.write_checksum = write_checksum - self.write_dict_id = write_dict_id - self.job_size = job_size - self.overlap_size_log = overlap_size_log - self.force_max_window = force_max_window - self.enable_ldm = enable_ldm - self.ldm_hash_log = ldm_hash_log - self.ldm_min_match = ldm_min_match - self.ldm_bucket_size_log = ldm_bucket_size_log - self.ldm_hash_every_log = ldm_hash_every_log - self.threads = threads - - self.params = _make_cctx_params(self) - - def estimated_compression_context_size(self): - return lib.ZSTD_estimateCCtxSize_usingCCtxParams(self.params) - -CompressionParameters = ZstdCompressionParameters - -def estimate_decompression_context_size(): - return lib.ZSTD_estimateDCtxSize() - - -def _set_compression_parameter(params, param, value): - zresult = lib.ZSTD_CCtxParam_setParameter(params, param, - ffi.cast('unsigned', value)) - if lib.ZSTD_isError(zresult): - raise ZstdError('unable to set compression context parameter: %s' % - _zstd_error(zresult)) - -class ZstdCompressionWriter(object): - def __init__(self, compressor, writer, source_size, write_size): - self._compressor = compressor - self._writer = writer - self._source_size = source_size - self._write_size = write_size - self._entered = False - self._bytes_compressed = 0 - - def __enter__(self): - if self._entered: - raise ZstdError('cannot __enter__ multiple times') - - zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._compressor._cctx, - self._source_size) - if lib.ZSTD_isError(zresult): - raise ZstdError('error setting source size: %s' % - _zstd_error(zresult)) - - self._entered = True - return self - - def __exit__(self, exc_type, exc_value, exc_tb): - self._entered = False - - if not exc_type and not exc_value and not exc_tb: - dst_buffer = ffi.new('char[]', self._write_size) - - out_buffer = ffi.new('ZSTD_outBuffer *') - in_buffer = ffi.new('ZSTD_inBuffer *') - - out_buffer.dst = dst_buffer - out_buffer.size = len(dst_buffer) - out_buffer.pos = 0 - - in_buffer.src = ffi.NULL - in_buffer.size = 0 - in_buffer.pos = 0 - - while True: - zresult = lib.ZSTD_compress_generic(self._compressor._cctx, - out_buffer, in_buffer, - lib.ZSTD_e_end) - - if lib.ZSTD_isError(zresult): - raise ZstdError('error ending compression stream: %s' % - _zstd_error(zresult)) - - if out_buffer.pos: - self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) - out_buffer.pos = 0 - - if zresult == 0: - break - - self._compressor = None - - return False - - def memory_size(self): - if not self._entered: - raise ZstdError('cannot determine size of an inactive compressor; ' - 'call when a context manager is active') - - return lib.ZSTD_sizeof_CCtx(self._compressor._cctx) - - def write(self, data): - if not self._entered: - raise ZstdError('write() must be called from an active context ' - 'manager') - - total_write = 0 - - data_buffer = ffi.from_buffer(data) - - in_buffer = ffi.new('ZSTD_inBuffer *') - in_buffer.src = data_buffer - in_buffer.size = len(data_buffer) - in_buffer.pos = 0 - - out_buffer = ffi.new('ZSTD_outBuffer *') - dst_buffer = ffi.new('char[]', self._write_size) - out_buffer.dst = dst_buffer - out_buffer.size = self._write_size - out_buffer.pos = 0 - - while in_buffer.pos < in_buffer.size: - zresult = lib.ZSTD_compress_generic(self._compressor._cctx, - out_buffer, in_buffer, - lib.ZSTD_e_continue) - if lib.ZSTD_isError(zresult): - raise ZstdError('zstd compress error: %s' % - _zstd_error(zresult)) - - if out_buffer.pos: - self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) - total_write += out_buffer.pos - self._bytes_compressed += out_buffer.pos - out_buffer.pos = 0 - - return total_write - - def flush(self): - if not self._entered: - raise ZstdError('flush must be called from an active context manager') - - total_write = 0 - - out_buffer = ffi.new('ZSTD_outBuffer *') - dst_buffer = ffi.new('char[]', self._write_size) - out_buffer.dst = dst_buffer - out_buffer.size = self._write_size - out_buffer.pos = 0 - - in_buffer = ffi.new('ZSTD_inBuffer *') - in_buffer.src = ffi.NULL - in_buffer.size = 0 - in_buffer.pos = 0 - - while True: - zresult = lib.ZSTD_compress_generic(self._compressor._cctx, - out_buffer, in_buffer, - lib.ZSTD_e_flush) - if lib.ZSTD_isError(zresult): - raise ZstdError('zstd compress error: %s' % - _zstd_error(zresult)) - - if out_buffer.pos: - self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) - total_write += out_buffer.pos - self._bytes_compressed += out_buffer.pos - out_buffer.pos = 0 - - if not zresult: - break - - return total_write - - def tell(self): - return self._bytes_compressed - - -class ZstdCompressionObj(object): - def compress(self, data): - if self._finished: - raise ZstdError('cannot call compress() after compressor finished') - - data_buffer = ffi.from_buffer(data) - source = ffi.new('ZSTD_inBuffer *') - source.src = data_buffer - source.size = len(data_buffer) - source.pos = 0 - - chunks = [] - - while source.pos < len(data): - zresult = lib.ZSTD_compress_generic(self._compressor._cctx, - self._out, - source, - lib.ZSTD_e_continue) - if lib.ZSTD_isError(zresult): - raise ZstdError('zstd compress error: %s' % - _zstd_error(zresult)) - - if self._out.pos: - chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:]) - self._out.pos = 0 - - return b''.join(chunks) - - def flush(self, flush_mode=COMPRESSOBJ_FLUSH_FINISH): - if flush_mode not in (COMPRESSOBJ_FLUSH_FINISH, COMPRESSOBJ_FLUSH_BLOCK): - raise ValueError('flush mode not recognized') - - if self._finished: - raise ZstdError('compressor object already finished') - - if flush_mode == COMPRESSOBJ_FLUSH_BLOCK: - z_flush_mode = lib.ZSTD_e_flush - elif flush_mode == COMPRESSOBJ_FLUSH_FINISH: - z_flush_mode = lib.ZSTD_e_end - self._finished = True - else: - raise ZstdError('unhandled flush mode') - - assert self._out.pos == 0 - - in_buffer = ffi.new('ZSTD_inBuffer *') - in_buffer.src = ffi.NULL - in_buffer.size = 0 - in_buffer.pos = 0 - - chunks = [] - - while True: - zresult = lib.ZSTD_compress_generic(self._compressor._cctx, - self._out, - in_buffer, - z_flush_mode) - if lib.ZSTD_isError(zresult): - raise ZstdError('error ending compression stream: %s' % - _zstd_error(zresult)) - - if self._out.pos: - chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:]) - self._out.pos = 0 - - if not zresult: - break - - return b''.join(chunks) - - -class ZstdCompressionChunker(object): - def __init__(self, compressor, chunk_size): - self._compressor = compressor - self._out = ffi.new('ZSTD_outBuffer *') - self._dst_buffer = ffi.new('char[]', chunk_size) - self._out.dst = self._dst_buffer - self._out.size = chunk_size - self._out.pos = 0 - - self._in = ffi.new('ZSTD_inBuffer *') - self._in.src = ffi.NULL - self._in.size = 0 - self._in.pos = 0 - self._finished = False - - def compress(self, data): - if self._finished: - raise ZstdError('cannot call compress() after compression finished') - - if self._in.src != ffi.NULL: - raise ZstdError('cannot perform operation before consuming output ' - 'from previous operation') - - data_buffer = ffi.from_buffer(data) - - if not len(data_buffer): - return - - self._in.src = data_buffer - self._in.size = len(data_buffer) - self._in.pos = 0 - - while self._in.pos < self._in.size: - zresult = lib.ZSTD_compress_generic(self._compressor._cctx, - self._out, - self._in, - lib.ZSTD_e_continue) - - if self._in.pos == self._in.size: - self._in.src = ffi.NULL - self._in.size = 0 - self._in.pos = 0 - - if lib.ZSTD_isError(zresult): - raise ZstdError('zstd compress error: %s' % - _zstd_error(zresult)) - - if self._out.pos == self._out.size: - yield ffi.buffer(self._out.dst, self._out.pos)[:] - self._out.pos = 0 - - def flush(self): - if self._finished: - raise ZstdError('cannot call flush() after compression finished') - - if self._in.src != ffi.NULL: - raise ZstdError('cannot call flush() before consuming output from ' - 'previous operation') - - while True: - zresult = lib.ZSTD_compress_generic(self._compressor._cctx, - self._out, self._in, - lib.ZSTD_e_flush) - if lib.ZSTD_isError(zresult): - raise ZstdError('zstd compress error: %s' % _zstd_error(zresult)) - - if self._out.pos: - yield ffi.buffer(self._out.dst, self._out.pos)[:] - self._out.pos = 0 - - if not zresult: - return - - def finish(self): - if self._finished: - raise ZstdError('cannot call finish() after compression finished') - - if self._in.src != ffi.NULL: - raise ZstdError('cannot call finish() before consuming output from ' - 'previous operation') - - while True: - zresult = lib.ZSTD_compress_generic(self._compressor._cctx, - self._out, self._in, - lib.ZSTD_e_end) - if lib.ZSTD_isError(zresult): - raise ZstdError('zstd compress error: %s' % _zstd_error(zresult)) - - if self._out.pos: - yield ffi.buffer(self._out.dst, self._out.pos)[:] - self._out.pos = 0 - - if not zresult: - self._finished = True - return - - -class CompressionReader(object): - def __init__(self, compressor, source, read_size): - self._compressor = compressor - self._source = source - self._read_size = read_size - self._entered = False - self._closed = False - self._bytes_compressed = 0 - self._finished_input = False - self._finished_output = False - - self._in_buffer = ffi.new('ZSTD_inBuffer *') - # Holds a ref so backing bytes in self._in_buffer stay alive. - self._source_buffer = None - - def __enter__(self): - if self._entered: - raise ValueError('cannot __enter__ multiple times') - - self._entered = True - return self - - def __exit__(self, exc_type, exc_value, exc_tb): - self._entered = False - self._closed = True - self._source = None - self._compressor = None - - return False - - def readable(self): - return True - - def writable(self): - return False - - def seekable(self): - return False - - def readline(self): - raise io.UnsupportedOperation() - - def readlines(self): - raise io.UnsupportedOperation() - - def write(self, data): - raise OSError('stream is not writable') - - def writelines(self, ignored): - raise OSError('stream is not writable') - - def isatty(self): - return False - - def flush(self): - return None - - def close(self): - self._closed = True - return None - - @property - def closed(self): - return self._closed - - def tell(self): - return self._bytes_compressed - - def readall(self): - raise NotImplementedError() - - def __iter__(self): - raise io.UnsupportedOperation() - - def __next__(self): - raise io.UnsupportedOperation() - - next = __next__ - - def read(self, size=-1): - if self._closed: - raise ValueError('stream is closed') - - if self._finished_output: - return b'' - - if size < 1: - raise ValueError('cannot read negative or size 0 amounts') - - # Need a dedicated ref to dest buffer otherwise it gets collected. - dst_buffer = ffi.new('char[]', size) - out_buffer = ffi.new('ZSTD_outBuffer *') - out_buffer.dst = dst_buffer - out_buffer.size = size - out_buffer.pos = 0 - - def compress_input(): - if self._in_buffer.pos >= self._in_buffer.size: - return - - old_pos = out_buffer.pos - - zresult = lib.ZSTD_compress_generic(self._compressor._cctx, - out_buffer, self._in_buffer, - lib.ZSTD_e_continue) - - self._bytes_compressed += out_buffer.pos - old_pos - - if self._in_buffer.pos == self._in_buffer.size: - self._in_buffer.src = ffi.NULL - self._in_buffer.pos = 0 - self._in_buffer.size = 0 - self._source_buffer = None - - if not hasattr(self._source, 'read'): - self._finished_input = True - - if lib.ZSTD_isError(zresult): - raise ZstdError('zstd compress error: %s', - _zstd_error(zresult)) - - if out_buffer.pos and out_buffer.pos == out_buffer.size: - return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] - - def get_input(): - if self._finished_input: - return - - if hasattr(self._source, 'read'): - data = self._source.read(self._read_size) - - if not data: - self._finished_input = True - return - - self._source_buffer = ffi.from_buffer(data) - self._in_buffer.src = self._source_buffer - self._in_buffer.size = len(self._source_buffer) - self._in_buffer.pos = 0 - else: - self._source_buffer = ffi.from_buffer(self._source) - self._in_buffer.src = self._source_buffer - self._in_buffer.size = len(self._source_buffer) - self._in_buffer.pos = 0 - - result = compress_input() - if result: - return result - - while not self._finished_input: - get_input() - result = compress_input() - if result: - return result - - # EOF - old_pos = out_buffer.pos - - zresult = lib.ZSTD_compress_generic(self._compressor._cctx, - out_buffer, self._in_buffer, - lib.ZSTD_e_end) - - self._bytes_compressed += out_buffer.pos - old_pos - - if lib.ZSTD_isError(zresult): - raise ZstdError('error ending compression stream: %s', - _zstd_error(zresult)) - - if zresult == 0: - self._finished_output = True - - return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] - -class ZstdCompressor(object): - def __init__(self, level=3, dict_data=None, compression_params=None, - write_checksum=None, write_content_size=None, - write_dict_id=None, threads=0): - if level > lib.ZSTD_maxCLevel(): - raise ValueError('level must be less than %d' % lib.ZSTD_maxCLevel()) - - if threads < 0: - threads = _cpu_count() - - if compression_params and write_checksum is not None: - raise ValueError('cannot define compression_params and ' - 'write_checksum') - - if compression_params and write_content_size is not None: - raise ValueError('cannot define compression_params and ' - 'write_content_size') - - if compression_params and write_dict_id is not None: - raise ValueError('cannot define compression_params and ' - 'write_dict_id') - - if compression_params and threads: - raise ValueError('cannot define compression_params and threads') - - if compression_params: - self._params = _make_cctx_params(compression_params) - else: - if write_dict_id is None: - write_dict_id = True - - params = lib.ZSTD_createCCtxParams() - if params == ffi.NULL: - raise MemoryError() - - self._params = ffi.gc(params, lib.ZSTD_freeCCtxParams) - - _set_compression_parameter(self._params, - lib.ZSTD_p_compressionLevel, - level) - - _set_compression_parameter( - self._params, - lib.ZSTD_p_contentSizeFlag, - write_content_size if write_content_size is not None else 1) - - _set_compression_parameter(self._params, - lib.ZSTD_p_checksumFlag, - 1 if write_checksum else 0) - - _set_compression_parameter(self._params, - lib.ZSTD_p_dictIDFlag, - 1 if write_dict_id else 0) - - if threads: - _set_compression_parameter(self._params, - lib.ZSTD_p_nbWorkers, - threads) - - cctx = lib.ZSTD_createCCtx() - if cctx == ffi.NULL: - raise MemoryError() - - self._cctx = cctx - self._dict_data = dict_data - - # We defer setting up garbage collection until after calling - # _setup_cctx() to ensure the memory size estimate is more accurate. - try: - self._setup_cctx() - finally: - self._cctx = ffi.gc(cctx, lib.ZSTD_freeCCtx, - size=lib.ZSTD_sizeof_CCtx(cctx)) - - def _setup_cctx(self): - zresult = lib.ZSTD_CCtx_setParametersUsingCCtxParams(self._cctx, - self._params) - if lib.ZSTD_isError(zresult): - raise ZstdError('could not set compression parameters: %s' % - _zstd_error(zresult)) - - dict_data = self._dict_data - - if dict_data: - if dict_data._cdict: - zresult = lib.ZSTD_CCtx_refCDict(self._cctx, dict_data._cdict) - else: - zresult = lib.ZSTD_CCtx_loadDictionary_advanced( - self._cctx, dict_data.as_bytes(), len(dict_data), - lib.ZSTD_dlm_byRef, dict_data._dict_type) - - if lib.ZSTD_isError(zresult): - raise ZstdError('could not load compression dictionary: %s' % - _zstd_error(zresult)) - - def memory_size(self): - return lib.ZSTD_sizeof_CCtx(self._cctx) - - def compress(self, data): - lib.ZSTD_CCtx_reset(self._cctx) - - data_buffer = ffi.from_buffer(data) - - dest_size = lib.ZSTD_compressBound(len(data_buffer)) - out = new_nonzero('char[]', dest_size) - - zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, len(data_buffer)) - if lib.ZSTD_isError(zresult): - raise ZstdError('error setting source size: %s' % - _zstd_error(zresult)) - - out_buffer = ffi.new('ZSTD_outBuffer *') - in_buffer = ffi.new('ZSTD_inBuffer *') - - out_buffer.dst = out - out_buffer.size = dest_size - out_buffer.pos = 0 - - in_buffer.src = data_buffer - in_buffer.size = len(data_buffer) - in_buffer.pos = 0 - - zresult = lib.ZSTD_compress_generic(self._cctx, - out_buffer, - in_buffer, - lib.ZSTD_e_end) - - if lib.ZSTD_isError(zresult): - raise ZstdError('cannot compress: %s' % - _zstd_error(zresult)) - elif zresult: - raise ZstdError('unexpected partial frame flush') - - return ffi.buffer(out, out_buffer.pos)[:] - - def compressobj(self, size=-1): - lib.ZSTD_CCtx_reset(self._cctx) - - if size < 0: - size = lib.ZSTD_CONTENTSIZE_UNKNOWN - - zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) - if lib.ZSTD_isError(zresult): - raise ZstdError('error setting source size: %s' % - _zstd_error(zresult)) - - cobj = ZstdCompressionObj() - cobj._out = ffi.new('ZSTD_outBuffer *') - cobj._dst_buffer = ffi.new('char[]', COMPRESSION_RECOMMENDED_OUTPUT_SIZE) - cobj._out.dst = cobj._dst_buffer - cobj._out.size = COMPRESSION_RECOMMENDED_OUTPUT_SIZE - cobj._out.pos = 0 - cobj._compressor = self - cobj._finished = False - - return cobj - - def chunker(self, size=-1, chunk_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE): - lib.ZSTD_CCtx_reset(self._cctx) - - if size < 0: - size = lib.ZSTD_CONTENTSIZE_UNKNOWN - - zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) - if lib.ZSTD_isError(zresult): - raise ZstdError('error setting source size: %s' % - _zstd_error(zresult)) - - return ZstdCompressionChunker(self, chunk_size=chunk_size) - - def copy_stream(self, ifh, ofh, size=-1, - read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE, - write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE): - - if not hasattr(ifh, 'read'): - raise ValueError('first argument must have a read() method') - if not hasattr(ofh, 'write'): - raise ValueError('second argument must have a write() method') - - lib.ZSTD_CCtx_reset(self._cctx) - - if size < 0: - size = lib.ZSTD_CONTENTSIZE_UNKNOWN - - zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) - if lib.ZSTD_isError(zresult): - raise ZstdError('error setting source size: %s' % - _zstd_error(zresult)) - - in_buffer = ffi.new('ZSTD_inBuffer *') - out_buffer = ffi.new('ZSTD_outBuffer *') - - dst_buffer = ffi.new('char[]', write_size) - out_buffer.dst = dst_buffer - out_buffer.size = write_size - out_buffer.pos = 0 - - total_read, total_write = 0, 0 - - while True: - data = ifh.read(read_size) - if not data: - break - - data_buffer = ffi.from_buffer(data) - total_read += len(data_buffer) - in_buffer.src = data_buffer - in_buffer.size = len(data_buffer) - in_buffer.pos = 0 - - while in_buffer.pos < in_buffer.size: - zresult = lib.ZSTD_compress_generic(self._cctx, - out_buffer, - in_buffer, - lib.ZSTD_e_continue) - if lib.ZSTD_isError(zresult): - raise ZstdError('zstd compress error: %s' % - _zstd_error(zresult)) - - if out_buffer.pos: - ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) - total_write += out_buffer.pos - out_buffer.pos = 0 - - # We've finished reading. Flush the compressor. - while True: - zresult = lib.ZSTD_compress_generic(self._cctx, - out_buffer, - in_buffer, - lib.ZSTD_e_end) - if lib.ZSTD_isError(zresult): - raise ZstdError('error ending compression stream: %s' % - _zstd_error(zresult)) - - if out_buffer.pos: - ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) - total_write += out_buffer.pos - out_buffer.pos = 0 - - if zresult == 0: - break - - return total_read, total_write - - def stream_reader(self, source, size=-1, - read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE): - lib.ZSTD_CCtx_reset(self._cctx) - - try: - size = len(source) - except Exception: - pass - - if size < 0: - size = lib.ZSTD_CONTENTSIZE_UNKNOWN - - zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) - if lib.ZSTD_isError(zresult): - raise ZstdError('error setting source size: %s' % - _zstd_error(zresult)) - - return CompressionReader(self, source, read_size) - - def stream_writer(self, writer, size=-1, - write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE): - - if not hasattr(writer, 'write'): - raise ValueError('must pass an object with a write() method') - - lib.ZSTD_CCtx_reset(self._cctx) - - if size < 0: - size = lib.ZSTD_CONTENTSIZE_UNKNOWN - - return ZstdCompressionWriter(self, writer, size, write_size) - - write_to = stream_writer - - def read_to_iter(self, reader, size=-1, - read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE, - write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE): - if hasattr(reader, 'read'): - have_read = True - elif hasattr(reader, '__getitem__'): - have_read = False - buffer_offset = 0 - size = len(reader) - else: - raise ValueError('must pass an object with a read() method or ' - 'conforms to buffer protocol') - - lib.ZSTD_CCtx_reset(self._cctx) - - if size < 0: - size = lib.ZSTD_CONTENTSIZE_UNKNOWN - - zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) - if lib.ZSTD_isError(zresult): - raise ZstdError('error setting source size: %s' % - _zstd_error(zresult)) - - in_buffer = ffi.new('ZSTD_inBuffer *') - out_buffer = ffi.new('ZSTD_outBuffer *') - - in_buffer.src = ffi.NULL - in_buffer.size = 0 - in_buffer.pos = 0 - - dst_buffer = ffi.new('char[]', write_size) - out_buffer.dst = dst_buffer - out_buffer.size = write_size - out_buffer.pos = 0 - - while True: - # We should never have output data sitting around after a previous - # iteration. - assert out_buffer.pos == 0 - - # Collect input data. - if have_read: - read_result = reader.read(read_size) - else: - remaining = len(reader) - buffer_offset - slice_size = min(remaining, read_size) - read_result = reader[buffer_offset:buffer_offset + slice_size] - buffer_offset += slice_size - - # No new input data. Break out of the read loop. - if not read_result: - break - - # Feed all read data into the compressor and emit output until - # exhausted. - read_buffer = ffi.from_buffer(read_result) - in_buffer.src = read_buffer - in_buffer.size = len(read_buffer) - in_buffer.pos = 0 - - while in_buffer.pos < in_buffer.size: - zresult = lib.ZSTD_compress_generic(self._cctx, out_buffer, in_buffer, - lib.ZSTD_e_continue) - if lib.ZSTD_isError(zresult): - raise ZstdError('zstd compress error: %s' % - _zstd_error(zresult)) - - if out_buffer.pos: - data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] - out_buffer.pos = 0 - yield data - - assert out_buffer.pos == 0 - - # And repeat the loop to collect more data. - continue - - # If we get here, input is exhausted. End the stream and emit what - # remains. - while True: - assert out_buffer.pos == 0 - zresult = lib.ZSTD_compress_generic(self._cctx, - out_buffer, - in_buffer, - lib.ZSTD_e_end) - if lib.ZSTD_isError(zresult): - raise ZstdError('error ending compression stream: %s' % - _zstd_error(zresult)) - - if out_buffer.pos: - data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] - out_buffer.pos = 0 - yield data - - if zresult == 0: - break - - read_from = read_to_iter - - def frame_progression(self): - progression = lib.ZSTD_getFrameProgression(self._cctx) - - return progression.ingested, progression.consumed, progression.produced - - -class FrameParameters(object): - def __init__(self, fparams): - self.content_size = fparams.frameContentSize - self.window_size = fparams.windowSize - self.dict_id = fparams.dictID - self.has_checksum = bool(fparams.checksumFlag) - - -def frame_content_size(data): - data_buffer = ffi.from_buffer(data) - - size = lib.ZSTD_getFrameContentSize(data_buffer, len(data_buffer)) - - if size == lib.ZSTD_CONTENTSIZE_ERROR: - raise ZstdError('error when determining content size') - elif size == lib.ZSTD_CONTENTSIZE_UNKNOWN: - return -1 - else: - return size - - -def frame_header_size(data): - data_buffer = ffi.from_buffer(data) - - zresult = lib.ZSTD_frameHeaderSize(data_buffer, len(data_buffer)) - if lib.ZSTD_isError(zresult): - raise ZstdError('could not determine frame header size: %s' % - _zstd_error(zresult)) - - return zresult - - -def get_frame_parameters(data): - params = ffi.new('ZSTD_frameHeader *') - - data_buffer = ffi.from_buffer(data) - zresult = lib.ZSTD_getFrameHeader(params, data_buffer, len(data_buffer)) - if lib.ZSTD_isError(zresult): - raise ZstdError('cannot get frame parameters: %s' % - _zstd_error(zresult)) - - if zresult: - raise ZstdError('not enough data for frame parameters; need %d bytes' % - zresult) - - return FrameParameters(params[0]) - - -class ZstdCompressionDict(object): - def __init__(self, data, dict_type=DICT_TYPE_AUTO, k=0, d=0): - assert isinstance(data, bytes_type) - self._data = data - self.k = k - self.d = d - - if dict_type not in (DICT_TYPE_AUTO, DICT_TYPE_RAWCONTENT, - DICT_TYPE_FULLDICT): - raise ValueError('invalid dictionary load mode: %d; must use ' - 'DICT_TYPE_* constants') - - self._dict_type = dict_type - self._cdict = None - - def __len__(self): - return len(self._data) - - def dict_id(self): - return int_type(lib.ZDICT_getDictID(self._data, len(self._data))) - - def as_bytes(self): - return self._data - - def precompute_compress(self, level=0, compression_params=None): - if level and compression_params: - raise ValueError('must only specify one of level or ' - 'compression_params') - - if not level and not compression_params: - raise ValueError('must specify one of level or compression_params') - - if level: - cparams = lib.ZSTD_getCParams(level, 0, len(self._data)) - else: - cparams = ffi.new('ZSTD_compressionParameters') - cparams.chainLog = compression_params.chain_log - cparams.hashLog = compression_params.hash_log - cparams.searchLength = compression_params.min_match - cparams.searchLog = compression_params.search_log - cparams.strategy = compression_params.compression_strategy - cparams.targetLength = compression_params.target_length - cparams.windowLog = compression_params.window_log - - cdict = lib.ZSTD_createCDict_advanced(self._data, len(self._data), - lib.ZSTD_dlm_byRef, - self._dict_type, - cparams, - lib.ZSTD_defaultCMem) - if cdict == ffi.NULL: - raise ZstdError('unable to precompute dictionary') - - self._cdict = ffi.gc(cdict, lib.ZSTD_freeCDict, - size=lib.ZSTD_sizeof_CDict(cdict)) - - @property - def _ddict(self): - ddict = lib.ZSTD_createDDict_advanced(self._data, len(self._data), - lib.ZSTD_dlm_byRef, - self._dict_type, - lib.ZSTD_defaultCMem) - - if ddict == ffi.NULL: - raise ZstdError('could not create decompression dict') - - ddict = ffi.gc(ddict, lib.ZSTD_freeDDict, - size=lib.ZSTD_sizeof_DDict(ddict)) - self.__dict__['_ddict'] = ddict - - return ddict - -def train_dictionary(dict_size, samples, k=0, d=0, notifications=0, dict_id=0, - level=0, steps=0, threads=0): - if not isinstance(samples, list): - raise TypeError('samples must be a list') - - if threads < 0: - threads = _cpu_count() - - total_size = sum(map(len, samples)) - - samples_buffer = new_nonzero('char[]', total_size) - sample_sizes = new_nonzero('size_t[]', len(samples)) - - offset = 0 - for i, sample in enumerate(samples): - if not isinstance(sample, bytes_type): - raise ValueError('samples must be bytes') - - l = len(sample) - ffi.memmove(samples_buffer + offset, sample, l) - offset += l - sample_sizes[i] = l - - dict_data = new_nonzero('char[]', dict_size) - - dparams = ffi.new('ZDICT_cover_params_t *')[0] - dparams.k = k - dparams.d = d - dparams.steps = steps - dparams.nbThreads = threads - dparams.zParams.notificationLevel = notifications - dparams.zParams.dictID = dict_id - dparams.zParams.compressionLevel = level - - if (not dparams.k and not dparams.d and not dparams.steps - and not dparams.nbThreads and not dparams.zParams.notificationLevel - and not dparams.zParams.dictID - and not dparams.zParams.compressionLevel): - zresult = lib.ZDICT_trainFromBuffer( - ffi.addressof(dict_data), dict_size, - ffi.addressof(samples_buffer), - ffi.addressof(sample_sizes, 0), len(samples)) - elif dparams.steps or dparams.nbThreads: - zresult = lib.ZDICT_optimizeTrainFromBuffer_cover( - ffi.addressof(dict_data), dict_size, - ffi.addressof(samples_buffer), - ffi.addressof(sample_sizes, 0), len(samples), - ffi.addressof(dparams)) - else: - zresult = lib.ZDICT_trainFromBuffer_cover( - ffi.addressof(dict_data), dict_size, - ffi.addressof(samples_buffer), - ffi.addressof(sample_sizes, 0), len(samples), - dparams) - - if lib.ZDICT_isError(zresult): - msg = ffi.string(lib.ZDICT_getErrorName(zresult)).decode('utf-8') - raise ZstdError('cannot train dict: %s' % msg) - - return ZstdCompressionDict(ffi.buffer(dict_data, zresult)[:], - dict_type=DICT_TYPE_FULLDICT, - k=dparams.k, d=dparams.d) - - -class ZstdDecompressionObj(object): - def __init__(self, decompressor, write_size): - self._decompressor = decompressor - self._write_size = write_size - self._finished = False - - def decompress(self, data): - if self._finished: - raise ZstdError('cannot use a decompressobj multiple times') - - in_buffer = ffi.new('ZSTD_inBuffer *') - out_buffer = ffi.new('ZSTD_outBuffer *') - - data_buffer = ffi.from_buffer(data) - in_buffer.src = data_buffer - in_buffer.size = len(data_buffer) - in_buffer.pos = 0 - - dst_buffer = ffi.new('char[]', self._write_size) - out_buffer.dst = dst_buffer - out_buffer.size = len(dst_buffer) - out_buffer.pos = 0 - - chunks = [] - - while True: - zresult = lib.ZSTD_decompress_generic(self._decompressor._dctx, - out_buffer, in_buffer) - if lib.ZSTD_isError(zresult): - raise ZstdError('zstd decompressor error: %s' % - _zstd_error(zresult)) - - if zresult == 0: - self._finished = True - self._decompressor = None - - if out_buffer.pos: - chunks.append(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) - - if (zresult == 0 or - (in_buffer.pos == in_buffer.size and out_buffer.pos == 0)): - break - - out_buffer.pos = 0 - - return b''.join(chunks) - - -class DecompressionReader(object): - def __init__(self, decompressor, source, read_size): - self._decompressor = decompressor - self._source = source - self._read_size = read_size - self._entered = False - self._closed = False - self._bytes_decompressed = 0 - self._finished_input = False - self._finished_output = False - self._in_buffer = ffi.new('ZSTD_inBuffer *') - # Holds a ref to self._in_buffer.src. - self._source_buffer = None - - def __enter__(self): - if self._entered: - raise ValueError('cannot __enter__ multiple times') - - self._entered = True - return self - - def __exit__(self, exc_type, exc_value, exc_tb): - self._entered = False - self._closed = True - self._source = None - self._decompressor = None - - return False - - def readable(self): - return True - - def writable(self): - return False - - def seekable(self): - return True - - def readline(self): - raise NotImplementedError() - - def readlines(self): - raise NotImplementedError() - - def write(self, data): - raise io.UnsupportedOperation() - - def writelines(self, lines): - raise io.UnsupportedOperation() - - def isatty(self): - return False - - def flush(self): - return None - - def close(self): - self._closed = True - return None - - @property - def closed(self): - return self._closed - - def tell(self): - return self._bytes_decompressed - - def readall(self): - raise NotImplementedError() - - def __iter__(self): - raise NotImplementedError() - - def __next__(self): - raise NotImplementedError() - - next = __next__ - - def read(self, size): - if self._closed: - raise ValueError('stream is closed') - - if self._finished_output: - return b'' - - if size < 1: - raise ValueError('cannot read negative or size 0 amounts') - - dst_buffer = ffi.new('char[]', size) - out_buffer = ffi.new('ZSTD_outBuffer *') - out_buffer.dst = dst_buffer - out_buffer.size = size - out_buffer.pos = 0 - - def decompress(): - zresult = lib.ZSTD_decompress_generic(self._decompressor._dctx, - out_buffer, self._in_buffer) - - if self._in_buffer.pos == self._in_buffer.size: - self._in_buffer.src = ffi.NULL - self._in_buffer.pos = 0 - self._in_buffer.size = 0 - self._source_buffer = None - - if not hasattr(self._source, 'read'): - self._finished_input = True - - if lib.ZSTD_isError(zresult): - raise ZstdError('zstd decompress error: %s', - _zstd_error(zresult)) - elif zresult == 0: - self._finished_output = True - - if out_buffer.pos and out_buffer.pos == out_buffer.size: - self._bytes_decompressed += out_buffer.size - return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] - - def get_input(): - if self._finished_input: - return - - if hasattr(self._source, 'read'): - data = self._source.read(self._read_size) - - if not data: - self._finished_input = True - return - - self._source_buffer = ffi.from_buffer(data) - self._in_buffer.src = self._source_buffer - self._in_buffer.size = len(self._source_buffer) - self._in_buffer.pos = 0 - else: - self._source_buffer = ffi.from_buffer(self._source) - self._in_buffer.src = self._source_buffer - self._in_buffer.size = len(self._source_buffer) - self._in_buffer.pos = 0 - - get_input() - result = decompress() - if result: - return result - - while not self._finished_input: - get_input() - result = decompress() - if result: - return result - - self._bytes_decompressed += out_buffer.pos - return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] - - def seek(self, pos, whence=os.SEEK_SET): - if self._closed: - raise ValueError('stream is closed') - - read_amount = 0 - - if whence == os.SEEK_SET: - if pos < 0: - raise ValueError('cannot seek to negative position with SEEK_SET') - - if pos < self._bytes_decompressed: - raise ValueError('cannot seek zstd decompression stream ' - 'backwards') - - read_amount = pos - self._bytes_decompressed - - elif whence == os.SEEK_CUR: - if pos < 0: - raise ValueError('cannot seek zstd decompression stream ' - 'backwards') - - read_amount = pos - elif whence == os.SEEK_END: - raise ValueError('zstd decompression streams cannot be seeked ' - 'with SEEK_END') - - while read_amount: - result = self.read(min(read_amount, - DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE)) - - if not result: - break - - read_amount -= len(result) - - return self._bytes_decompressed - -class ZstdDecompressionWriter(object): - def __init__(self, decompressor, writer, write_size): - self._decompressor = decompressor - self._writer = writer - self._write_size = write_size - self._entered = False - - def __enter__(self): - if self._entered: - raise ZstdError('cannot __enter__ multiple times') - - self._decompressor._ensure_dctx() - self._entered = True - - return self - - def __exit__(self, exc_type, exc_value, exc_tb): - self._entered = False - - def memory_size(self): - if not self._decompressor._dctx: - raise ZstdError('cannot determine size of inactive decompressor ' - 'call when context manager is active') - - return lib.ZSTD_sizeof_DCtx(self._decompressor._dctx) - - def write(self, data): - if not self._entered: - raise ZstdError('write must be called from an active context manager') - - total_write = 0 - - in_buffer = ffi.new('ZSTD_inBuffer *') - out_buffer = ffi.new('ZSTD_outBuffer *') - - data_buffer = ffi.from_buffer(data) - in_buffer.src = data_buffer - in_buffer.size = len(data_buffer) - in_buffer.pos = 0 - - dst_buffer = ffi.new('char[]', self._write_size) - out_buffer.dst = dst_buffer - out_buffer.size = len(dst_buffer) - out_buffer.pos = 0 - - dctx = self._decompressor._dctx - - while in_buffer.pos < in_buffer.size: - zresult = lib.ZSTD_decompress_generic(dctx, out_buffer, in_buffer) - if lib.ZSTD_isError(zresult): - raise ZstdError('zstd decompress error: %s' % - _zstd_error(zresult)) - - if out_buffer.pos: - self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) - total_write += out_buffer.pos - out_buffer.pos = 0 - - return total_write - - -class ZstdDecompressor(object): - def __init__(self, dict_data=None, max_window_size=0, format=FORMAT_ZSTD1): - self._dict_data = dict_data - self._max_window_size = max_window_size - self._format = format - - dctx = lib.ZSTD_createDCtx() - if dctx == ffi.NULL: - raise MemoryError() - - self._dctx = dctx - - # Defer setting up garbage collection until full state is loaded so - # the memory size is more accurate. - try: - self._ensure_dctx() - finally: - self._dctx = ffi.gc(dctx, lib.ZSTD_freeDCtx, - size=lib.ZSTD_sizeof_DCtx(dctx)) - - def memory_size(self): - return lib.ZSTD_sizeof_DCtx(self._dctx) - - def decompress(self, data, max_output_size=0): - self._ensure_dctx() - - data_buffer = ffi.from_buffer(data) - - output_size = lib.ZSTD_getFrameContentSize(data_buffer, len(data_buffer)) - - if output_size == lib.ZSTD_CONTENTSIZE_ERROR: - raise ZstdError('error determining content size from frame header') - elif output_size == 0: - return b'' - elif output_size == lib.ZSTD_CONTENTSIZE_UNKNOWN: - if not max_output_size: - raise ZstdError('could not determine content size in frame header') - - result_buffer = ffi.new('char[]', max_output_size) - result_size = max_output_size - output_size = 0 - else: - result_buffer = ffi.new('char[]', output_size) - result_size = output_size - - out_buffer = ffi.new('ZSTD_outBuffer *') - out_buffer.dst = result_buffer - out_buffer.size = result_size - out_buffer.pos = 0 - - in_buffer = ffi.new('ZSTD_inBuffer *') - in_buffer.src = data_buffer - in_buffer.size = len(data_buffer) - in_buffer.pos = 0 - - zresult = lib.ZSTD_decompress_generic(self._dctx, out_buffer, in_buffer) - if lib.ZSTD_isError(zresult): - raise ZstdError('decompression error: %s' % - _zstd_error(zresult)) - elif zresult: - raise ZstdError('decompression error: did not decompress full frame') - elif output_size and out_buffer.pos != output_size: - raise ZstdError('decompression error: decompressed %d bytes; expected %d' % - (zresult, output_size)) - - return ffi.buffer(result_buffer, out_buffer.pos)[:] - - def stream_reader(self, source, read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE): - self._ensure_dctx() - return DecompressionReader(self, source, read_size) - - def decompressobj(self, write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE): - if write_size < 1: - raise ValueError('write_size must be positive') - - self._ensure_dctx() - return ZstdDecompressionObj(self, write_size=write_size) - - def read_to_iter(self, reader, read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, - write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE, - skip_bytes=0): - if skip_bytes >= read_size: - raise ValueError('skip_bytes must be smaller than read_size') - - if hasattr(reader, 'read'): - have_read = True - elif hasattr(reader, '__getitem__'): - have_read = False - buffer_offset = 0 - size = len(reader) - else: - raise ValueError('must pass an object with a read() method or ' - 'conforms to buffer protocol') - - if skip_bytes: - if have_read: - reader.read(skip_bytes) - else: - if skip_bytes > size: - raise ValueError('skip_bytes larger than first input chunk') - - buffer_offset = skip_bytes - - self._ensure_dctx() - - in_buffer = ffi.new('ZSTD_inBuffer *') - out_buffer = ffi.new('ZSTD_outBuffer *') - - dst_buffer = ffi.new('char[]', write_size) - out_buffer.dst = dst_buffer - out_buffer.size = len(dst_buffer) - out_buffer.pos = 0 - - while True: - assert out_buffer.pos == 0 - - if have_read: - read_result = reader.read(read_size) - else: - remaining = size - buffer_offset - slice_size = min(remaining, read_size) - read_result = reader[buffer_offset:buffer_offset + slice_size] - buffer_offset += slice_size - - # No new input. Break out of read loop. - if not read_result: - break - - # Feed all read data into decompressor and emit output until - # exhausted. - read_buffer = ffi.from_buffer(read_result) - in_buffer.src = read_buffer - in_buffer.size = len(read_buffer) - in_buffer.pos = 0 - - while in_buffer.pos < in_buffer.size: - assert out_buffer.pos == 0 - - zresult = lib.ZSTD_decompress_generic(self._dctx, out_buffer, in_buffer) - if lib.ZSTD_isError(zresult): - raise ZstdError('zstd decompress error: %s' % - _zstd_error(zresult)) - - if out_buffer.pos: - data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] - out_buffer.pos = 0 - yield data - - if zresult == 0: - return - - # Repeat loop to collect more input data. - continue - - # If we get here, input is exhausted. - - read_from = read_to_iter - - def stream_writer(self, writer, write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE): - if not hasattr(writer, 'write'): - raise ValueError('must pass an object with a write() method') - - return ZstdDecompressionWriter(self, writer, write_size) - - write_to = stream_writer - - def copy_stream(self, ifh, ofh, - read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, - write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE): - if not hasattr(ifh, 'read'): - raise ValueError('first argument must have a read() method') - if not hasattr(ofh, 'write'): - raise ValueError('second argument must have a write() method') - - self._ensure_dctx() - - in_buffer = ffi.new('ZSTD_inBuffer *') - out_buffer = ffi.new('ZSTD_outBuffer *') - - dst_buffer = ffi.new('char[]', write_size) - out_buffer.dst = dst_buffer - out_buffer.size = write_size - out_buffer.pos = 0 - - total_read, total_write = 0, 0 - - # Read all available input. - while True: - data = ifh.read(read_size) - if not data: - break - - data_buffer = ffi.from_buffer(data) - total_read += len(data_buffer) - in_buffer.src = data_buffer - in_buffer.size = len(data_buffer) - in_buffer.pos = 0 - - # Flush all read data to output. - while in_buffer.pos < in_buffer.size: - zresult = lib.ZSTD_decompress_generic(self._dctx, out_buffer, in_buffer) - if lib.ZSTD_isError(zresult): - raise ZstdError('zstd decompressor error: %s' % - _zstd_error(zresult)) - - if out_buffer.pos: - ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) - total_write += out_buffer.pos - out_buffer.pos = 0 - - # Continue loop to keep reading. - - return total_read, total_write - - def decompress_content_dict_chain(self, frames): - if not isinstance(frames, list): - raise TypeError('argument must be a list') - - if not frames: - raise ValueError('empty input chain') - - # First chunk should not be using a dictionary. We handle it specially. - chunk = frames[0] - if not isinstance(chunk, bytes_type): - raise ValueError('chunk 0 must be bytes') - - # All chunks should be zstd frames and should have content size set. - chunk_buffer = ffi.from_buffer(chunk) - params = ffi.new('ZSTD_frameHeader *') - zresult = lib.ZSTD_getFrameHeader(params, chunk_buffer, len(chunk_buffer)) - if lib.ZSTD_isError(zresult): - raise ValueError('chunk 0 is not a valid zstd frame') - elif zresult: - raise ValueError('chunk 0 is too small to contain a zstd frame') - - if params.frameContentSize == lib.ZSTD_CONTENTSIZE_UNKNOWN: - raise ValueError('chunk 0 missing content size in frame') - - self._ensure_dctx(load_dict=False) - - last_buffer = ffi.new('char[]', params.frameContentSize) - - out_buffer = ffi.new('ZSTD_outBuffer *') - out_buffer.dst = last_buffer - out_buffer.size = len(last_buffer) - out_buffer.pos = 0 - - in_buffer = ffi.new('ZSTD_inBuffer *') - in_buffer.src = chunk_buffer - in_buffer.size = len(chunk_buffer) - in_buffer.pos = 0 - - zresult = lib.ZSTD_decompress_generic(self._dctx, out_buffer, in_buffer) - if lib.ZSTD_isError(zresult): - raise ZstdError('could not decompress chunk 0: %s' % - _zstd_error(zresult)) - elif zresult: - raise ZstdError('chunk 0 did not decompress full frame') - - # Special case of chain length of 1 - if len(frames) == 1: - return ffi.buffer(last_buffer, len(last_buffer))[:] - - i = 1 - while i < len(frames): - chunk = frames[i] - if not isinstance(chunk, bytes_type): - raise ValueError('chunk %d must be bytes' % i) - - chunk_buffer = ffi.from_buffer(chunk) - zresult = lib.ZSTD_getFrameHeader(params, chunk_buffer, len(chunk_buffer)) - if lib.ZSTD_isError(zresult): - raise ValueError('chunk %d is not a valid zstd frame' % i) - elif zresult: - raise ValueError('chunk %d is too small to contain a zstd frame' % i) - - if params.frameContentSize == lib.ZSTD_CONTENTSIZE_UNKNOWN: - raise ValueError('chunk %d missing content size in frame' % i) - - dest_buffer = ffi.new('char[]', params.frameContentSize) - - out_buffer.dst = dest_buffer - out_buffer.size = len(dest_buffer) - out_buffer.pos = 0 - - in_buffer.src = chunk_buffer - in_buffer.size = len(chunk_buffer) - in_buffer.pos = 0 - - zresult = lib.ZSTD_decompress_generic(self._dctx, out_buffer, in_buffer) - if lib.ZSTD_isError(zresult): - raise ZstdError('could not decompress chunk %d: %s' % - _zstd_error(zresult)) - elif zresult: - raise ZstdError('chunk %d did not decompress full frame' % i) - - last_buffer = dest_buffer - i += 1 - - return ffi.buffer(last_buffer, len(last_buffer))[:] - - def _ensure_dctx(self, load_dict=True): - lib.ZSTD_DCtx_reset(self._dctx) - - if self._max_window_size: - zresult = lib.ZSTD_DCtx_setMaxWindowSize(self._dctx, - self._max_window_size) - if lib.ZSTD_isError(zresult): - raise ZstdError('unable to set max window size: %s' % - _zstd_error(zresult)) - - zresult = lib.ZSTD_DCtx_setFormat(self._dctx, self._format) - if lib.ZSTD_isError(zresult): - raise ZstdError('unable to set decoding format: %s' % - _zstd_error(zresult)) - - if self._dict_data and load_dict: - zresult = lib.ZSTD_DCtx_refDDict(self._dctx, self._dict_data._ddict) - if lib.ZSTD_isError(zresult): - raise ZstdError('unable to reference prepared dictionary: %s' % - _zstd_error(zresult))