Mercurial > public > mercurial-scm > hg
diff contrib/python-zstandard/c-ext/compressor.c @ 31796:e0dc40530c5a
zstd: vendor python-zstandard 0.8.0
Commit 81e1f5bbf1fc54808649562d3ed829730765c540 from
https://github.com/indygreg/python-zstandard is imported without
modifications (other than removing unwanted files).
Updates relevant to Mercurial include:
* Support for multi-threaded compression (we can use this for
bundle and wire protocol compression).
* APIs for batch compression and decompression operations using
multiple threads and optimal memory allocation mechanism. (Can
be useful for revlog perf improvements.)
* A ``BufferWithSegments`` type that models a single memory buffer
containing N discrete items of known lengths. This type can be
used for very efficient 0-copy data operations.
# no-check-commit
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Sat, 01 Apr 2017 15:24:03 -0700 |
parents | c32454d69b85 |
children | b1fb341d8a61 |
line wrap: on
line diff
--- a/contrib/python-zstandard/c-ext/compressor.c Sat Apr 01 13:43:52 2017 -0700 +++ b/contrib/python-zstandard/c-ext/compressor.c Sat Apr 01 15:24:03 2017 -0700 @@ -7,12 +7,17 @@ */ #include "python-zstandard.h" +#include "pool.h" extern PyObject* ZstdError; -int populate_cdict(ZstdCompressor* compressor, void* dictData, size_t dictSize, ZSTD_parameters* zparams) { +int populate_cdict(ZstdCompressor* compressor, ZSTD_parameters* zparams) { ZSTD_customMem zmem; - assert(!compressor->cdict); + + if (compressor->cdict || !compressor->dict || !compressor->dict->dictData) { + return 0; + } + Py_BEGIN_ALLOW_THREADS memset(&zmem, 0, sizeof(zmem)); compressor->cdict = ZSTD_createCDict_advanced(compressor->dict->dictData, @@ -28,22 +33,32 @@ } /** -* Initialize a zstd CStream from a ZstdCompressor instance. -* -* Returns a ZSTD_CStream on success or NULL on failure. If NULL, a Python -* exception will be set. -*/ -ZSTD_CStream* CStream_from_ZstdCompressor(ZstdCompressor* compressor, Py_ssize_t sourceSize) { - ZSTD_CStream* cstream; + * Ensure the ZSTD_CStream on a ZstdCompressor instance is initialized. + * + * Returns 0 on success. Other value on failure. Will set a Python exception + * on failure. + */ +int init_cstream(ZstdCompressor* compressor, unsigned long long sourceSize) { ZSTD_parameters zparams; void* dictData = NULL; size_t dictSize = 0; size_t zresult; - cstream = ZSTD_createCStream(); - if (!cstream) { - PyErr_SetString(ZstdError, "cannot create CStream"); - return NULL; + if (compressor->cstream) { + zresult = ZSTD_resetCStream(compressor->cstream, sourceSize); + if (ZSTD_isError(zresult)) { + PyErr_Format(ZstdError, "could not reset CStream: %s", + ZSTD_getErrorName(zresult)); + return -1; + } + + return 0; + } + + compressor->cstream = ZSTD_createCStream(); + if (!compressor->cstream) { + PyErr_SetString(ZstdError, "could not create CStream"); + return -1; } if (compressor->dict) { @@ -63,15 +78,51 @@ zparams.fParams = compressor->fparams; - zresult = ZSTD_initCStream_advanced(cstream, dictData, dictSize, zparams, sourceSize); + zresult = ZSTD_initCStream_advanced(compressor->cstream, dictData, dictSize, + zparams, sourceSize); if (ZSTD_isError(zresult)) { - ZSTD_freeCStream(cstream); + ZSTD_freeCStream(compressor->cstream); + compressor->cstream = NULL; PyErr_Format(ZstdError, "cannot init CStream: %s", ZSTD_getErrorName(zresult)); - return NULL; + return -1; } - return cstream; + return 0;; +} + +int init_mtcstream(ZstdCompressor* compressor, Py_ssize_t sourceSize) { + size_t zresult; + void* dictData = NULL; + size_t dictSize = 0; + ZSTD_parameters zparams; + + assert(compressor->mtcctx); + + if (compressor->dict) { + dictData = compressor->dict->dictData; + dictSize = compressor->dict->dictSize; + } + + memset(&zparams, 0, sizeof(zparams)); + if (compressor->cparams) { + ztopy_compression_parameters(compressor->cparams, &zparams.cParams); + } + else { + zparams.cParams = ZSTD_getCParams(compressor->compressionLevel, sourceSize, dictSize); + } + + zparams.fParams = compressor->fparams; + + zresult = ZSTDMT_initCStream_advanced(compressor->mtcctx, dictData, dictSize, + zparams, sourceSize); + + if (ZSTD_isError(zresult)) { + PyErr_Format(ZstdError, "cannot init CStream: %s", ZSTD_getErrorName(zresult)); + return -1; + } + + return 0; } PyDoc_STRVAR(ZstdCompressor__doc__, @@ -103,6 +154,11 @@ " Determines whether the dictionary ID will be written into the compressed\n" " data. Defaults to True. Only adds content to the compressed data if\n" " a dictionary is being used.\n" +"threads\n" +" Number of threads to use to compress data concurrently. When set,\n" +" compression operations are performed on multiple threads. The default\n" +" value (0) disables multi-threaded compression. A value of ``-1`` means to\n" +" set the number of threads to the number of detected logical CPUs.\n" ); static int ZstdCompressor_init(ZstdCompressor* self, PyObject* args, PyObject* kwargs) { @@ -113,6 +169,7 @@ "write_checksum", "write_content_size", "write_dict_id", + "threads", NULL }; @@ -122,16 +179,12 @@ PyObject* writeChecksum = NULL; PyObject* writeContentSize = NULL; PyObject* writeDictID = NULL; + int threads = 0; - self->cctx = NULL; - self->dict = NULL; - self->cparams = NULL; - self->cdict = NULL; - - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|iO!O!OOO:ZstdCompressor", + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|iO!O!OOOi:ZstdCompressor", kwlist, &level, &ZstdCompressionDictType, &dict, &CompressionParametersType, ¶ms, - &writeChecksum, &writeContentSize, &writeDictID)) { + &writeChecksum, &writeContentSize, &writeDictID, &threads)) { return -1; } @@ -146,12 +199,27 @@ return -1; } + if (threads < 0) { + threads = cpu_count(); + } + + self->threads = threads; + /* We create a ZSTD_CCtx for reuse among multiple operations to reduce the overhead of each compression operation. */ - self->cctx = ZSTD_createCCtx(); - if (!self->cctx) { - PyErr_NoMemory(); - return -1; + if (threads) { + self->mtcctx = ZSTDMT_createCCtx(threads); + if (!self->mtcctx) { + PyErr_NoMemory(); + return -1; + } + } + else { + self->cctx = ZSTD_createCCtx(); + if (!self->cctx) { + PyErr_NoMemory(); + return -1; + } } self->compressionLevel = level; @@ -182,6 +250,11 @@ } static void ZstdCompressor_dealloc(ZstdCompressor* self) { + if (self->cstream) { + ZSTD_freeCStream(self->cstream); + self->cstream = NULL; + } + Py_XDECREF(self->cparams); Py_XDECREF(self->dict); @@ -195,6 +268,11 @@ self->cctx = NULL; } + if (self->mtcctx) { + ZSTDMT_freeCCtx(self->mtcctx); + self->mtcctx = NULL; + } + PyObject_Del(self); } @@ -229,7 +307,6 @@ Py_ssize_t sourceSize = 0; size_t inSize = ZSTD_CStreamInSize(); size_t outSize = ZSTD_CStreamOutSize(); - ZSTD_CStream* cstream; ZSTD_inBuffer input; ZSTD_outBuffer output; Py_ssize_t totalRead = 0; @@ -261,10 +338,17 @@ /* Prevent free on uninitialized memory in finally. */ output.dst = NULL; - cstream = CStream_from_ZstdCompressor(self, sourceSize); - if (!cstream) { - res = NULL; - goto finally; + if (self->mtcctx) { + if (init_mtcstream(self, sourceSize)) { + res = NULL; + goto finally; + } + } + else { + if (0 != init_cstream(self, sourceSize)) { + res = NULL; + goto finally; + } } output.dst = PyMem_Malloc(outSize); @@ -300,7 +384,12 @@ while (input.pos < input.size) { Py_BEGIN_ALLOW_THREADS - zresult = ZSTD_compressStream(cstream, &output, &input); + if (self->mtcctx) { + zresult = ZSTDMT_compressStream(self->mtcctx, &output, &input); + } + else { + zresult = ZSTD_compressStream(self->cstream, &output, &input); + } Py_END_ALLOW_THREADS if (ZSTD_isError(zresult)) { @@ -325,7 +414,12 @@ /* We've finished reading. Now flush the compressor stream. */ while (1) { - zresult = ZSTD_endStream(cstream, &output); + if (self->mtcctx) { + zresult = ZSTDMT_endStream(self->mtcctx, &output); + } + else { + zresult = ZSTD_endStream(self->cstream, &output); + } if (ZSTD_isError(zresult)) { PyErr_Format(ZstdError, "error ending compression stream: %s", ZSTD_getErrorName(zresult)); @@ -350,24 +444,17 @@ } } - ZSTD_freeCStream(cstream); - cstream = NULL; - totalReadPy = PyLong_FromSsize_t(totalRead); totalWritePy = PyLong_FromSsize_t(totalWrite); res = PyTuple_Pack(2, totalReadPy, totalWritePy); - Py_DecRef(totalReadPy); - Py_DecRef(totalWritePy); + Py_DECREF(totalReadPy); + Py_DECREF(totalWritePy); finally: if (output.dst) { PyMem_Free(output.dst); } - if (cstream) { - ZSTD_freeCStream(cstream); - } - return res; } @@ -410,6 +497,18 @@ return NULL; } + if (self->threads && self->dict) { + PyErr_SetString(ZstdError, + "compress() cannot be used with both dictionaries and multi-threaded compression"); + return NULL; + } + + if (self->threads && self->cparams) { + PyErr_SetString(ZstdError, + "compress() cannot be used with both compression parameters and multi-threaded compression"); + return NULL; + } + /* Limitation in zstd C API doesn't let decompression side distinguish between content size of 0 and unknown content size. This can make round tripping via Python difficult. Until this is fixed, require a flag @@ -456,24 +555,28 @@ https://github.com/facebook/zstd/issues/358 contains more info. We could potentially add an argument somewhere to control this behavior. */ - if (dictData && !self->cdict) { - if (populate_cdict(self, dictData, dictSize, &zparams)) { - Py_DECREF(output); - return NULL; - } + if (0 != populate_cdict(self, &zparams)) { + Py_DECREF(output); + return NULL; } Py_BEGIN_ALLOW_THREADS - /* By avoiding ZSTD_compress(), we don't necessarily write out content - size. This means the argument to ZstdCompressor to control frame - parameters is honored. */ - if (self->cdict) { - zresult = ZSTD_compress_usingCDict(self->cctx, dest, destSize, - source, sourceSize, self->cdict); + if (self->mtcctx) { + zresult = ZSTDMT_compressCCtx(self->mtcctx, dest, destSize, + source, sourceSize, self->compressionLevel); } else { - zresult = ZSTD_compress_advanced(self->cctx, dest, destSize, - source, sourceSize, dictData, dictSize, zparams); + /* By avoiding ZSTD_compress(), we don't necessarily write out content + size. This means the argument to ZstdCompressor to control frame + parameters is honored. */ + if (self->cdict) { + zresult = ZSTD_compress_usingCDict(self->cctx, dest, destSize, + source, sourceSize, self->cdict); + } + else { + zresult = ZSTD_compress_advanced(self->cctx, dest, destSize, + source, sourceSize, dictData, dictSize, zparams); + } } Py_END_ALLOW_THREADS @@ -507,21 +610,30 @@ Py_ssize_t inSize = 0; size_t outSize = ZSTD_CStreamOutSize(); - ZstdCompressionObj* result = PyObject_New(ZstdCompressionObj, &ZstdCompressionObjType); - if (!result) { - return NULL; - } + ZstdCompressionObj* result = NULL; if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|n:compressobj", kwlist, &inSize)) { return NULL; } - result->cstream = CStream_from_ZstdCompressor(self, inSize); - if (!result->cstream) { - Py_DECREF(result); + result = (ZstdCompressionObj*)PyObject_CallObject((PyObject*)&ZstdCompressionObjType, NULL); + if (!result) { return NULL; } + if (self->mtcctx) { + if (init_mtcstream(self, inSize)) { + Py_DECREF(result); + return NULL; + } + } + else { + if (0 != init_cstream(self, inSize)) { + Py_DECREF(result); + return NULL; + } + } + result->output.dst = PyMem_Malloc(outSize); if (!result->output.dst) { PyErr_NoMemory(); @@ -529,13 +641,9 @@ return NULL; } result->output.size = outSize; - result->output.pos = 0; - result->compressor = self; Py_INCREF(result->compressor); - result->finished = 0; - return result; } @@ -579,19 +687,10 @@ return NULL; } - result = PyObject_New(ZstdCompressorIterator, &ZstdCompressorIteratorType); + result = (ZstdCompressorIterator*)PyObject_CallObject((PyObject*)&ZstdCompressorIteratorType, NULL); if (!result) { return NULL; } - - result->compressor = NULL; - result->reader = NULL; - result->buffer = NULL; - result->cstream = NULL; - result->input.src = NULL; - result->output.dst = NULL; - result->readResult = NULL; - if (PyObject_HasAttrString(reader, "read")) { result->reader = reader; Py_INCREF(result->reader); @@ -608,7 +707,6 @@ goto except; } - result->bufferOffset = 0; sourceSize = result->buffer->len; } else { @@ -621,9 +719,16 @@ Py_INCREF(result->compressor); result->sourceSize = sourceSize; - result->cstream = CStream_from_ZstdCompressor(self, sourceSize); - if (!result->cstream) { - goto except; + + if (self->mtcctx) { + if (init_mtcstream(self, sourceSize)) { + goto except; + } + } + else { + if (0 != init_cstream(self, sourceSize)) { + goto except; + } } result->inSize = inSize; @@ -635,26 +740,12 @@ goto except; } result->output.size = outSize; - result->output.pos = 0; - - result->input.src = NULL; - result->input.size = 0; - result->input.pos = 0; - - result->finishedInput = 0; - result->finishedOutput = 0; goto finally; except: - if (result->cstream) { - ZSTD_freeCStream(result->cstream); - result->cstream = NULL; - } - - Py_DecRef((PyObject*)result->compressor); - Py_DecRef(result->reader); - + Py_XDECREF(result->compressor); + Py_XDECREF(result->reader); Py_DECREF(result); result = NULL; @@ -703,7 +794,7 @@ return NULL; } - result = PyObject_New(ZstdCompressionWriter, &ZstdCompressionWriterType); + result = (ZstdCompressionWriter*)PyObject_CallObject((PyObject*)&ZstdCompressionWriterType, NULL); if (!result) { return NULL; } @@ -715,11 +806,671 @@ Py_INCREF(result->writer); result->sourceSize = sourceSize; - result->outSize = outSize; - result->entered = 0; - result->cstream = NULL; + return result; +} + +typedef struct { + void* sourceData; + size_t sourceSize; +} DataSource; + +typedef struct { + DataSource* sources; + Py_ssize_t sourcesSize; + unsigned long long totalSourceSize; +} DataSources; + +typedef struct { + void* dest; + Py_ssize_t destSize; + BufferSegment* segments; + Py_ssize_t segmentsSize; +} DestBuffer; + +typedef enum { + WorkerError_none = 0, + WorkerError_zstd = 1, + WorkerError_no_memory = 2, +} WorkerError; + +/** + * Holds state for an individual worker performing multi_compress_to_buffer work. + */ +typedef struct { + /* Used for compression. */ + ZSTD_CCtx* cctx; + ZSTD_CDict* cdict; + int cLevel; + CompressionParametersObject* cParams; + ZSTD_frameParameters fParams; + + /* What to compress. */ + DataSource* sources; + Py_ssize_t sourcesSize; + Py_ssize_t startOffset; + Py_ssize_t endOffset; + unsigned long long totalSourceSize; + + /* Result storage. */ + DestBuffer* destBuffers; + Py_ssize_t destCount; + + /* Error tracking. */ + WorkerError error; + size_t zresult; + Py_ssize_t errorOffset; +} WorkerState; + +static void compress_worker(WorkerState* state) { + Py_ssize_t inputOffset = state->startOffset; + Py_ssize_t remainingItems = state->endOffset - state->startOffset + 1; + Py_ssize_t currentBufferStartOffset = state->startOffset; + size_t zresult; + ZSTD_parameters zparams; + void* newDest; + size_t allocationSize; + size_t boundSize; + Py_ssize_t destOffset = 0; + DataSource* sources = state->sources; + DestBuffer* destBuffer; + + assert(!state->destBuffers); + assert(0 == state->destCount); + + if (state->cParams) { + ztopy_compression_parameters(state->cParams, &zparams.cParams); + } + + zparams.fParams = state->fParams; + + /* + * The total size of the compressed data is unknown until we actually + * compress data. That means we can't pre-allocate the exact size we need. + * + * There is a cost to every allocation and reallocation. So, it is in our + * interest to minimize the number of allocations. + * + * There is also a cost to too few allocations. If allocations are too + * large they may fail. If buffers are shared and all inputs become + * irrelevant at different lifetimes, then a reference to one segment + * in the buffer will keep the entire buffer alive. This leads to excessive + * memory usage. + * + * Our current strategy is to assume a compression ratio of 16:1 and + * allocate buffers of that size, rounded up to the nearest power of 2 + * (because computers like round numbers). That ratio is greater than what + * most inputs achieve. This is by design: we don't want to over-allocate. + * But we don't want to under-allocate and lead to too many buffers either. + */ + + state->destCount = 1; + + state->destBuffers = calloc(1, sizeof(DestBuffer)); + if (NULL == state->destBuffers) { + state->error = WorkerError_no_memory; + return; + } + + destBuffer = &state->destBuffers[state->destCount - 1]; + + /* + * Rather than track bounds and grow the segments buffer, allocate space + * to hold remaining items then truncate when we're done with it. + */ + destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment)); + if (NULL == destBuffer->segments) { + state->error = WorkerError_no_memory; + return; + } + + destBuffer->segmentsSize = remainingItems; + + allocationSize = roundpow2(state->totalSourceSize >> 4); + + /* If the maximum size of the output is larger than that, round up. */ + boundSize = ZSTD_compressBound(sources[inputOffset].sourceSize); + + if (boundSize > allocationSize) { + allocationSize = roundpow2(boundSize); + } + + destBuffer->dest = malloc(allocationSize); + if (NULL == destBuffer->dest) { + state->error = WorkerError_no_memory; + return; + } + + destBuffer->destSize = allocationSize; + + for (inputOffset = state->startOffset; inputOffset <= state->endOffset; inputOffset++) { + void* source = sources[inputOffset].sourceData; + size_t sourceSize = sources[inputOffset].sourceSize; + size_t destAvailable; + void* dest; + + destAvailable = destBuffer->destSize - destOffset; + boundSize = ZSTD_compressBound(sourceSize); + + /* + * Not enough space in current buffer to hold largest compressed output. + * So allocate and switch to a new output buffer. + */ + if (boundSize > destAvailable) { + /* + * The downsizing of the existing buffer is optional. It should be cheap + * (unlike growing). So we just do it. + */ + if (destAvailable) { + newDest = realloc(destBuffer->dest, destOffset); + if (NULL == newDest) { + state->error = WorkerError_no_memory; + return; + } + + destBuffer->dest = newDest; + destBuffer->destSize = destOffset; + } + + /* Truncate segments buffer. */ + newDest = realloc(destBuffer->segments, + (inputOffset - currentBufferStartOffset + 1) * sizeof(BufferSegment)); + if (NULL == newDest) { + state->error = WorkerError_no_memory; + return; + } + + destBuffer->segments = newDest; + destBuffer->segmentsSize = inputOffset - currentBufferStartOffset; + + /* Grow space for new struct. */ + /* TODO consider over-allocating so we don't do this every time. */ + newDest = realloc(state->destBuffers, (state->destCount + 1) * sizeof(DestBuffer)); + if (NULL == newDest) { + state->error = WorkerError_no_memory; + return; + } + + state->destBuffers = newDest; + state->destCount++; + + destBuffer = &state->destBuffers[state->destCount - 1]; + + /* Don't take any chances with non-NULL pointers. */ + memset(destBuffer, 0, sizeof(DestBuffer)); + + /** + * We could dynamically update allocation size based on work done so far. + * For now, keep is simple. + */ + allocationSize = roundpow2(state->totalSourceSize >> 4); + + if (boundSize > allocationSize) { + allocationSize = roundpow2(boundSize); + } + + destBuffer->dest = malloc(allocationSize); + if (NULL == destBuffer->dest) { + state->error = WorkerError_no_memory; + return; + } + + destBuffer->destSize = allocationSize; + destAvailable = allocationSize; + destOffset = 0; + + destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment)); + if (NULL == destBuffer->segments) { + state->error = WorkerError_no_memory; + return; + } + + destBuffer->segmentsSize = remainingItems; + currentBufferStartOffset = inputOffset; + } + + dest = (char*)destBuffer->dest + destOffset; + + if (state->cdict) { + zresult = ZSTD_compress_usingCDict(state->cctx, dest, destAvailable, + source, sourceSize, state->cdict); + } + else { + if (!state->cParams) { + zparams.cParams = ZSTD_getCParams(state->cLevel, sourceSize, 0); + } + + zresult = ZSTD_compress_advanced(state->cctx, dest, destAvailable, + source, sourceSize, NULL, 0, zparams); + } + + if (ZSTD_isError(zresult)) { + state->error = WorkerError_zstd; + state->zresult = zresult; + state->errorOffset = inputOffset; + break; + } + + destBuffer->segments[inputOffset - currentBufferStartOffset].offset = destOffset; + destBuffer->segments[inputOffset - currentBufferStartOffset].length = zresult; + + destOffset += zresult; + remainingItems--; + } + + if (destBuffer->destSize > destOffset) { + newDest = realloc(destBuffer->dest, destOffset); + if (NULL == newDest) { + state->error = WorkerError_no_memory; + return; + } + + destBuffer->dest = newDest; + destBuffer->destSize = destOffset; + } +} + +ZstdBufferWithSegmentsCollection* compress_from_datasources(ZstdCompressor* compressor, + DataSources* sources, unsigned int threadCount) { + ZSTD_parameters zparams; + unsigned long long bytesPerWorker; + POOL_ctx* pool = NULL; + WorkerState* workerStates = NULL; + Py_ssize_t i; + unsigned long long workerBytes = 0; + Py_ssize_t workerStartOffset = 0; + size_t currentThread = 0; + int errored = 0; + Py_ssize_t segmentsCount = 0; + Py_ssize_t segmentIndex; + PyObject* segmentsArg = NULL; + ZstdBufferWithSegments* buffer; + ZstdBufferWithSegmentsCollection* result = NULL; + + assert(sources->sourcesSize > 0); + assert(sources->totalSourceSize > 0); + assert(threadCount >= 1); + + /* More threads than inputs makes no sense. */ + threadCount = sources->sourcesSize < threadCount ? (unsigned int)sources->sourcesSize + : threadCount; + + /* TODO lower thread count when input size is too small and threads would add + overhead. */ + + /* + * When dictionaries are used, parameters are derived from the size of the + * first element. + * + * TODO come up with a better mechanism. + */ + memset(&zparams, 0, sizeof(zparams)); + if (compressor->cparams) { + ztopy_compression_parameters(compressor->cparams, &zparams.cParams); + } + else { + zparams.cParams = ZSTD_getCParams(compressor->compressionLevel, + sources->sources[0].sourceSize, + compressor->dict ? compressor->dict->dictSize : 0); + } + + zparams.fParams = compressor->fparams; + + if (0 != populate_cdict(compressor, &zparams)) { + return NULL; + } + + workerStates = PyMem_Malloc(threadCount * sizeof(WorkerState)); + if (NULL == workerStates) { + PyErr_NoMemory(); + goto finally; + } + + memset(workerStates, 0, threadCount * sizeof(WorkerState)); + + if (threadCount > 1) { + pool = POOL_create(threadCount, 1); + if (NULL == pool) { + PyErr_SetString(ZstdError, "could not initialize zstd thread pool"); + goto finally; + } + } + + bytesPerWorker = sources->totalSourceSize / threadCount; + + for (i = 0; i < threadCount; i++) { + workerStates[i].cctx = ZSTD_createCCtx(); + if (!workerStates[i].cctx) { + PyErr_NoMemory(); + goto finally; + } + + workerStates[i].cdict = compressor->cdict; + workerStates[i].cLevel = compressor->compressionLevel; + workerStates[i].cParams = compressor->cparams; + workerStates[i].fParams = compressor->fparams; + + workerStates[i].sources = sources->sources; + workerStates[i].sourcesSize = sources->sourcesSize; + } + + Py_BEGIN_ALLOW_THREADS + for (i = 0; i < sources->sourcesSize; i++) { + workerBytes += sources->sources[i].sourceSize; + + /* + * The last worker/thread needs to handle all remaining work. Don't + * trigger it prematurely. Defer to the block outside of the loop + * to run the last worker/thread. But do still process this loop + * so workerBytes is correct. + */ + if (currentThread == threadCount - 1) { + continue; + } + + if (workerBytes >= bytesPerWorker) { + assert(currentThread < threadCount); + workerStates[currentThread].totalSourceSize = workerBytes; + workerStates[currentThread].startOffset = workerStartOffset; + workerStates[currentThread].endOffset = i; + + if (threadCount > 1) { + POOL_add(pool, (POOL_function)compress_worker, &workerStates[currentThread]); + } + else { + compress_worker(&workerStates[currentThread]); + } + + currentThread++; + workerStartOffset = i + 1; + workerBytes = 0; + } + } + + if (workerBytes) { + assert(currentThread < threadCount); + workerStates[currentThread].totalSourceSize = workerBytes; + workerStates[currentThread].startOffset = workerStartOffset; + workerStates[currentThread].endOffset = sources->sourcesSize - 1; + + if (threadCount > 1) { + POOL_add(pool, (POOL_function)compress_worker, &workerStates[currentThread]); + } + else { + compress_worker(&workerStates[currentThread]); + } + } + + if (threadCount > 1) { + POOL_free(pool); + pool = NULL; + } + + Py_END_ALLOW_THREADS + + for (i = 0; i < threadCount; i++) { + switch (workerStates[i].error) { + case WorkerError_no_memory: + PyErr_NoMemory(); + errored = 1; + break; + + case WorkerError_zstd: + PyErr_Format(ZstdError, "error compressing item %zd: %s", + workerStates[i].errorOffset, ZSTD_getErrorName(workerStates[i].zresult)); + errored = 1; + break; + default: + ; + } + + if (errored) { + break; + } + + } + + if (errored) { + goto finally; + } + + segmentsCount = 0; + for (i = 0; i < threadCount; i++) { + WorkerState* state = &workerStates[i]; + segmentsCount += state->destCount; + } + + segmentsArg = PyTuple_New(segmentsCount); + if (NULL == segmentsArg) { + goto finally; + } + + segmentIndex = 0; + + for (i = 0; i < threadCount; i++) { + Py_ssize_t j; + WorkerState* state = &workerStates[i]; + + for (j = 0; j < state->destCount; j++) { + DestBuffer* destBuffer = &state->destBuffers[j]; + buffer = BufferWithSegments_FromMemory(destBuffer->dest, destBuffer->destSize, + destBuffer->segments, destBuffer->segmentsSize); + + if (NULL == buffer) { + goto finally; + } + + /* Tell instance to use free() instsead of PyMem_Free(). */ + buffer->useFree = 1; + + /* + * BufferWithSegments_FromMemory takes ownership of the backing memory. + * Unset it here so it doesn't get freed below. + */ + destBuffer->dest = NULL; + destBuffer->segments = NULL; + + PyTuple_SET_ITEM(segmentsArg, segmentIndex++, (PyObject*)buffer); + } + } + + result = (ZstdBufferWithSegmentsCollection*)PyObject_CallObject( + (PyObject*)&ZstdBufferWithSegmentsCollectionType, segmentsArg); + +finally: + Py_CLEAR(segmentsArg); + + if (pool) { + POOL_free(pool); + } + + if (workerStates) { + Py_ssize_t j; + + for (i = 0; i < threadCount; i++) { + WorkerState state = workerStates[i]; + + if (state.cctx) { + ZSTD_freeCCtx(state.cctx); + } + + /* malloc() is used in worker thread. */ + + for (j = 0; j < state.destCount; j++) { + if (state.destBuffers) { + free(state.destBuffers[j].dest); + free(state.destBuffers[j].segments); + } + } + + + free(state.destBuffers); + } + + PyMem_Free(workerStates); + } + + return result; +} + +PyDoc_STRVAR(ZstdCompressor_multi_compress_to_buffer__doc__, +"Compress multiple pieces of data as a single operation\n" +"\n" +"Receives a ``BufferWithSegmentsCollection``, a ``BufferWithSegments``, or\n" +"a list of bytes like objects holding data to compress.\n" +"\n" +"Returns a ``BufferWithSegmentsCollection`` holding compressed data.\n" +"\n" +"This function is optimized to perform multiple compression operations as\n" +"as possible with as little overhead as possbile.\n" +); + +static ZstdBufferWithSegmentsCollection* ZstdCompressor_multi_compress_to_buffer(ZstdCompressor* self, PyObject* args, PyObject* kwargs) { + static char* kwlist[] = { + "data", + "threads", + NULL + }; + + PyObject* data; + int threads = 0; + Py_buffer* dataBuffers = NULL; + DataSources sources; + Py_ssize_t i; + Py_ssize_t sourceCount = 0; + ZstdBufferWithSegmentsCollection* result = NULL; + + if (self->mtcctx) { + PyErr_SetString(ZstdError, + "function cannot be called on ZstdCompressor configured for multi-threaded compression"); + return NULL; + } + + memset(&sources, 0, sizeof(sources)); + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|i:multi_compress_to_buffer", kwlist, + &data, &threads)) { + return NULL; + } + + if (threads < 0) { + threads = cpu_count(); + } + + if (threads < 2) { + threads = 1; + } + + if (PyObject_TypeCheck(data, &ZstdBufferWithSegmentsType)) { + ZstdBufferWithSegments* buffer = (ZstdBufferWithSegments*)data; + + sources.sources = PyMem_Malloc(buffer->segmentCount * sizeof(DataSource)); + if (NULL == sources.sources) { + PyErr_NoMemory(); + goto finally; + } + + for (i = 0; i < buffer->segmentCount; i++) { + sources.sources[i].sourceData = (char*)buffer->data + buffer->segments[i].offset; + sources.sources[i].sourceSize = buffer->segments[i].length; + sources.totalSourceSize += buffer->segments[i].length; + } + + sources.sourcesSize = buffer->segmentCount; + } + else if (PyObject_TypeCheck(data, &ZstdBufferWithSegmentsCollectionType)) { + Py_ssize_t j; + Py_ssize_t offset = 0; + ZstdBufferWithSegments* buffer; + ZstdBufferWithSegmentsCollection* collection = (ZstdBufferWithSegmentsCollection*)data; + + sourceCount = BufferWithSegmentsCollection_length(collection); + + sources.sources = PyMem_Malloc(sourceCount * sizeof(DataSource)); + if (NULL == sources.sources) { + PyErr_NoMemory(); + goto finally; + } + + for (i = 0; i < collection->bufferCount; i++) { + buffer = collection->buffers[i]; + + for (j = 0; j < buffer->segmentCount; j++) { + sources.sources[offset].sourceData = (char*)buffer->data + buffer->segments[j].offset; + sources.sources[offset].sourceSize = buffer->segments[j].length; + sources.totalSourceSize += buffer->segments[j].length; + + offset++; + } + } + + sources.sourcesSize = sourceCount; + } + else if (PyList_Check(data)) { + sourceCount = PyList_GET_SIZE(data); + + sources.sources = PyMem_Malloc(sourceCount * sizeof(DataSource)); + if (NULL == sources.sources) { + PyErr_NoMemory(); + goto finally; + } + + /* + * It isn't clear whether the address referred to by Py_buffer.buf + * is still valid after PyBuffer_Release. We we hold a reference to all + * Py_buffer instances for the duration of the operation. + */ + dataBuffers = PyMem_Malloc(sourceCount * sizeof(Py_buffer)); + if (NULL == dataBuffers) { + PyErr_NoMemory(); + goto finally; + } + + memset(dataBuffers, 0, sourceCount * sizeof(Py_buffer)); + + for (i = 0; i < sourceCount; i++) { + if (0 != PyObject_GetBuffer(PyList_GET_ITEM(data, i), + &dataBuffers[i], PyBUF_CONTIG_RO)) { + PyErr_Clear(); + PyErr_Format(PyExc_TypeError, "item %zd not a bytes like object", i); + goto finally; + } + + sources.sources[i].sourceData = dataBuffers[i].buf; + sources.sources[i].sourceSize = dataBuffers[i].len; + sources.totalSourceSize += dataBuffers[i].len; + } + + sources.sourcesSize = sourceCount; + } + else { + PyErr_SetString(PyExc_TypeError, "argument must be list of BufferWithSegments"); + goto finally; + } + + if (0 == sources.sourcesSize) { + PyErr_SetString(PyExc_ValueError, "no source elements found"); + goto finally; + } + + if (0 == sources.totalSourceSize) { + PyErr_SetString(PyExc_ValueError, "source elements are empty"); + goto finally; + } + + result = compress_from_datasources(self, &sources, threads); + +finally: + PyMem_Free(sources.sources); + + if (dataBuffers) { + for (i = 0; i < sourceCount; i++) { + PyBuffer_Release(&dataBuffers[i]); + } + + PyMem_Free(dataBuffers); + } return result; } @@ -735,6 +1486,8 @@ METH_VARARGS | METH_KEYWORDS, ZstdCompressor_read_from__doc__ }, { "write_to", (PyCFunction)ZstdCompressor_write_to, METH_VARARGS | METH_KEYWORDS, ZstdCompressor_write_to___doc__ }, + { "multi_compress_to_buffer", (PyCFunction)ZstdCompressor_multi_compress_to_buffer, + METH_VARARGS | METH_KEYWORDS, ZstdCompressor_multi_compress_to_buffer__doc__ }, { NULL, NULL } };