diff contrib/python-zstandard/c-ext/compressor.c @ 31796:e0dc40530c5a

zstd: vendor python-zstandard 0.8.0 Commit 81e1f5bbf1fc54808649562d3ed829730765c540 from https://github.com/indygreg/python-zstandard is imported without modifications (other than removing unwanted files). Updates relevant to Mercurial include: * Support for multi-threaded compression (we can use this for bundle and wire protocol compression). * APIs for batch compression and decompression operations using multiple threads and optimal memory allocation mechanism. (Can be useful for revlog perf improvements.) * A ``BufferWithSegments`` type that models a single memory buffer containing N discrete items of known lengths. This type can be used for very efficient 0-copy data operations. # no-check-commit
author Gregory Szorc <gregory.szorc@gmail.com>
date Sat, 01 Apr 2017 15:24:03 -0700
parents c32454d69b85
children b1fb341d8a61
line wrap: on
line diff
--- a/contrib/python-zstandard/c-ext/compressor.c	Sat Apr 01 13:43:52 2017 -0700
+++ b/contrib/python-zstandard/c-ext/compressor.c	Sat Apr 01 15:24:03 2017 -0700
@@ -7,12 +7,17 @@
 */
 
 #include "python-zstandard.h"
+#include "pool.h"
 
 extern PyObject* ZstdError;
 
-int populate_cdict(ZstdCompressor* compressor, void* dictData, size_t dictSize, ZSTD_parameters* zparams) {
+int populate_cdict(ZstdCompressor* compressor, ZSTD_parameters* zparams) {
 	ZSTD_customMem zmem;
-	assert(!compressor->cdict);
+
+	if (compressor->cdict || !compressor->dict || !compressor->dict->dictData) {
+		return 0;
+	}
+
 	Py_BEGIN_ALLOW_THREADS
 	memset(&zmem, 0, sizeof(zmem));
 	compressor->cdict = ZSTD_createCDict_advanced(compressor->dict->dictData,
@@ -28,22 +33,32 @@
 }
 
 /**
-* Initialize a zstd CStream from a ZstdCompressor instance.
-*
-* Returns a ZSTD_CStream on success or NULL on failure. If NULL, a Python
-* exception will be set.
-*/
-ZSTD_CStream* CStream_from_ZstdCompressor(ZstdCompressor* compressor, Py_ssize_t sourceSize) {
-	ZSTD_CStream* cstream;
+ * Ensure the ZSTD_CStream on a ZstdCompressor instance is initialized.
+ *
+ * Returns 0 on success. Other value on failure. Will set a Python exception
+ * on failure.
+ */
+int init_cstream(ZstdCompressor* compressor, unsigned long long sourceSize) {
 	ZSTD_parameters zparams;
 	void* dictData = NULL;
 	size_t dictSize = 0;
 	size_t zresult;
 
-	cstream = ZSTD_createCStream();
-	if (!cstream) {
-		PyErr_SetString(ZstdError, "cannot create CStream");
-		return NULL;
+	if (compressor->cstream) {
+		zresult = ZSTD_resetCStream(compressor->cstream, sourceSize);
+		if (ZSTD_isError(zresult)) {
+			PyErr_Format(ZstdError, "could not reset CStream: %s",
+				ZSTD_getErrorName(zresult));
+			return -1;
+		}
+
+		return 0;
+	}
+
+	compressor->cstream = ZSTD_createCStream();
+	if (!compressor->cstream) {
+		PyErr_SetString(ZstdError, "could not create CStream");
+		return -1;
 	}
 
 	if (compressor->dict) {
@@ -63,15 +78,51 @@
 
 	zparams.fParams = compressor->fparams;
 
-	zresult = ZSTD_initCStream_advanced(cstream, dictData, dictSize, zparams, sourceSize);
+	zresult = ZSTD_initCStream_advanced(compressor->cstream, dictData, dictSize,
+		zparams, sourceSize);
 
 	if (ZSTD_isError(zresult)) {
-		ZSTD_freeCStream(cstream);
+		ZSTD_freeCStream(compressor->cstream);
+		compressor->cstream = NULL;
 		PyErr_Format(ZstdError, "cannot init CStream: %s", ZSTD_getErrorName(zresult));
-		return NULL;
+		return -1;
 	}
 
-	return cstream;
+	return 0;;
+}
+
+int init_mtcstream(ZstdCompressor* compressor, Py_ssize_t sourceSize) {
+	size_t zresult;
+	void* dictData = NULL;
+	size_t dictSize = 0;
+	ZSTD_parameters zparams;
+
+	assert(compressor->mtcctx);
+
+	if (compressor->dict) {
+		dictData = compressor->dict->dictData;
+		dictSize = compressor->dict->dictSize;
+	}
+
+	memset(&zparams, 0, sizeof(zparams));
+	if (compressor->cparams) {
+		ztopy_compression_parameters(compressor->cparams, &zparams.cParams);
+	}
+	else {
+		zparams.cParams = ZSTD_getCParams(compressor->compressionLevel, sourceSize, dictSize);
+	}
+
+	zparams.fParams = compressor->fparams;
+
+	zresult = ZSTDMT_initCStream_advanced(compressor->mtcctx, dictData, dictSize,
+		zparams, sourceSize);
+
+	if (ZSTD_isError(zresult)) {
+		PyErr_Format(ZstdError, "cannot init CStream: %s", ZSTD_getErrorName(zresult));
+		return -1;
+	}
+
+	return 0;
 }
 
 PyDoc_STRVAR(ZstdCompressor__doc__,
@@ -103,6 +154,11 @@
 "   Determines whether the dictionary ID will be written into the compressed\n"
 "   data. Defaults to True. Only adds content to the compressed data if\n"
 "   a dictionary is being used.\n"
+"threads\n"
+"   Number of threads to use to compress data concurrently. When set,\n"
+"   compression operations are performed on multiple threads. The default\n"
+"   value (0) disables multi-threaded compression. A value of ``-1`` means to\n"
+"   set the number of threads to the number of detected logical CPUs.\n"
 );
 
 static int ZstdCompressor_init(ZstdCompressor* self, PyObject* args, PyObject* kwargs) {
@@ -113,6 +169,7 @@
 		"write_checksum",
 		"write_content_size",
 		"write_dict_id",
+		"threads",
 		NULL
 	};
 
@@ -122,16 +179,12 @@
 	PyObject* writeChecksum = NULL;
 	PyObject* writeContentSize = NULL;
 	PyObject* writeDictID = NULL;
+	int threads = 0;
 
-	self->cctx = NULL;
-	self->dict = NULL;
-	self->cparams = NULL;
-	self->cdict = NULL;
-
-	if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|iO!O!OOO:ZstdCompressor",
+	if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|iO!O!OOOi:ZstdCompressor",
 		kwlist,	&level, &ZstdCompressionDictType, &dict,
 		&CompressionParametersType, &params,
-		&writeChecksum, &writeContentSize, &writeDictID)) {
+		&writeChecksum, &writeContentSize, &writeDictID, &threads)) {
 		return -1;
 	}
 
@@ -146,12 +199,27 @@
 		return -1;
 	}
 
+	if (threads < 0) {
+		threads = cpu_count();
+	}
+
+	self->threads = threads;
+
 	/* We create a ZSTD_CCtx for reuse among multiple operations to reduce the
 	   overhead of each compression operation. */
-	self->cctx = ZSTD_createCCtx();
-	if (!self->cctx) {
-		PyErr_NoMemory();
-		return -1;
+	if (threads) {
+		self->mtcctx = ZSTDMT_createCCtx(threads);
+		if (!self->mtcctx) {
+			PyErr_NoMemory();
+			return -1;
+		}
+	}
+	else {
+		self->cctx = ZSTD_createCCtx();
+		if (!self->cctx) {
+			PyErr_NoMemory();
+			return -1;
+		}
 	}
 
 	self->compressionLevel = level;
@@ -182,6 +250,11 @@
 }
 
 static void ZstdCompressor_dealloc(ZstdCompressor* self) {
+	if (self->cstream) {
+		ZSTD_freeCStream(self->cstream);
+		self->cstream = NULL;
+	}
+
 	Py_XDECREF(self->cparams);
 	Py_XDECREF(self->dict);
 
@@ -195,6 +268,11 @@
 		self->cctx = NULL;
 	}
 
+	if (self->mtcctx) {
+		ZSTDMT_freeCCtx(self->mtcctx);
+		self->mtcctx = NULL;
+	}
+
 	PyObject_Del(self);
 }
 
@@ -229,7 +307,6 @@
 	Py_ssize_t sourceSize = 0;
 	size_t inSize = ZSTD_CStreamInSize();
 	size_t outSize = ZSTD_CStreamOutSize();
-	ZSTD_CStream* cstream;
 	ZSTD_inBuffer input;
 	ZSTD_outBuffer output;
 	Py_ssize_t totalRead = 0;
@@ -261,10 +338,17 @@
 	/* Prevent free on uninitialized memory in finally. */
 	output.dst = NULL;
 
-	cstream = CStream_from_ZstdCompressor(self, sourceSize);
-	if (!cstream) {
-		res = NULL;
-		goto finally;
+	if (self->mtcctx) {
+		if (init_mtcstream(self, sourceSize)) {
+			res = NULL;
+			goto finally;
+		}
+	}
+	else {
+		if (0 != init_cstream(self, sourceSize)) {
+			res = NULL;
+			goto finally;
+		}
 	}
 
 	output.dst = PyMem_Malloc(outSize);
@@ -300,7 +384,12 @@
 
 		while (input.pos < input.size) {
 			Py_BEGIN_ALLOW_THREADS
-			zresult = ZSTD_compressStream(cstream, &output, &input);
+			if (self->mtcctx) {
+				zresult = ZSTDMT_compressStream(self->mtcctx, &output, &input);
+			}
+			else {
+				zresult = ZSTD_compressStream(self->cstream, &output, &input);
+			}
 			Py_END_ALLOW_THREADS
 
 			if (ZSTD_isError(zresult)) {
@@ -325,7 +414,12 @@
 
 	/* We've finished reading. Now flush the compressor stream. */
 	while (1) {
-		zresult = ZSTD_endStream(cstream, &output);
+		if (self->mtcctx) {
+			zresult = ZSTDMT_endStream(self->mtcctx, &output);
+		}
+		else {
+			zresult = ZSTD_endStream(self->cstream, &output);
+		}
 		if (ZSTD_isError(zresult)) {
 			PyErr_Format(ZstdError, "error ending compression stream: %s",
 				ZSTD_getErrorName(zresult));
@@ -350,24 +444,17 @@
 		}
 	}
 
-	ZSTD_freeCStream(cstream);
-	cstream = NULL;
-
 	totalReadPy = PyLong_FromSsize_t(totalRead);
 	totalWritePy = PyLong_FromSsize_t(totalWrite);
 	res = PyTuple_Pack(2, totalReadPy, totalWritePy);
-	Py_DecRef(totalReadPy);
-	Py_DecRef(totalWritePy);
+	Py_DECREF(totalReadPy);
+	Py_DECREF(totalWritePy);
 
 finally:
 	if (output.dst) {
 		PyMem_Free(output.dst);
 	}
 
-	if (cstream) {
-		ZSTD_freeCStream(cstream);
-	}
-
 	return res;
 }
 
@@ -410,6 +497,18 @@
 		return NULL;
 	}
 
+	if (self->threads && self->dict) {
+		PyErr_SetString(ZstdError,
+			"compress() cannot be used with both dictionaries and multi-threaded compression");
+		return NULL;
+	}
+
+	if (self->threads && self->cparams) {
+		PyErr_SetString(ZstdError,
+			"compress() cannot be used with both compression parameters and multi-threaded compression");
+		return NULL;
+	}
+
 	/* Limitation in zstd C API doesn't let decompression side distinguish
 	   between content size of 0 and unknown content size. This can make round
 	   tripping via Python difficult. Until this is fixed, require a flag
@@ -456,24 +555,28 @@
 	https://github.com/facebook/zstd/issues/358 contains more info. We could
 	potentially add an argument somewhere to control this behavior.
 	*/
-	if (dictData && !self->cdict) {
-		if (populate_cdict(self, dictData, dictSize, &zparams)) {
-			Py_DECREF(output);
-			return NULL;
-		}
+	if (0 != populate_cdict(self, &zparams)) {
+		Py_DECREF(output);
+		return NULL;
 	}
 
 	Py_BEGIN_ALLOW_THREADS
-	/* By avoiding ZSTD_compress(), we don't necessarily write out content
-	   size. This means the argument to ZstdCompressor to control frame
-	   parameters is honored. */
-	if (self->cdict) {
-		zresult = ZSTD_compress_usingCDict(self->cctx, dest, destSize,
-			source, sourceSize, self->cdict);
+	if (self->mtcctx) {
+		zresult = ZSTDMT_compressCCtx(self->mtcctx, dest, destSize,
+			source, sourceSize, self->compressionLevel);
 	}
 	else {
-		zresult = ZSTD_compress_advanced(self->cctx, dest, destSize,
-			source, sourceSize, dictData, dictSize, zparams);
+		/* By avoiding ZSTD_compress(), we don't necessarily write out content
+		   size. This means the argument to ZstdCompressor to control frame
+		   parameters is honored. */
+		if (self->cdict) {
+			zresult = ZSTD_compress_usingCDict(self->cctx, dest, destSize,
+				source, sourceSize, self->cdict);
+		}
+		else {
+			zresult = ZSTD_compress_advanced(self->cctx, dest, destSize,
+				source, sourceSize, dictData, dictSize, zparams);
+		}
 	}
 	Py_END_ALLOW_THREADS
 
@@ -507,21 +610,30 @@
 
 	Py_ssize_t inSize = 0;
 	size_t outSize = ZSTD_CStreamOutSize();
-	ZstdCompressionObj* result = PyObject_New(ZstdCompressionObj, &ZstdCompressionObjType);
-	if (!result) {
-		return NULL;
-	}
+	ZstdCompressionObj* result = NULL;
 
 	if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|n:compressobj", kwlist, &inSize)) {
 		return NULL;
 	}
 
-	result->cstream = CStream_from_ZstdCompressor(self, inSize);
-	if (!result->cstream) {
-		Py_DECREF(result);
+	result = (ZstdCompressionObj*)PyObject_CallObject((PyObject*)&ZstdCompressionObjType, NULL);
+	if (!result) {
 		return NULL;
 	}
 
+	if (self->mtcctx) {
+		if (init_mtcstream(self, inSize)) {
+			Py_DECREF(result);
+			return NULL;
+		}
+	}
+	else {
+		if (0 != init_cstream(self, inSize)) {
+			Py_DECREF(result);
+			return NULL;
+		}
+	}
+
 	result->output.dst = PyMem_Malloc(outSize);
 	if (!result->output.dst) {
 		PyErr_NoMemory();
@@ -529,13 +641,9 @@
 		return NULL;
 	}
 	result->output.size = outSize;
-	result->output.pos = 0;
-
 	result->compressor = self;
 	Py_INCREF(result->compressor);
 
-	result->finished = 0;
-
 	return result;
 }
 
@@ -579,19 +687,10 @@
 		return NULL;
 	}
 
-	result = PyObject_New(ZstdCompressorIterator, &ZstdCompressorIteratorType);
+	result = (ZstdCompressorIterator*)PyObject_CallObject((PyObject*)&ZstdCompressorIteratorType, NULL);
 	if (!result) {
 		return NULL;
 	}
-
-	result->compressor = NULL;
-	result->reader = NULL;
-	result->buffer = NULL;
-	result->cstream = NULL;
-	result->input.src = NULL;
-	result->output.dst = NULL;
-	result->readResult = NULL;
-
 	if (PyObject_HasAttrString(reader, "read")) {
 		result->reader = reader;
 		Py_INCREF(result->reader);
@@ -608,7 +707,6 @@
 			goto except;
 		}
 
-		result->bufferOffset = 0;
 		sourceSize = result->buffer->len;
 	}
 	else {
@@ -621,9 +719,16 @@
 	Py_INCREF(result->compressor);
 
 	result->sourceSize = sourceSize;
-	result->cstream = CStream_from_ZstdCompressor(self, sourceSize);
-	if (!result->cstream) {
-		goto except;
+
+	if (self->mtcctx) {
+		if (init_mtcstream(self, sourceSize)) {
+			goto except;
+		}
+	}
+	else {
+		if (0 != init_cstream(self, sourceSize)) {
+			goto except;
+		}
 	}
 
 	result->inSize = inSize;
@@ -635,26 +740,12 @@
 		goto except;
 	}
 	result->output.size = outSize;
-	result->output.pos = 0;
-
-	result->input.src = NULL;
-	result->input.size = 0;
-	result->input.pos = 0;
-
-	result->finishedInput = 0;
-	result->finishedOutput = 0;
 
 	goto finally;
 
 except:
-	if (result->cstream) {
-		ZSTD_freeCStream(result->cstream);
-		result->cstream = NULL;
-	}
-
-	Py_DecRef((PyObject*)result->compressor);
-	Py_DecRef(result->reader);
-
+	Py_XDECREF(result->compressor);
+	Py_XDECREF(result->reader);
 	Py_DECREF(result);
 	result = NULL;
 
@@ -703,7 +794,7 @@
 		return NULL;
 	}
 
-	result = PyObject_New(ZstdCompressionWriter, &ZstdCompressionWriterType);
+	result = (ZstdCompressionWriter*)PyObject_CallObject((PyObject*)&ZstdCompressionWriterType, NULL);
 	if (!result) {
 		return NULL;
 	}
@@ -715,11 +806,671 @@
 	Py_INCREF(result->writer);
 
 	result->sourceSize = sourceSize;
-
 	result->outSize = outSize;
 
-	result->entered = 0;
-	result->cstream = NULL;
+	return result;
+}
+
+typedef struct {
+	void* sourceData;
+	size_t sourceSize;
+} DataSource;
+
+typedef struct {
+	DataSource* sources;
+	Py_ssize_t sourcesSize;
+	unsigned long long totalSourceSize;
+} DataSources;
+
+typedef struct {
+	void* dest;
+	Py_ssize_t destSize;
+	BufferSegment* segments;
+	Py_ssize_t segmentsSize;
+} DestBuffer;
+
+typedef enum {
+	WorkerError_none = 0,
+	WorkerError_zstd = 1,
+	WorkerError_no_memory = 2,
+} WorkerError;
+
+/**
+ * Holds state for an individual worker performing multi_compress_to_buffer work.
+ */
+typedef struct {
+	/* Used for compression. */
+	ZSTD_CCtx* cctx;
+	ZSTD_CDict* cdict;
+	int cLevel;
+	CompressionParametersObject* cParams;
+	ZSTD_frameParameters fParams;
+
+	/* What to compress. */
+	DataSource* sources;
+	Py_ssize_t sourcesSize;
+	Py_ssize_t startOffset;
+	Py_ssize_t endOffset;
+	unsigned long long totalSourceSize;
+
+	/* Result storage. */
+	DestBuffer* destBuffers;
+	Py_ssize_t destCount;
+
+	/* Error tracking. */
+	WorkerError error;
+	size_t zresult;
+	Py_ssize_t errorOffset;
+} WorkerState;
+
+static void compress_worker(WorkerState* state) {
+	Py_ssize_t inputOffset = state->startOffset;
+	Py_ssize_t remainingItems = state->endOffset - state->startOffset + 1;
+	Py_ssize_t currentBufferStartOffset = state->startOffset;
+	size_t zresult;
+	ZSTD_parameters zparams;
+	void* newDest;
+	size_t allocationSize;
+	size_t boundSize;
+	Py_ssize_t destOffset = 0;
+	DataSource* sources = state->sources;
+	DestBuffer* destBuffer;
+
+	assert(!state->destBuffers);
+	assert(0 == state->destCount);
+
+	if (state->cParams) {
+		ztopy_compression_parameters(state->cParams, &zparams.cParams);
+	}
+
+	zparams.fParams = state->fParams;
+
+	/*
+	 * The total size of the compressed data is unknown until we actually
+	 * compress data. That means we can't pre-allocate the exact size we need.
+	 * 
+	 * There is a cost to every allocation and reallocation. So, it is in our
+	 * interest to minimize the number of allocations.
+	 *
+	 * There is also a cost to too few allocations. If allocations are too
+	 * large they may fail. If buffers are shared and all inputs become
+	 * irrelevant at different lifetimes, then a reference to one segment
+	 * in the buffer will keep the entire buffer alive. This leads to excessive
+	 * memory usage.
+	 *
+	 * Our current strategy is to assume a compression ratio of 16:1 and
+	 * allocate buffers of that size, rounded up to the nearest power of 2
+	 * (because computers like round numbers). That ratio is greater than what
+	 * most inputs achieve. This is by design: we don't want to over-allocate.
+	 * But we don't want to under-allocate and lead to too many buffers either.
+	 */
+
+	state->destCount = 1;
+
+	state->destBuffers = calloc(1, sizeof(DestBuffer));
+	if (NULL == state->destBuffers) {
+		state->error = WorkerError_no_memory;
+		return;
+	}
+
+	destBuffer = &state->destBuffers[state->destCount - 1];
+
+	/*
+	 * Rather than track bounds and grow the segments buffer, allocate space
+	 * to hold remaining items then truncate when we're done with it.
+	 */
+	destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment));
+	if (NULL == destBuffer->segments) {
+		state->error = WorkerError_no_memory;
+		return;
+	}
+
+	destBuffer->segmentsSize = remainingItems;
+
+	allocationSize = roundpow2(state->totalSourceSize >> 4);
+
+	/* If the maximum size of the output is larger than that, round up. */
+	boundSize = ZSTD_compressBound(sources[inputOffset].sourceSize);
+
+	if (boundSize > allocationSize) {
+		allocationSize = roundpow2(boundSize);
+	}
+
+	destBuffer->dest = malloc(allocationSize);
+	if (NULL == destBuffer->dest) {
+		state->error = WorkerError_no_memory;
+		return;
+	}
+
+	destBuffer->destSize = allocationSize;
+
+	for (inputOffset = state->startOffset; inputOffset <= state->endOffset; inputOffset++) {
+		void* source = sources[inputOffset].sourceData;
+		size_t sourceSize = sources[inputOffset].sourceSize;
+		size_t destAvailable;
+		void* dest;
+
+		destAvailable = destBuffer->destSize - destOffset;
+		boundSize = ZSTD_compressBound(sourceSize);
+
+		/*
+		 * Not enough space in current buffer to hold largest compressed output.
+		 * So allocate and switch to a new output buffer.
+		 */
+		if (boundSize > destAvailable) {
+			/*
+			 * The downsizing of the existing buffer is optional. It should be cheap
+			 * (unlike growing). So we just do it.
+			 */
+			if (destAvailable) {
+				newDest = realloc(destBuffer->dest, destOffset);
+				if (NULL == newDest) {
+					state->error = WorkerError_no_memory;
+					return;
+				}
+
+				destBuffer->dest = newDest;
+				destBuffer->destSize = destOffset;
+			}
+
+			/* Truncate segments buffer. */
+			newDest = realloc(destBuffer->segments,
+				(inputOffset - currentBufferStartOffset + 1) * sizeof(BufferSegment));
+			if (NULL == newDest) {
+				state->error = WorkerError_no_memory;
+				return;
+			}
+
+			destBuffer->segments = newDest;
+			destBuffer->segmentsSize = inputOffset - currentBufferStartOffset;
+
+			/* Grow space for new struct. */
+			/* TODO consider over-allocating so we don't do this every time. */
+			newDest = realloc(state->destBuffers, (state->destCount + 1) * sizeof(DestBuffer));
+			if (NULL == newDest) {
+				state->error = WorkerError_no_memory;
+				return;
+			}
+
+			state->destBuffers = newDest;
+			state->destCount++;
+
+			destBuffer = &state->destBuffers[state->destCount - 1];
+
+			/* Don't take any chances with non-NULL pointers. */
+			memset(destBuffer, 0, sizeof(DestBuffer));
+
+			/**
+			 * We could dynamically update allocation size based on work done so far.
+			 * For now, keep is simple.
+			 */
+			allocationSize = roundpow2(state->totalSourceSize >> 4);
+
+			if (boundSize > allocationSize) {
+				allocationSize = roundpow2(boundSize);
+			}
+
+			destBuffer->dest = malloc(allocationSize);
+			if (NULL == destBuffer->dest) {
+				state->error = WorkerError_no_memory;
+				return;
+			}
+
+			destBuffer->destSize = allocationSize;
+			destAvailable = allocationSize;
+			destOffset = 0;
+
+			destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment));
+			if (NULL == destBuffer->segments) {
+				state->error = WorkerError_no_memory;
+				return;
+			}
+
+			destBuffer->segmentsSize = remainingItems;
+			currentBufferStartOffset = inputOffset;
+		}
+
+		dest = (char*)destBuffer->dest + destOffset;
+
+		if (state->cdict) {
+			zresult = ZSTD_compress_usingCDict(state->cctx, dest, destAvailable,
+				source, sourceSize, state->cdict);
+		}
+		else {
+			if (!state->cParams) {
+				zparams.cParams = ZSTD_getCParams(state->cLevel, sourceSize, 0);
+			}
+
+			zresult = ZSTD_compress_advanced(state->cctx, dest, destAvailable,
+				source, sourceSize, NULL, 0, zparams);
+		}
+
+		if (ZSTD_isError(zresult)) {
+			state->error = WorkerError_zstd;
+			state->zresult = zresult;
+			state->errorOffset = inputOffset;
+			break;
+		}
+
+		destBuffer->segments[inputOffset - currentBufferStartOffset].offset = destOffset;
+		destBuffer->segments[inputOffset - currentBufferStartOffset].length = zresult;
+
+		destOffset += zresult;
+		remainingItems--;
+	}
+
+	if (destBuffer->destSize > destOffset) {
+		newDest = realloc(destBuffer->dest, destOffset);
+		if (NULL == newDest) {
+			state->error = WorkerError_no_memory;
+			return;
+		}
+
+		destBuffer->dest = newDest;
+		destBuffer->destSize = destOffset;
+	}
+}
+
+ZstdBufferWithSegmentsCollection* compress_from_datasources(ZstdCompressor* compressor,
+	DataSources* sources, unsigned int threadCount) {
+	ZSTD_parameters zparams;
+	unsigned long long bytesPerWorker;
+	POOL_ctx* pool = NULL;
+	WorkerState* workerStates = NULL;
+	Py_ssize_t i;
+	unsigned long long workerBytes = 0;
+	Py_ssize_t workerStartOffset = 0;
+	size_t currentThread = 0;
+	int errored = 0;
+	Py_ssize_t segmentsCount = 0;
+	Py_ssize_t segmentIndex;
+	PyObject* segmentsArg = NULL;
+	ZstdBufferWithSegments* buffer;
+	ZstdBufferWithSegmentsCollection* result = NULL;
+
+	assert(sources->sourcesSize > 0);
+	assert(sources->totalSourceSize > 0);
+	assert(threadCount >= 1);
+
+	/* More threads than inputs makes no sense. */
+	threadCount = sources->sourcesSize < threadCount ? (unsigned int)sources->sourcesSize
+													 : threadCount;
+
+	/* TODO lower thread count when input size is too small and threads would add
+	overhead. */
+
+	/*
+	 * When dictionaries are used, parameters are derived from the size of the
+	 * first element.
+	 *
+	 * TODO come up with a better mechanism.
+	 */
+	memset(&zparams, 0, sizeof(zparams));
+	if (compressor->cparams) {
+		ztopy_compression_parameters(compressor->cparams, &zparams.cParams);
+	}
+	else {
+		zparams.cParams = ZSTD_getCParams(compressor->compressionLevel,
+			sources->sources[0].sourceSize,
+			compressor->dict ? compressor->dict->dictSize : 0);
+	}
+
+	zparams.fParams = compressor->fparams;
+
+	if (0 != populate_cdict(compressor, &zparams)) {
+		return NULL;
+	}
+
+	workerStates = PyMem_Malloc(threadCount * sizeof(WorkerState));
+	if (NULL == workerStates) {
+		PyErr_NoMemory();
+		goto finally;
+	}
+
+	memset(workerStates, 0, threadCount * sizeof(WorkerState));
+
+	if (threadCount > 1) {
+		pool = POOL_create(threadCount, 1);
+		if (NULL == pool) {
+			PyErr_SetString(ZstdError, "could not initialize zstd thread pool");
+			goto finally;
+		}
+	}
+
+	bytesPerWorker = sources->totalSourceSize / threadCount;
+
+	for (i = 0; i < threadCount; i++) {
+		workerStates[i].cctx = ZSTD_createCCtx();
+		if (!workerStates[i].cctx) {
+			PyErr_NoMemory();
+			goto finally;
+		}
+
+		workerStates[i].cdict = compressor->cdict;
+		workerStates[i].cLevel = compressor->compressionLevel;
+		workerStates[i].cParams = compressor->cparams;
+		workerStates[i].fParams = compressor->fparams;
+
+		workerStates[i].sources = sources->sources;
+		workerStates[i].sourcesSize = sources->sourcesSize;
+	}
+
+	Py_BEGIN_ALLOW_THREADS
+	for (i = 0; i < sources->sourcesSize; i++) {
+		workerBytes += sources->sources[i].sourceSize;
+
+		/*
+		 * The last worker/thread needs to handle all remaining work. Don't
+		 * trigger it prematurely. Defer to the block outside of the loop
+		 * to run the last worker/thread. But do still process this loop
+		 * so workerBytes is correct.
+		 */
+		if (currentThread == threadCount - 1) {
+			continue;
+		}
+
+		if (workerBytes >= bytesPerWorker) {
+			assert(currentThread < threadCount);
+			workerStates[currentThread].totalSourceSize = workerBytes;
+			workerStates[currentThread].startOffset = workerStartOffset;
+			workerStates[currentThread].endOffset = i;
+
+			if (threadCount > 1) {
+				POOL_add(pool, (POOL_function)compress_worker, &workerStates[currentThread]);
+			}
+			else {
+				compress_worker(&workerStates[currentThread]);
+			}
+
+			currentThread++;
+			workerStartOffset = i + 1;
+			workerBytes = 0;
+		}
+	}
+
+	if (workerBytes) {
+		assert(currentThread < threadCount);
+		workerStates[currentThread].totalSourceSize = workerBytes;
+		workerStates[currentThread].startOffset = workerStartOffset;
+		workerStates[currentThread].endOffset = sources->sourcesSize - 1;
+
+		if (threadCount > 1) {
+			POOL_add(pool, (POOL_function)compress_worker, &workerStates[currentThread]);
+		}
+		else {
+			compress_worker(&workerStates[currentThread]);
+		}
+	}
+
+	if (threadCount > 1) {
+		POOL_free(pool);
+		pool = NULL;
+	}
+
+	Py_END_ALLOW_THREADS
+
+	for (i = 0; i < threadCount; i++) {
+		switch (workerStates[i].error) {
+		case WorkerError_no_memory:
+			PyErr_NoMemory();
+			errored = 1;
+			break;
+
+		case WorkerError_zstd:
+			PyErr_Format(ZstdError, "error compressing item %zd: %s",
+				workerStates[i].errorOffset, ZSTD_getErrorName(workerStates[i].zresult));
+			errored = 1;
+			break;
+		default:
+			;
+		}
+
+		if (errored) {
+			break;
+		}
+
+	}
+
+	if (errored) {
+		goto finally;
+	}
+
+	segmentsCount = 0;
+	for (i = 0; i < threadCount; i++) {
+		WorkerState* state = &workerStates[i];
+		segmentsCount += state->destCount;
+	}
+
+	segmentsArg = PyTuple_New(segmentsCount);
+	if (NULL == segmentsArg) {
+		goto finally;
+	}
+
+	segmentIndex = 0;
+
+	for (i = 0; i < threadCount; i++) {
+		Py_ssize_t j;
+		WorkerState* state = &workerStates[i];
+
+		for (j = 0; j < state->destCount; j++) {
+			DestBuffer* destBuffer = &state->destBuffers[j];
+			buffer = BufferWithSegments_FromMemory(destBuffer->dest, destBuffer->destSize,
+				destBuffer->segments, destBuffer->segmentsSize);
+
+			if (NULL == buffer) {
+				goto finally;
+			}
+
+			/* Tell instance to use free() instsead of PyMem_Free(). */
+			buffer->useFree = 1;
+
+			/*
+			 * BufferWithSegments_FromMemory takes ownership of the backing memory.
+			 * Unset it here so it doesn't get freed below.
+			 */
+			destBuffer->dest = NULL;
+			destBuffer->segments = NULL;
+
+			PyTuple_SET_ITEM(segmentsArg, segmentIndex++, (PyObject*)buffer);
+		}
+	}
+
+	result = (ZstdBufferWithSegmentsCollection*)PyObject_CallObject(
+		(PyObject*)&ZstdBufferWithSegmentsCollectionType, segmentsArg);
+
+finally:
+	Py_CLEAR(segmentsArg);
+
+	if (pool) {
+		POOL_free(pool);
+	}
+
+	if (workerStates) {
+		Py_ssize_t j;
+
+		for (i = 0; i < threadCount; i++) {
+			WorkerState state = workerStates[i];
+
+			if (state.cctx) {
+				ZSTD_freeCCtx(state.cctx);
+			}
+
+			/* malloc() is used in worker thread. */
+
+			for (j = 0; j < state.destCount; j++) {
+				if (state.destBuffers) {
+					free(state.destBuffers[j].dest);
+					free(state.destBuffers[j].segments);
+				}
+			}
+
+
+			free(state.destBuffers);
+		}
+
+		PyMem_Free(workerStates);
+	}
+
+	return result;
+}
+
+PyDoc_STRVAR(ZstdCompressor_multi_compress_to_buffer__doc__,
+"Compress multiple pieces of data as a single operation\n"
+"\n"
+"Receives a ``BufferWithSegmentsCollection``, a ``BufferWithSegments``, or\n"
+"a list of bytes like objects holding data to compress.\n"
+"\n"
+"Returns a ``BufferWithSegmentsCollection`` holding compressed data.\n"
+"\n"
+"This function is optimized to perform multiple compression operations as\n"
+"as possible with as little overhead as possbile.\n"
+);
+
+static ZstdBufferWithSegmentsCollection* ZstdCompressor_multi_compress_to_buffer(ZstdCompressor* self, PyObject* args, PyObject* kwargs) {
+	static char* kwlist[] = {
+		"data",
+		"threads",
+		NULL
+	};
+
+	PyObject* data;
+	int threads = 0;
+	Py_buffer* dataBuffers = NULL;
+	DataSources sources;
+	Py_ssize_t i;
+	Py_ssize_t sourceCount = 0;
+	ZstdBufferWithSegmentsCollection* result = NULL;
+
+	if (self->mtcctx) {
+		PyErr_SetString(ZstdError,
+			"function cannot be called on ZstdCompressor configured for multi-threaded compression");
+		return NULL;
+	}
+
+	memset(&sources, 0, sizeof(sources));
+
+	if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|i:multi_compress_to_buffer", kwlist,
+		&data, &threads)) {
+		return NULL;
+	}
+
+	if (threads < 0) {
+		threads = cpu_count();
+	}
+
+	if (threads < 2) {
+		threads = 1;
+	}
+
+	if (PyObject_TypeCheck(data, &ZstdBufferWithSegmentsType)) {
+		ZstdBufferWithSegments* buffer = (ZstdBufferWithSegments*)data;
+
+		sources.sources = PyMem_Malloc(buffer->segmentCount * sizeof(DataSource));
+		if (NULL == sources.sources) {
+			PyErr_NoMemory();
+			goto finally;
+		}
+
+		for (i = 0; i < buffer->segmentCount; i++) {
+			sources.sources[i].sourceData = (char*)buffer->data + buffer->segments[i].offset;
+			sources.sources[i].sourceSize = buffer->segments[i].length;
+			sources.totalSourceSize += buffer->segments[i].length;
+		}
+
+		sources.sourcesSize = buffer->segmentCount;
+	}
+	else if (PyObject_TypeCheck(data, &ZstdBufferWithSegmentsCollectionType)) {
+		Py_ssize_t j;
+		Py_ssize_t offset = 0;
+		ZstdBufferWithSegments* buffer;
+		ZstdBufferWithSegmentsCollection* collection = (ZstdBufferWithSegmentsCollection*)data;
+
+		sourceCount = BufferWithSegmentsCollection_length(collection);
+
+		sources.sources = PyMem_Malloc(sourceCount * sizeof(DataSource));
+		if (NULL == sources.sources) {
+			PyErr_NoMemory();
+			goto finally;
+		}
+
+		for (i = 0; i < collection->bufferCount; i++) {
+			buffer = collection->buffers[i];
+
+			for (j = 0; j < buffer->segmentCount; j++) {
+				sources.sources[offset].sourceData = (char*)buffer->data + buffer->segments[j].offset;
+				sources.sources[offset].sourceSize = buffer->segments[j].length;
+				sources.totalSourceSize += buffer->segments[j].length;
+
+				offset++;
+			}
+		}
+
+		sources.sourcesSize = sourceCount;
+	}
+	else if (PyList_Check(data)) {
+		sourceCount = PyList_GET_SIZE(data);
+
+		sources.sources = PyMem_Malloc(sourceCount * sizeof(DataSource));
+		if (NULL == sources.sources) {
+			PyErr_NoMemory();
+			goto finally;
+		}
+
+		/*
+		 * It isn't clear whether the address referred to by Py_buffer.buf
+		 * is still valid after PyBuffer_Release. We we hold a reference to all
+		 * Py_buffer instances for the duration of the operation.
+		 */
+		dataBuffers = PyMem_Malloc(sourceCount * sizeof(Py_buffer));
+		if (NULL == dataBuffers) {
+			PyErr_NoMemory();
+			goto finally;
+		}
+
+		memset(dataBuffers, 0, sourceCount * sizeof(Py_buffer));
+
+		for (i = 0; i < sourceCount; i++) {
+			if (0 != PyObject_GetBuffer(PyList_GET_ITEM(data, i),
+				&dataBuffers[i], PyBUF_CONTIG_RO)) {
+				PyErr_Clear();
+				PyErr_Format(PyExc_TypeError, "item %zd not a bytes like object", i);
+				goto finally;
+			}
+
+			sources.sources[i].sourceData = dataBuffers[i].buf;
+			sources.sources[i].sourceSize = dataBuffers[i].len;
+			sources.totalSourceSize += dataBuffers[i].len;
+		}
+
+		sources.sourcesSize = sourceCount;
+	}
+	else {
+		PyErr_SetString(PyExc_TypeError, "argument must be list of BufferWithSegments");
+		goto finally;
+	}
+
+	if (0 == sources.sourcesSize) {
+		PyErr_SetString(PyExc_ValueError, "no source elements found");
+		goto finally;
+	}
+
+	if (0 == sources.totalSourceSize) {
+		PyErr_SetString(PyExc_ValueError, "source elements are empty");
+		goto finally;
+	}
+
+	result = compress_from_datasources(self, &sources, threads);
+
+finally:
+	PyMem_Free(sources.sources);
+
+	if (dataBuffers) {
+		for (i = 0; i < sourceCount; i++) {
+			PyBuffer_Release(&dataBuffers[i]);
+		}
+
+		PyMem_Free(dataBuffers);
+	}
 
 	return result;
 }
@@ -735,6 +1486,8 @@
 	METH_VARARGS | METH_KEYWORDS, ZstdCompressor_read_from__doc__ },
 	{ "write_to", (PyCFunction)ZstdCompressor_write_to,
 	METH_VARARGS | METH_KEYWORDS, ZstdCompressor_write_to___doc__ },
+	{ "multi_compress_to_buffer", (PyCFunction)ZstdCompressor_multi_compress_to_buffer,
+	METH_VARARGS | METH_KEYWORDS, ZstdCompressor_multi_compress_to_buffer__doc__ },
 	{ NULL, NULL }
 };