Mercurial > public > mercurial-scm > hg
comparison contrib/python-zstandard/zstd_cffi.py @ 30895:c32454d69b85
zstd: vendor python-zstandard 0.7.0
Commit 3054ae3a66112970a091d3939fee32c2d0c1a23e from
https://github.com/indygreg/python-zstandard is imported without
modifications (other than removing unwanted files).
The vendored zstd library within has been upgraded from 1.1.2 to
1.1.3. This version introduced new APIs for threads, thread
pools, multi-threaded compression, and a new dictionary
builder (COVER). These features are not yet used by
python-zstandard (or Mercurial for that matter). However,
that will likely change in the next python-zstandard release
(and I think there are opportunities for Mercurial to take
advantage of the multi-threaded APIs).
Relevant to Mercurial, the CFFI bindings are now fully
implemented. This means zstd should "just work" with PyPy
(although I haven't tried). The python-zstandard test suite also
runs all tests against both the C extension and CFFI bindings to
ensure feature parity.
There is also a "decompress_content_dict_chain()" API. This was
derived from discussions with Yann Collet on list about alternate
ways of encoding delta chains.
The change most relevant to Mercurial is a performance enhancement in
the simple decompression API to reuse a data structure across
operations. This makes decompression of multiple inputs significantly
faster. (This scenario occurs when reading revlog delta chains, for
example.)
Using python-zstandard's bench.py to measure the performance
difference...
On changelog chunks in the mozilla-unified repo:
decompress discrete decompress() reuse zctx
1.262243 wall; 1.260000 CPU; 1.260000 user; 0.000000 sys 170.43 MB/s (best of 3)
0.949106 wall; 0.950000 CPU; 0.950000 user; 0.000000 sys 226.66 MB/s (best of 4)
decompress discrete dict decompress() reuse zctx
0.692170 wall; 0.690000 CPU; 0.690000 user; 0.000000 sys 310.80 MB/s (best of 5)
0.437088 wall; 0.440000 CPU; 0.440000 user; 0.000000 sys 492.17 MB/s (best of 7)
On manifest chunks in the mozilla-unified repo:
decompress discrete decompress() reuse zctx
1.367284 wall; 1.370000 CPU; 1.370000 user; 0.000000 sys 274.01 MB/s (best of 3)
1.086831 wall; 1.080000 CPU; 1.080000 user; 0.000000 sys 344.72 MB/s (best of 3)
decompress discrete dict decompress() reuse zctx
0.993272 wall; 0.990000 CPU; 0.990000 user; 0.000000 sys 377.19 MB/s (best of 3)
0.678651 wall; 0.680000 CPU; 0.680000 user; 0.000000 sys 552.06 MB/s (best of 5)
That should make reads on zstd revlogs a bit faster ;)
# no-check-commit
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Tue, 07 Feb 2017 23:24:47 -0800 |
parents | b86a448a2965 |
children | e0dc40530c5a |
comparison
equal
deleted
inserted
replaced
30894:5b60464efbde | 30895:c32454d69b85 |
---|---|
6 | 6 |
7 """Python interface to the Zstandard (zstd) compression library.""" | 7 """Python interface to the Zstandard (zstd) compression library.""" |
8 | 8 |
9 from __future__ import absolute_import, unicode_literals | 9 from __future__ import absolute_import, unicode_literals |
10 | 10 |
11 import io | 11 import sys |
12 | 12 |
13 from _zstd_cffi import ( | 13 from _zstd_cffi import ( |
14 ffi, | 14 ffi, |
15 lib, | 15 lib, |
16 ) | 16 ) |
17 | 17 |
18 | 18 if sys.version_info[0] == 2: |
19 _CSTREAM_IN_SIZE = lib.ZSTD_CStreamInSize() | 19 bytes_type = str |
20 _CSTREAM_OUT_SIZE = lib.ZSTD_CStreamOutSize() | 20 int_type = long |
21 | 21 else: |
22 | 22 bytes_type = bytes |
23 class _ZstdCompressionWriter(object): | 23 int_type = int |
24 def __init__(self, cstream, writer): | 24 |
25 self._cstream = cstream | 25 |
26 COMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_CStreamInSize() | |
27 COMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_CStreamOutSize() | |
28 DECOMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_DStreamInSize() | |
29 DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_DStreamOutSize() | |
30 | |
31 new_nonzero = ffi.new_allocator(should_clear_after_alloc=False) | |
32 | |
33 | |
34 MAX_COMPRESSION_LEVEL = lib.ZSTD_maxCLevel() | |
35 MAGIC_NUMBER = lib.ZSTD_MAGICNUMBER | |
36 FRAME_HEADER = b'\x28\xb5\x2f\xfd' | |
37 ZSTD_VERSION = (lib.ZSTD_VERSION_MAJOR, lib.ZSTD_VERSION_MINOR, lib.ZSTD_VERSION_RELEASE) | |
38 | |
39 WINDOWLOG_MIN = lib.ZSTD_WINDOWLOG_MIN | |
40 WINDOWLOG_MAX = lib.ZSTD_WINDOWLOG_MAX | |
41 CHAINLOG_MIN = lib.ZSTD_CHAINLOG_MIN | |
42 CHAINLOG_MAX = lib.ZSTD_CHAINLOG_MAX | |
43 HASHLOG_MIN = lib.ZSTD_HASHLOG_MIN | |
44 HASHLOG_MAX = lib.ZSTD_HASHLOG_MAX | |
45 HASHLOG3_MAX = lib.ZSTD_HASHLOG3_MAX | |
46 SEARCHLOG_MIN = lib.ZSTD_SEARCHLOG_MIN | |
47 SEARCHLOG_MAX = lib.ZSTD_SEARCHLOG_MAX | |
48 SEARCHLENGTH_MIN = lib.ZSTD_SEARCHLENGTH_MIN | |
49 SEARCHLENGTH_MAX = lib.ZSTD_SEARCHLENGTH_MAX | |
50 TARGETLENGTH_MIN = lib.ZSTD_TARGETLENGTH_MIN | |
51 TARGETLENGTH_MAX = lib.ZSTD_TARGETLENGTH_MAX | |
52 | |
53 STRATEGY_FAST = lib.ZSTD_fast | |
54 STRATEGY_DFAST = lib.ZSTD_dfast | |
55 STRATEGY_GREEDY = lib.ZSTD_greedy | |
56 STRATEGY_LAZY = lib.ZSTD_lazy | |
57 STRATEGY_LAZY2 = lib.ZSTD_lazy2 | |
58 STRATEGY_BTLAZY2 = lib.ZSTD_btlazy2 | |
59 STRATEGY_BTOPT = lib.ZSTD_btopt | |
60 | |
61 COMPRESSOBJ_FLUSH_FINISH = 0 | |
62 COMPRESSOBJ_FLUSH_BLOCK = 1 | |
63 | |
64 | |
65 class ZstdError(Exception): | |
66 pass | |
67 | |
68 | |
69 class CompressionParameters(object): | |
70 def __init__(self, window_log, chain_log, hash_log, search_log, | |
71 search_length, target_length, strategy): | |
72 if window_log < WINDOWLOG_MIN or window_log > WINDOWLOG_MAX: | |
73 raise ValueError('invalid window log value') | |
74 | |
75 if chain_log < CHAINLOG_MIN or chain_log > CHAINLOG_MAX: | |
76 raise ValueError('invalid chain log value') | |
77 | |
78 if hash_log < HASHLOG_MIN or hash_log > HASHLOG_MAX: | |
79 raise ValueError('invalid hash log value') | |
80 | |
81 if search_log < SEARCHLOG_MIN or search_log > SEARCHLOG_MAX: | |
82 raise ValueError('invalid search log value') | |
83 | |
84 if search_length < SEARCHLENGTH_MIN or search_length > SEARCHLENGTH_MAX: | |
85 raise ValueError('invalid search length value') | |
86 | |
87 if target_length < TARGETLENGTH_MIN or target_length > TARGETLENGTH_MAX: | |
88 raise ValueError('invalid target length value') | |
89 | |
90 if strategy < STRATEGY_FAST or strategy > STRATEGY_BTOPT: | |
91 raise ValueError('invalid strategy value') | |
92 | |
93 self.window_log = window_log | |
94 self.chain_log = chain_log | |
95 self.hash_log = hash_log | |
96 self.search_log = search_log | |
97 self.search_length = search_length | |
98 self.target_length = target_length | |
99 self.strategy = strategy | |
100 | |
101 def as_compression_parameters(self): | |
102 p = ffi.new('ZSTD_compressionParameters *')[0] | |
103 p.windowLog = self.window_log | |
104 p.chainLog = self.chain_log | |
105 p.hashLog = self.hash_log | |
106 p.searchLog = self.search_log | |
107 p.searchLength = self.search_length | |
108 p.targetLength = self.target_length | |
109 p.strategy = self.strategy | |
110 | |
111 return p | |
112 | |
113 def get_compression_parameters(level, source_size=0, dict_size=0): | |
114 params = lib.ZSTD_getCParams(level, source_size, dict_size) | |
115 return CompressionParameters(window_log=params.windowLog, | |
116 chain_log=params.chainLog, | |
117 hash_log=params.hashLog, | |
118 search_log=params.searchLog, | |
119 search_length=params.searchLength, | |
120 target_length=params.targetLength, | |
121 strategy=params.strategy) | |
122 | |
123 | |
124 def estimate_compression_context_size(params): | |
125 if not isinstance(params, CompressionParameters): | |
126 raise ValueError('argument must be a CompressionParameters') | |
127 | |
128 cparams = params.as_compression_parameters() | |
129 return lib.ZSTD_estimateCCtxSize(cparams) | |
130 | |
131 | |
132 def estimate_decompression_context_size(): | |
133 return lib.ZSTD_estimateDCtxSize() | |
134 | |
135 | |
136 class ZstdCompressionWriter(object): | |
137 def __init__(self, compressor, writer, source_size, write_size): | |
138 self._compressor = compressor | |
26 self._writer = writer | 139 self._writer = writer |
140 self._source_size = source_size | |
141 self._write_size = write_size | |
142 self._entered = False | |
27 | 143 |
28 def __enter__(self): | 144 def __enter__(self): |
145 if self._entered: | |
146 raise ZstdError('cannot __enter__ multiple times') | |
147 | |
148 self._cstream = self._compressor._get_cstream(self._source_size) | |
149 self._entered = True | |
29 return self | 150 return self |
30 | 151 |
31 def __exit__(self, exc_type, exc_value, exc_tb): | 152 def __exit__(self, exc_type, exc_value, exc_tb): |
153 self._entered = False | |
154 | |
32 if not exc_type and not exc_value and not exc_tb: | 155 if not exc_type and not exc_value and not exc_tb: |
33 out_buffer = ffi.new('ZSTD_outBuffer *') | 156 out_buffer = ffi.new('ZSTD_outBuffer *') |
34 out_buffer.dst = ffi.new('char[]', _CSTREAM_OUT_SIZE) | 157 dst_buffer = ffi.new('char[]', self._write_size) |
35 out_buffer.size = _CSTREAM_OUT_SIZE | 158 out_buffer.dst = dst_buffer |
159 out_buffer.size = self._write_size | |
36 out_buffer.pos = 0 | 160 out_buffer.pos = 0 |
37 | 161 |
38 while True: | 162 while True: |
39 res = lib.ZSTD_endStream(self._cstream, out_buffer) | 163 zresult = lib.ZSTD_endStream(self._cstream, out_buffer) |
40 if lib.ZSTD_isError(res): | 164 if lib.ZSTD_isError(zresult): |
41 raise Exception('error ending compression stream: %s' % lib.ZSTD_getErrorName) | 165 raise ZstdError('error ending compression stream: %s' % |
166 ffi.string(lib.ZSTD_getErrorName(zresult))) | |
42 | 167 |
43 if out_buffer.pos: | 168 if out_buffer.pos: |
44 self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) | 169 self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) |
45 out_buffer.pos = 0 | 170 out_buffer.pos = 0 |
46 | 171 |
47 if res == 0: | 172 if zresult == 0: |
48 break | 173 break |
49 | 174 |
175 self._cstream = None | |
176 self._compressor = None | |
177 | |
50 return False | 178 return False |
51 | 179 |
180 def memory_size(self): | |
181 if not self._entered: | |
182 raise ZstdError('cannot determine size of an inactive compressor; ' | |
183 'call when a context manager is active') | |
184 | |
185 return lib.ZSTD_sizeof_CStream(self._cstream) | |
186 | |
52 def write(self, data): | 187 def write(self, data): |
188 if not self._entered: | |
189 raise ZstdError('write() must be called from an active context ' | |
190 'manager') | |
191 | |
192 total_write = 0 | |
193 | |
194 data_buffer = ffi.from_buffer(data) | |
195 | |
196 in_buffer = ffi.new('ZSTD_inBuffer *') | |
197 in_buffer.src = data_buffer | |
198 in_buffer.size = len(data_buffer) | |
199 in_buffer.pos = 0 | |
200 | |
53 out_buffer = ffi.new('ZSTD_outBuffer *') | 201 out_buffer = ffi.new('ZSTD_outBuffer *') |
54 out_buffer.dst = ffi.new('char[]', _CSTREAM_OUT_SIZE) | 202 dst_buffer = ffi.new('char[]', self._write_size) |
55 out_buffer.size = _CSTREAM_OUT_SIZE | 203 out_buffer.dst = dst_buffer |
204 out_buffer.size = self._write_size | |
56 out_buffer.pos = 0 | 205 out_buffer.pos = 0 |
57 | 206 |
58 # TODO can we reuse existing memory? | |
59 in_buffer = ffi.new('ZSTD_inBuffer *') | |
60 in_buffer.src = ffi.new('char[]', data) | |
61 in_buffer.size = len(data) | |
62 in_buffer.pos = 0 | |
63 while in_buffer.pos < in_buffer.size: | 207 while in_buffer.pos < in_buffer.size: |
64 res = lib.ZSTD_compressStream(self._cstream, out_buffer, in_buffer) | 208 zresult = lib.ZSTD_compressStream(self._cstream, out_buffer, in_buffer) |
65 if lib.ZSTD_isError(res): | 209 if lib.ZSTD_isError(zresult): |
66 raise Exception('zstd compress error: %s' % lib.ZSTD_getErrorName(res)) | 210 raise ZstdError('zstd compress error: %s' % |
211 ffi.string(lib.ZSTD_getErrorName(zresult))) | |
67 | 212 |
68 if out_buffer.pos: | 213 if out_buffer.pos: |
69 self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) | 214 self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) |
215 total_write += out_buffer.pos | |
70 out_buffer.pos = 0 | 216 out_buffer.pos = 0 |
71 | 217 |
218 return total_write | |
219 | |
220 def flush(self): | |
221 if not self._entered: | |
222 raise ZstdError('flush must be called from an active context manager') | |
223 | |
224 total_write = 0 | |
225 | |
226 out_buffer = ffi.new('ZSTD_outBuffer *') | |
227 dst_buffer = ffi.new('char[]', self._write_size) | |
228 out_buffer.dst = dst_buffer | |
229 out_buffer.size = self._write_size | |
230 out_buffer.pos = 0 | |
231 | |
232 while True: | |
233 zresult = lib.ZSTD_flushStream(self._cstream, out_buffer) | |
234 if lib.ZSTD_isError(zresult): | |
235 raise ZstdError('zstd compress error: %s' % | |
236 ffi.string(lib.ZSTD_getErrorName(zresult))) | |
237 | |
238 if not out_buffer.pos: | |
239 break | |
240 | |
241 self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) | |
242 total_write += out_buffer.pos | |
243 out_buffer.pos = 0 | |
244 | |
245 return total_write | |
246 | |
247 | |
248 class ZstdCompressionObj(object): | |
249 def compress(self, data): | |
250 if self._finished: | |
251 raise ZstdError('cannot call compress() after compressor finished') | |
252 | |
253 data_buffer = ffi.from_buffer(data) | |
254 source = ffi.new('ZSTD_inBuffer *') | |
255 source.src = data_buffer | |
256 source.size = len(data_buffer) | |
257 source.pos = 0 | |
258 | |
259 chunks = [] | |
260 | |
261 while source.pos < len(data): | |
262 zresult = lib.ZSTD_compressStream(self._cstream, self._out, source) | |
263 if lib.ZSTD_isError(zresult): | |
264 raise ZstdError('zstd compress error: %s' % | |
265 ffi.string(lib.ZSTD_getErrorName(zresult))) | |
266 | |
267 if self._out.pos: | |
268 chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:]) | |
269 self._out.pos = 0 | |
270 | |
271 return b''.join(chunks) | |
272 | |
273 def flush(self, flush_mode=COMPRESSOBJ_FLUSH_FINISH): | |
274 if flush_mode not in (COMPRESSOBJ_FLUSH_FINISH, COMPRESSOBJ_FLUSH_BLOCK): | |
275 raise ValueError('flush mode not recognized') | |
276 | |
277 if self._finished: | |
278 raise ZstdError('compressor object already finished') | |
279 | |
280 assert self._out.pos == 0 | |
281 | |
282 if flush_mode == COMPRESSOBJ_FLUSH_BLOCK: | |
283 zresult = lib.ZSTD_flushStream(self._cstream, self._out) | |
284 if lib.ZSTD_isError(zresult): | |
285 raise ZstdError('zstd compress error: %s' % | |
286 ffi.string(lib.ZSTD_getErrorName(zresult))) | |
287 | |
288 # Output buffer is guaranteed to hold full block. | |
289 assert zresult == 0 | |
290 | |
291 if self._out.pos: | |
292 result = ffi.buffer(self._out.dst, self._out.pos)[:] | |
293 self._out.pos = 0 | |
294 return result | |
295 else: | |
296 return b'' | |
297 | |
298 assert flush_mode == COMPRESSOBJ_FLUSH_FINISH | |
299 self._finished = True | |
300 | |
301 chunks = [] | |
302 | |
303 while True: | |
304 zresult = lib.ZSTD_endStream(self._cstream, self._out) | |
305 if lib.ZSTD_isError(zresult): | |
306 raise ZstdError('error ending compression stream: %s' % | |
307 ffi.string(lib.ZSTD_getErroName(zresult))) | |
308 | |
309 if self._out.pos: | |
310 chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:]) | |
311 self._out.pos = 0 | |
312 | |
313 if not zresult: | |
314 break | |
315 | |
316 # GC compression stream immediately. | |
317 self._cstream = None | |
318 | |
319 return b''.join(chunks) | |
320 | |
72 | 321 |
73 class ZstdCompressor(object): | 322 class ZstdCompressor(object): |
74 def __init__(self, level=3, dict_data=None, compression_params=None): | 323 def __init__(self, level=3, dict_data=None, compression_params=None, |
75 if dict_data: | 324 write_checksum=False, write_content_size=False, |
76 raise Exception('dict_data not yet supported') | 325 write_dict_id=True): |
77 if compression_params: | 326 if level < 1: |
78 raise Exception('compression_params not yet supported') | 327 raise ValueError('level must be greater than 0') |
328 elif level > lib.ZSTD_maxCLevel(): | |
329 raise ValueError('level must be less than %d' % lib.ZSTD_maxCLevel()) | |
79 | 330 |
80 self._compression_level = level | 331 self._compression_level = level |
81 | 332 self._dict_data = dict_data |
82 def compress(self, data): | 333 self._cparams = compression_params |
83 # Just use the stream API for now. | 334 self._fparams = ffi.new('ZSTD_frameParameters *')[0] |
84 output = io.BytesIO() | 335 self._fparams.checksumFlag = write_checksum |
85 with self.write_to(output) as compressor: | 336 self._fparams.contentSizeFlag = write_content_size |
86 compressor.write(data) | 337 self._fparams.noDictIDFlag = not write_dict_id |
87 return output.getvalue() | 338 |
88 | 339 cctx = lib.ZSTD_createCCtx() |
89 def copy_stream(self, ifh, ofh): | 340 if cctx == ffi.NULL: |
90 cstream = self._get_cstream() | 341 raise MemoryError() |
342 | |
343 self._cctx = ffi.gc(cctx, lib.ZSTD_freeCCtx) | |
344 | |
345 def compress(self, data, allow_empty=False): | |
346 if len(data) == 0 and self._fparams.contentSizeFlag and not allow_empty: | |
347 raise ValueError('cannot write empty inputs when writing content sizes') | |
348 | |
349 # TODO use a CDict for performance. | |
350 dict_data = ffi.NULL | |
351 dict_size = 0 | |
352 | |
353 if self._dict_data: | |
354 dict_data = self._dict_data.as_bytes() | |
355 dict_size = len(self._dict_data) | |
356 | |
357 params = ffi.new('ZSTD_parameters *')[0] | |
358 if self._cparams: | |
359 params.cParams = self._cparams.as_compression_parameters() | |
360 else: | |
361 params.cParams = lib.ZSTD_getCParams(self._compression_level, len(data), | |
362 dict_size) | |
363 params.fParams = self._fparams | |
364 | |
365 dest_size = lib.ZSTD_compressBound(len(data)) | |
366 out = new_nonzero('char[]', dest_size) | |
367 | |
368 zresult = lib.ZSTD_compress_advanced(self._cctx, | |
369 ffi.addressof(out), dest_size, | |
370 data, len(data), | |
371 dict_data, dict_size, | |
372 params) | |
373 | |
374 if lib.ZSTD_isError(zresult): | |
375 raise ZstdError('cannot compress: %s' % | |
376 ffi.string(lib.ZSTD_getErrorName(zresult))) | |
377 | |
378 return ffi.buffer(out, zresult)[:] | |
379 | |
380 def compressobj(self, size=0): | |
381 cstream = self._get_cstream(size) | |
382 cobj = ZstdCompressionObj() | |
383 cobj._cstream = cstream | |
384 cobj._out = ffi.new('ZSTD_outBuffer *') | |
385 cobj._dst_buffer = ffi.new('char[]', COMPRESSION_RECOMMENDED_OUTPUT_SIZE) | |
386 cobj._out.dst = cobj._dst_buffer | |
387 cobj._out.size = COMPRESSION_RECOMMENDED_OUTPUT_SIZE | |
388 cobj._out.pos = 0 | |
389 cobj._compressor = self | |
390 cobj._finished = False | |
391 | |
392 return cobj | |
393 | |
394 def copy_stream(self, ifh, ofh, size=0, | |
395 read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE, | |
396 write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE): | |
397 | |
398 if not hasattr(ifh, 'read'): | |
399 raise ValueError('first argument must have a read() method') | |
400 if not hasattr(ofh, 'write'): | |
401 raise ValueError('second argument must have a write() method') | |
402 | |
403 cstream = self._get_cstream(size) | |
91 | 404 |
92 in_buffer = ffi.new('ZSTD_inBuffer *') | 405 in_buffer = ffi.new('ZSTD_inBuffer *') |
93 out_buffer = ffi.new('ZSTD_outBuffer *') | 406 out_buffer = ffi.new('ZSTD_outBuffer *') |
94 | 407 |
95 out_buffer.dst = ffi.new('char[]', _CSTREAM_OUT_SIZE) | 408 dst_buffer = ffi.new('char[]', write_size) |
96 out_buffer.size = _CSTREAM_OUT_SIZE | 409 out_buffer.dst = dst_buffer |
410 out_buffer.size = write_size | |
97 out_buffer.pos = 0 | 411 out_buffer.pos = 0 |
98 | 412 |
99 total_read, total_write = 0, 0 | 413 total_read, total_write = 0, 0 |
100 | 414 |
101 while True: | 415 while True: |
102 data = ifh.read(_CSTREAM_IN_SIZE) | 416 data = ifh.read(read_size) |
103 if not data: | 417 if not data: |
104 break | 418 break |
105 | 419 |
106 total_read += len(data) | 420 data_buffer = ffi.from_buffer(data) |
107 | 421 total_read += len(data_buffer) |
108 in_buffer.src = ffi.new('char[]', data) | 422 in_buffer.src = data_buffer |
109 in_buffer.size = len(data) | 423 in_buffer.size = len(data_buffer) |
110 in_buffer.pos = 0 | 424 in_buffer.pos = 0 |
111 | 425 |
112 while in_buffer.pos < in_buffer.size: | 426 while in_buffer.pos < in_buffer.size: |
113 res = lib.ZSTD_compressStream(cstream, out_buffer, in_buffer) | 427 zresult = lib.ZSTD_compressStream(cstream, out_buffer, in_buffer) |
114 if lib.ZSTD_isError(res): | 428 if lib.ZSTD_isError(zresult): |
115 raise Exception('zstd compress error: %s' % | 429 raise ZstdError('zstd compress error: %s' % |
116 lib.ZSTD_getErrorName(res)) | 430 ffi.string(lib.ZSTD_getErrorName(zresult))) |
117 | 431 |
118 if out_buffer.pos: | 432 if out_buffer.pos: |
119 ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) | 433 ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) |
120 total_write = out_buffer.pos | 434 total_write += out_buffer.pos |
121 out_buffer.pos = 0 | 435 out_buffer.pos = 0 |
122 | 436 |
123 # We've finished reading. Flush the compressor. | 437 # We've finished reading. Flush the compressor. |
124 while True: | 438 while True: |
125 res = lib.ZSTD_endStream(cstream, out_buffer) | 439 zresult = lib.ZSTD_endStream(cstream, out_buffer) |
126 if lib.ZSTD_isError(res): | 440 if lib.ZSTD_isError(zresult): |
127 raise Exception('error ending compression stream: %s' % | 441 raise ZstdError('error ending compression stream: %s' % |
128 lib.ZSTD_getErrorName(res)) | 442 ffi.string(lib.ZSTD_getErrorName(zresult))) |
129 | 443 |
130 if out_buffer.pos: | 444 if out_buffer.pos: |
131 ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) | 445 ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) |
132 total_write += out_buffer.pos | 446 total_write += out_buffer.pos |
133 out_buffer.pos = 0 | 447 out_buffer.pos = 0 |
134 | 448 |
135 if res == 0: | 449 if zresult == 0: |
136 break | 450 break |
137 | 451 |
138 return total_read, total_write | 452 return total_read, total_write |
139 | 453 |
140 def write_to(self, writer): | 454 def write_to(self, writer, size=0, |
141 return _ZstdCompressionWriter(self._get_cstream(), writer) | 455 write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE): |
142 | 456 |
143 def _get_cstream(self): | 457 if not hasattr(writer, 'write'): |
458 raise ValueError('must pass an object with a write() method') | |
459 | |
460 return ZstdCompressionWriter(self, writer, size, write_size) | |
461 | |
462 def read_from(self, reader, size=0, | |
463 read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE, | |
464 write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE): | |
465 if hasattr(reader, 'read'): | |
466 have_read = True | |
467 elif hasattr(reader, '__getitem__'): | |
468 have_read = False | |
469 buffer_offset = 0 | |
470 size = len(reader) | |
471 else: | |
472 raise ValueError('must pass an object with a read() method or ' | |
473 'conforms to buffer protocol') | |
474 | |
475 cstream = self._get_cstream(size) | |
476 | |
477 in_buffer = ffi.new('ZSTD_inBuffer *') | |
478 out_buffer = ffi.new('ZSTD_outBuffer *') | |
479 | |
480 in_buffer.src = ffi.NULL | |
481 in_buffer.size = 0 | |
482 in_buffer.pos = 0 | |
483 | |
484 dst_buffer = ffi.new('char[]', write_size) | |
485 out_buffer.dst = dst_buffer | |
486 out_buffer.size = write_size | |
487 out_buffer.pos = 0 | |
488 | |
489 while True: | |
490 # We should never have output data sitting around after a previous | |
491 # iteration. | |
492 assert out_buffer.pos == 0 | |
493 | |
494 # Collect input data. | |
495 if have_read: | |
496 read_result = reader.read(read_size) | |
497 else: | |
498 remaining = len(reader) - buffer_offset | |
499 slice_size = min(remaining, read_size) | |
500 read_result = reader[buffer_offset:buffer_offset + slice_size] | |
501 buffer_offset += slice_size | |
502 | |
503 # No new input data. Break out of the read loop. | |
504 if not read_result: | |
505 break | |
506 | |
507 # Feed all read data into the compressor and emit output until | |
508 # exhausted. | |
509 read_buffer = ffi.from_buffer(read_result) | |
510 in_buffer.src = read_buffer | |
511 in_buffer.size = len(read_buffer) | |
512 in_buffer.pos = 0 | |
513 | |
514 while in_buffer.pos < in_buffer.size: | |
515 zresult = lib.ZSTD_compressStream(cstream, out_buffer, in_buffer) | |
516 if lib.ZSTD_isError(zresult): | |
517 raise ZstdError('zstd compress error: %s' % | |
518 ffi.string(lib.ZSTD_getErrorName(zresult))) | |
519 | |
520 if out_buffer.pos: | |
521 data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | |
522 out_buffer.pos = 0 | |
523 yield data | |
524 | |
525 assert out_buffer.pos == 0 | |
526 | |
527 # And repeat the loop to collect more data. | |
528 continue | |
529 | |
530 # If we get here, input is exhausted. End the stream and emit what | |
531 # remains. | |
532 while True: | |
533 assert out_buffer.pos == 0 | |
534 zresult = lib.ZSTD_endStream(cstream, out_buffer) | |
535 if lib.ZSTD_isError(zresult): | |
536 raise ZstdError('error ending compression stream: %s' % | |
537 ffi.string(lib.ZSTD_getErrorName(zresult))) | |
538 | |
539 if out_buffer.pos: | |
540 data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | |
541 out_buffer.pos = 0 | |
542 yield data | |
543 | |
544 if zresult == 0: | |
545 break | |
546 | |
547 def _get_cstream(self, size): | |
144 cstream = lib.ZSTD_createCStream() | 548 cstream = lib.ZSTD_createCStream() |
549 if cstream == ffi.NULL: | |
550 raise MemoryError() | |
551 | |
145 cstream = ffi.gc(cstream, lib.ZSTD_freeCStream) | 552 cstream = ffi.gc(cstream, lib.ZSTD_freeCStream) |
146 | 553 |
147 res = lib.ZSTD_initCStream(cstream, self._compression_level) | 554 dict_data = ffi.NULL |
148 if lib.ZSTD_isError(res): | 555 dict_size = 0 |
556 if self._dict_data: | |
557 dict_data = self._dict_data.as_bytes() | |
558 dict_size = len(self._dict_data) | |
559 | |
560 zparams = ffi.new('ZSTD_parameters *')[0] | |
561 if self._cparams: | |
562 zparams.cParams = self._cparams.as_compression_parameters() | |
563 else: | |
564 zparams.cParams = lib.ZSTD_getCParams(self._compression_level, | |
565 size, dict_size) | |
566 zparams.fParams = self._fparams | |
567 | |
568 zresult = lib.ZSTD_initCStream_advanced(cstream, dict_data, dict_size, | |
569 zparams, size) | |
570 if lib.ZSTD_isError(zresult): | |
149 raise Exception('cannot init CStream: %s' % | 571 raise Exception('cannot init CStream: %s' % |
150 lib.ZSTD_getErrorName(res)) | 572 ffi.string(lib.ZSTD_getErrorName(zresult))) |
151 | 573 |
152 return cstream | 574 return cstream |
575 | |
576 | |
577 class FrameParameters(object): | |
578 def __init__(self, fparams): | |
579 self.content_size = fparams.frameContentSize | |
580 self.window_size = fparams.windowSize | |
581 self.dict_id = fparams.dictID | |
582 self.has_checksum = bool(fparams.checksumFlag) | |
583 | |
584 | |
585 def get_frame_parameters(data): | |
586 if not isinstance(data, bytes_type): | |
587 raise TypeError('argument must be bytes') | |
588 | |
589 params = ffi.new('ZSTD_frameParams *') | |
590 | |
591 zresult = lib.ZSTD_getFrameParams(params, data, len(data)) | |
592 if lib.ZSTD_isError(zresult): | |
593 raise ZstdError('cannot get frame parameters: %s' % | |
594 ffi.string(lib.ZSTD_getErrorName(zresult))) | |
595 | |
596 if zresult: | |
597 raise ZstdError('not enough data for frame parameters; need %d bytes' % | |
598 zresult) | |
599 | |
600 return FrameParameters(params[0]) | |
601 | |
602 | |
603 class ZstdCompressionDict(object): | |
604 def __init__(self, data): | |
605 assert isinstance(data, bytes_type) | |
606 self._data = data | |
607 | |
608 def __len__(self): | |
609 return len(self._data) | |
610 | |
611 def dict_id(self): | |
612 return int_type(lib.ZDICT_getDictID(self._data, len(self._data))) | |
613 | |
614 def as_bytes(self): | |
615 return self._data | |
616 | |
617 | |
618 def train_dictionary(dict_size, samples, parameters=None): | |
619 if not isinstance(samples, list): | |
620 raise TypeError('samples must be a list') | |
621 | |
622 total_size = sum(map(len, samples)) | |
623 | |
624 samples_buffer = new_nonzero('char[]', total_size) | |
625 sample_sizes = new_nonzero('size_t[]', len(samples)) | |
626 | |
627 offset = 0 | |
628 for i, sample in enumerate(samples): | |
629 if not isinstance(sample, bytes_type): | |
630 raise ValueError('samples must be bytes') | |
631 | |
632 l = len(sample) | |
633 ffi.memmove(samples_buffer + offset, sample, l) | |
634 offset += l | |
635 sample_sizes[i] = l | |
636 | |
637 dict_data = new_nonzero('char[]', dict_size) | |
638 | |
639 zresult = lib.ZDICT_trainFromBuffer(ffi.addressof(dict_data), dict_size, | |
640 ffi.addressof(samples_buffer), | |
641 ffi.addressof(sample_sizes, 0), | |
642 len(samples)) | |
643 if lib.ZDICT_isError(zresult): | |
644 raise ZstdError('Cannot train dict: %s' % | |
645 ffi.string(lib.ZDICT_getErrorName(zresult))) | |
646 | |
647 return ZstdCompressionDict(ffi.buffer(dict_data, zresult)[:]) | |
648 | |
649 | |
650 class ZstdDecompressionObj(object): | |
651 def __init__(self, decompressor): | |
652 self._decompressor = decompressor | |
653 self._dstream = self._decompressor._get_dstream() | |
654 self._finished = False | |
655 | |
656 def decompress(self, data): | |
657 if self._finished: | |
658 raise ZstdError('cannot use a decompressobj multiple times') | |
659 | |
660 in_buffer = ffi.new('ZSTD_inBuffer *') | |
661 out_buffer = ffi.new('ZSTD_outBuffer *') | |
662 | |
663 data_buffer = ffi.from_buffer(data) | |
664 in_buffer.src = data_buffer | |
665 in_buffer.size = len(data_buffer) | |
666 in_buffer.pos = 0 | |
667 | |
668 dst_buffer = ffi.new('char[]', DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE) | |
669 out_buffer.dst = dst_buffer | |
670 out_buffer.size = len(dst_buffer) | |
671 out_buffer.pos = 0 | |
672 | |
673 chunks = [] | |
674 | |
675 while in_buffer.pos < in_buffer.size: | |
676 zresult = lib.ZSTD_decompressStream(self._dstream, out_buffer, in_buffer) | |
677 if lib.ZSTD_isError(zresult): | |
678 raise ZstdError('zstd decompressor error: %s' % | |
679 ffi.string(lib.ZSTD_getErrorName(zresult))) | |
680 | |
681 if zresult == 0: | |
682 self._finished = True | |
683 self._dstream = None | |
684 self._decompressor = None | |
685 | |
686 if out_buffer.pos: | |
687 chunks.append(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) | |
688 out_buffer.pos = 0 | |
689 | |
690 return b''.join(chunks) | |
691 | |
692 | |
693 class ZstdDecompressionWriter(object): | |
694 def __init__(self, decompressor, writer, write_size): | |
695 self._decompressor = decompressor | |
696 self._writer = writer | |
697 self._write_size = write_size | |
698 self._dstream = None | |
699 self._entered = False | |
700 | |
701 def __enter__(self): | |
702 if self._entered: | |
703 raise ZstdError('cannot __enter__ multiple times') | |
704 | |
705 self._dstream = self._decompressor._get_dstream() | |
706 self._entered = True | |
707 | |
708 return self | |
709 | |
710 def __exit__(self, exc_type, exc_value, exc_tb): | |
711 self._entered = False | |
712 self._dstream = None | |
713 | |
714 def memory_size(self): | |
715 if not self._dstream: | |
716 raise ZstdError('cannot determine size of inactive decompressor ' | |
717 'call when context manager is active') | |
718 | |
719 return lib.ZSTD_sizeof_DStream(self._dstream) | |
720 | |
721 def write(self, data): | |
722 if not self._entered: | |
723 raise ZstdError('write must be called from an active context manager') | |
724 | |
725 total_write = 0 | |
726 | |
727 in_buffer = ffi.new('ZSTD_inBuffer *') | |
728 out_buffer = ffi.new('ZSTD_outBuffer *') | |
729 | |
730 data_buffer = ffi.from_buffer(data) | |
731 in_buffer.src = data_buffer | |
732 in_buffer.size = len(data_buffer) | |
733 in_buffer.pos = 0 | |
734 | |
735 dst_buffer = ffi.new('char[]', self._write_size) | |
736 out_buffer.dst = dst_buffer | |
737 out_buffer.size = len(dst_buffer) | |
738 out_buffer.pos = 0 | |
739 | |
740 while in_buffer.pos < in_buffer.size: | |
741 zresult = lib.ZSTD_decompressStream(self._dstream, out_buffer, in_buffer) | |
742 if lib.ZSTD_isError(zresult): | |
743 raise ZstdError('zstd decompress error: %s' % | |
744 ffi.string(lib.ZSTD_getErrorName(zresult))) | |
745 | |
746 if out_buffer.pos: | |
747 self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) | |
748 total_write += out_buffer.pos | |
749 out_buffer.pos = 0 | |
750 | |
751 return total_write | |
752 | |
753 | |
754 class ZstdDecompressor(object): | |
755 def __init__(self, dict_data=None): | |
756 self._dict_data = dict_data | |
757 | |
758 dctx = lib.ZSTD_createDCtx() | |
759 if dctx == ffi.NULL: | |
760 raise MemoryError() | |
761 | |
762 self._refdctx = ffi.gc(dctx, lib.ZSTD_freeDCtx) | |
763 | |
764 @property | |
765 def _ddict(self): | |
766 if self._dict_data: | |
767 dict_data = self._dict_data.as_bytes() | |
768 dict_size = len(self._dict_data) | |
769 | |
770 ddict = lib.ZSTD_createDDict(dict_data, dict_size) | |
771 if ddict == ffi.NULL: | |
772 raise ZstdError('could not create decompression dict') | |
773 else: | |
774 ddict = None | |
775 | |
776 self.__dict__['_ddict'] = ddict | |
777 return ddict | |
778 | |
779 def decompress(self, data, max_output_size=0): | |
780 data_buffer = ffi.from_buffer(data) | |
781 | |
782 orig_dctx = new_nonzero('char[]', lib.ZSTD_sizeof_DCtx(self._refdctx)) | |
783 dctx = ffi.cast('ZSTD_DCtx *', orig_dctx) | |
784 lib.ZSTD_copyDCtx(dctx, self._refdctx) | |
785 | |
786 ddict = self._ddict | |
787 | |
788 output_size = lib.ZSTD_getDecompressedSize(data_buffer, len(data_buffer)) | |
789 if output_size: | |
790 result_buffer = ffi.new('char[]', output_size) | |
791 result_size = output_size | |
792 else: | |
793 if not max_output_size: | |
794 raise ZstdError('input data invalid or missing content size ' | |
795 'in frame header') | |
796 | |
797 result_buffer = ffi.new('char[]', max_output_size) | |
798 result_size = max_output_size | |
799 | |
800 if ddict: | |
801 zresult = lib.ZSTD_decompress_usingDDict(dctx, | |
802 result_buffer, result_size, | |
803 data_buffer, len(data_buffer), | |
804 ddict) | |
805 else: | |
806 zresult = lib.ZSTD_decompressDCtx(dctx, | |
807 result_buffer, result_size, | |
808 data_buffer, len(data_buffer)) | |
809 if lib.ZSTD_isError(zresult): | |
810 raise ZstdError('decompression error: %s' % | |
811 ffi.string(lib.ZSTD_getErrorName(zresult))) | |
812 elif output_size and zresult != output_size: | |
813 raise ZstdError('decompression error: decompressed %d bytes; expected %d' % | |
814 (zresult, output_size)) | |
815 | |
816 return ffi.buffer(result_buffer, zresult)[:] | |
817 | |
818 def decompressobj(self): | |
819 return ZstdDecompressionObj(self) | |
820 | |
821 def read_from(self, reader, read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, | |
822 write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE, | |
823 skip_bytes=0): | |
824 if skip_bytes >= read_size: | |
825 raise ValueError('skip_bytes must be smaller than read_size') | |
826 | |
827 if hasattr(reader, 'read'): | |
828 have_read = True | |
829 elif hasattr(reader, '__getitem__'): | |
830 have_read = False | |
831 buffer_offset = 0 | |
832 size = len(reader) | |
833 else: | |
834 raise ValueError('must pass an object with a read() method or ' | |
835 'conforms to buffer protocol') | |
836 | |
837 if skip_bytes: | |
838 if have_read: | |
839 reader.read(skip_bytes) | |
840 else: | |
841 if skip_bytes > size: | |
842 raise ValueError('skip_bytes larger than first input chunk') | |
843 | |
844 buffer_offset = skip_bytes | |
845 | |
846 dstream = self._get_dstream() | |
847 | |
848 in_buffer = ffi.new('ZSTD_inBuffer *') | |
849 out_buffer = ffi.new('ZSTD_outBuffer *') | |
850 | |
851 dst_buffer = ffi.new('char[]', write_size) | |
852 out_buffer.dst = dst_buffer | |
853 out_buffer.size = len(dst_buffer) | |
854 out_buffer.pos = 0 | |
855 | |
856 while True: | |
857 assert out_buffer.pos == 0 | |
858 | |
859 if have_read: | |
860 read_result = reader.read(read_size) | |
861 else: | |
862 remaining = size - buffer_offset | |
863 slice_size = min(remaining, read_size) | |
864 read_result = reader[buffer_offset:buffer_offset + slice_size] | |
865 buffer_offset += slice_size | |
866 | |
867 # No new input. Break out of read loop. | |
868 if not read_result: | |
869 break | |
870 | |
871 # Feed all read data into decompressor and emit output until | |
872 # exhausted. | |
873 read_buffer = ffi.from_buffer(read_result) | |
874 in_buffer.src = read_buffer | |
875 in_buffer.size = len(read_buffer) | |
876 in_buffer.pos = 0 | |
877 | |
878 while in_buffer.pos < in_buffer.size: | |
879 assert out_buffer.pos == 0 | |
880 | |
881 zresult = lib.ZSTD_decompressStream(dstream, out_buffer, in_buffer) | |
882 if lib.ZSTD_isError(zresult): | |
883 raise ZstdError('zstd decompress error: %s' % | |
884 ffi.string(lib.ZSTD_getErrorName(zresult))) | |
885 | |
886 if out_buffer.pos: | |
887 data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | |
888 out_buffer.pos = 0 | |
889 yield data | |
890 | |
891 if zresult == 0: | |
892 return | |
893 | |
894 # Repeat loop to collect more input data. | |
895 continue | |
896 | |
897 # If we get here, input is exhausted. | |
898 | |
899 def write_to(self, writer, write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE): | |
900 if not hasattr(writer, 'write'): | |
901 raise ValueError('must pass an object with a write() method') | |
902 | |
903 return ZstdDecompressionWriter(self, writer, write_size) | |
904 | |
905 def copy_stream(self, ifh, ofh, | |
906 read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, | |
907 write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE): | |
908 if not hasattr(ifh, 'read'): | |
909 raise ValueError('first argument must have a read() method') | |
910 if not hasattr(ofh, 'write'): | |
911 raise ValueError('second argument must have a write() method') | |
912 | |
913 dstream = self._get_dstream() | |
914 | |
915 in_buffer = ffi.new('ZSTD_inBuffer *') | |
916 out_buffer = ffi.new('ZSTD_outBuffer *') | |
917 | |
918 dst_buffer = ffi.new('char[]', write_size) | |
919 out_buffer.dst = dst_buffer | |
920 out_buffer.size = write_size | |
921 out_buffer.pos = 0 | |
922 | |
923 total_read, total_write = 0, 0 | |
924 | |
925 # Read all available input. | |
926 while True: | |
927 data = ifh.read(read_size) | |
928 if not data: | |
929 break | |
930 | |
931 data_buffer = ffi.from_buffer(data) | |
932 total_read += len(data_buffer) | |
933 in_buffer.src = data_buffer | |
934 in_buffer.size = len(data_buffer) | |
935 in_buffer.pos = 0 | |
936 | |
937 # Flush all read data to output. | |
938 while in_buffer.pos < in_buffer.size: | |
939 zresult = lib.ZSTD_decompressStream(dstream, out_buffer, in_buffer) | |
940 if lib.ZSTD_isError(zresult): | |
941 raise ZstdError('zstd decompressor error: %s' % | |
942 ffi.string(lib.ZSTD_getErrorName(zresult))) | |
943 | |
944 if out_buffer.pos: | |
945 ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) | |
946 total_write += out_buffer.pos | |
947 out_buffer.pos = 0 | |
948 | |
949 # Continue loop to keep reading. | |
950 | |
951 return total_read, total_write | |
952 | |
953 def decompress_content_dict_chain(self, frames): | |
954 if not isinstance(frames, list): | |
955 raise TypeError('argument must be a list') | |
956 | |
957 if not frames: | |
958 raise ValueError('empty input chain') | |
959 | |
960 # First chunk should not be using a dictionary. We handle it specially. | |
961 chunk = frames[0] | |
962 if not isinstance(chunk, bytes_type): | |
963 raise ValueError('chunk 0 must be bytes') | |
964 | |
965 # All chunks should be zstd frames and should have content size set. | |
966 chunk_buffer = ffi.from_buffer(chunk) | |
967 params = ffi.new('ZSTD_frameParams *') | |
968 zresult = lib.ZSTD_getFrameParams(params, chunk_buffer, len(chunk_buffer)) | |
969 if lib.ZSTD_isError(zresult): | |
970 raise ValueError('chunk 0 is not a valid zstd frame') | |
971 elif zresult: | |
972 raise ValueError('chunk 0 is too small to contain a zstd frame') | |
973 | |
974 if not params.frameContentSize: | |
975 raise ValueError('chunk 0 missing content size in frame') | |
976 | |
977 dctx = lib.ZSTD_createDCtx() | |
978 if dctx == ffi.NULL: | |
979 raise MemoryError() | |
980 | |
981 dctx = ffi.gc(dctx, lib.ZSTD_freeDCtx) | |
982 | |
983 last_buffer = ffi.new('char[]', params.frameContentSize) | |
984 | |
985 zresult = lib.ZSTD_decompressDCtx(dctx, last_buffer, len(last_buffer), | |
986 chunk_buffer, len(chunk_buffer)) | |
987 if lib.ZSTD_isError(zresult): | |
988 raise ZstdError('could not decompress chunk 0: %s' % | |
989 ffi.string(lib.ZSTD_getErrorName(zresult))) | |
990 | |
991 # Special case of chain length of 1 | |
992 if len(frames) == 1: | |
993 return ffi.buffer(last_buffer, len(last_buffer))[:] | |
994 | |
995 i = 1 | |
996 while i < len(frames): | |
997 chunk = frames[i] | |
998 if not isinstance(chunk, bytes_type): | |
999 raise ValueError('chunk %d must be bytes' % i) | |
1000 | |
1001 chunk_buffer = ffi.from_buffer(chunk) | |
1002 zresult = lib.ZSTD_getFrameParams(params, chunk_buffer, len(chunk_buffer)) | |
1003 if lib.ZSTD_isError(zresult): | |
1004 raise ValueError('chunk %d is not a valid zstd frame' % i) | |
1005 elif zresult: | |
1006 raise ValueError('chunk %d is too small to contain a zstd frame' % i) | |
1007 | |
1008 if not params.frameContentSize: | |
1009 raise ValueError('chunk %d missing content size in frame' % i) | |
1010 | |
1011 dest_buffer = ffi.new('char[]', params.frameContentSize) | |
1012 | |
1013 zresult = lib.ZSTD_decompress_usingDict(dctx, dest_buffer, len(dest_buffer), | |
1014 chunk_buffer, len(chunk_buffer), | |
1015 last_buffer, len(last_buffer)) | |
1016 if lib.ZSTD_isError(zresult): | |
1017 raise ZstdError('could not decompress chunk %d' % i) | |
1018 | |
1019 last_buffer = dest_buffer | |
1020 i += 1 | |
1021 | |
1022 return ffi.buffer(last_buffer, len(last_buffer))[:] | |
1023 | |
1024 def _get_dstream(self): | |
1025 dstream = lib.ZSTD_createDStream() | |
1026 if dstream == ffi.NULL: | |
1027 raise MemoryError() | |
1028 | |
1029 dstream = ffi.gc(dstream, lib.ZSTD_freeDStream) | |
1030 | |
1031 if self._dict_data: | |
1032 zresult = lib.ZSTD_initDStream_usingDict(dstream, | |
1033 self._dict_data.as_bytes(), | |
1034 len(self._dict_data)) | |
1035 else: | |
1036 zresult = lib.ZSTD_initDStream(dstream) | |
1037 | |
1038 if lib.ZSTD_isError(zresult): | |
1039 raise ZstdError('could not initialize DStream: %s' % | |
1040 ffi.string(lib.ZSTD_getErrorName(zresult))) | |
1041 | |
1042 return dstream |