Mercurial > public > mercurial-scm > hg-stable
diff contrib/python-zstandard/c-ext/decompressionreader.c @ 42070:675775c33ab6
zstandard: vendor python-zstandard 0.11
The upstream source distribution from PyPI was extracted. Unwanted
files were removed.
The clang-format ignore list was updated to reflect the new source
of files.
The project contains a vendored copy of zstandard 1.3.8. The old
version was 1.3.6. This should result in some minor performance wins.
test-check-py3-compat.t was updated to reflect now-passing tests on
Python 3.8.
Some HTTP tests were updated to reflect new zstd compression output.
# no-check-commit because 3rd party code has different style guidelines
Differential Revision: https://phab.mercurial-scm.org/D6199
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Thu, 04 Apr 2019 17:34:43 -0700 |
parents | 73fef626dae3 |
children | 69de49c4e39c |
line wrap: on
line diff
--- a/contrib/python-zstandard/c-ext/decompressionreader.c Thu Apr 04 15:24:03 2019 -0700 +++ b/contrib/python-zstandard/c-ext/decompressionreader.c Thu Apr 04 17:34:43 2019 -0700 @@ -102,6 +102,114 @@ Py_RETURN_FALSE; } +/** + * Read available input. + * + * Returns 0 if no data was added to input. + * Returns 1 if new input data is available. + * Returns -1 on error and sets a Python exception as a side-effect. + */ +int read_decompressor_input(ZstdDecompressionReader* self) { + if (self->finishedInput) { + return 0; + } + + if (self->input.pos != self->input.size) { + return 0; + } + + if (self->reader) { + Py_buffer buffer; + + assert(self->readResult == NULL); + self->readResult = PyObject_CallMethod(self->reader, "read", + "k", self->readSize); + if (NULL == self->readResult) { + return -1; + } + + memset(&buffer, 0, sizeof(buffer)); + + if (0 != PyObject_GetBuffer(self->readResult, &buffer, PyBUF_CONTIG_RO)) { + return -1; + } + + /* EOF */ + if (0 == buffer.len) { + self->finishedInput = 1; + Py_CLEAR(self->readResult); + } + else { + self->input.src = buffer.buf; + self->input.size = buffer.len; + self->input.pos = 0; + } + + PyBuffer_Release(&buffer); + } + else { + assert(self->buffer.buf); + /* + * We should only get here once since expectation is we always + * exhaust input buffer before reading again. + */ + assert(self->input.src == NULL); + + self->input.src = self->buffer.buf; + self->input.size = self->buffer.len; + self->input.pos = 0; + } + + return 1; +} + +/** + * Decompresses available input into an output buffer. + * + * Returns 0 if we need more input. + * Returns 1 if output buffer should be emitted. + * Returns -1 on error and sets a Python exception. + */ +int decompress_input(ZstdDecompressionReader* self, ZSTD_outBuffer* output) { + size_t zresult; + + if (self->input.pos >= self->input.size) { + return 0; + } + + Py_BEGIN_ALLOW_THREADS + zresult = ZSTD_decompressStream(self->decompressor->dctx, output, &self->input); + Py_END_ALLOW_THREADS + + /* Input exhausted. Clear our state tracking. */ + if (self->input.pos == self->input.size) { + memset(&self->input, 0, sizeof(self->input)); + Py_CLEAR(self->readResult); + + if (self->buffer.buf) { + self->finishedInput = 1; + } + } + + if (ZSTD_isError(zresult)) { + PyErr_Format(ZstdError, "zstd decompress error: %s", ZSTD_getErrorName(zresult)); + return -1; + } + + /* We fulfilled the full read request. Signal to emit. */ + if (output->pos && output->pos == output->size) { + return 1; + } + /* We're at the end of a frame and we aren't allowed to return data + spanning frames. */ + else if (output->pos && zresult == 0 && !self->readAcrossFrames) { + return 1; + } + + /* There is more room in the output. Signal to collect more data. */ + return 0; +} + static PyObject* reader_read(ZstdDecompressionReader* self, PyObject* args, PyObject* kwargs) { static char* kwlist[] = { "size", @@ -113,26 +221,30 @@ char* resultBuffer; Py_ssize_t resultSize; ZSTD_outBuffer output; - size_t zresult; + int decompressResult, readResult; if (self->closed) { PyErr_SetString(PyExc_ValueError, "stream is closed"); return NULL; } - if (self->finishedOutput) { - return PyBytes_FromStringAndSize("", 0); - } - - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "n", kwlist, &size)) { + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|n", kwlist, &size)) { return NULL; } - if (size < 1) { - PyErr_SetString(PyExc_ValueError, "cannot read negative or size 0 amounts"); + if (size < -1) { + PyErr_SetString(PyExc_ValueError, "cannot read negative amounts less than -1"); return NULL; } + if (size == -1) { + return PyObject_CallMethod((PyObject*)self, "readall", NULL); + } + + if (self->finishedOutput || size == 0) { + return PyBytes_FromStringAndSize("", 0); + } + result = PyBytes_FromStringAndSize(NULL, size); if (NULL == result) { return NULL; @@ -146,85 +258,38 @@ readinput: - /* Consume input data left over from last time. */ - if (self->input.pos < self->input.size) { - Py_BEGIN_ALLOW_THREADS - zresult = ZSTD_decompress_generic(self->decompressor->dctx, - &output, &self->input); - Py_END_ALLOW_THREADS + decompressResult = decompress_input(self, &output); - /* Input exhausted. Clear our state tracking. */ - if (self->input.pos == self->input.size) { - memset(&self->input, 0, sizeof(self->input)); - Py_CLEAR(self->readResult); + if (-1 == decompressResult) { + Py_XDECREF(result); + return NULL; + } + else if (0 == decompressResult) { } + else if (1 == decompressResult) { + self->bytesDecompressed += output.pos; - if (self->buffer.buf) { - self->finishedInput = 1; + if (output.pos != output.size) { + if (safe_pybytes_resize(&result, output.pos)) { + Py_XDECREF(result); + return NULL; } } - - if (ZSTD_isError(zresult)) { - PyErr_Format(ZstdError, "zstd decompress error: %s", ZSTD_getErrorName(zresult)); - return NULL; - } - else if (0 == zresult) { - self->finishedOutput = 1; - } - - /* We fulfilled the full read request. Emit it. */ - if (output.pos && output.pos == output.size) { - self->bytesDecompressed += output.size; - return result; - } - - /* - * There is more room in the output. Fall through to try to collect - * more data so we can try to fill the output. - */ + return result; + } + else { + assert(0); } - if (!self->finishedInput) { - if (self->reader) { - Py_buffer buffer; - - assert(self->readResult == NULL); - self->readResult = PyObject_CallMethod(self->reader, "read", - "k", self->readSize); - if (NULL == self->readResult) { - return NULL; - } - - memset(&buffer, 0, sizeof(buffer)); - - if (0 != PyObject_GetBuffer(self->readResult, &buffer, PyBUF_CONTIG_RO)) { - return NULL; - } + readResult = read_decompressor_input(self); - /* EOF */ - if (0 == buffer.len) { - self->finishedInput = 1; - Py_CLEAR(self->readResult); - } - else { - self->input.src = buffer.buf; - self->input.size = buffer.len; - self->input.pos = 0; - } - - PyBuffer_Release(&buffer); - } - else { - assert(self->buffer.buf); - /* - * We should only get here once since above block will exhaust - * source buffer until finishedInput is set. - */ - assert(self->input.src == NULL); - - self->input.src = self->buffer.buf; - self->input.size = self->buffer.len; - self->input.pos = 0; - } + if (-1 == readResult) { + Py_XDECREF(result); + return NULL; + } + else if (0 == readResult) {} + else if (1 == readResult) {} + else { + assert(0); } if (self->input.size) { @@ -242,18 +307,288 @@ return result; } +static PyObject* reader_read1(ZstdDecompressionReader* self, PyObject* args, PyObject* kwargs) { + static char* kwlist[] = { + "size", + NULL + }; + + Py_ssize_t size = -1; + PyObject* result = NULL; + char* resultBuffer; + Py_ssize_t resultSize; + ZSTD_outBuffer output; + + if (self->closed) { + PyErr_SetString(PyExc_ValueError, "stream is closed"); + return NULL; + } + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|n", kwlist, &size)) { + return NULL; + } + + if (size < -1) { + PyErr_SetString(PyExc_ValueError, "cannot read negative amounts less than -1"); + return NULL; + } + + if (self->finishedOutput || size == 0) { + return PyBytes_FromStringAndSize("", 0); + } + + if (size == -1) { + size = ZSTD_DStreamOutSize(); + } + + result = PyBytes_FromStringAndSize(NULL, size); + if (NULL == result) { + return NULL; + } + + PyBytes_AsStringAndSize(result, &resultBuffer, &resultSize); + + output.dst = resultBuffer; + output.size = resultSize; + output.pos = 0; + + /* read1() is supposed to use at most 1 read() from the underlying stream. + * However, we can't satisfy this requirement with decompression due to the + * nature of how decompression works. Our strategy is to read + decompress + * until we get any output, at which point we return. This satisfies the + * intent of the read1() API to limit read operations. + */ + while (!self->finishedInput) { + int readResult, decompressResult; + + readResult = read_decompressor_input(self); + if (-1 == readResult) { + Py_XDECREF(result); + return NULL; + } + else if (0 == readResult || 1 == readResult) { } + else { + assert(0); + } + + decompressResult = decompress_input(self, &output); + + if (-1 == decompressResult) { + Py_XDECREF(result); + return NULL; + } + else if (0 == decompressResult || 1 == decompressResult) { } + else { + assert(0); + } + + if (output.pos) { + break; + } + } + + self->bytesDecompressed += output.pos; + if (safe_pybytes_resize(&result, output.pos)) { + Py_XDECREF(result); + return NULL; + } + + return result; +} + +static PyObject* reader_readinto(ZstdDecompressionReader* self, PyObject* args) { + Py_buffer dest; + ZSTD_outBuffer output; + int decompressResult, readResult; + PyObject* result = NULL; + + if (self->closed) { + PyErr_SetString(PyExc_ValueError, "stream is closed"); + return NULL; + } + + if (self->finishedOutput) { + return PyLong_FromLong(0); + } + + if (!PyArg_ParseTuple(args, "w*:readinto", &dest)) { + return NULL; + } + + if (!PyBuffer_IsContiguous(&dest, 'C') || dest.ndim > 1) { + PyErr_SetString(PyExc_ValueError, + "destination buffer should be contiguous and have at most one dimension"); + goto finally; + } + + output.dst = dest.buf; + output.size = dest.len; + output.pos = 0; + +readinput: + + decompressResult = decompress_input(self, &output); + + if (-1 == decompressResult) { + goto finally; + } + else if (0 == decompressResult) { } + else if (1 == decompressResult) { + self->bytesDecompressed += output.pos; + result = PyLong_FromSize_t(output.pos); + goto finally; + } + else { + assert(0); + } + + readResult = read_decompressor_input(self); + + if (-1 == readResult) { + goto finally; + } + else if (0 == readResult) {} + else if (1 == readResult) {} + else { + assert(0); + } + + if (self->input.size) { + goto readinput; + } + + /* EOF */ + self->bytesDecompressed += output.pos; + result = PyLong_FromSize_t(output.pos); + +finally: + PyBuffer_Release(&dest); + + return result; +} + +static PyObject* reader_readinto1(ZstdDecompressionReader* self, PyObject* args) { + Py_buffer dest; + ZSTD_outBuffer output; + PyObject* result = NULL; + + if (self->closed) { + PyErr_SetString(PyExc_ValueError, "stream is closed"); + return NULL; + } + + if (self->finishedOutput) { + return PyLong_FromLong(0); + } + + if (!PyArg_ParseTuple(args, "w*:readinto1", &dest)) { + return NULL; + } + + if (!PyBuffer_IsContiguous(&dest, 'C') || dest.ndim > 1) { + PyErr_SetString(PyExc_ValueError, + "destination buffer should be contiguous and have at most one dimension"); + goto finally; + } + + output.dst = dest.buf; + output.size = dest.len; + output.pos = 0; + + while (!self->finishedInput && !self->finishedOutput) { + int decompressResult, readResult; + + readResult = read_decompressor_input(self); + + if (-1 == readResult) { + goto finally; + } + else if (0 == readResult || 1 == readResult) {} + else { + assert(0); + } + + decompressResult = decompress_input(self, &output); + + if (-1 == decompressResult) { + goto finally; + } + else if (0 == decompressResult || 1 == decompressResult) {} + else { + assert(0); + } + + if (output.pos) { + break; + } + } + + self->bytesDecompressed += output.pos; + result = PyLong_FromSize_t(output.pos); + +finally: + PyBuffer_Release(&dest); + + return result; +} + static PyObject* reader_readall(PyObject* self) { - PyErr_SetNone(PyExc_NotImplementedError); - return NULL; + PyObject* chunks = NULL; + PyObject* empty = NULL; + PyObject* result = NULL; + + /* Our strategy is to collect chunks into a list then join all the + * chunks at the end. We could potentially use e.g. an io.BytesIO. But + * this feels simple enough to implement and avoids potentially expensive + * reallocations of large buffers. + */ + chunks = PyList_New(0); + if (NULL == chunks) { + return NULL; + } + + while (1) { + PyObject* chunk = PyObject_CallMethod(self, "read", "i", 1048576); + if (NULL == chunk) { + Py_DECREF(chunks); + return NULL; + } + + if (!PyBytes_Size(chunk)) { + Py_DECREF(chunk); + break; + } + + if (PyList_Append(chunks, chunk)) { + Py_DECREF(chunk); + Py_DECREF(chunks); + return NULL; + } + + Py_DECREF(chunk); + } + + empty = PyBytes_FromStringAndSize("", 0); + if (NULL == empty) { + Py_DECREF(chunks); + return NULL; + } + + result = PyObject_CallMethod(empty, "join", "O", chunks); + + Py_DECREF(empty); + Py_DECREF(chunks); + + return result; } static PyObject* reader_readline(PyObject* self) { - PyErr_SetNone(PyExc_NotImplementedError); + set_unsupported_operation(); return NULL; } static PyObject* reader_readlines(PyObject* self) { - PyErr_SetNone(PyExc_NotImplementedError); + set_unsupported_operation(); return NULL; } @@ -345,12 +680,12 @@ } static PyObject* reader_iter(PyObject* self) { - PyErr_SetNone(PyExc_NotImplementedError); + set_unsupported_operation(); return NULL; } static PyObject* reader_iternext(PyObject* self) { - PyErr_SetNone(PyExc_NotImplementedError); + set_unsupported_operation(); return NULL; } @@ -367,6 +702,10 @@ PyDoc_STR("Returns True") }, { "read", (PyCFunction)reader_read, METH_VARARGS | METH_KEYWORDS, PyDoc_STR("read compressed data") }, + { "read1", (PyCFunction)reader_read1, METH_VARARGS | METH_KEYWORDS, + PyDoc_STR("read compressed data") }, + { "readinto", (PyCFunction)reader_readinto, METH_VARARGS, NULL }, + { "readinto1", (PyCFunction)reader_readinto1, METH_VARARGS, NULL }, { "readall", (PyCFunction)reader_readall, METH_NOARGS, PyDoc_STR("Not implemented") }, { "readline", (PyCFunction)reader_readline, METH_NOARGS, PyDoc_STR("Not implemented") }, { "readlines", (PyCFunction)reader_readlines, METH_NOARGS, PyDoc_STR("Not implemented") },