comparison contrib/python-zstandard/zstd_cffi.py @ 30895:c32454d69b85

zstd: vendor python-zstandard 0.7.0 Commit 3054ae3a66112970a091d3939fee32c2d0c1a23e from https://github.com/indygreg/python-zstandard is imported without modifications (other than removing unwanted files). The vendored zstd library within has been upgraded from 1.1.2 to 1.1.3. This version introduced new APIs for threads, thread pools, multi-threaded compression, and a new dictionary builder (COVER). These features are not yet used by python-zstandard (or Mercurial for that matter). However, that will likely change in the next python-zstandard release (and I think there are opportunities for Mercurial to take advantage of the multi-threaded APIs). Relevant to Mercurial, the CFFI bindings are now fully implemented. This means zstd should "just work" with PyPy (although I haven't tried). The python-zstandard test suite also runs all tests against both the C extension and CFFI bindings to ensure feature parity. There is also a "decompress_content_dict_chain()" API. This was derived from discussions with Yann Collet on list about alternate ways of encoding delta chains. The change most relevant to Mercurial is a performance enhancement in the simple decompression API to reuse a data structure across operations. This makes decompression of multiple inputs significantly faster. (This scenario occurs when reading revlog delta chains, for example.) Using python-zstandard's bench.py to measure the performance difference... On changelog chunks in the mozilla-unified repo: decompress discrete decompress() reuse zctx 1.262243 wall; 1.260000 CPU; 1.260000 user; 0.000000 sys 170.43 MB/s (best of 3) 0.949106 wall; 0.950000 CPU; 0.950000 user; 0.000000 sys 226.66 MB/s (best of 4) decompress discrete dict decompress() reuse zctx 0.692170 wall; 0.690000 CPU; 0.690000 user; 0.000000 sys 310.80 MB/s (best of 5) 0.437088 wall; 0.440000 CPU; 0.440000 user; 0.000000 sys 492.17 MB/s (best of 7) On manifest chunks in the mozilla-unified repo: decompress discrete decompress() reuse zctx 1.367284 wall; 1.370000 CPU; 1.370000 user; 0.000000 sys 274.01 MB/s (best of 3) 1.086831 wall; 1.080000 CPU; 1.080000 user; 0.000000 sys 344.72 MB/s (best of 3) decompress discrete dict decompress() reuse zctx 0.993272 wall; 0.990000 CPU; 0.990000 user; 0.000000 sys 377.19 MB/s (best of 3) 0.678651 wall; 0.680000 CPU; 0.680000 user; 0.000000 sys 552.06 MB/s (best of 5) That should make reads on zstd revlogs a bit faster ;) # no-check-commit
author Gregory Szorc <gregory.szorc@gmail.com>
date Tue, 07 Feb 2017 23:24:47 -0800
parents b86a448a2965
children e0dc40530c5a
comparison
equal deleted inserted replaced
30894:5b60464efbde 30895:c32454d69b85
6 6
7 """Python interface to the Zstandard (zstd) compression library.""" 7 """Python interface to the Zstandard (zstd) compression library."""
8 8
9 from __future__ import absolute_import, unicode_literals 9 from __future__ import absolute_import, unicode_literals
10 10
11 import io 11 import sys
12 12
13 from _zstd_cffi import ( 13 from _zstd_cffi import (
14 ffi, 14 ffi,
15 lib, 15 lib,
16 ) 16 )
17 17
18 18 if sys.version_info[0] == 2:
19 _CSTREAM_IN_SIZE = lib.ZSTD_CStreamInSize() 19 bytes_type = str
20 _CSTREAM_OUT_SIZE = lib.ZSTD_CStreamOutSize() 20 int_type = long
21 21 else:
22 22 bytes_type = bytes
23 class _ZstdCompressionWriter(object): 23 int_type = int
24 def __init__(self, cstream, writer): 24
25 self._cstream = cstream 25
26 COMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_CStreamInSize()
27 COMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_CStreamOutSize()
28 DECOMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_DStreamInSize()
29 DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_DStreamOutSize()
30
31 new_nonzero = ffi.new_allocator(should_clear_after_alloc=False)
32
33
34 MAX_COMPRESSION_LEVEL = lib.ZSTD_maxCLevel()
35 MAGIC_NUMBER = lib.ZSTD_MAGICNUMBER
36 FRAME_HEADER = b'\x28\xb5\x2f\xfd'
37 ZSTD_VERSION = (lib.ZSTD_VERSION_MAJOR, lib.ZSTD_VERSION_MINOR, lib.ZSTD_VERSION_RELEASE)
38
39 WINDOWLOG_MIN = lib.ZSTD_WINDOWLOG_MIN
40 WINDOWLOG_MAX = lib.ZSTD_WINDOWLOG_MAX
41 CHAINLOG_MIN = lib.ZSTD_CHAINLOG_MIN
42 CHAINLOG_MAX = lib.ZSTD_CHAINLOG_MAX
43 HASHLOG_MIN = lib.ZSTD_HASHLOG_MIN
44 HASHLOG_MAX = lib.ZSTD_HASHLOG_MAX
45 HASHLOG3_MAX = lib.ZSTD_HASHLOG3_MAX
46 SEARCHLOG_MIN = lib.ZSTD_SEARCHLOG_MIN
47 SEARCHLOG_MAX = lib.ZSTD_SEARCHLOG_MAX
48 SEARCHLENGTH_MIN = lib.ZSTD_SEARCHLENGTH_MIN
49 SEARCHLENGTH_MAX = lib.ZSTD_SEARCHLENGTH_MAX
50 TARGETLENGTH_MIN = lib.ZSTD_TARGETLENGTH_MIN
51 TARGETLENGTH_MAX = lib.ZSTD_TARGETLENGTH_MAX
52
53 STRATEGY_FAST = lib.ZSTD_fast
54 STRATEGY_DFAST = lib.ZSTD_dfast
55 STRATEGY_GREEDY = lib.ZSTD_greedy
56 STRATEGY_LAZY = lib.ZSTD_lazy
57 STRATEGY_LAZY2 = lib.ZSTD_lazy2
58 STRATEGY_BTLAZY2 = lib.ZSTD_btlazy2
59 STRATEGY_BTOPT = lib.ZSTD_btopt
60
61 COMPRESSOBJ_FLUSH_FINISH = 0
62 COMPRESSOBJ_FLUSH_BLOCK = 1
63
64
65 class ZstdError(Exception):
66 pass
67
68
69 class CompressionParameters(object):
70 def __init__(self, window_log, chain_log, hash_log, search_log,
71 search_length, target_length, strategy):
72 if window_log < WINDOWLOG_MIN or window_log > WINDOWLOG_MAX:
73 raise ValueError('invalid window log value')
74
75 if chain_log < CHAINLOG_MIN or chain_log > CHAINLOG_MAX:
76 raise ValueError('invalid chain log value')
77
78 if hash_log < HASHLOG_MIN or hash_log > HASHLOG_MAX:
79 raise ValueError('invalid hash log value')
80
81 if search_log < SEARCHLOG_MIN or search_log > SEARCHLOG_MAX:
82 raise ValueError('invalid search log value')
83
84 if search_length < SEARCHLENGTH_MIN or search_length > SEARCHLENGTH_MAX:
85 raise ValueError('invalid search length value')
86
87 if target_length < TARGETLENGTH_MIN or target_length > TARGETLENGTH_MAX:
88 raise ValueError('invalid target length value')
89
90 if strategy < STRATEGY_FAST or strategy > STRATEGY_BTOPT:
91 raise ValueError('invalid strategy value')
92
93 self.window_log = window_log
94 self.chain_log = chain_log
95 self.hash_log = hash_log
96 self.search_log = search_log
97 self.search_length = search_length
98 self.target_length = target_length
99 self.strategy = strategy
100
101 def as_compression_parameters(self):
102 p = ffi.new('ZSTD_compressionParameters *')[0]
103 p.windowLog = self.window_log
104 p.chainLog = self.chain_log
105 p.hashLog = self.hash_log
106 p.searchLog = self.search_log
107 p.searchLength = self.search_length
108 p.targetLength = self.target_length
109 p.strategy = self.strategy
110
111 return p
112
113 def get_compression_parameters(level, source_size=0, dict_size=0):
114 params = lib.ZSTD_getCParams(level, source_size, dict_size)
115 return CompressionParameters(window_log=params.windowLog,
116 chain_log=params.chainLog,
117 hash_log=params.hashLog,
118 search_log=params.searchLog,
119 search_length=params.searchLength,
120 target_length=params.targetLength,
121 strategy=params.strategy)
122
123
124 def estimate_compression_context_size(params):
125 if not isinstance(params, CompressionParameters):
126 raise ValueError('argument must be a CompressionParameters')
127
128 cparams = params.as_compression_parameters()
129 return lib.ZSTD_estimateCCtxSize(cparams)
130
131
132 def estimate_decompression_context_size():
133 return lib.ZSTD_estimateDCtxSize()
134
135
136 class ZstdCompressionWriter(object):
137 def __init__(self, compressor, writer, source_size, write_size):
138 self._compressor = compressor
26 self._writer = writer 139 self._writer = writer
140 self._source_size = source_size
141 self._write_size = write_size
142 self._entered = False
27 143
28 def __enter__(self): 144 def __enter__(self):
145 if self._entered:
146 raise ZstdError('cannot __enter__ multiple times')
147
148 self._cstream = self._compressor._get_cstream(self._source_size)
149 self._entered = True
29 return self 150 return self
30 151
31 def __exit__(self, exc_type, exc_value, exc_tb): 152 def __exit__(self, exc_type, exc_value, exc_tb):
153 self._entered = False
154
32 if not exc_type and not exc_value and not exc_tb: 155 if not exc_type and not exc_value and not exc_tb:
33 out_buffer = ffi.new('ZSTD_outBuffer *') 156 out_buffer = ffi.new('ZSTD_outBuffer *')
34 out_buffer.dst = ffi.new('char[]', _CSTREAM_OUT_SIZE) 157 dst_buffer = ffi.new('char[]', self._write_size)
35 out_buffer.size = _CSTREAM_OUT_SIZE 158 out_buffer.dst = dst_buffer
159 out_buffer.size = self._write_size
36 out_buffer.pos = 0 160 out_buffer.pos = 0
37 161
38 while True: 162 while True:
39 res = lib.ZSTD_endStream(self._cstream, out_buffer) 163 zresult = lib.ZSTD_endStream(self._cstream, out_buffer)
40 if lib.ZSTD_isError(res): 164 if lib.ZSTD_isError(zresult):
41 raise Exception('error ending compression stream: %s' % lib.ZSTD_getErrorName) 165 raise ZstdError('error ending compression stream: %s' %
166 ffi.string(lib.ZSTD_getErrorName(zresult)))
42 167
43 if out_buffer.pos: 168 if out_buffer.pos:
44 self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) 169 self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:])
45 out_buffer.pos = 0 170 out_buffer.pos = 0
46 171
47 if res == 0: 172 if zresult == 0:
48 break 173 break
49 174
175 self._cstream = None
176 self._compressor = None
177
50 return False 178 return False
51 179
180 def memory_size(self):
181 if not self._entered:
182 raise ZstdError('cannot determine size of an inactive compressor; '
183 'call when a context manager is active')
184
185 return lib.ZSTD_sizeof_CStream(self._cstream)
186
52 def write(self, data): 187 def write(self, data):
188 if not self._entered:
189 raise ZstdError('write() must be called from an active context '
190 'manager')
191
192 total_write = 0
193
194 data_buffer = ffi.from_buffer(data)
195
196 in_buffer = ffi.new('ZSTD_inBuffer *')
197 in_buffer.src = data_buffer
198 in_buffer.size = len(data_buffer)
199 in_buffer.pos = 0
200
53 out_buffer = ffi.new('ZSTD_outBuffer *') 201 out_buffer = ffi.new('ZSTD_outBuffer *')
54 out_buffer.dst = ffi.new('char[]', _CSTREAM_OUT_SIZE) 202 dst_buffer = ffi.new('char[]', self._write_size)
55 out_buffer.size = _CSTREAM_OUT_SIZE 203 out_buffer.dst = dst_buffer
204 out_buffer.size = self._write_size
56 out_buffer.pos = 0 205 out_buffer.pos = 0
57 206
58 # TODO can we reuse existing memory?
59 in_buffer = ffi.new('ZSTD_inBuffer *')
60 in_buffer.src = ffi.new('char[]', data)
61 in_buffer.size = len(data)
62 in_buffer.pos = 0
63 while in_buffer.pos < in_buffer.size: 207 while in_buffer.pos < in_buffer.size:
64 res = lib.ZSTD_compressStream(self._cstream, out_buffer, in_buffer) 208 zresult = lib.ZSTD_compressStream(self._cstream, out_buffer, in_buffer)
65 if lib.ZSTD_isError(res): 209 if lib.ZSTD_isError(zresult):
66 raise Exception('zstd compress error: %s' % lib.ZSTD_getErrorName(res)) 210 raise ZstdError('zstd compress error: %s' %
211 ffi.string(lib.ZSTD_getErrorName(zresult)))
67 212
68 if out_buffer.pos: 213 if out_buffer.pos:
69 self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) 214 self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:])
215 total_write += out_buffer.pos
70 out_buffer.pos = 0 216 out_buffer.pos = 0
71 217
218 return total_write
219
220 def flush(self):
221 if not self._entered:
222 raise ZstdError('flush must be called from an active context manager')
223
224 total_write = 0
225
226 out_buffer = ffi.new('ZSTD_outBuffer *')
227 dst_buffer = ffi.new('char[]', self._write_size)
228 out_buffer.dst = dst_buffer
229 out_buffer.size = self._write_size
230 out_buffer.pos = 0
231
232 while True:
233 zresult = lib.ZSTD_flushStream(self._cstream, out_buffer)
234 if lib.ZSTD_isError(zresult):
235 raise ZstdError('zstd compress error: %s' %
236 ffi.string(lib.ZSTD_getErrorName(zresult)))
237
238 if not out_buffer.pos:
239 break
240
241 self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:])
242 total_write += out_buffer.pos
243 out_buffer.pos = 0
244
245 return total_write
246
247
248 class ZstdCompressionObj(object):
249 def compress(self, data):
250 if self._finished:
251 raise ZstdError('cannot call compress() after compressor finished')
252
253 data_buffer = ffi.from_buffer(data)
254 source = ffi.new('ZSTD_inBuffer *')
255 source.src = data_buffer
256 source.size = len(data_buffer)
257 source.pos = 0
258
259 chunks = []
260
261 while source.pos < len(data):
262 zresult = lib.ZSTD_compressStream(self._cstream, self._out, source)
263 if lib.ZSTD_isError(zresult):
264 raise ZstdError('zstd compress error: %s' %
265 ffi.string(lib.ZSTD_getErrorName(zresult)))
266
267 if self._out.pos:
268 chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:])
269 self._out.pos = 0
270
271 return b''.join(chunks)
272
273 def flush(self, flush_mode=COMPRESSOBJ_FLUSH_FINISH):
274 if flush_mode not in (COMPRESSOBJ_FLUSH_FINISH, COMPRESSOBJ_FLUSH_BLOCK):
275 raise ValueError('flush mode not recognized')
276
277 if self._finished:
278 raise ZstdError('compressor object already finished')
279
280 assert self._out.pos == 0
281
282 if flush_mode == COMPRESSOBJ_FLUSH_BLOCK:
283 zresult = lib.ZSTD_flushStream(self._cstream, self._out)
284 if lib.ZSTD_isError(zresult):
285 raise ZstdError('zstd compress error: %s' %
286 ffi.string(lib.ZSTD_getErrorName(zresult)))
287
288 # Output buffer is guaranteed to hold full block.
289 assert zresult == 0
290
291 if self._out.pos:
292 result = ffi.buffer(self._out.dst, self._out.pos)[:]
293 self._out.pos = 0
294 return result
295 else:
296 return b''
297
298 assert flush_mode == COMPRESSOBJ_FLUSH_FINISH
299 self._finished = True
300
301 chunks = []
302
303 while True:
304 zresult = lib.ZSTD_endStream(self._cstream, self._out)
305 if lib.ZSTD_isError(zresult):
306 raise ZstdError('error ending compression stream: %s' %
307 ffi.string(lib.ZSTD_getErroName(zresult)))
308
309 if self._out.pos:
310 chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:])
311 self._out.pos = 0
312
313 if not zresult:
314 break
315
316 # GC compression stream immediately.
317 self._cstream = None
318
319 return b''.join(chunks)
320
72 321
73 class ZstdCompressor(object): 322 class ZstdCompressor(object):
74 def __init__(self, level=3, dict_data=None, compression_params=None): 323 def __init__(self, level=3, dict_data=None, compression_params=None,
75 if dict_data: 324 write_checksum=False, write_content_size=False,
76 raise Exception('dict_data not yet supported') 325 write_dict_id=True):
77 if compression_params: 326 if level < 1:
78 raise Exception('compression_params not yet supported') 327 raise ValueError('level must be greater than 0')
328 elif level > lib.ZSTD_maxCLevel():
329 raise ValueError('level must be less than %d' % lib.ZSTD_maxCLevel())
79 330
80 self._compression_level = level 331 self._compression_level = level
81 332 self._dict_data = dict_data
82 def compress(self, data): 333 self._cparams = compression_params
83 # Just use the stream API for now. 334 self._fparams = ffi.new('ZSTD_frameParameters *')[0]
84 output = io.BytesIO() 335 self._fparams.checksumFlag = write_checksum
85 with self.write_to(output) as compressor: 336 self._fparams.contentSizeFlag = write_content_size
86 compressor.write(data) 337 self._fparams.noDictIDFlag = not write_dict_id
87 return output.getvalue() 338
88 339 cctx = lib.ZSTD_createCCtx()
89 def copy_stream(self, ifh, ofh): 340 if cctx == ffi.NULL:
90 cstream = self._get_cstream() 341 raise MemoryError()
342
343 self._cctx = ffi.gc(cctx, lib.ZSTD_freeCCtx)
344
345 def compress(self, data, allow_empty=False):
346 if len(data) == 0 and self._fparams.contentSizeFlag and not allow_empty:
347 raise ValueError('cannot write empty inputs when writing content sizes')
348
349 # TODO use a CDict for performance.
350 dict_data = ffi.NULL
351 dict_size = 0
352
353 if self._dict_data:
354 dict_data = self._dict_data.as_bytes()
355 dict_size = len(self._dict_data)
356
357 params = ffi.new('ZSTD_parameters *')[0]
358 if self._cparams:
359 params.cParams = self._cparams.as_compression_parameters()
360 else:
361 params.cParams = lib.ZSTD_getCParams(self._compression_level, len(data),
362 dict_size)
363 params.fParams = self._fparams
364
365 dest_size = lib.ZSTD_compressBound(len(data))
366 out = new_nonzero('char[]', dest_size)
367
368 zresult = lib.ZSTD_compress_advanced(self._cctx,
369 ffi.addressof(out), dest_size,
370 data, len(data),
371 dict_data, dict_size,
372 params)
373
374 if lib.ZSTD_isError(zresult):
375 raise ZstdError('cannot compress: %s' %
376 ffi.string(lib.ZSTD_getErrorName(zresult)))
377
378 return ffi.buffer(out, zresult)[:]
379
380 def compressobj(self, size=0):
381 cstream = self._get_cstream(size)
382 cobj = ZstdCompressionObj()
383 cobj._cstream = cstream
384 cobj._out = ffi.new('ZSTD_outBuffer *')
385 cobj._dst_buffer = ffi.new('char[]', COMPRESSION_RECOMMENDED_OUTPUT_SIZE)
386 cobj._out.dst = cobj._dst_buffer
387 cobj._out.size = COMPRESSION_RECOMMENDED_OUTPUT_SIZE
388 cobj._out.pos = 0
389 cobj._compressor = self
390 cobj._finished = False
391
392 return cobj
393
394 def copy_stream(self, ifh, ofh, size=0,
395 read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE,
396 write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE):
397
398 if not hasattr(ifh, 'read'):
399 raise ValueError('first argument must have a read() method')
400 if not hasattr(ofh, 'write'):
401 raise ValueError('second argument must have a write() method')
402
403 cstream = self._get_cstream(size)
91 404
92 in_buffer = ffi.new('ZSTD_inBuffer *') 405 in_buffer = ffi.new('ZSTD_inBuffer *')
93 out_buffer = ffi.new('ZSTD_outBuffer *') 406 out_buffer = ffi.new('ZSTD_outBuffer *')
94 407
95 out_buffer.dst = ffi.new('char[]', _CSTREAM_OUT_SIZE) 408 dst_buffer = ffi.new('char[]', write_size)
96 out_buffer.size = _CSTREAM_OUT_SIZE 409 out_buffer.dst = dst_buffer
410 out_buffer.size = write_size
97 out_buffer.pos = 0 411 out_buffer.pos = 0
98 412
99 total_read, total_write = 0, 0 413 total_read, total_write = 0, 0
100 414
101 while True: 415 while True:
102 data = ifh.read(_CSTREAM_IN_SIZE) 416 data = ifh.read(read_size)
103 if not data: 417 if not data:
104 break 418 break
105 419
106 total_read += len(data) 420 data_buffer = ffi.from_buffer(data)
107 421 total_read += len(data_buffer)
108 in_buffer.src = ffi.new('char[]', data) 422 in_buffer.src = data_buffer
109 in_buffer.size = len(data) 423 in_buffer.size = len(data_buffer)
110 in_buffer.pos = 0 424 in_buffer.pos = 0
111 425
112 while in_buffer.pos < in_buffer.size: 426 while in_buffer.pos < in_buffer.size:
113 res = lib.ZSTD_compressStream(cstream, out_buffer, in_buffer) 427 zresult = lib.ZSTD_compressStream(cstream, out_buffer, in_buffer)
114 if lib.ZSTD_isError(res): 428 if lib.ZSTD_isError(zresult):
115 raise Exception('zstd compress error: %s' % 429 raise ZstdError('zstd compress error: %s' %
116 lib.ZSTD_getErrorName(res)) 430 ffi.string(lib.ZSTD_getErrorName(zresult)))
117 431
118 if out_buffer.pos: 432 if out_buffer.pos:
119 ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) 433 ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
120 total_write = out_buffer.pos 434 total_write += out_buffer.pos
121 out_buffer.pos = 0 435 out_buffer.pos = 0
122 436
123 # We've finished reading. Flush the compressor. 437 # We've finished reading. Flush the compressor.
124 while True: 438 while True:
125 res = lib.ZSTD_endStream(cstream, out_buffer) 439 zresult = lib.ZSTD_endStream(cstream, out_buffer)
126 if lib.ZSTD_isError(res): 440 if lib.ZSTD_isError(zresult):
127 raise Exception('error ending compression stream: %s' % 441 raise ZstdError('error ending compression stream: %s' %
128 lib.ZSTD_getErrorName(res)) 442 ffi.string(lib.ZSTD_getErrorName(zresult)))
129 443
130 if out_buffer.pos: 444 if out_buffer.pos:
131 ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) 445 ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
132 total_write += out_buffer.pos 446 total_write += out_buffer.pos
133 out_buffer.pos = 0 447 out_buffer.pos = 0
134 448
135 if res == 0: 449 if zresult == 0:
136 break 450 break
137 451
138 return total_read, total_write 452 return total_read, total_write
139 453
140 def write_to(self, writer): 454 def write_to(self, writer, size=0,
141 return _ZstdCompressionWriter(self._get_cstream(), writer) 455 write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE):
142 456
143 def _get_cstream(self): 457 if not hasattr(writer, 'write'):
458 raise ValueError('must pass an object with a write() method')
459
460 return ZstdCompressionWriter(self, writer, size, write_size)
461
462 def read_from(self, reader, size=0,
463 read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE,
464 write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE):
465 if hasattr(reader, 'read'):
466 have_read = True
467 elif hasattr(reader, '__getitem__'):
468 have_read = False
469 buffer_offset = 0
470 size = len(reader)
471 else:
472 raise ValueError('must pass an object with a read() method or '
473 'conforms to buffer protocol')
474
475 cstream = self._get_cstream(size)
476
477 in_buffer = ffi.new('ZSTD_inBuffer *')
478 out_buffer = ffi.new('ZSTD_outBuffer *')
479
480 in_buffer.src = ffi.NULL
481 in_buffer.size = 0
482 in_buffer.pos = 0
483
484 dst_buffer = ffi.new('char[]', write_size)
485 out_buffer.dst = dst_buffer
486 out_buffer.size = write_size
487 out_buffer.pos = 0
488
489 while True:
490 # We should never have output data sitting around after a previous
491 # iteration.
492 assert out_buffer.pos == 0
493
494 # Collect input data.
495 if have_read:
496 read_result = reader.read(read_size)
497 else:
498 remaining = len(reader) - buffer_offset
499 slice_size = min(remaining, read_size)
500 read_result = reader[buffer_offset:buffer_offset + slice_size]
501 buffer_offset += slice_size
502
503 # No new input data. Break out of the read loop.
504 if not read_result:
505 break
506
507 # Feed all read data into the compressor and emit output until
508 # exhausted.
509 read_buffer = ffi.from_buffer(read_result)
510 in_buffer.src = read_buffer
511 in_buffer.size = len(read_buffer)
512 in_buffer.pos = 0
513
514 while in_buffer.pos < in_buffer.size:
515 zresult = lib.ZSTD_compressStream(cstream, out_buffer, in_buffer)
516 if lib.ZSTD_isError(zresult):
517 raise ZstdError('zstd compress error: %s' %
518 ffi.string(lib.ZSTD_getErrorName(zresult)))
519
520 if out_buffer.pos:
521 data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
522 out_buffer.pos = 0
523 yield data
524
525 assert out_buffer.pos == 0
526
527 # And repeat the loop to collect more data.
528 continue
529
530 # If we get here, input is exhausted. End the stream and emit what
531 # remains.
532 while True:
533 assert out_buffer.pos == 0
534 zresult = lib.ZSTD_endStream(cstream, out_buffer)
535 if lib.ZSTD_isError(zresult):
536 raise ZstdError('error ending compression stream: %s' %
537 ffi.string(lib.ZSTD_getErrorName(zresult)))
538
539 if out_buffer.pos:
540 data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
541 out_buffer.pos = 0
542 yield data
543
544 if zresult == 0:
545 break
546
547 def _get_cstream(self, size):
144 cstream = lib.ZSTD_createCStream() 548 cstream = lib.ZSTD_createCStream()
549 if cstream == ffi.NULL:
550 raise MemoryError()
551
145 cstream = ffi.gc(cstream, lib.ZSTD_freeCStream) 552 cstream = ffi.gc(cstream, lib.ZSTD_freeCStream)
146 553
147 res = lib.ZSTD_initCStream(cstream, self._compression_level) 554 dict_data = ffi.NULL
148 if lib.ZSTD_isError(res): 555 dict_size = 0
556 if self._dict_data:
557 dict_data = self._dict_data.as_bytes()
558 dict_size = len(self._dict_data)
559
560 zparams = ffi.new('ZSTD_parameters *')[0]
561 if self._cparams:
562 zparams.cParams = self._cparams.as_compression_parameters()
563 else:
564 zparams.cParams = lib.ZSTD_getCParams(self._compression_level,
565 size, dict_size)
566 zparams.fParams = self._fparams
567
568 zresult = lib.ZSTD_initCStream_advanced(cstream, dict_data, dict_size,
569 zparams, size)
570 if lib.ZSTD_isError(zresult):
149 raise Exception('cannot init CStream: %s' % 571 raise Exception('cannot init CStream: %s' %
150 lib.ZSTD_getErrorName(res)) 572 ffi.string(lib.ZSTD_getErrorName(zresult)))
151 573
152 return cstream 574 return cstream
575
576
577 class FrameParameters(object):
578 def __init__(self, fparams):
579 self.content_size = fparams.frameContentSize
580 self.window_size = fparams.windowSize
581 self.dict_id = fparams.dictID
582 self.has_checksum = bool(fparams.checksumFlag)
583
584
585 def get_frame_parameters(data):
586 if not isinstance(data, bytes_type):
587 raise TypeError('argument must be bytes')
588
589 params = ffi.new('ZSTD_frameParams *')
590
591 zresult = lib.ZSTD_getFrameParams(params, data, len(data))
592 if lib.ZSTD_isError(zresult):
593 raise ZstdError('cannot get frame parameters: %s' %
594 ffi.string(lib.ZSTD_getErrorName(zresult)))
595
596 if zresult:
597 raise ZstdError('not enough data for frame parameters; need %d bytes' %
598 zresult)
599
600 return FrameParameters(params[0])
601
602
603 class ZstdCompressionDict(object):
604 def __init__(self, data):
605 assert isinstance(data, bytes_type)
606 self._data = data
607
608 def __len__(self):
609 return len(self._data)
610
611 def dict_id(self):
612 return int_type(lib.ZDICT_getDictID(self._data, len(self._data)))
613
614 def as_bytes(self):
615 return self._data
616
617
618 def train_dictionary(dict_size, samples, parameters=None):
619 if not isinstance(samples, list):
620 raise TypeError('samples must be a list')
621
622 total_size = sum(map(len, samples))
623
624 samples_buffer = new_nonzero('char[]', total_size)
625 sample_sizes = new_nonzero('size_t[]', len(samples))
626
627 offset = 0
628 for i, sample in enumerate(samples):
629 if not isinstance(sample, bytes_type):
630 raise ValueError('samples must be bytes')
631
632 l = len(sample)
633 ffi.memmove(samples_buffer + offset, sample, l)
634 offset += l
635 sample_sizes[i] = l
636
637 dict_data = new_nonzero('char[]', dict_size)
638
639 zresult = lib.ZDICT_trainFromBuffer(ffi.addressof(dict_data), dict_size,
640 ffi.addressof(samples_buffer),
641 ffi.addressof(sample_sizes, 0),
642 len(samples))
643 if lib.ZDICT_isError(zresult):
644 raise ZstdError('Cannot train dict: %s' %
645 ffi.string(lib.ZDICT_getErrorName(zresult)))
646
647 return ZstdCompressionDict(ffi.buffer(dict_data, zresult)[:])
648
649
650 class ZstdDecompressionObj(object):
651 def __init__(self, decompressor):
652 self._decompressor = decompressor
653 self._dstream = self._decompressor._get_dstream()
654 self._finished = False
655
656 def decompress(self, data):
657 if self._finished:
658 raise ZstdError('cannot use a decompressobj multiple times')
659
660 in_buffer = ffi.new('ZSTD_inBuffer *')
661 out_buffer = ffi.new('ZSTD_outBuffer *')
662
663 data_buffer = ffi.from_buffer(data)
664 in_buffer.src = data_buffer
665 in_buffer.size = len(data_buffer)
666 in_buffer.pos = 0
667
668 dst_buffer = ffi.new('char[]', DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE)
669 out_buffer.dst = dst_buffer
670 out_buffer.size = len(dst_buffer)
671 out_buffer.pos = 0
672
673 chunks = []
674
675 while in_buffer.pos < in_buffer.size:
676 zresult = lib.ZSTD_decompressStream(self._dstream, out_buffer, in_buffer)
677 if lib.ZSTD_isError(zresult):
678 raise ZstdError('zstd decompressor error: %s' %
679 ffi.string(lib.ZSTD_getErrorName(zresult)))
680
681 if zresult == 0:
682 self._finished = True
683 self._dstream = None
684 self._decompressor = None
685
686 if out_buffer.pos:
687 chunks.append(ffi.buffer(out_buffer.dst, out_buffer.pos)[:])
688 out_buffer.pos = 0
689
690 return b''.join(chunks)
691
692
693 class ZstdDecompressionWriter(object):
694 def __init__(self, decompressor, writer, write_size):
695 self._decompressor = decompressor
696 self._writer = writer
697 self._write_size = write_size
698 self._dstream = None
699 self._entered = False
700
701 def __enter__(self):
702 if self._entered:
703 raise ZstdError('cannot __enter__ multiple times')
704
705 self._dstream = self._decompressor._get_dstream()
706 self._entered = True
707
708 return self
709
710 def __exit__(self, exc_type, exc_value, exc_tb):
711 self._entered = False
712 self._dstream = None
713
714 def memory_size(self):
715 if not self._dstream:
716 raise ZstdError('cannot determine size of inactive decompressor '
717 'call when context manager is active')
718
719 return lib.ZSTD_sizeof_DStream(self._dstream)
720
721 def write(self, data):
722 if not self._entered:
723 raise ZstdError('write must be called from an active context manager')
724
725 total_write = 0
726
727 in_buffer = ffi.new('ZSTD_inBuffer *')
728 out_buffer = ffi.new('ZSTD_outBuffer *')
729
730 data_buffer = ffi.from_buffer(data)
731 in_buffer.src = data_buffer
732 in_buffer.size = len(data_buffer)
733 in_buffer.pos = 0
734
735 dst_buffer = ffi.new('char[]', self._write_size)
736 out_buffer.dst = dst_buffer
737 out_buffer.size = len(dst_buffer)
738 out_buffer.pos = 0
739
740 while in_buffer.pos < in_buffer.size:
741 zresult = lib.ZSTD_decompressStream(self._dstream, out_buffer, in_buffer)
742 if lib.ZSTD_isError(zresult):
743 raise ZstdError('zstd decompress error: %s' %
744 ffi.string(lib.ZSTD_getErrorName(zresult)))
745
746 if out_buffer.pos:
747 self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:])
748 total_write += out_buffer.pos
749 out_buffer.pos = 0
750
751 return total_write
752
753
754 class ZstdDecompressor(object):
755 def __init__(self, dict_data=None):
756 self._dict_data = dict_data
757
758 dctx = lib.ZSTD_createDCtx()
759 if dctx == ffi.NULL:
760 raise MemoryError()
761
762 self._refdctx = ffi.gc(dctx, lib.ZSTD_freeDCtx)
763
764 @property
765 def _ddict(self):
766 if self._dict_data:
767 dict_data = self._dict_data.as_bytes()
768 dict_size = len(self._dict_data)
769
770 ddict = lib.ZSTD_createDDict(dict_data, dict_size)
771 if ddict == ffi.NULL:
772 raise ZstdError('could not create decompression dict')
773 else:
774 ddict = None
775
776 self.__dict__['_ddict'] = ddict
777 return ddict
778
779 def decompress(self, data, max_output_size=0):
780 data_buffer = ffi.from_buffer(data)
781
782 orig_dctx = new_nonzero('char[]', lib.ZSTD_sizeof_DCtx(self._refdctx))
783 dctx = ffi.cast('ZSTD_DCtx *', orig_dctx)
784 lib.ZSTD_copyDCtx(dctx, self._refdctx)
785
786 ddict = self._ddict
787
788 output_size = lib.ZSTD_getDecompressedSize(data_buffer, len(data_buffer))
789 if output_size:
790 result_buffer = ffi.new('char[]', output_size)
791 result_size = output_size
792 else:
793 if not max_output_size:
794 raise ZstdError('input data invalid or missing content size '
795 'in frame header')
796
797 result_buffer = ffi.new('char[]', max_output_size)
798 result_size = max_output_size
799
800 if ddict:
801 zresult = lib.ZSTD_decompress_usingDDict(dctx,
802 result_buffer, result_size,
803 data_buffer, len(data_buffer),
804 ddict)
805 else:
806 zresult = lib.ZSTD_decompressDCtx(dctx,
807 result_buffer, result_size,
808 data_buffer, len(data_buffer))
809 if lib.ZSTD_isError(zresult):
810 raise ZstdError('decompression error: %s' %
811 ffi.string(lib.ZSTD_getErrorName(zresult)))
812 elif output_size and zresult != output_size:
813 raise ZstdError('decompression error: decompressed %d bytes; expected %d' %
814 (zresult, output_size))
815
816 return ffi.buffer(result_buffer, zresult)[:]
817
818 def decompressobj(self):
819 return ZstdDecompressionObj(self)
820
821 def read_from(self, reader, read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE,
822 write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE,
823 skip_bytes=0):
824 if skip_bytes >= read_size:
825 raise ValueError('skip_bytes must be smaller than read_size')
826
827 if hasattr(reader, 'read'):
828 have_read = True
829 elif hasattr(reader, '__getitem__'):
830 have_read = False
831 buffer_offset = 0
832 size = len(reader)
833 else:
834 raise ValueError('must pass an object with a read() method or '
835 'conforms to buffer protocol')
836
837 if skip_bytes:
838 if have_read:
839 reader.read(skip_bytes)
840 else:
841 if skip_bytes > size:
842 raise ValueError('skip_bytes larger than first input chunk')
843
844 buffer_offset = skip_bytes
845
846 dstream = self._get_dstream()
847
848 in_buffer = ffi.new('ZSTD_inBuffer *')
849 out_buffer = ffi.new('ZSTD_outBuffer *')
850
851 dst_buffer = ffi.new('char[]', write_size)
852 out_buffer.dst = dst_buffer
853 out_buffer.size = len(dst_buffer)
854 out_buffer.pos = 0
855
856 while True:
857 assert out_buffer.pos == 0
858
859 if have_read:
860 read_result = reader.read(read_size)
861 else:
862 remaining = size - buffer_offset
863 slice_size = min(remaining, read_size)
864 read_result = reader[buffer_offset:buffer_offset + slice_size]
865 buffer_offset += slice_size
866
867 # No new input. Break out of read loop.
868 if not read_result:
869 break
870
871 # Feed all read data into decompressor and emit output until
872 # exhausted.
873 read_buffer = ffi.from_buffer(read_result)
874 in_buffer.src = read_buffer
875 in_buffer.size = len(read_buffer)
876 in_buffer.pos = 0
877
878 while in_buffer.pos < in_buffer.size:
879 assert out_buffer.pos == 0
880
881 zresult = lib.ZSTD_decompressStream(dstream, out_buffer, in_buffer)
882 if lib.ZSTD_isError(zresult):
883 raise ZstdError('zstd decompress error: %s' %
884 ffi.string(lib.ZSTD_getErrorName(zresult)))
885
886 if out_buffer.pos:
887 data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
888 out_buffer.pos = 0
889 yield data
890
891 if zresult == 0:
892 return
893
894 # Repeat loop to collect more input data.
895 continue
896
897 # If we get here, input is exhausted.
898
899 def write_to(self, writer, write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE):
900 if not hasattr(writer, 'write'):
901 raise ValueError('must pass an object with a write() method')
902
903 return ZstdDecompressionWriter(self, writer, write_size)
904
905 def copy_stream(self, ifh, ofh,
906 read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE,
907 write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE):
908 if not hasattr(ifh, 'read'):
909 raise ValueError('first argument must have a read() method')
910 if not hasattr(ofh, 'write'):
911 raise ValueError('second argument must have a write() method')
912
913 dstream = self._get_dstream()
914
915 in_buffer = ffi.new('ZSTD_inBuffer *')
916 out_buffer = ffi.new('ZSTD_outBuffer *')
917
918 dst_buffer = ffi.new('char[]', write_size)
919 out_buffer.dst = dst_buffer
920 out_buffer.size = write_size
921 out_buffer.pos = 0
922
923 total_read, total_write = 0, 0
924
925 # Read all available input.
926 while True:
927 data = ifh.read(read_size)
928 if not data:
929 break
930
931 data_buffer = ffi.from_buffer(data)
932 total_read += len(data_buffer)
933 in_buffer.src = data_buffer
934 in_buffer.size = len(data_buffer)
935 in_buffer.pos = 0
936
937 # Flush all read data to output.
938 while in_buffer.pos < in_buffer.size:
939 zresult = lib.ZSTD_decompressStream(dstream, out_buffer, in_buffer)
940 if lib.ZSTD_isError(zresult):
941 raise ZstdError('zstd decompressor error: %s' %
942 ffi.string(lib.ZSTD_getErrorName(zresult)))
943
944 if out_buffer.pos:
945 ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
946 total_write += out_buffer.pos
947 out_buffer.pos = 0
948
949 # Continue loop to keep reading.
950
951 return total_read, total_write
952
953 def decompress_content_dict_chain(self, frames):
954 if not isinstance(frames, list):
955 raise TypeError('argument must be a list')
956
957 if not frames:
958 raise ValueError('empty input chain')
959
960 # First chunk should not be using a dictionary. We handle it specially.
961 chunk = frames[0]
962 if not isinstance(chunk, bytes_type):
963 raise ValueError('chunk 0 must be bytes')
964
965 # All chunks should be zstd frames and should have content size set.
966 chunk_buffer = ffi.from_buffer(chunk)
967 params = ffi.new('ZSTD_frameParams *')
968 zresult = lib.ZSTD_getFrameParams(params, chunk_buffer, len(chunk_buffer))
969 if lib.ZSTD_isError(zresult):
970 raise ValueError('chunk 0 is not a valid zstd frame')
971 elif zresult:
972 raise ValueError('chunk 0 is too small to contain a zstd frame')
973
974 if not params.frameContentSize:
975 raise ValueError('chunk 0 missing content size in frame')
976
977 dctx = lib.ZSTD_createDCtx()
978 if dctx == ffi.NULL:
979 raise MemoryError()
980
981 dctx = ffi.gc(dctx, lib.ZSTD_freeDCtx)
982
983 last_buffer = ffi.new('char[]', params.frameContentSize)
984
985 zresult = lib.ZSTD_decompressDCtx(dctx, last_buffer, len(last_buffer),
986 chunk_buffer, len(chunk_buffer))
987 if lib.ZSTD_isError(zresult):
988 raise ZstdError('could not decompress chunk 0: %s' %
989 ffi.string(lib.ZSTD_getErrorName(zresult)))
990
991 # Special case of chain length of 1
992 if len(frames) == 1:
993 return ffi.buffer(last_buffer, len(last_buffer))[:]
994
995 i = 1
996 while i < len(frames):
997 chunk = frames[i]
998 if not isinstance(chunk, bytes_type):
999 raise ValueError('chunk %d must be bytes' % i)
1000
1001 chunk_buffer = ffi.from_buffer(chunk)
1002 zresult = lib.ZSTD_getFrameParams(params, chunk_buffer, len(chunk_buffer))
1003 if lib.ZSTD_isError(zresult):
1004 raise ValueError('chunk %d is not a valid zstd frame' % i)
1005 elif zresult:
1006 raise ValueError('chunk %d is too small to contain a zstd frame' % i)
1007
1008 if not params.frameContentSize:
1009 raise ValueError('chunk %d missing content size in frame' % i)
1010
1011 dest_buffer = ffi.new('char[]', params.frameContentSize)
1012
1013 zresult = lib.ZSTD_decompress_usingDict(dctx, dest_buffer, len(dest_buffer),
1014 chunk_buffer, len(chunk_buffer),
1015 last_buffer, len(last_buffer))
1016 if lib.ZSTD_isError(zresult):
1017 raise ZstdError('could not decompress chunk %d' % i)
1018
1019 last_buffer = dest_buffer
1020 i += 1
1021
1022 return ffi.buffer(last_buffer, len(last_buffer))[:]
1023
1024 def _get_dstream(self):
1025 dstream = lib.ZSTD_createDStream()
1026 if dstream == ffi.NULL:
1027 raise MemoryError()
1028
1029 dstream = ffi.gc(dstream, lib.ZSTD_freeDStream)
1030
1031 if self._dict_data:
1032 zresult = lib.ZSTD_initDStream_usingDict(dstream,
1033 self._dict_data.as_bytes(),
1034 len(self._dict_data))
1035 else:
1036 zresult = lib.ZSTD_initDStream(dstream)
1037
1038 if lib.ZSTD_isError(zresult):
1039 raise ZstdError('could not initialize DStream: %s' %
1040 ffi.string(lib.ZSTD_getErrorName(zresult)))
1041
1042 return dstream