--- a/contrib/python-zstandard/c-ext/compressionreader.c Thu Apr 04 15:24:03 2019 -0700
+++ b/contrib/python-zstandard/c-ext/compressionreader.c Thu Apr 04 17:34:43 2019 -0700
@@ -128,6 +128,96 @@
return PyLong_FromUnsignedLongLong(self->bytesCompressed);
}
+int read_compressor_input(ZstdCompressionReader* self) {
+ if (self->finishedInput) {
+ return 0;
+ }
+
+ if (self->input.pos != self->input.size) {
+ return 0;
+ }
+
+ if (self->reader) {
+ Py_buffer buffer;
+
+ assert(self->readResult == NULL);
+
+ self->readResult = PyObject_CallMethod(self->reader, "read",
+ "k", self->readSize);
+
+ if (NULL == self->readResult) {
+ return -1;
+ }
+
+ memset(&buffer, 0, sizeof(buffer));
+
+ if (0 != PyObject_GetBuffer(self->readResult, &buffer, PyBUF_CONTIG_RO)) {
+ return -1;
+ }
+
+ /* EOF */
+ if (0 == buffer.len) {
+ self->finishedInput = 1;
+ Py_CLEAR(self->readResult);
+ }
+ else {
+ self->input.src = buffer.buf;
+ self->input.size = buffer.len;
+ self->input.pos = 0;
+ }
+
+ PyBuffer_Release(&buffer);
+ }
+ else {
+ assert(self->buffer.buf);
+
+ self->input.src = self->buffer.buf;
+ self->input.size = self->buffer.len;
+ self->input.pos = 0;
+ }
+
+ return 1;
+}
+
+int compress_input(ZstdCompressionReader* self, ZSTD_outBuffer* output) {
+ size_t oldPos;
+ size_t zresult;
+
+ /* If we have data left over, consume it. */
+ if (self->input.pos < self->input.size) {
+ oldPos = output->pos;
+
+ Py_BEGIN_ALLOW_THREADS
+ zresult = ZSTD_compressStream2(self->compressor->cctx,
+ output, &self->input, ZSTD_e_continue);
+ Py_END_ALLOW_THREADS
+
+ self->bytesCompressed += output->pos - oldPos;
+
+ /* Input exhausted. Clear out state tracking. */
+ if (self->input.pos == self->input.size) {
+ memset(&self->input, 0, sizeof(self->input));
+ Py_CLEAR(self->readResult);
+
+ if (self->buffer.buf) {
+ self->finishedInput = 1;
+ }
+ }
+
+ if (ZSTD_isError(zresult)) {
+ PyErr_Format(ZstdError, "zstd compress error: %s", ZSTD_getErrorName(zresult));
+ return -1;
+ }
+ }
+
+ if (output->pos && output->pos == output->size) {
+ return 1;
+ }
+ else {
+ return 0;
+ }
+}
+
static PyObject* reader_read(ZstdCompressionReader* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"size",
@@ -140,25 +230,30 @@
Py_ssize_t resultSize;
size_t zresult;
size_t oldPos;
+ int readResult, compressResult;
if (self->closed) {
PyErr_SetString(PyExc_ValueError, "stream is closed");
return NULL;
}
- if (self->finishedOutput) {
- return PyBytes_FromStringAndSize("", 0);
- }
-
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "n", kwlist, &size)) {
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|n", kwlist, &size)) {
return NULL;
}
- if (size < 1) {
- PyErr_SetString(PyExc_ValueError, "cannot read negative or size 0 amounts");
+ if (size < -1) {
+ PyErr_SetString(PyExc_ValueError, "cannot read negative amounts less than -1");
return NULL;
}
+ if (size == -1) {
+ return PyObject_CallMethod((PyObject*)self, "readall", NULL);
+ }
+
+ if (self->finishedOutput || size == 0) {
+ return PyBytes_FromStringAndSize("", 0);
+ }
+
result = PyBytes_FromStringAndSize(NULL, size);
if (NULL == result) {
return NULL;
@@ -172,86 +267,34 @@
readinput:
- /* If we have data left over, consume it. */
- if (self->input.pos < self->input.size) {
- oldPos = self->output.pos;
-
- Py_BEGIN_ALLOW_THREADS
- zresult = ZSTD_compress_generic(self->compressor->cctx,
- &self->output, &self->input, ZSTD_e_continue);
-
- Py_END_ALLOW_THREADS
-
- self->bytesCompressed += self->output.pos - oldPos;
-
- /* Input exhausted. Clear out state tracking. */
- if (self->input.pos == self->input.size) {
- memset(&self->input, 0, sizeof(self->input));
- Py_CLEAR(self->readResult);
+ compressResult = compress_input(self, &self->output);
- if (self->buffer.buf) {
- self->finishedInput = 1;
- }
- }
-
- if (ZSTD_isError(zresult)) {
- PyErr_Format(ZstdError, "zstd compress error: %s", ZSTD_getErrorName(zresult));
- return NULL;
- }
-
- if (self->output.pos) {
- /* If no more room in output, emit it. */
- if (self->output.pos == self->output.size) {
- memset(&self->output, 0, sizeof(self->output));
- return result;
- }
-
- /*
- * There is room in the output. We fall through to below, which will either
- * get more input for us or will attempt to end the stream.
- */
- }
-
- /* Fall through to gather more input. */
+ if (-1 == compressResult) {
+ Py_XDECREF(result);
+ return NULL;
+ }
+ else if (0 == compressResult) {
+ /* There is room in the output. We fall through to below, which will
+ * either get more input for us or will attempt to end the stream.
+ */
+ }
+ else if (1 == compressResult) {
+ memset(&self->output, 0, sizeof(self->output));
+ return result;
+ }
+ else {
+ assert(0);
}
- if (!self->finishedInput) {
- if (self->reader) {
- Py_buffer buffer;
-
- assert(self->readResult == NULL);
- self->readResult = PyObject_CallMethod(self->reader, "read",
- "k", self->readSize);
- if (self->readResult == NULL) {
- return NULL;
- }
-
- memset(&buffer, 0, sizeof(buffer));
-
- if (0 != PyObject_GetBuffer(self->readResult, &buffer, PyBUF_CONTIG_RO)) {
- return NULL;
- }
+ readResult = read_compressor_input(self);
- /* EOF */
- if (0 == buffer.len) {
- self->finishedInput = 1;
- Py_CLEAR(self->readResult);
- }
- else {
- self->input.src = buffer.buf;
- self->input.size = buffer.len;
- self->input.pos = 0;
- }
-
- PyBuffer_Release(&buffer);
- }
- else {
- assert(self->buffer.buf);
-
- self->input.src = self->buffer.buf;
- self->input.size = self->buffer.len;
- self->input.pos = 0;
- }
+ if (-1 == readResult) {
+ return NULL;
+ }
+ else if (0 == readResult) { }
+ else if (1 == readResult) { }
+ else {
+ assert(0);
}
if (self->input.size) {
@@ -261,7 +304,7 @@
/* Else EOF */
oldPos = self->output.pos;
- zresult = ZSTD_compress_generic(self->compressor->cctx, &self->output,
+ zresult = ZSTD_compressStream2(self->compressor->cctx, &self->output,
&self->input, ZSTD_e_end);
self->bytesCompressed += self->output.pos - oldPos;
@@ -269,6 +312,7 @@
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "error ending compression stream: %s",
ZSTD_getErrorName(zresult));
+ Py_XDECREF(result);
return NULL;
}
@@ -288,9 +332,394 @@
return result;
}
+static PyObject* reader_read1(ZstdCompressionReader* self, PyObject* args, PyObject* kwargs) {
+ static char* kwlist[] = {
+ "size",
+ NULL
+ };
+
+ Py_ssize_t size = -1;
+ PyObject* result = NULL;
+ char* resultBuffer;
+ Py_ssize_t resultSize;
+ ZSTD_outBuffer output;
+ int compressResult;
+ size_t oldPos;
+ size_t zresult;
+
+ if (self->closed) {
+ PyErr_SetString(PyExc_ValueError, "stream is closed");
+ return NULL;
+ }
+
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|n:read1", kwlist, &size)) {
+ return NULL;
+ }
+
+ if (size < -1) {
+ PyErr_SetString(PyExc_ValueError, "cannot read negative amounts less than -1");
+ return NULL;
+ }
+
+ if (self->finishedOutput || size == 0) {
+ return PyBytes_FromStringAndSize("", 0);
+ }
+
+ if (size == -1) {
+ size = ZSTD_CStreamOutSize();
+ }
+
+ result = PyBytes_FromStringAndSize(NULL, size);
+ if (NULL == result) {
+ return NULL;
+ }
+
+ PyBytes_AsStringAndSize(result, &resultBuffer, &resultSize);
+
+ output.dst = resultBuffer;
+ output.size = resultSize;
+ output.pos = 0;
+
+ /* read1() is supposed to use at most 1 read() from the underlying stream.
+ However, we can't satisfy this requirement with compression because
+ not every input will generate output. We /could/ flush the compressor,
+ but this may not be desirable. We allow multiple read() from the
+ underlying stream. But unlike read(), we return as soon as output data
+ is available.
+ */
+
+ compressResult = compress_input(self, &output);
+
+ if (-1 == compressResult) {
+ Py_XDECREF(result);
+ return NULL;
+ }
+ else if (0 == compressResult || 1 == compressResult) { }
+ else {
+ assert(0);
+ }
+
+ if (output.pos) {
+ goto finally;
+ }
+
+ while (!self->finishedInput) {
+ int readResult = read_compressor_input(self);
+
+ if (-1 == readResult) {
+ Py_XDECREF(result);
+ return NULL;
+ }
+ else if (0 == readResult || 1 == readResult) { }
+ else {
+ assert(0);
+ }
+
+ compressResult = compress_input(self, &output);
+
+ if (-1 == compressResult) {
+ Py_XDECREF(result);
+ return NULL;
+ }
+ else if (0 == compressResult || 1 == compressResult) { }
+ else {
+ assert(0);
+ }
+
+ if (output.pos) {
+ goto finally;
+ }
+ }
+
+ /* EOF */
+ oldPos = output.pos;
+
+ zresult = ZSTD_compressStream2(self->compressor->cctx, &output, &self->input,
+ ZSTD_e_end);
+
+ self->bytesCompressed += output.pos - oldPos;
+
+ if (ZSTD_isError(zresult)) {
+ PyErr_Format(ZstdError, "error ending compression stream: %s",
+ ZSTD_getErrorName(zresult));
+ Py_XDECREF(result);
+ return NULL;
+ }
+
+ if (zresult == 0) {
+ self->finishedOutput = 1;
+ }
+
+finally:
+ if (result) {
+ if (safe_pybytes_resize(&result, output.pos)) {
+ Py_XDECREF(result);
+ return NULL;
+ }
+ }
+
+ return result;
+}
+
static PyObject* reader_readall(PyObject* self) {
- PyErr_SetNone(PyExc_NotImplementedError);
- return NULL;
+ PyObject* chunks = NULL;
+ PyObject* empty = NULL;
+ PyObject* result = NULL;
+
+ /* Our strategy is to collect chunks into a list then join all the
+ * chunks at the end. We could potentially use e.g. an io.BytesIO. But
+ * this feels simple enough to implement and avoids potentially expensive
+ * reallocations of large buffers.
+ */
+ chunks = PyList_New(0);
+ if (NULL == chunks) {
+ return NULL;
+ }
+
+ while (1) {
+ PyObject* chunk = PyObject_CallMethod(self, "read", "i", 1048576);
+ if (NULL == chunk) {
+ Py_DECREF(chunks);
+ return NULL;
+ }
+
+ if (!PyBytes_Size(chunk)) {
+ Py_DECREF(chunk);
+ break;
+ }
+
+ if (PyList_Append(chunks, chunk)) {
+ Py_DECREF(chunk);
+ Py_DECREF(chunks);
+ return NULL;
+ }
+
+ Py_DECREF(chunk);
+ }
+
+ empty = PyBytes_FromStringAndSize("", 0);
+ if (NULL == empty) {
+ Py_DECREF(chunks);
+ return NULL;
+ }
+
+ result = PyObject_CallMethod(empty, "join", "O", chunks);
+
+ Py_DECREF(empty);
+ Py_DECREF(chunks);
+
+ return result;
+}
+
+static PyObject* reader_readinto(ZstdCompressionReader* self, PyObject* args) {
+ Py_buffer dest;
+ ZSTD_outBuffer output;
+ int readResult, compressResult;
+ PyObject* result = NULL;
+ size_t zresult;
+ size_t oldPos;
+
+ if (self->closed) {
+ PyErr_SetString(PyExc_ValueError, "stream is closed");
+ return NULL;
+ }
+
+ if (self->finishedOutput) {
+ return PyLong_FromLong(0);
+ }
+
+ if (!PyArg_ParseTuple(args, "w*:readinto", &dest)) {
+ return NULL;
+ }
+
+ if (!PyBuffer_IsContiguous(&dest, 'C') || dest.ndim > 1) {
+ PyErr_SetString(PyExc_ValueError,
+ "destination buffer should be contiguous and have at most one dimension");
+ goto finally;
+ }
+
+ output.dst = dest.buf;
+ output.size = dest.len;
+ output.pos = 0;
+
+ compressResult = compress_input(self, &output);
+
+ if (-1 == compressResult) {
+ goto finally;
+ }
+ else if (0 == compressResult) { }
+ else if (1 == compressResult) {
+ result = PyLong_FromSize_t(output.pos);
+ goto finally;
+ }
+ else {
+ assert(0);
+ }
+
+ while (!self->finishedInput) {
+ readResult = read_compressor_input(self);
+
+ if (-1 == readResult) {
+ goto finally;
+ }
+ else if (0 == readResult || 1 == readResult) {}
+ else {
+ assert(0);
+ }
+
+ compressResult = compress_input(self, &output);
+
+ if (-1 == compressResult) {
+ goto finally;
+ }
+ else if (0 == compressResult) { }
+ else if (1 == compressResult) {
+ result = PyLong_FromSize_t(output.pos);
+ goto finally;
+ }
+ else {
+ assert(0);
+ }
+ }
+
+ /* EOF */
+ oldPos = output.pos;
+
+ zresult = ZSTD_compressStream2(self->compressor->cctx, &output, &self->input,
+ ZSTD_e_end);
+
+ self->bytesCompressed += self->output.pos - oldPos;
+
+ if (ZSTD_isError(zresult)) {
+ PyErr_Format(ZstdError, "error ending compression stream: %s",
+ ZSTD_getErrorName(zresult));
+ goto finally;
+ }
+
+ assert(output.pos);
+
+ if (0 == zresult) {
+ self->finishedOutput = 1;
+ }
+
+ result = PyLong_FromSize_t(output.pos);
+
+finally:
+ PyBuffer_Release(&dest);
+
+ return result;
+}
+
+static PyObject* reader_readinto1(ZstdCompressionReader* self, PyObject* args) {
+ Py_buffer dest;
+ PyObject* result = NULL;
+ ZSTD_outBuffer output;
+ int compressResult;
+ size_t oldPos;
+ size_t zresult;
+
+ if (self->closed) {
+ PyErr_SetString(PyExc_ValueError, "stream is closed");
+ return NULL;
+ }
+
+ if (self->finishedOutput) {
+ return PyLong_FromLong(0);
+ }
+
+ if (!PyArg_ParseTuple(args, "w*:readinto1", &dest)) {
+ return NULL;
+ }
+
+ if (!PyBuffer_IsContiguous(&dest, 'C') || dest.ndim > 1) {
+ PyErr_SetString(PyExc_ValueError,
+ "destination buffer should be contiguous and have at most one dimension");
+ goto finally;
+ }
+
+ output.dst = dest.buf;
+ output.size = dest.len;
+ output.pos = 0;
+
+ compressResult = compress_input(self, &output);
+
+ if (-1 == compressResult) {
+ goto finally;
+ }
+ else if (0 == compressResult || 1 == compressResult) { }
+ else {
+ assert(0);
+ }
+
+ if (output.pos) {
+ result = PyLong_FromSize_t(output.pos);
+ goto finally;
+ }
+
+ while (!self->finishedInput) {
+ int readResult = read_compressor_input(self);
+
+ if (-1 == readResult) {
+ goto finally;
+ }
+ else if (0 == readResult || 1 == readResult) { }
+ else {
+ assert(0);
+ }
+
+ compressResult = compress_input(self, &output);
+
+ if (-1 == compressResult) {
+ goto finally;
+ }
+ else if (0 == compressResult) { }
+ else if (1 == compressResult) {
+ result = PyLong_FromSize_t(output.pos);
+ goto finally;
+ }
+ else {
+ assert(0);
+ }
+
+ /* If we produced output and we're not done with input, emit
+ * that output now, as we've hit restrictions of read1().
+ */
+ if (output.pos && !self->finishedInput) {
+ result = PyLong_FromSize_t(output.pos);
+ goto finally;
+ }
+
+ /* Otherwise we either have no output or we've exhausted the
+ * input. Either we try to get more input or we fall through
+ * to EOF below */
+ }
+
+ /* EOF */
+ oldPos = output.pos;
+
+ zresult = ZSTD_compressStream2(self->compressor->cctx, &output, &self->input,
+ ZSTD_e_end);
+
+ self->bytesCompressed += self->output.pos - oldPos;
+
+ if (ZSTD_isError(zresult)) {
+ PyErr_Format(ZstdError, "error ending compression stream: %s",
+ ZSTD_getErrorName(zresult));
+ goto finally;
+ }
+
+ assert(output.pos);
+
+ if (0 == zresult) {
+ self->finishedOutput = 1;
+ }
+
+ result = PyLong_FromSize_t(output.pos);
+
+finally:
+ PyBuffer_Release(&dest);
+
+ return result;
}
static PyObject* reader_iter(PyObject* self) {
@@ -315,7 +744,10 @@
{ "readable", (PyCFunction)reader_readable, METH_NOARGS,
PyDoc_STR("Returns True") },
{ "read", (PyCFunction)reader_read, METH_VARARGS | METH_KEYWORDS, PyDoc_STR("read compressed data") },
+ { "read1", (PyCFunction)reader_read1, METH_VARARGS | METH_KEYWORDS, NULL },
{ "readall", (PyCFunction)reader_readall, METH_NOARGS, PyDoc_STR("Not implemented") },
+ { "readinto", (PyCFunction)reader_readinto, METH_VARARGS, NULL },
+ { "readinto1", (PyCFunction)reader_readinto1, METH_VARARGS, NULL },
{ "readline", (PyCFunction)reader_readline, METH_VARARGS, PyDoc_STR("Not implemented") },
{ "readlines", (PyCFunction)reader_readlines, METH_VARARGS, PyDoc_STR("Not implemented") },
{ "seekable", (PyCFunction)reader_seekable, METH_NOARGS,