--- a/contrib/python-zstandard/c-ext/decompressionreader.c Thu Apr 04 15:24:03 2019 -0700
+++ b/contrib/python-zstandard/c-ext/decompressionreader.c Thu Apr 04 17:34:43 2019 -0700
@@ -102,6 +102,114 @@
Py_RETURN_FALSE;
}
+/**
+ * Read available input.
+ *
+ * Returns 0 if no data was added to input.
+ * Returns 1 if new input data is available.
+ * Returns -1 on error and sets a Python exception as a side-effect.
+ */
+int read_decompressor_input(ZstdDecompressionReader* self) {
+ if (self->finishedInput) {
+ return 0;
+ }
+
+ if (self->input.pos != self->input.size) {
+ return 0;
+ }
+
+ if (self->reader) {
+ Py_buffer buffer;
+
+ assert(self->readResult == NULL);
+ self->readResult = PyObject_CallMethod(self->reader, "read",
+ "k", self->readSize);
+ if (NULL == self->readResult) {
+ return -1;
+ }
+
+ memset(&buffer, 0, sizeof(buffer));
+
+ if (0 != PyObject_GetBuffer(self->readResult, &buffer, PyBUF_CONTIG_RO)) {
+ return -1;
+ }
+
+ /* EOF */
+ if (0 == buffer.len) {
+ self->finishedInput = 1;
+ Py_CLEAR(self->readResult);
+ }
+ else {
+ self->input.src = buffer.buf;
+ self->input.size = buffer.len;
+ self->input.pos = 0;
+ }
+
+ PyBuffer_Release(&buffer);
+ }
+ else {
+ assert(self->buffer.buf);
+ /*
+ * We should only get here once since expectation is we always
+ * exhaust input buffer before reading again.
+ */
+ assert(self->input.src == NULL);
+
+ self->input.src = self->buffer.buf;
+ self->input.size = self->buffer.len;
+ self->input.pos = 0;
+ }
+
+ return 1;
+}
+
+/**
+ * Decompresses available input into an output buffer.
+ *
+ * Returns 0 if we need more input.
+ * Returns 1 if output buffer should be emitted.
+ * Returns -1 on error and sets a Python exception.
+ */
+int decompress_input(ZstdDecompressionReader* self, ZSTD_outBuffer* output) {
+ size_t zresult;
+
+ if (self->input.pos >= self->input.size) {
+ return 0;
+ }
+
+ Py_BEGIN_ALLOW_THREADS
+ zresult = ZSTD_decompressStream(self->decompressor->dctx, output, &self->input);
+ Py_END_ALLOW_THREADS
+
+ /* Input exhausted. Clear our state tracking. */
+ if (self->input.pos == self->input.size) {
+ memset(&self->input, 0, sizeof(self->input));
+ Py_CLEAR(self->readResult);
+
+ if (self->buffer.buf) {
+ self->finishedInput = 1;
+ }
+ }
+
+ if (ZSTD_isError(zresult)) {
+ PyErr_Format(ZstdError, "zstd decompress error: %s", ZSTD_getErrorName(zresult));
+ return -1;
+ }
+
+ /* We fulfilled the full read request. Signal to emit. */
+ if (output->pos && output->pos == output->size) {
+ return 1;
+ }
+ /* We're at the end of a frame and we aren't allowed to return data
+ spanning frames. */
+ else if (output->pos && zresult == 0 && !self->readAcrossFrames) {
+ return 1;
+ }
+
+ /* There is more room in the output. Signal to collect more data. */
+ return 0;
+}
+
static PyObject* reader_read(ZstdDecompressionReader* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"size",
@@ -113,26 +221,30 @@
char* resultBuffer;
Py_ssize_t resultSize;
ZSTD_outBuffer output;
- size_t zresult;
+ int decompressResult, readResult;
if (self->closed) {
PyErr_SetString(PyExc_ValueError, "stream is closed");
return NULL;
}
- if (self->finishedOutput) {
- return PyBytes_FromStringAndSize("", 0);
- }
-
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "n", kwlist, &size)) {
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|n", kwlist, &size)) {
return NULL;
}
- if (size < 1) {
- PyErr_SetString(PyExc_ValueError, "cannot read negative or size 0 amounts");
+ if (size < -1) {
+ PyErr_SetString(PyExc_ValueError, "cannot read negative amounts less than -1");
return NULL;
}
+ if (size == -1) {
+ return PyObject_CallMethod((PyObject*)self, "readall", NULL);
+ }
+
+ if (self->finishedOutput || size == 0) {
+ return PyBytes_FromStringAndSize("", 0);
+ }
+
result = PyBytes_FromStringAndSize(NULL, size);
if (NULL == result) {
return NULL;
@@ -146,85 +258,38 @@
readinput:
- /* Consume input data left over from last time. */
- if (self->input.pos < self->input.size) {
- Py_BEGIN_ALLOW_THREADS
- zresult = ZSTD_decompress_generic(self->decompressor->dctx,
- &output, &self->input);
- Py_END_ALLOW_THREADS
+ decompressResult = decompress_input(self, &output);
- /* Input exhausted. Clear our state tracking. */
- if (self->input.pos == self->input.size) {
- memset(&self->input, 0, sizeof(self->input));
- Py_CLEAR(self->readResult);
+ if (-1 == decompressResult) {
+ Py_XDECREF(result);
+ return NULL;
+ }
+ else if (0 == decompressResult) { }
+ else if (1 == decompressResult) {
+ self->bytesDecompressed += output.pos;
- if (self->buffer.buf) {
- self->finishedInput = 1;
+ if (output.pos != output.size) {
+ if (safe_pybytes_resize(&result, output.pos)) {
+ Py_XDECREF(result);
+ return NULL;
}
}
-
- if (ZSTD_isError(zresult)) {
- PyErr_Format(ZstdError, "zstd decompress error: %s", ZSTD_getErrorName(zresult));
- return NULL;
- }
- else if (0 == zresult) {
- self->finishedOutput = 1;
- }
-
- /* We fulfilled the full read request. Emit it. */
- if (output.pos && output.pos == output.size) {
- self->bytesDecompressed += output.size;
- return result;
- }
-
- /*
- * There is more room in the output. Fall through to try to collect
- * more data so we can try to fill the output.
- */
+ return result;
+ }
+ else {
+ assert(0);
}
- if (!self->finishedInput) {
- if (self->reader) {
- Py_buffer buffer;
-
- assert(self->readResult == NULL);
- self->readResult = PyObject_CallMethod(self->reader, "read",
- "k", self->readSize);
- if (NULL == self->readResult) {
- return NULL;
- }
-
- memset(&buffer, 0, sizeof(buffer));
-
- if (0 != PyObject_GetBuffer(self->readResult, &buffer, PyBUF_CONTIG_RO)) {
- return NULL;
- }
+ readResult = read_decompressor_input(self);
- /* EOF */
- if (0 == buffer.len) {
- self->finishedInput = 1;
- Py_CLEAR(self->readResult);
- }
- else {
- self->input.src = buffer.buf;
- self->input.size = buffer.len;
- self->input.pos = 0;
- }
-
- PyBuffer_Release(&buffer);
- }
- else {
- assert(self->buffer.buf);
- /*
- * We should only get here once since above block will exhaust
- * source buffer until finishedInput is set.
- */
- assert(self->input.src == NULL);
-
- self->input.src = self->buffer.buf;
- self->input.size = self->buffer.len;
- self->input.pos = 0;
- }
+ if (-1 == readResult) {
+ Py_XDECREF(result);
+ return NULL;
+ }
+ else if (0 == readResult) {}
+ else if (1 == readResult) {}
+ else {
+ assert(0);
}
if (self->input.size) {
@@ -242,18 +307,288 @@
return result;
}
+static PyObject* reader_read1(ZstdDecompressionReader* self, PyObject* args, PyObject* kwargs) {
+ static char* kwlist[] = {
+ "size",
+ NULL
+ };
+
+ Py_ssize_t size = -1;
+ PyObject* result = NULL;
+ char* resultBuffer;
+ Py_ssize_t resultSize;
+ ZSTD_outBuffer output;
+
+ if (self->closed) {
+ PyErr_SetString(PyExc_ValueError, "stream is closed");
+ return NULL;
+ }
+
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|n", kwlist, &size)) {
+ return NULL;
+ }
+
+ if (size < -1) {
+ PyErr_SetString(PyExc_ValueError, "cannot read negative amounts less than -1");
+ return NULL;
+ }
+
+ if (self->finishedOutput || size == 0) {
+ return PyBytes_FromStringAndSize("", 0);
+ }
+
+ if (size == -1) {
+ size = ZSTD_DStreamOutSize();
+ }
+
+ result = PyBytes_FromStringAndSize(NULL, size);
+ if (NULL == result) {
+ return NULL;
+ }
+
+ PyBytes_AsStringAndSize(result, &resultBuffer, &resultSize);
+
+ output.dst = resultBuffer;
+ output.size = resultSize;
+ output.pos = 0;
+
+ /* read1() is supposed to use at most 1 read() from the underlying stream.
+ * However, we can't satisfy this requirement with decompression due to the
+ * nature of how decompression works. Our strategy is to read + decompress
+ * until we get any output, at which point we return. This satisfies the
+ * intent of the read1() API to limit read operations.
+ */
+ while (!self->finishedInput) {
+ int readResult, decompressResult;
+
+ readResult = read_decompressor_input(self);
+ if (-1 == readResult) {
+ Py_XDECREF(result);
+ return NULL;
+ }
+ else if (0 == readResult || 1 == readResult) { }
+ else {
+ assert(0);
+ }
+
+ decompressResult = decompress_input(self, &output);
+
+ if (-1 == decompressResult) {
+ Py_XDECREF(result);
+ return NULL;
+ }
+ else if (0 == decompressResult || 1 == decompressResult) { }
+ else {
+ assert(0);
+ }
+
+ if (output.pos) {
+ break;
+ }
+ }
+
+ self->bytesDecompressed += output.pos;
+ if (safe_pybytes_resize(&result, output.pos)) {
+ Py_XDECREF(result);
+ return NULL;
+ }
+
+ return result;
+}
+
+static PyObject* reader_readinto(ZstdDecompressionReader* self, PyObject* args) {
+ Py_buffer dest;
+ ZSTD_outBuffer output;
+ int decompressResult, readResult;
+ PyObject* result = NULL;
+
+ if (self->closed) {
+ PyErr_SetString(PyExc_ValueError, "stream is closed");
+ return NULL;
+ }
+
+ if (self->finishedOutput) {
+ return PyLong_FromLong(0);
+ }
+
+ if (!PyArg_ParseTuple(args, "w*:readinto", &dest)) {
+ return NULL;
+ }
+
+ if (!PyBuffer_IsContiguous(&dest, 'C') || dest.ndim > 1) {
+ PyErr_SetString(PyExc_ValueError,
+ "destination buffer should be contiguous and have at most one dimension");
+ goto finally;
+ }
+
+ output.dst = dest.buf;
+ output.size = dest.len;
+ output.pos = 0;
+
+readinput:
+
+ decompressResult = decompress_input(self, &output);
+
+ if (-1 == decompressResult) {
+ goto finally;
+ }
+ else if (0 == decompressResult) { }
+ else if (1 == decompressResult) {
+ self->bytesDecompressed += output.pos;
+ result = PyLong_FromSize_t(output.pos);
+ goto finally;
+ }
+ else {
+ assert(0);
+ }
+
+ readResult = read_decompressor_input(self);
+
+ if (-1 == readResult) {
+ goto finally;
+ }
+ else if (0 == readResult) {}
+ else if (1 == readResult) {}
+ else {
+ assert(0);
+ }
+
+ if (self->input.size) {
+ goto readinput;
+ }
+
+ /* EOF */
+ self->bytesDecompressed += output.pos;
+ result = PyLong_FromSize_t(output.pos);
+
+finally:
+ PyBuffer_Release(&dest);
+
+ return result;
+}
+
+static PyObject* reader_readinto1(ZstdDecompressionReader* self, PyObject* args) {
+ Py_buffer dest;
+ ZSTD_outBuffer output;
+ PyObject* result = NULL;
+
+ if (self->closed) {
+ PyErr_SetString(PyExc_ValueError, "stream is closed");
+ return NULL;
+ }
+
+ if (self->finishedOutput) {
+ return PyLong_FromLong(0);
+ }
+
+ if (!PyArg_ParseTuple(args, "w*:readinto1", &dest)) {
+ return NULL;
+ }
+
+ if (!PyBuffer_IsContiguous(&dest, 'C') || dest.ndim > 1) {
+ PyErr_SetString(PyExc_ValueError,
+ "destination buffer should be contiguous and have at most one dimension");
+ goto finally;
+ }
+
+ output.dst = dest.buf;
+ output.size = dest.len;
+ output.pos = 0;
+
+ while (!self->finishedInput && !self->finishedOutput) {
+ int decompressResult, readResult;
+
+ readResult = read_decompressor_input(self);
+
+ if (-1 == readResult) {
+ goto finally;
+ }
+ else if (0 == readResult || 1 == readResult) {}
+ else {
+ assert(0);
+ }
+
+ decompressResult = decompress_input(self, &output);
+
+ if (-1 == decompressResult) {
+ goto finally;
+ }
+ else if (0 == decompressResult || 1 == decompressResult) {}
+ else {
+ assert(0);
+ }
+
+ if (output.pos) {
+ break;
+ }
+ }
+
+ self->bytesDecompressed += output.pos;
+ result = PyLong_FromSize_t(output.pos);
+
+finally:
+ PyBuffer_Release(&dest);
+
+ return result;
+}
+
static PyObject* reader_readall(PyObject* self) {
- PyErr_SetNone(PyExc_NotImplementedError);
- return NULL;
+ PyObject* chunks = NULL;
+ PyObject* empty = NULL;
+ PyObject* result = NULL;
+
+ /* Our strategy is to collect chunks into a list then join all the
+ * chunks at the end. We could potentially use e.g. an io.BytesIO. But
+ * this feels simple enough to implement and avoids potentially expensive
+ * reallocations of large buffers.
+ */
+ chunks = PyList_New(0);
+ if (NULL == chunks) {
+ return NULL;
+ }
+
+ while (1) {
+ PyObject* chunk = PyObject_CallMethod(self, "read", "i", 1048576);
+ if (NULL == chunk) {
+ Py_DECREF(chunks);
+ return NULL;
+ }
+
+ if (!PyBytes_Size(chunk)) {
+ Py_DECREF(chunk);
+ break;
+ }
+
+ if (PyList_Append(chunks, chunk)) {
+ Py_DECREF(chunk);
+ Py_DECREF(chunks);
+ return NULL;
+ }
+
+ Py_DECREF(chunk);
+ }
+
+ empty = PyBytes_FromStringAndSize("", 0);
+ if (NULL == empty) {
+ Py_DECREF(chunks);
+ return NULL;
+ }
+
+ result = PyObject_CallMethod(empty, "join", "O", chunks);
+
+ Py_DECREF(empty);
+ Py_DECREF(chunks);
+
+ return result;
}
static PyObject* reader_readline(PyObject* self) {
- PyErr_SetNone(PyExc_NotImplementedError);
+ set_unsupported_operation();
return NULL;
}
static PyObject* reader_readlines(PyObject* self) {
- PyErr_SetNone(PyExc_NotImplementedError);
+ set_unsupported_operation();
return NULL;
}
@@ -345,12 +680,12 @@
}
static PyObject* reader_iter(PyObject* self) {
- PyErr_SetNone(PyExc_NotImplementedError);
+ set_unsupported_operation();
return NULL;
}
static PyObject* reader_iternext(PyObject* self) {
- PyErr_SetNone(PyExc_NotImplementedError);
+ set_unsupported_operation();
return NULL;
}
@@ -367,6 +702,10 @@
PyDoc_STR("Returns True") },
{ "read", (PyCFunction)reader_read, METH_VARARGS | METH_KEYWORDS,
PyDoc_STR("read compressed data") },
+ { "read1", (PyCFunction)reader_read1, METH_VARARGS | METH_KEYWORDS,
+ PyDoc_STR("read compressed data") },
+ { "readinto", (PyCFunction)reader_readinto, METH_VARARGS, NULL },
+ { "readinto1", (PyCFunction)reader_readinto1, METH_VARARGS, NULL },
{ "readall", (PyCFunction)reader_readall, METH_NOARGS, PyDoc_STR("Not implemented") },
{ "readline", (PyCFunction)reader_readline, METH_NOARGS, PyDoc_STR("Not implemented") },
{ "readlines", (PyCFunction)reader_readlines, METH_NOARGS, PyDoc_STR("Not implemented") },