--- a/contrib/python-zstandard/c-ext/compressor.c Sat Apr 01 13:43:52 2017 -0700
+++ b/contrib/python-zstandard/c-ext/compressor.c Sat Apr 01 15:24:03 2017 -0700
@@ -7,12 +7,17 @@
*/
#include "python-zstandard.h"
+#include "pool.h"
extern PyObject* ZstdError;
-int populate_cdict(ZstdCompressor* compressor, void* dictData, size_t dictSize, ZSTD_parameters* zparams) {
+int populate_cdict(ZstdCompressor* compressor, ZSTD_parameters* zparams) {
ZSTD_customMem zmem;
- assert(!compressor->cdict);
+
+ if (compressor->cdict || !compressor->dict || !compressor->dict->dictData) {
+ return 0;
+ }
+
Py_BEGIN_ALLOW_THREADS
memset(&zmem, 0, sizeof(zmem));
compressor->cdict = ZSTD_createCDict_advanced(compressor->dict->dictData,
@@ -28,22 +33,32 @@
}
/**
-* Initialize a zstd CStream from a ZstdCompressor instance.
-*
-* Returns a ZSTD_CStream on success or NULL on failure. If NULL, a Python
-* exception will be set.
-*/
-ZSTD_CStream* CStream_from_ZstdCompressor(ZstdCompressor* compressor, Py_ssize_t sourceSize) {
- ZSTD_CStream* cstream;
+ * Ensure the ZSTD_CStream on a ZstdCompressor instance is initialized.
+ *
+ * Returns 0 on success. Other value on failure. Will set a Python exception
+ * on failure.
+ */
+int init_cstream(ZstdCompressor* compressor, unsigned long long sourceSize) {
ZSTD_parameters zparams;
void* dictData = NULL;
size_t dictSize = 0;
size_t zresult;
- cstream = ZSTD_createCStream();
- if (!cstream) {
- PyErr_SetString(ZstdError, "cannot create CStream");
- return NULL;
+ if (compressor->cstream) {
+ zresult = ZSTD_resetCStream(compressor->cstream, sourceSize);
+ if (ZSTD_isError(zresult)) {
+ PyErr_Format(ZstdError, "could not reset CStream: %s",
+ ZSTD_getErrorName(zresult));
+ return -1;
+ }
+
+ return 0;
+ }
+
+ compressor->cstream = ZSTD_createCStream();
+ if (!compressor->cstream) {
+ PyErr_SetString(ZstdError, "could not create CStream");
+ return -1;
}
if (compressor->dict) {
@@ -63,15 +78,51 @@
zparams.fParams = compressor->fparams;
- zresult = ZSTD_initCStream_advanced(cstream, dictData, dictSize, zparams, sourceSize);
+ zresult = ZSTD_initCStream_advanced(compressor->cstream, dictData, dictSize,
+ zparams, sourceSize);
if (ZSTD_isError(zresult)) {
- ZSTD_freeCStream(cstream);
+ ZSTD_freeCStream(compressor->cstream);
+ compressor->cstream = NULL;
PyErr_Format(ZstdError, "cannot init CStream: %s", ZSTD_getErrorName(zresult));
- return NULL;
+ return -1;
}
- return cstream;
+ return 0;;
+}
+
+int init_mtcstream(ZstdCompressor* compressor, Py_ssize_t sourceSize) {
+ size_t zresult;
+ void* dictData = NULL;
+ size_t dictSize = 0;
+ ZSTD_parameters zparams;
+
+ assert(compressor->mtcctx);
+
+ if (compressor->dict) {
+ dictData = compressor->dict->dictData;
+ dictSize = compressor->dict->dictSize;
+ }
+
+ memset(&zparams, 0, sizeof(zparams));
+ if (compressor->cparams) {
+ ztopy_compression_parameters(compressor->cparams, &zparams.cParams);
+ }
+ else {
+ zparams.cParams = ZSTD_getCParams(compressor->compressionLevel, sourceSize, dictSize);
+ }
+
+ zparams.fParams = compressor->fparams;
+
+ zresult = ZSTDMT_initCStream_advanced(compressor->mtcctx, dictData, dictSize,
+ zparams, sourceSize);
+
+ if (ZSTD_isError(zresult)) {
+ PyErr_Format(ZstdError, "cannot init CStream: %s", ZSTD_getErrorName(zresult));
+ return -1;
+ }
+
+ return 0;
}
PyDoc_STRVAR(ZstdCompressor__doc__,
@@ -103,6 +154,11 @@
" Determines whether the dictionary ID will be written into the compressed\n"
" data. Defaults to True. Only adds content to the compressed data if\n"
" a dictionary is being used.\n"
+"threads\n"
+" Number of threads to use to compress data concurrently. When set,\n"
+" compression operations are performed on multiple threads. The default\n"
+" value (0) disables multi-threaded compression. A value of ``-1`` means to\n"
+" set the number of threads to the number of detected logical CPUs.\n"
);
static int ZstdCompressor_init(ZstdCompressor* self, PyObject* args, PyObject* kwargs) {
@@ -113,6 +169,7 @@
"write_checksum",
"write_content_size",
"write_dict_id",
+ "threads",
NULL
};
@@ -122,16 +179,12 @@
PyObject* writeChecksum = NULL;
PyObject* writeContentSize = NULL;
PyObject* writeDictID = NULL;
+ int threads = 0;
- self->cctx = NULL;
- self->dict = NULL;
- self->cparams = NULL;
- self->cdict = NULL;
-
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|iO!O!OOO:ZstdCompressor",
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|iO!O!OOOi:ZstdCompressor",
kwlist, &level, &ZstdCompressionDictType, &dict,
&CompressionParametersType, ¶ms,
- &writeChecksum, &writeContentSize, &writeDictID)) {
+ &writeChecksum, &writeContentSize, &writeDictID, &threads)) {
return -1;
}
@@ -146,12 +199,27 @@
return -1;
}
+ if (threads < 0) {
+ threads = cpu_count();
+ }
+
+ self->threads = threads;
+
/* We create a ZSTD_CCtx for reuse among multiple operations to reduce the
overhead of each compression operation. */
- self->cctx = ZSTD_createCCtx();
- if (!self->cctx) {
- PyErr_NoMemory();
- return -1;
+ if (threads) {
+ self->mtcctx = ZSTDMT_createCCtx(threads);
+ if (!self->mtcctx) {
+ PyErr_NoMemory();
+ return -1;
+ }
+ }
+ else {
+ self->cctx = ZSTD_createCCtx();
+ if (!self->cctx) {
+ PyErr_NoMemory();
+ return -1;
+ }
}
self->compressionLevel = level;
@@ -182,6 +250,11 @@
}
static void ZstdCompressor_dealloc(ZstdCompressor* self) {
+ if (self->cstream) {
+ ZSTD_freeCStream(self->cstream);
+ self->cstream = NULL;
+ }
+
Py_XDECREF(self->cparams);
Py_XDECREF(self->dict);
@@ -195,6 +268,11 @@
self->cctx = NULL;
}
+ if (self->mtcctx) {
+ ZSTDMT_freeCCtx(self->mtcctx);
+ self->mtcctx = NULL;
+ }
+
PyObject_Del(self);
}
@@ -229,7 +307,6 @@
Py_ssize_t sourceSize = 0;
size_t inSize = ZSTD_CStreamInSize();
size_t outSize = ZSTD_CStreamOutSize();
- ZSTD_CStream* cstream;
ZSTD_inBuffer input;
ZSTD_outBuffer output;
Py_ssize_t totalRead = 0;
@@ -261,10 +338,17 @@
/* Prevent free on uninitialized memory in finally. */
output.dst = NULL;
- cstream = CStream_from_ZstdCompressor(self, sourceSize);
- if (!cstream) {
- res = NULL;
- goto finally;
+ if (self->mtcctx) {
+ if (init_mtcstream(self, sourceSize)) {
+ res = NULL;
+ goto finally;
+ }
+ }
+ else {
+ if (0 != init_cstream(self, sourceSize)) {
+ res = NULL;
+ goto finally;
+ }
}
output.dst = PyMem_Malloc(outSize);
@@ -300,7 +384,12 @@
while (input.pos < input.size) {
Py_BEGIN_ALLOW_THREADS
- zresult = ZSTD_compressStream(cstream, &output, &input);
+ if (self->mtcctx) {
+ zresult = ZSTDMT_compressStream(self->mtcctx, &output, &input);
+ }
+ else {
+ zresult = ZSTD_compressStream(self->cstream, &output, &input);
+ }
Py_END_ALLOW_THREADS
if (ZSTD_isError(zresult)) {
@@ -325,7 +414,12 @@
/* We've finished reading. Now flush the compressor stream. */
while (1) {
- zresult = ZSTD_endStream(cstream, &output);
+ if (self->mtcctx) {
+ zresult = ZSTDMT_endStream(self->mtcctx, &output);
+ }
+ else {
+ zresult = ZSTD_endStream(self->cstream, &output);
+ }
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "error ending compression stream: %s",
ZSTD_getErrorName(zresult));
@@ -350,24 +444,17 @@
}
}
- ZSTD_freeCStream(cstream);
- cstream = NULL;
-
totalReadPy = PyLong_FromSsize_t(totalRead);
totalWritePy = PyLong_FromSsize_t(totalWrite);
res = PyTuple_Pack(2, totalReadPy, totalWritePy);
- Py_DecRef(totalReadPy);
- Py_DecRef(totalWritePy);
+ Py_DECREF(totalReadPy);
+ Py_DECREF(totalWritePy);
finally:
if (output.dst) {
PyMem_Free(output.dst);
}
- if (cstream) {
- ZSTD_freeCStream(cstream);
- }
-
return res;
}
@@ -410,6 +497,18 @@
return NULL;
}
+ if (self->threads && self->dict) {
+ PyErr_SetString(ZstdError,
+ "compress() cannot be used with both dictionaries and multi-threaded compression");
+ return NULL;
+ }
+
+ if (self->threads && self->cparams) {
+ PyErr_SetString(ZstdError,
+ "compress() cannot be used with both compression parameters and multi-threaded compression");
+ return NULL;
+ }
+
/* Limitation in zstd C API doesn't let decompression side distinguish
between content size of 0 and unknown content size. This can make round
tripping via Python difficult. Until this is fixed, require a flag
@@ -456,24 +555,28 @@
https://github.com/facebook/zstd/issues/358 contains more info. We could
potentially add an argument somewhere to control this behavior.
*/
- if (dictData && !self->cdict) {
- if (populate_cdict(self, dictData, dictSize, &zparams)) {
- Py_DECREF(output);
- return NULL;
- }
+ if (0 != populate_cdict(self, &zparams)) {
+ Py_DECREF(output);
+ return NULL;
}
Py_BEGIN_ALLOW_THREADS
- /* By avoiding ZSTD_compress(), we don't necessarily write out content
- size. This means the argument to ZstdCompressor to control frame
- parameters is honored. */
- if (self->cdict) {
- zresult = ZSTD_compress_usingCDict(self->cctx, dest, destSize,
- source, sourceSize, self->cdict);
+ if (self->mtcctx) {
+ zresult = ZSTDMT_compressCCtx(self->mtcctx, dest, destSize,
+ source, sourceSize, self->compressionLevel);
}
else {
- zresult = ZSTD_compress_advanced(self->cctx, dest, destSize,
- source, sourceSize, dictData, dictSize, zparams);
+ /* By avoiding ZSTD_compress(), we don't necessarily write out content
+ size. This means the argument to ZstdCompressor to control frame
+ parameters is honored. */
+ if (self->cdict) {
+ zresult = ZSTD_compress_usingCDict(self->cctx, dest, destSize,
+ source, sourceSize, self->cdict);
+ }
+ else {
+ zresult = ZSTD_compress_advanced(self->cctx, dest, destSize,
+ source, sourceSize, dictData, dictSize, zparams);
+ }
}
Py_END_ALLOW_THREADS
@@ -507,21 +610,30 @@
Py_ssize_t inSize = 0;
size_t outSize = ZSTD_CStreamOutSize();
- ZstdCompressionObj* result = PyObject_New(ZstdCompressionObj, &ZstdCompressionObjType);
- if (!result) {
- return NULL;
- }
+ ZstdCompressionObj* result = NULL;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|n:compressobj", kwlist, &inSize)) {
return NULL;
}
- result->cstream = CStream_from_ZstdCompressor(self, inSize);
- if (!result->cstream) {
- Py_DECREF(result);
+ result = (ZstdCompressionObj*)PyObject_CallObject((PyObject*)&ZstdCompressionObjType, NULL);
+ if (!result) {
return NULL;
}
+ if (self->mtcctx) {
+ if (init_mtcstream(self, inSize)) {
+ Py_DECREF(result);
+ return NULL;
+ }
+ }
+ else {
+ if (0 != init_cstream(self, inSize)) {
+ Py_DECREF(result);
+ return NULL;
+ }
+ }
+
result->output.dst = PyMem_Malloc(outSize);
if (!result->output.dst) {
PyErr_NoMemory();
@@ -529,13 +641,9 @@
return NULL;
}
result->output.size = outSize;
- result->output.pos = 0;
-
result->compressor = self;
Py_INCREF(result->compressor);
- result->finished = 0;
-
return result;
}
@@ -579,19 +687,10 @@
return NULL;
}
- result = PyObject_New(ZstdCompressorIterator, &ZstdCompressorIteratorType);
+ result = (ZstdCompressorIterator*)PyObject_CallObject((PyObject*)&ZstdCompressorIteratorType, NULL);
if (!result) {
return NULL;
}
-
- result->compressor = NULL;
- result->reader = NULL;
- result->buffer = NULL;
- result->cstream = NULL;
- result->input.src = NULL;
- result->output.dst = NULL;
- result->readResult = NULL;
-
if (PyObject_HasAttrString(reader, "read")) {
result->reader = reader;
Py_INCREF(result->reader);
@@ -608,7 +707,6 @@
goto except;
}
- result->bufferOffset = 0;
sourceSize = result->buffer->len;
}
else {
@@ -621,9 +719,16 @@
Py_INCREF(result->compressor);
result->sourceSize = sourceSize;
- result->cstream = CStream_from_ZstdCompressor(self, sourceSize);
- if (!result->cstream) {
- goto except;
+
+ if (self->mtcctx) {
+ if (init_mtcstream(self, sourceSize)) {
+ goto except;
+ }
+ }
+ else {
+ if (0 != init_cstream(self, sourceSize)) {
+ goto except;
+ }
}
result->inSize = inSize;
@@ -635,26 +740,12 @@
goto except;
}
result->output.size = outSize;
- result->output.pos = 0;
-
- result->input.src = NULL;
- result->input.size = 0;
- result->input.pos = 0;
-
- result->finishedInput = 0;
- result->finishedOutput = 0;
goto finally;
except:
- if (result->cstream) {
- ZSTD_freeCStream(result->cstream);
- result->cstream = NULL;
- }
-
- Py_DecRef((PyObject*)result->compressor);
- Py_DecRef(result->reader);
-
+ Py_XDECREF(result->compressor);
+ Py_XDECREF(result->reader);
Py_DECREF(result);
result = NULL;
@@ -703,7 +794,7 @@
return NULL;
}
- result = PyObject_New(ZstdCompressionWriter, &ZstdCompressionWriterType);
+ result = (ZstdCompressionWriter*)PyObject_CallObject((PyObject*)&ZstdCompressionWriterType, NULL);
if (!result) {
return NULL;
}
@@ -715,11 +806,671 @@
Py_INCREF(result->writer);
result->sourceSize = sourceSize;
-
result->outSize = outSize;
- result->entered = 0;
- result->cstream = NULL;
+ return result;
+}
+
+typedef struct {
+ void* sourceData;
+ size_t sourceSize;
+} DataSource;
+
+typedef struct {
+ DataSource* sources;
+ Py_ssize_t sourcesSize;
+ unsigned long long totalSourceSize;
+} DataSources;
+
+typedef struct {
+ void* dest;
+ Py_ssize_t destSize;
+ BufferSegment* segments;
+ Py_ssize_t segmentsSize;
+} DestBuffer;
+
+typedef enum {
+ WorkerError_none = 0,
+ WorkerError_zstd = 1,
+ WorkerError_no_memory = 2,
+} WorkerError;
+
+/**
+ * Holds state for an individual worker performing multi_compress_to_buffer work.
+ */
+typedef struct {
+ /* Used for compression. */
+ ZSTD_CCtx* cctx;
+ ZSTD_CDict* cdict;
+ int cLevel;
+ CompressionParametersObject* cParams;
+ ZSTD_frameParameters fParams;
+
+ /* What to compress. */
+ DataSource* sources;
+ Py_ssize_t sourcesSize;
+ Py_ssize_t startOffset;
+ Py_ssize_t endOffset;
+ unsigned long long totalSourceSize;
+
+ /* Result storage. */
+ DestBuffer* destBuffers;
+ Py_ssize_t destCount;
+
+ /* Error tracking. */
+ WorkerError error;
+ size_t zresult;
+ Py_ssize_t errorOffset;
+} WorkerState;
+
+static void compress_worker(WorkerState* state) {
+ Py_ssize_t inputOffset = state->startOffset;
+ Py_ssize_t remainingItems = state->endOffset - state->startOffset + 1;
+ Py_ssize_t currentBufferStartOffset = state->startOffset;
+ size_t zresult;
+ ZSTD_parameters zparams;
+ void* newDest;
+ size_t allocationSize;
+ size_t boundSize;
+ Py_ssize_t destOffset = 0;
+ DataSource* sources = state->sources;
+ DestBuffer* destBuffer;
+
+ assert(!state->destBuffers);
+ assert(0 == state->destCount);
+
+ if (state->cParams) {
+ ztopy_compression_parameters(state->cParams, &zparams.cParams);
+ }
+
+ zparams.fParams = state->fParams;
+
+ /*
+ * The total size of the compressed data is unknown until we actually
+ * compress data. That means we can't pre-allocate the exact size we need.
+ *
+ * There is a cost to every allocation and reallocation. So, it is in our
+ * interest to minimize the number of allocations.
+ *
+ * There is also a cost to too few allocations. If allocations are too
+ * large they may fail. If buffers are shared and all inputs become
+ * irrelevant at different lifetimes, then a reference to one segment
+ * in the buffer will keep the entire buffer alive. This leads to excessive
+ * memory usage.
+ *
+ * Our current strategy is to assume a compression ratio of 16:1 and
+ * allocate buffers of that size, rounded up to the nearest power of 2
+ * (because computers like round numbers). That ratio is greater than what
+ * most inputs achieve. This is by design: we don't want to over-allocate.
+ * But we don't want to under-allocate and lead to too many buffers either.
+ */
+
+ state->destCount = 1;
+
+ state->destBuffers = calloc(1, sizeof(DestBuffer));
+ if (NULL == state->destBuffers) {
+ state->error = WorkerError_no_memory;
+ return;
+ }
+
+ destBuffer = &state->destBuffers[state->destCount - 1];
+
+ /*
+ * Rather than track bounds and grow the segments buffer, allocate space
+ * to hold remaining items then truncate when we're done with it.
+ */
+ destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment));
+ if (NULL == destBuffer->segments) {
+ state->error = WorkerError_no_memory;
+ return;
+ }
+
+ destBuffer->segmentsSize = remainingItems;
+
+ allocationSize = roundpow2(state->totalSourceSize >> 4);
+
+ /* If the maximum size of the output is larger than that, round up. */
+ boundSize = ZSTD_compressBound(sources[inputOffset].sourceSize);
+
+ if (boundSize > allocationSize) {
+ allocationSize = roundpow2(boundSize);
+ }
+
+ destBuffer->dest = malloc(allocationSize);
+ if (NULL == destBuffer->dest) {
+ state->error = WorkerError_no_memory;
+ return;
+ }
+
+ destBuffer->destSize = allocationSize;
+
+ for (inputOffset = state->startOffset; inputOffset <= state->endOffset; inputOffset++) {
+ void* source = sources[inputOffset].sourceData;
+ size_t sourceSize = sources[inputOffset].sourceSize;
+ size_t destAvailable;
+ void* dest;
+
+ destAvailable = destBuffer->destSize - destOffset;
+ boundSize = ZSTD_compressBound(sourceSize);
+
+ /*
+ * Not enough space in current buffer to hold largest compressed output.
+ * So allocate and switch to a new output buffer.
+ */
+ if (boundSize > destAvailable) {
+ /*
+ * The downsizing of the existing buffer is optional. It should be cheap
+ * (unlike growing). So we just do it.
+ */
+ if (destAvailable) {
+ newDest = realloc(destBuffer->dest, destOffset);
+ if (NULL == newDest) {
+ state->error = WorkerError_no_memory;
+ return;
+ }
+
+ destBuffer->dest = newDest;
+ destBuffer->destSize = destOffset;
+ }
+
+ /* Truncate segments buffer. */
+ newDest = realloc(destBuffer->segments,
+ (inputOffset - currentBufferStartOffset + 1) * sizeof(BufferSegment));
+ if (NULL == newDest) {
+ state->error = WorkerError_no_memory;
+ return;
+ }
+
+ destBuffer->segments = newDest;
+ destBuffer->segmentsSize = inputOffset - currentBufferStartOffset;
+
+ /* Grow space for new struct. */
+ /* TODO consider over-allocating so we don't do this every time. */
+ newDest = realloc(state->destBuffers, (state->destCount + 1) * sizeof(DestBuffer));
+ if (NULL == newDest) {
+ state->error = WorkerError_no_memory;
+ return;
+ }
+
+ state->destBuffers = newDest;
+ state->destCount++;
+
+ destBuffer = &state->destBuffers[state->destCount - 1];
+
+ /* Don't take any chances with non-NULL pointers. */
+ memset(destBuffer, 0, sizeof(DestBuffer));
+
+ /**
+ * We could dynamically update allocation size based on work done so far.
+ * For now, keep is simple.
+ */
+ allocationSize = roundpow2(state->totalSourceSize >> 4);
+
+ if (boundSize > allocationSize) {
+ allocationSize = roundpow2(boundSize);
+ }
+
+ destBuffer->dest = malloc(allocationSize);
+ if (NULL == destBuffer->dest) {
+ state->error = WorkerError_no_memory;
+ return;
+ }
+
+ destBuffer->destSize = allocationSize;
+ destAvailable = allocationSize;
+ destOffset = 0;
+
+ destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment));
+ if (NULL == destBuffer->segments) {
+ state->error = WorkerError_no_memory;
+ return;
+ }
+
+ destBuffer->segmentsSize = remainingItems;
+ currentBufferStartOffset = inputOffset;
+ }
+
+ dest = (char*)destBuffer->dest + destOffset;
+
+ if (state->cdict) {
+ zresult = ZSTD_compress_usingCDict(state->cctx, dest, destAvailable,
+ source, sourceSize, state->cdict);
+ }
+ else {
+ if (!state->cParams) {
+ zparams.cParams = ZSTD_getCParams(state->cLevel, sourceSize, 0);
+ }
+
+ zresult = ZSTD_compress_advanced(state->cctx, dest, destAvailable,
+ source, sourceSize, NULL, 0, zparams);
+ }
+
+ if (ZSTD_isError(zresult)) {
+ state->error = WorkerError_zstd;
+ state->zresult = zresult;
+ state->errorOffset = inputOffset;
+ break;
+ }
+
+ destBuffer->segments[inputOffset - currentBufferStartOffset].offset = destOffset;
+ destBuffer->segments[inputOffset - currentBufferStartOffset].length = zresult;
+
+ destOffset += zresult;
+ remainingItems--;
+ }
+
+ if (destBuffer->destSize > destOffset) {
+ newDest = realloc(destBuffer->dest, destOffset);
+ if (NULL == newDest) {
+ state->error = WorkerError_no_memory;
+ return;
+ }
+
+ destBuffer->dest = newDest;
+ destBuffer->destSize = destOffset;
+ }
+}
+
+ZstdBufferWithSegmentsCollection* compress_from_datasources(ZstdCompressor* compressor,
+ DataSources* sources, unsigned int threadCount) {
+ ZSTD_parameters zparams;
+ unsigned long long bytesPerWorker;
+ POOL_ctx* pool = NULL;
+ WorkerState* workerStates = NULL;
+ Py_ssize_t i;
+ unsigned long long workerBytes = 0;
+ Py_ssize_t workerStartOffset = 0;
+ size_t currentThread = 0;
+ int errored = 0;
+ Py_ssize_t segmentsCount = 0;
+ Py_ssize_t segmentIndex;
+ PyObject* segmentsArg = NULL;
+ ZstdBufferWithSegments* buffer;
+ ZstdBufferWithSegmentsCollection* result = NULL;
+
+ assert(sources->sourcesSize > 0);
+ assert(sources->totalSourceSize > 0);
+ assert(threadCount >= 1);
+
+ /* More threads than inputs makes no sense. */
+ threadCount = sources->sourcesSize < threadCount ? (unsigned int)sources->sourcesSize
+ : threadCount;
+
+ /* TODO lower thread count when input size is too small and threads would add
+ overhead. */
+
+ /*
+ * When dictionaries are used, parameters are derived from the size of the
+ * first element.
+ *
+ * TODO come up with a better mechanism.
+ */
+ memset(&zparams, 0, sizeof(zparams));
+ if (compressor->cparams) {
+ ztopy_compression_parameters(compressor->cparams, &zparams.cParams);
+ }
+ else {
+ zparams.cParams = ZSTD_getCParams(compressor->compressionLevel,
+ sources->sources[0].sourceSize,
+ compressor->dict ? compressor->dict->dictSize : 0);
+ }
+
+ zparams.fParams = compressor->fparams;
+
+ if (0 != populate_cdict(compressor, &zparams)) {
+ return NULL;
+ }
+
+ workerStates = PyMem_Malloc(threadCount * sizeof(WorkerState));
+ if (NULL == workerStates) {
+ PyErr_NoMemory();
+ goto finally;
+ }
+
+ memset(workerStates, 0, threadCount * sizeof(WorkerState));
+
+ if (threadCount > 1) {
+ pool = POOL_create(threadCount, 1);
+ if (NULL == pool) {
+ PyErr_SetString(ZstdError, "could not initialize zstd thread pool");
+ goto finally;
+ }
+ }
+
+ bytesPerWorker = sources->totalSourceSize / threadCount;
+
+ for (i = 0; i < threadCount; i++) {
+ workerStates[i].cctx = ZSTD_createCCtx();
+ if (!workerStates[i].cctx) {
+ PyErr_NoMemory();
+ goto finally;
+ }
+
+ workerStates[i].cdict = compressor->cdict;
+ workerStates[i].cLevel = compressor->compressionLevel;
+ workerStates[i].cParams = compressor->cparams;
+ workerStates[i].fParams = compressor->fparams;
+
+ workerStates[i].sources = sources->sources;
+ workerStates[i].sourcesSize = sources->sourcesSize;
+ }
+
+ Py_BEGIN_ALLOW_THREADS
+ for (i = 0; i < sources->sourcesSize; i++) {
+ workerBytes += sources->sources[i].sourceSize;
+
+ /*
+ * The last worker/thread needs to handle all remaining work. Don't
+ * trigger it prematurely. Defer to the block outside of the loop
+ * to run the last worker/thread. But do still process this loop
+ * so workerBytes is correct.
+ */
+ if (currentThread == threadCount - 1) {
+ continue;
+ }
+
+ if (workerBytes >= bytesPerWorker) {
+ assert(currentThread < threadCount);
+ workerStates[currentThread].totalSourceSize = workerBytes;
+ workerStates[currentThread].startOffset = workerStartOffset;
+ workerStates[currentThread].endOffset = i;
+
+ if (threadCount > 1) {
+ POOL_add(pool, (POOL_function)compress_worker, &workerStates[currentThread]);
+ }
+ else {
+ compress_worker(&workerStates[currentThread]);
+ }
+
+ currentThread++;
+ workerStartOffset = i + 1;
+ workerBytes = 0;
+ }
+ }
+
+ if (workerBytes) {
+ assert(currentThread < threadCount);
+ workerStates[currentThread].totalSourceSize = workerBytes;
+ workerStates[currentThread].startOffset = workerStartOffset;
+ workerStates[currentThread].endOffset = sources->sourcesSize - 1;
+
+ if (threadCount > 1) {
+ POOL_add(pool, (POOL_function)compress_worker, &workerStates[currentThread]);
+ }
+ else {
+ compress_worker(&workerStates[currentThread]);
+ }
+ }
+
+ if (threadCount > 1) {
+ POOL_free(pool);
+ pool = NULL;
+ }
+
+ Py_END_ALLOW_THREADS
+
+ for (i = 0; i < threadCount; i++) {
+ switch (workerStates[i].error) {
+ case WorkerError_no_memory:
+ PyErr_NoMemory();
+ errored = 1;
+ break;
+
+ case WorkerError_zstd:
+ PyErr_Format(ZstdError, "error compressing item %zd: %s",
+ workerStates[i].errorOffset, ZSTD_getErrorName(workerStates[i].zresult));
+ errored = 1;
+ break;
+ default:
+ ;
+ }
+
+ if (errored) {
+ break;
+ }
+
+ }
+
+ if (errored) {
+ goto finally;
+ }
+
+ segmentsCount = 0;
+ for (i = 0; i < threadCount; i++) {
+ WorkerState* state = &workerStates[i];
+ segmentsCount += state->destCount;
+ }
+
+ segmentsArg = PyTuple_New(segmentsCount);
+ if (NULL == segmentsArg) {
+ goto finally;
+ }
+
+ segmentIndex = 0;
+
+ for (i = 0; i < threadCount; i++) {
+ Py_ssize_t j;
+ WorkerState* state = &workerStates[i];
+
+ for (j = 0; j < state->destCount; j++) {
+ DestBuffer* destBuffer = &state->destBuffers[j];
+ buffer = BufferWithSegments_FromMemory(destBuffer->dest, destBuffer->destSize,
+ destBuffer->segments, destBuffer->segmentsSize);
+
+ if (NULL == buffer) {
+ goto finally;
+ }
+
+ /* Tell instance to use free() instsead of PyMem_Free(). */
+ buffer->useFree = 1;
+
+ /*
+ * BufferWithSegments_FromMemory takes ownership of the backing memory.
+ * Unset it here so it doesn't get freed below.
+ */
+ destBuffer->dest = NULL;
+ destBuffer->segments = NULL;
+
+ PyTuple_SET_ITEM(segmentsArg, segmentIndex++, (PyObject*)buffer);
+ }
+ }
+
+ result = (ZstdBufferWithSegmentsCollection*)PyObject_CallObject(
+ (PyObject*)&ZstdBufferWithSegmentsCollectionType, segmentsArg);
+
+finally:
+ Py_CLEAR(segmentsArg);
+
+ if (pool) {
+ POOL_free(pool);
+ }
+
+ if (workerStates) {
+ Py_ssize_t j;
+
+ for (i = 0; i < threadCount; i++) {
+ WorkerState state = workerStates[i];
+
+ if (state.cctx) {
+ ZSTD_freeCCtx(state.cctx);
+ }
+
+ /* malloc() is used in worker thread. */
+
+ for (j = 0; j < state.destCount; j++) {
+ if (state.destBuffers) {
+ free(state.destBuffers[j].dest);
+ free(state.destBuffers[j].segments);
+ }
+ }
+
+
+ free(state.destBuffers);
+ }
+
+ PyMem_Free(workerStates);
+ }
+
+ return result;
+}
+
+PyDoc_STRVAR(ZstdCompressor_multi_compress_to_buffer__doc__,
+"Compress multiple pieces of data as a single operation\n"
+"\n"
+"Receives a ``BufferWithSegmentsCollection``, a ``BufferWithSegments``, or\n"
+"a list of bytes like objects holding data to compress.\n"
+"\n"
+"Returns a ``BufferWithSegmentsCollection`` holding compressed data.\n"
+"\n"
+"This function is optimized to perform multiple compression operations as\n"
+"as possible with as little overhead as possbile.\n"
+);
+
+static ZstdBufferWithSegmentsCollection* ZstdCompressor_multi_compress_to_buffer(ZstdCompressor* self, PyObject* args, PyObject* kwargs) {
+ static char* kwlist[] = {
+ "data",
+ "threads",
+ NULL
+ };
+
+ PyObject* data;
+ int threads = 0;
+ Py_buffer* dataBuffers = NULL;
+ DataSources sources;
+ Py_ssize_t i;
+ Py_ssize_t sourceCount = 0;
+ ZstdBufferWithSegmentsCollection* result = NULL;
+
+ if (self->mtcctx) {
+ PyErr_SetString(ZstdError,
+ "function cannot be called on ZstdCompressor configured for multi-threaded compression");
+ return NULL;
+ }
+
+ memset(&sources, 0, sizeof(sources));
+
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|i:multi_compress_to_buffer", kwlist,
+ &data, &threads)) {
+ return NULL;
+ }
+
+ if (threads < 0) {
+ threads = cpu_count();
+ }
+
+ if (threads < 2) {
+ threads = 1;
+ }
+
+ if (PyObject_TypeCheck(data, &ZstdBufferWithSegmentsType)) {
+ ZstdBufferWithSegments* buffer = (ZstdBufferWithSegments*)data;
+
+ sources.sources = PyMem_Malloc(buffer->segmentCount * sizeof(DataSource));
+ if (NULL == sources.sources) {
+ PyErr_NoMemory();
+ goto finally;
+ }
+
+ for (i = 0; i < buffer->segmentCount; i++) {
+ sources.sources[i].sourceData = (char*)buffer->data + buffer->segments[i].offset;
+ sources.sources[i].sourceSize = buffer->segments[i].length;
+ sources.totalSourceSize += buffer->segments[i].length;
+ }
+
+ sources.sourcesSize = buffer->segmentCount;
+ }
+ else if (PyObject_TypeCheck(data, &ZstdBufferWithSegmentsCollectionType)) {
+ Py_ssize_t j;
+ Py_ssize_t offset = 0;
+ ZstdBufferWithSegments* buffer;
+ ZstdBufferWithSegmentsCollection* collection = (ZstdBufferWithSegmentsCollection*)data;
+
+ sourceCount = BufferWithSegmentsCollection_length(collection);
+
+ sources.sources = PyMem_Malloc(sourceCount * sizeof(DataSource));
+ if (NULL == sources.sources) {
+ PyErr_NoMemory();
+ goto finally;
+ }
+
+ for (i = 0; i < collection->bufferCount; i++) {
+ buffer = collection->buffers[i];
+
+ for (j = 0; j < buffer->segmentCount; j++) {
+ sources.sources[offset].sourceData = (char*)buffer->data + buffer->segments[j].offset;
+ sources.sources[offset].sourceSize = buffer->segments[j].length;
+ sources.totalSourceSize += buffer->segments[j].length;
+
+ offset++;
+ }
+ }
+
+ sources.sourcesSize = sourceCount;
+ }
+ else if (PyList_Check(data)) {
+ sourceCount = PyList_GET_SIZE(data);
+
+ sources.sources = PyMem_Malloc(sourceCount * sizeof(DataSource));
+ if (NULL == sources.sources) {
+ PyErr_NoMemory();
+ goto finally;
+ }
+
+ /*
+ * It isn't clear whether the address referred to by Py_buffer.buf
+ * is still valid after PyBuffer_Release. We we hold a reference to all
+ * Py_buffer instances for the duration of the operation.
+ */
+ dataBuffers = PyMem_Malloc(sourceCount * sizeof(Py_buffer));
+ if (NULL == dataBuffers) {
+ PyErr_NoMemory();
+ goto finally;
+ }
+
+ memset(dataBuffers, 0, sourceCount * sizeof(Py_buffer));
+
+ for (i = 0; i < sourceCount; i++) {
+ if (0 != PyObject_GetBuffer(PyList_GET_ITEM(data, i),
+ &dataBuffers[i], PyBUF_CONTIG_RO)) {
+ PyErr_Clear();
+ PyErr_Format(PyExc_TypeError, "item %zd not a bytes like object", i);
+ goto finally;
+ }
+
+ sources.sources[i].sourceData = dataBuffers[i].buf;
+ sources.sources[i].sourceSize = dataBuffers[i].len;
+ sources.totalSourceSize += dataBuffers[i].len;
+ }
+
+ sources.sourcesSize = sourceCount;
+ }
+ else {
+ PyErr_SetString(PyExc_TypeError, "argument must be list of BufferWithSegments");
+ goto finally;
+ }
+
+ if (0 == sources.sourcesSize) {
+ PyErr_SetString(PyExc_ValueError, "no source elements found");
+ goto finally;
+ }
+
+ if (0 == sources.totalSourceSize) {
+ PyErr_SetString(PyExc_ValueError, "source elements are empty");
+ goto finally;
+ }
+
+ result = compress_from_datasources(self, &sources, threads);
+
+finally:
+ PyMem_Free(sources.sources);
+
+ if (dataBuffers) {
+ for (i = 0; i < sourceCount; i++) {
+ PyBuffer_Release(&dataBuffers[i]);
+ }
+
+ PyMem_Free(dataBuffers);
+ }
return result;
}
@@ -735,6 +1486,8 @@
METH_VARARGS | METH_KEYWORDS, ZstdCompressor_read_from__doc__ },
{ "write_to", (PyCFunction)ZstdCompressor_write_to,
METH_VARARGS | METH_KEYWORDS, ZstdCompressor_write_to___doc__ },
+ { "multi_compress_to_buffer", (PyCFunction)ZstdCompressor_multi_compress_to_buffer,
+ METH_VARARGS | METH_KEYWORDS, ZstdCompressor_multi_compress_to_buffer__doc__ },
{ NULL, NULL }
};