diff contrib/python-zstandard/c-ext/compressionreader.c @ 42070:675775c33ab6

zstandard: vendor python-zstandard 0.11 The upstream source distribution from PyPI was extracted. Unwanted files were removed. The clang-format ignore list was updated to reflect the new source of files. The project contains a vendored copy of zstandard 1.3.8. The old version was 1.3.6. This should result in some minor performance wins. test-check-py3-compat.t was updated to reflect now-passing tests on Python 3.8. Some HTTP tests were updated to reflect new zstd compression output. # no-check-commit because 3rd party code has different style guidelines Differential Revision: https://phab.mercurial-scm.org/D6199
author Gregory Szorc <gregory.szorc@gmail.com>
date Thu, 04 Apr 2019 17:34:43 -0700
parents 73fef626dae3
children e92ca942ddca
line wrap: on
line diff
--- a/contrib/python-zstandard/c-ext/compressionreader.c	Thu Apr 04 15:24:03 2019 -0700
+++ b/contrib/python-zstandard/c-ext/compressionreader.c	Thu Apr 04 17:34:43 2019 -0700
@@ -128,6 +128,96 @@
 	return PyLong_FromUnsignedLongLong(self->bytesCompressed);
 }
 
+int read_compressor_input(ZstdCompressionReader* self) {
+	if (self->finishedInput) {
+		return 0;
+	}
+
+	if (self->input.pos != self->input.size) {
+		return 0;
+	}
+
+	if (self->reader) {
+		Py_buffer buffer;
+
+		assert(self->readResult == NULL);
+
+		self->readResult = PyObject_CallMethod(self->reader, "read",
+		    "k", self->readSize);
+
+		if (NULL == self->readResult) {
+			return -1;
+		}
+
+		memset(&buffer, 0, sizeof(buffer));
+
+		if (0 != PyObject_GetBuffer(self->readResult, &buffer, PyBUF_CONTIG_RO)) {
+			return -1;
+		}
+
+		/* EOF */
+		if (0 == buffer.len) {
+			self->finishedInput = 1;
+			Py_CLEAR(self->readResult);
+		}
+		else {
+			self->input.src = buffer.buf;
+			self->input.size = buffer.len;
+			self->input.pos = 0;
+		}
+
+		PyBuffer_Release(&buffer);
+	}
+	else {
+		assert(self->buffer.buf);
+
+		self->input.src = self->buffer.buf;
+		self->input.size = self->buffer.len;
+		self->input.pos = 0;
+	}
+
+	return 1;
+}
+
+int compress_input(ZstdCompressionReader* self, ZSTD_outBuffer* output) {
+	size_t oldPos;
+	size_t zresult;
+
+	/* If we have data left over, consume it. */
+	if (self->input.pos < self->input.size) {
+		oldPos = output->pos;
+
+		Py_BEGIN_ALLOW_THREADS
+		zresult = ZSTD_compressStream2(self->compressor->cctx,
+		    output, &self->input, ZSTD_e_continue);
+		Py_END_ALLOW_THREADS
+
+		self->bytesCompressed += output->pos - oldPos;
+
+		/* Input exhausted. Clear out state tracking. */
+		if (self->input.pos == self->input.size) {
+			memset(&self->input, 0, sizeof(self->input));
+			Py_CLEAR(self->readResult);
+
+			if (self->buffer.buf) {
+				self->finishedInput = 1;
+			}
+		}
+
+		if (ZSTD_isError(zresult)) {
+			PyErr_Format(ZstdError, "zstd compress error: %s", ZSTD_getErrorName(zresult));
+			return -1;
+		}
+	}
+
+    if (output->pos && output->pos == output->size) {
+        return 1;
+    }
+    else {
+        return 0;
+    }
+}
+
 static PyObject* reader_read(ZstdCompressionReader* self, PyObject* args, PyObject* kwargs) {
 	static char* kwlist[] = {
 		"size",
@@ -140,25 +230,30 @@
 	Py_ssize_t resultSize;
 	size_t zresult;
 	size_t oldPos;
+	int readResult, compressResult;
 
 	if (self->closed) {
 		PyErr_SetString(PyExc_ValueError, "stream is closed");
 		return NULL;
 	}
 
-	if (self->finishedOutput) {
-		return PyBytes_FromStringAndSize("", 0);
-	}
-
-	if (!PyArg_ParseTupleAndKeywords(args, kwargs, "n", kwlist, &size)) {
+	if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|n", kwlist, &size)) {
 		return NULL;
 	}
 
-	if (size < 1) {
-		PyErr_SetString(PyExc_ValueError, "cannot read negative or size 0 amounts");
+	if (size < -1) {
+		PyErr_SetString(PyExc_ValueError, "cannot read negative amounts less than -1");
 		return NULL;
 	}
 
+	if (size == -1) {
+		return PyObject_CallMethod((PyObject*)self, "readall", NULL);
+	}
+
+	if (self->finishedOutput || size == 0) {
+		return PyBytes_FromStringAndSize("", 0);
+	}
+
 	result = PyBytes_FromStringAndSize(NULL, size);
 	if (NULL == result) {
 		return NULL;
@@ -172,86 +267,34 @@
 
 readinput:
 
-	/* If we have data left over, consume it. */
-	if (self->input.pos < self->input.size) {
-		oldPos = self->output.pos;
-
-		Py_BEGIN_ALLOW_THREADS
-		zresult = ZSTD_compress_generic(self->compressor->cctx,
-			&self->output, &self->input, ZSTD_e_continue);
-
-		Py_END_ALLOW_THREADS
-
-		self->bytesCompressed += self->output.pos - oldPos;
-
-		/* Input exhausted. Clear out state tracking. */
-		if (self->input.pos == self->input.size) {
-			memset(&self->input, 0, sizeof(self->input));
-			Py_CLEAR(self->readResult);
+    compressResult = compress_input(self, &self->output);
 
-			if (self->buffer.buf) {
-				self->finishedInput = 1;
-			}
-		}
-
-		if (ZSTD_isError(zresult)) {
-			PyErr_Format(ZstdError, "zstd compress error: %s", ZSTD_getErrorName(zresult));
-			return NULL;
-		}
-
-		if (self->output.pos) {
-			/* If no more room in output, emit it. */
-			if (self->output.pos == self->output.size) {
-				memset(&self->output, 0, sizeof(self->output));
-				return result;
-			}
-
-			/*
-			 * There is room in the output. We fall through to below, which will either
-			 * get more input for us or will attempt to end the stream.
-			 */
-		}
-
-		/* Fall through to gather more input. */
+	if (-1 == compressResult) {
+		Py_XDECREF(result);
+		return NULL;
+	}
+	else if (0 == compressResult) {
+		/* There is room in the output. We fall through to below, which will
+		 * either get more input for us or will attempt to end the stream.
+		 */
+	}
+	else if (1 == compressResult) {
+		memset(&self->output, 0, sizeof(self->output));
+		return result;
+	}
+	else {
+		assert(0);
 	}
 
-	if (!self->finishedInput) {
-		if (self->reader) {
-			Py_buffer buffer;
-
-			assert(self->readResult == NULL);
-			self->readResult = PyObject_CallMethod(self->reader, "read",
-				"k", self->readSize);
-			if (self->readResult == NULL) {
-				return NULL;
-			}
-
-			memset(&buffer, 0, sizeof(buffer));
-
-			if (0 != PyObject_GetBuffer(self->readResult, &buffer, PyBUF_CONTIG_RO)) {
-				return NULL;
-			}
+	readResult = read_compressor_input(self);
 
-			/* EOF */
-			if (0 == buffer.len) {
-				self->finishedInput = 1;
-				Py_CLEAR(self->readResult);
-			}
-			else {
-				self->input.src = buffer.buf;
-				self->input.size = buffer.len;
-				self->input.pos = 0;
-			}
-
-			PyBuffer_Release(&buffer);
-		}
-		else {
-			assert(self->buffer.buf);
-
-			self->input.src = self->buffer.buf;
-			self->input.size = self->buffer.len;
-			self->input.pos = 0;
-		}
+	if (-1 == readResult) {
+		return NULL;
+	}
+	else if (0 == readResult) { }
+	else if (1 == readResult) { }
+	else {
+		assert(0);
 	}
 
 	if (self->input.size) {
@@ -261,7 +304,7 @@
 	/* Else EOF */
 	oldPos = self->output.pos;
 
-	zresult = ZSTD_compress_generic(self->compressor->cctx, &self->output,
+	zresult = ZSTD_compressStream2(self->compressor->cctx, &self->output,
 		&self->input, ZSTD_e_end);
 
 	self->bytesCompressed += self->output.pos - oldPos;
@@ -269,6 +312,7 @@
 	if (ZSTD_isError(zresult)) {
 		PyErr_Format(ZstdError, "error ending compression stream: %s",
 			ZSTD_getErrorName(zresult));
+		Py_XDECREF(result);
 		return NULL;
 	}
 
@@ -288,9 +332,394 @@
 	return result;
 }
 
+static PyObject* reader_read1(ZstdCompressionReader* self, PyObject* args, PyObject* kwargs) {
+	static char* kwlist[] = {
+		"size",
+		NULL
+	};
+
+	Py_ssize_t size = -1;
+	PyObject* result = NULL;
+	char* resultBuffer;
+	Py_ssize_t resultSize;
+	ZSTD_outBuffer output;
+	int compressResult;
+	size_t oldPos;
+	size_t zresult;
+
+	if (self->closed) {
+		PyErr_SetString(PyExc_ValueError, "stream is closed");
+		return NULL;
+	}
+
+	if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|n:read1", kwlist, &size)) {
+		return NULL;
+	}
+
+	if (size < -1) {
+		PyErr_SetString(PyExc_ValueError, "cannot read negative amounts less than -1");
+		return NULL;
+	}
+
+	if (self->finishedOutput || size == 0) {
+		return PyBytes_FromStringAndSize("", 0);
+	}
+
+	if (size == -1) {
+		size = ZSTD_CStreamOutSize();
+	}
+
+	result = PyBytes_FromStringAndSize(NULL, size);
+	if (NULL == result) {
+		return NULL;
+	}
+
+	PyBytes_AsStringAndSize(result, &resultBuffer, &resultSize);
+
+	output.dst = resultBuffer;
+	output.size = resultSize;
+	output.pos = 0;
+
+	/* read1() is supposed to use at most 1 read() from the underlying stream.
+	   However, we can't satisfy this requirement with compression because
+	   not every input will generate output. We /could/ flush the compressor,
+	   but this may not be desirable. We allow multiple read() from the
+	   underlying stream. But unlike read(), we return as soon as output data
+	   is available.
+	*/
+
+	compressResult = compress_input(self, &output);
+
+	if (-1 == compressResult) {
+		Py_XDECREF(result);
+		return NULL;
+	}
+	else if (0 == compressResult || 1 == compressResult) { }
+	else {
+		assert(0);
+	}
+
+	if (output.pos) {
+		goto finally;
+	}
+
+	while (!self->finishedInput) {
+		int readResult = read_compressor_input(self);
+
+		if (-1 == readResult) {
+			Py_XDECREF(result);
+			return NULL;
+		}
+		else if (0 == readResult || 1 == readResult) { }
+		else {
+			assert(0);
+		}
+
+		compressResult = compress_input(self, &output);
+
+		if (-1 == compressResult) {
+			Py_XDECREF(result);
+			return NULL;
+		}
+		else if (0 == compressResult || 1 == compressResult) { }
+		else {
+			assert(0);
+		}
+
+		if (output.pos) {
+			goto finally;
+		}
+	}
+
+	/* EOF */
+	oldPos = output.pos;
+
+	zresult = ZSTD_compressStream2(self->compressor->cctx, &output, &self->input,
+        ZSTD_e_end);
+
+	self->bytesCompressed += output.pos - oldPos;
+
+	if (ZSTD_isError(zresult)) {
+		PyErr_Format(ZstdError, "error ending compression stream: %s",
+		    ZSTD_getErrorName(zresult));
+		Py_XDECREF(result);
+		return NULL;
+	}
+
+	if (zresult == 0) {
+		self->finishedOutput = 1;
+	}
+
+finally:
+	if (result) {
+		if (safe_pybytes_resize(&result, output.pos)) {
+			Py_XDECREF(result);
+			return NULL;
+		}
+	}
+
+	return result;
+}
+
 static PyObject* reader_readall(PyObject* self) {
-	PyErr_SetNone(PyExc_NotImplementedError);
-	return NULL;
+	PyObject* chunks = NULL;
+	PyObject* empty = NULL;
+	PyObject* result = NULL;
+
+	/* Our strategy is to collect chunks into a list then join all the
+	 * chunks at the end. We could potentially use e.g. an io.BytesIO. But
+	 * this feels simple enough to implement and avoids potentially expensive
+	 * reallocations of large buffers.
+	 */
+	chunks = PyList_New(0);
+	if (NULL == chunks) {
+		return NULL;
+	}
+
+	while (1) {
+		PyObject* chunk = PyObject_CallMethod(self, "read", "i", 1048576);
+		if (NULL == chunk) {
+			Py_DECREF(chunks);
+			return NULL;
+		}
+
+		if (!PyBytes_Size(chunk)) {
+			Py_DECREF(chunk);
+			break;
+		}
+
+		if (PyList_Append(chunks, chunk)) {
+			Py_DECREF(chunk);
+			Py_DECREF(chunks);
+			return NULL;
+		}
+
+		Py_DECREF(chunk);
+	}
+
+	empty = PyBytes_FromStringAndSize("", 0);
+	if (NULL == empty) {
+		Py_DECREF(chunks);
+		return NULL;
+	}
+
+	result = PyObject_CallMethod(empty, "join", "O", chunks);
+
+	Py_DECREF(empty);
+	Py_DECREF(chunks);
+
+	return result;
+}
+
+static PyObject* reader_readinto(ZstdCompressionReader* self, PyObject* args) {
+	Py_buffer dest;
+	ZSTD_outBuffer output;
+	int readResult, compressResult;
+	PyObject* result = NULL;
+	size_t zresult;
+	size_t oldPos;
+
+	if (self->closed) {
+		PyErr_SetString(PyExc_ValueError, "stream is closed");
+		return NULL;
+	}
+
+	if (self->finishedOutput) {
+		return PyLong_FromLong(0);
+	}
+
+	if (!PyArg_ParseTuple(args, "w*:readinto", &dest)) {
+		return NULL;
+	}
+
+	if (!PyBuffer_IsContiguous(&dest, 'C') || dest.ndim > 1) {
+		PyErr_SetString(PyExc_ValueError,
+		    "destination buffer should be contiguous and have at most one dimension");
+		goto finally;
+	}
+
+	output.dst = dest.buf;
+	output.size = dest.len;
+	output.pos = 0;
+
+	compressResult = compress_input(self, &output);
+
+	if (-1 == compressResult) {
+		goto finally;
+	}
+	else if (0 == compressResult) {	}
+	else if (1 == compressResult) {
+		result = PyLong_FromSize_t(output.pos);
+		goto finally;
+	}
+	else {
+		assert(0);
+	}
+
+	while (!self->finishedInput) {
+		readResult = read_compressor_input(self);
+
+		if (-1 == readResult) {
+			goto finally;
+		}
+		else if (0 == readResult || 1 == readResult) {}
+		else {
+			assert(0);
+		}
+
+		compressResult = compress_input(self, &output);
+
+		if (-1 == compressResult) {
+			goto finally;
+		}
+		else if (0 == compressResult) { }
+		else if (1 == compressResult) {
+			result = PyLong_FromSize_t(output.pos);
+			goto finally;
+		}
+		else {
+			assert(0);
+		}
+	}
+
+	/* EOF */
+	oldPos = output.pos;
+
+	zresult = ZSTD_compressStream2(self->compressor->cctx, &output, &self->input,
+	    ZSTD_e_end);
+
+	self->bytesCompressed += self->output.pos - oldPos;
+
+	if (ZSTD_isError(zresult)) {
+		PyErr_Format(ZstdError, "error ending compression stream: %s",
+		    ZSTD_getErrorName(zresult));
+		goto finally;
+	}
+
+	assert(output.pos);
+
+	if (0 == zresult) {
+		self->finishedOutput = 1;
+	}
+
+	result = PyLong_FromSize_t(output.pos);
+
+finally:
+	PyBuffer_Release(&dest);
+
+	return result;
+}
+
+static PyObject* reader_readinto1(ZstdCompressionReader* self, PyObject* args) {
+	Py_buffer dest;
+	PyObject* result = NULL;
+	ZSTD_outBuffer output;
+	int compressResult;
+	size_t oldPos;
+	size_t zresult;
+
+	if (self->closed) {
+		PyErr_SetString(PyExc_ValueError, "stream is closed");
+		return NULL;
+	}
+
+	if (self->finishedOutput) {
+		return PyLong_FromLong(0);
+	}
+
+	if (!PyArg_ParseTuple(args, "w*:readinto1", &dest)) {
+		return NULL;
+	}
+
+	if (!PyBuffer_IsContiguous(&dest, 'C') || dest.ndim > 1) {
+		PyErr_SetString(PyExc_ValueError,
+		    "destination buffer should be contiguous and have at most one dimension");
+		goto finally;
+	}
+
+	output.dst = dest.buf;
+	output.size = dest.len;
+	output.pos = 0;
+
+	compressResult = compress_input(self, &output);
+
+	if (-1 == compressResult) {
+		goto finally;
+	}
+	else if (0 == compressResult || 1 == compressResult) { }
+	else {
+		assert(0);
+	}
+
+	if (output.pos) {
+		result = PyLong_FromSize_t(output.pos);
+		goto finally;
+	}
+
+	while (!self->finishedInput) {
+		int readResult = read_compressor_input(self);
+
+		if (-1 == readResult) {
+			goto finally;
+		}
+		else if (0 == readResult || 1 == readResult) { }
+		else {
+			assert(0);
+		}
+
+		compressResult = compress_input(self, &output);
+
+		if (-1 == compressResult) {
+			goto finally;
+		}
+		else if (0 == compressResult) { }
+		else if (1 == compressResult) {
+			result = PyLong_FromSize_t(output.pos);
+			goto finally;
+		}
+		else {
+			assert(0);
+		}
+
+		/* If we produced output and we're not done with input, emit
+		 * that output now, as we've hit restrictions of read1().
+		 */
+		if (output.pos && !self->finishedInput) {
+			result = PyLong_FromSize_t(output.pos);
+			goto finally;
+		}
+
+		/* Otherwise we either have no output or we've exhausted the
+		 * input. Either we try to get more input or we fall through
+		 * to EOF below */
+	}
+
+	/* EOF */
+	oldPos = output.pos;
+
+	zresult = ZSTD_compressStream2(self->compressor->cctx, &output, &self->input,
+	    ZSTD_e_end);
+
+	self->bytesCompressed += self->output.pos - oldPos;
+
+	if (ZSTD_isError(zresult)) {
+		PyErr_Format(ZstdError, "error ending compression stream: %s",
+		    ZSTD_getErrorName(zresult));
+		goto finally;
+	}
+
+	assert(output.pos);
+
+	if (0 == zresult) {
+		self->finishedOutput = 1;
+	}
+
+	result = PyLong_FromSize_t(output.pos);
+
+finally:
+	PyBuffer_Release(&dest);
+
+	return result;
 }
 
 static PyObject* reader_iter(PyObject* self) {
@@ -315,7 +744,10 @@
 	{ "readable", (PyCFunction)reader_readable, METH_NOARGS,
 	PyDoc_STR("Returns True") },
 	{ "read", (PyCFunction)reader_read, METH_VARARGS | METH_KEYWORDS, PyDoc_STR("read compressed data") },
+	{ "read1", (PyCFunction)reader_read1, METH_VARARGS | METH_KEYWORDS, NULL },
 	{ "readall", (PyCFunction)reader_readall, METH_NOARGS, PyDoc_STR("Not implemented") },
+	{ "readinto", (PyCFunction)reader_readinto, METH_VARARGS, NULL },
+	{ "readinto1", (PyCFunction)reader_readinto1, METH_VARARGS, NULL },
 	{ "readline", (PyCFunction)reader_readline, METH_VARARGS, PyDoc_STR("Not implemented") },
 	{ "readlines", (PyCFunction)reader_readlines, METH_VARARGS, PyDoc_STR("Not implemented") },
 	{ "seekable", (PyCFunction)reader_seekable, METH_NOARGS,