contrib/python-zstandard/zstd_cffi.py
changeset 30435 b86a448a2965
child 30895 c32454d69b85
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/contrib/python-zstandard/zstd_cffi.py	Thu Nov 10 22:15:58 2016 -0800
@@ -0,0 +1,152 @@
+# Copyright (c) 2016-present, Gregory Szorc
+# All rights reserved.
+#
+# This software may be modified and distributed under the terms
+# of the BSD license. See the LICENSE file for details.
+
+"""Python interface to the Zstandard (zstd) compression library."""
+
+from __future__ import absolute_import, unicode_literals
+
+import io
+
+from _zstd_cffi import (
+    ffi,
+    lib,
+)
+
+
+_CSTREAM_IN_SIZE = lib.ZSTD_CStreamInSize()
+_CSTREAM_OUT_SIZE = lib.ZSTD_CStreamOutSize()
+
+
+class _ZstdCompressionWriter(object):
+    def __init__(self, cstream, writer):
+        self._cstream = cstream
+        self._writer = writer
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, exc_tb):
+        if not exc_type and not exc_value and not exc_tb:
+            out_buffer = ffi.new('ZSTD_outBuffer *')
+            out_buffer.dst = ffi.new('char[]', _CSTREAM_OUT_SIZE)
+            out_buffer.size = _CSTREAM_OUT_SIZE
+            out_buffer.pos = 0
+
+            while True:
+                res = lib.ZSTD_endStream(self._cstream, out_buffer)
+                if lib.ZSTD_isError(res):
+                    raise Exception('error ending compression stream: %s' % lib.ZSTD_getErrorName)
+
+                if out_buffer.pos:
+                    self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
+                    out_buffer.pos = 0
+
+                if res == 0:
+                    break
+
+        return False
+
+    def write(self, data):
+        out_buffer = ffi.new('ZSTD_outBuffer *')
+        out_buffer.dst = ffi.new('char[]', _CSTREAM_OUT_SIZE)
+        out_buffer.size = _CSTREAM_OUT_SIZE
+        out_buffer.pos = 0
+
+        # TODO can we reuse existing memory?
+        in_buffer = ffi.new('ZSTD_inBuffer *')
+        in_buffer.src = ffi.new('char[]', data)
+        in_buffer.size = len(data)
+        in_buffer.pos = 0
+        while in_buffer.pos < in_buffer.size:
+            res = lib.ZSTD_compressStream(self._cstream, out_buffer, in_buffer)
+            if lib.ZSTD_isError(res):
+                raise Exception('zstd compress error: %s' % lib.ZSTD_getErrorName(res))
+
+            if out_buffer.pos:
+                self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
+                out_buffer.pos = 0
+
+
+class ZstdCompressor(object):
+    def __init__(self, level=3, dict_data=None, compression_params=None):
+        if dict_data:
+            raise Exception('dict_data not yet supported')
+        if compression_params:
+            raise Exception('compression_params not yet supported')
+
+        self._compression_level = level
+
+    def compress(self, data):
+        # Just use the stream API for now.
+        output = io.BytesIO()
+        with self.write_to(output) as compressor:
+            compressor.write(data)
+        return output.getvalue()
+
+    def copy_stream(self, ifh, ofh):
+        cstream = self._get_cstream()
+
+        in_buffer = ffi.new('ZSTD_inBuffer *')
+        out_buffer = ffi.new('ZSTD_outBuffer *')
+
+        out_buffer.dst = ffi.new('char[]', _CSTREAM_OUT_SIZE)
+        out_buffer.size = _CSTREAM_OUT_SIZE
+        out_buffer.pos = 0
+
+        total_read, total_write = 0, 0
+
+        while True:
+            data = ifh.read(_CSTREAM_IN_SIZE)
+            if not data:
+                break
+
+            total_read += len(data)
+
+            in_buffer.src = ffi.new('char[]', data)
+            in_buffer.size = len(data)
+            in_buffer.pos = 0
+
+            while in_buffer.pos < in_buffer.size:
+                res = lib.ZSTD_compressStream(cstream, out_buffer, in_buffer)
+                if lib.ZSTD_isError(res):
+                    raise Exception('zstd compress error: %s' %
+                                    lib.ZSTD_getErrorName(res))
+
+                if out_buffer.pos:
+                    ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
+                    total_write = out_buffer.pos
+                    out_buffer.pos = 0
+
+        # We've finished reading. Flush the compressor.
+        while True:
+            res = lib.ZSTD_endStream(cstream, out_buffer)
+            if lib.ZSTD_isError(res):
+                raise Exception('error ending compression stream: %s' %
+                                lib.ZSTD_getErrorName(res))
+
+            if out_buffer.pos:
+                ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
+                total_write += out_buffer.pos
+                out_buffer.pos = 0
+
+            if res == 0:
+                break
+
+        return total_read, total_write
+
+    def write_to(self, writer):
+        return _ZstdCompressionWriter(self._get_cstream(), writer)
+
+    def _get_cstream(self):
+        cstream = lib.ZSTD_createCStream()
+        cstream = ffi.gc(cstream, lib.ZSTD_freeCStream)
+
+        res = lib.ZSTD_initCStream(cstream, self._compression_level)
+        if lib.ZSTD_isError(res):
+            raise Exception('cannot init CStream: %s' %
+                            lib.ZSTD_getErrorName(res))
+
+        return cstream