diff -r 2e484bdea8c4 -r b86a448a2965 contrib/python-zstandard/zstd_cffi.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/contrib/python-zstandard/zstd_cffi.py Thu Nov 10 22:15:58 2016 -0800 @@ -0,0 +1,152 @@ +# Copyright (c) 2016-present, Gregory Szorc +# All rights reserved. +# +# This software may be modified and distributed under the terms +# of the BSD license. See the LICENSE file for details. + +"""Python interface to the Zstandard (zstd) compression library.""" + +from __future__ import absolute_import, unicode_literals + +import io + +from _zstd_cffi import ( + ffi, + lib, +) + + +_CSTREAM_IN_SIZE = lib.ZSTD_CStreamInSize() +_CSTREAM_OUT_SIZE = lib.ZSTD_CStreamOutSize() + + +class _ZstdCompressionWriter(object): + def __init__(self, cstream, writer): + self._cstream = cstream + self._writer = writer + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + if not exc_type and not exc_value and not exc_tb: + out_buffer = ffi.new('ZSTD_outBuffer *') + out_buffer.dst = ffi.new('char[]', _CSTREAM_OUT_SIZE) + out_buffer.size = _CSTREAM_OUT_SIZE + out_buffer.pos = 0 + + while True: + res = lib.ZSTD_endStream(self._cstream, out_buffer) + if lib.ZSTD_isError(res): + raise Exception('error ending compression stream: %s' % lib.ZSTD_getErrorName) + + if out_buffer.pos: + self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) + out_buffer.pos = 0 + + if res == 0: + break + + return False + + def write(self, data): + out_buffer = ffi.new('ZSTD_outBuffer *') + out_buffer.dst = ffi.new('char[]', _CSTREAM_OUT_SIZE) + out_buffer.size = _CSTREAM_OUT_SIZE + out_buffer.pos = 0 + + # TODO can we reuse existing memory? + in_buffer = ffi.new('ZSTD_inBuffer *') + in_buffer.src = ffi.new('char[]', data) + in_buffer.size = len(data) + in_buffer.pos = 0 + while in_buffer.pos < in_buffer.size: + res = lib.ZSTD_compressStream(self._cstream, out_buffer, in_buffer) + if lib.ZSTD_isError(res): + raise Exception('zstd compress error: %s' % lib.ZSTD_getErrorName(res)) + + if out_buffer.pos: + self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) + out_buffer.pos = 0 + + +class ZstdCompressor(object): + def __init__(self, level=3, dict_data=None, compression_params=None): + if dict_data: + raise Exception('dict_data not yet supported') + if compression_params: + raise Exception('compression_params not yet supported') + + self._compression_level = level + + def compress(self, data): + # Just use the stream API for now. + output = io.BytesIO() + with self.write_to(output) as compressor: + compressor.write(data) + return output.getvalue() + + def copy_stream(self, ifh, ofh): + cstream = self._get_cstream() + + in_buffer = ffi.new('ZSTD_inBuffer *') + out_buffer = ffi.new('ZSTD_outBuffer *') + + out_buffer.dst = ffi.new('char[]', _CSTREAM_OUT_SIZE) + out_buffer.size = _CSTREAM_OUT_SIZE + out_buffer.pos = 0 + + total_read, total_write = 0, 0 + + while True: + data = ifh.read(_CSTREAM_IN_SIZE) + if not data: + break + + total_read += len(data) + + in_buffer.src = ffi.new('char[]', data) + in_buffer.size = len(data) + in_buffer.pos = 0 + + while in_buffer.pos < in_buffer.size: + res = lib.ZSTD_compressStream(cstream, out_buffer, in_buffer) + if lib.ZSTD_isError(res): + raise Exception('zstd compress error: %s' % + lib.ZSTD_getErrorName(res)) + + if out_buffer.pos: + ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) + total_write = out_buffer.pos + out_buffer.pos = 0 + + # We've finished reading. Flush the compressor. + while True: + res = lib.ZSTD_endStream(cstream, out_buffer) + if lib.ZSTD_isError(res): + raise Exception('error ending compression stream: %s' % + lib.ZSTD_getErrorName(res)) + + if out_buffer.pos: + ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) + total_write += out_buffer.pos + out_buffer.pos = 0 + + if res == 0: + break + + return total_read, total_write + + def write_to(self, writer): + return _ZstdCompressionWriter(self._get_cstream(), writer) + + def _get_cstream(self): + cstream = lib.ZSTD_createCStream() + cstream = ffi.gc(cstream, lib.ZSTD_freeCStream) + + res = lib.ZSTD_initCStream(cstream, self._compression_level) + if lib.ZSTD_isError(res): + raise Exception('cannot init CStream: %s' % + lib.ZSTD_getErrorName(res)) + + return cstream