diff contrib/python-zstandard/zstd/compress/zstdmt_compress.c @ 42937:69de49c4e39c

zstandard: vendor python-zstandard 0.12 The upstream source distribution from PyPI was extracted. Unwanted files were removed. The clang-format ignore list was updated to reflect the new source of files. test-repo-compengines.t was updated to reflect a change in behavior of the zstd library. The project contains a vendored copy of zstandard 1.4.3. The old version was 1.3.8. This should result in some minor performance wins. # no-check-commit because 3rd party code has different style guidelines Differential Revision: https://phab.mercurial-scm.org/D6858
author Gregory Szorc <gregory.szorc@gmail.com>
date Sun, 15 Sep 2019 20:04:00 -0700
parents 675775c33ab6
children de7838053207
line wrap: on
line diff
--- a/contrib/python-zstandard/zstd/compress/zstdmt_compress.c	Sun Sep 15 00:07:30 2019 -0400
+++ b/contrib/python-zstandard/zstd/compress/zstdmt_compress.c	Sun Sep 15 20:04:00 2019 -0700
@@ -22,6 +22,7 @@
 /* ======   Dependencies   ====== */
 #include <string.h>      /* memcpy, memset */
 #include <limits.h>      /* INT_MAX, UINT_MAX */
+#include "mem.h"         /* MEM_STATIC */
 #include "pool.h"        /* threadpool */
 #include "threading.h"   /* mutex */
 #include "zstd_compress_internal.h"  /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */
@@ -456,7 +457,7 @@
      * Must be acquired after the main mutex when acquiring both.
      */
     ZSTD_pthread_mutex_t ldmWindowMutex;
-    ZSTD_pthread_cond_t ldmWindowCond;  /* Signaled when ldmWindow is udpated */
+    ZSTD_pthread_cond_t ldmWindowCond;  /* Signaled when ldmWindow is updated */
     ZSTD_window_t ldmWindow;  /* A thread-safe copy of ldmState.window */
 } serialState_t;
 
@@ -647,7 +648,7 @@
     buffer_t dstBuff = job->dstBuff;
     size_t lastCBlockSize = 0;
 
-    /* ressources */
+    /* resources */
     if (cctx==NULL) JOB_ERROR(ERROR(memory_allocation));
     if (dstBuff.start == NULL) {   /* streaming job : doesn't provide a dstBuffer */
         dstBuff = ZSTDMT_getBuffer(job->bufPool);
@@ -672,7 +673,7 @@
         if (ZSTD_isError(initError)) JOB_ERROR(initError);
     } else {  /* srcStart points at reloaded section */
         U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->src.size;
-        {   size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_c_forceMaxWindow, !job->firstJob);
+        {   size_t const forceWindowError = ZSTD_CCtxParams_setParameter(&jobParams, ZSTD_c_forceMaxWindow, !job->firstJob);
             if (ZSTD_isError(forceWindowError)) JOB_ERROR(forceWindowError);
         }
         {   size_t const initError = ZSTD_compressBegin_advanced_internal(cctx,
@@ -864,14 +865,10 @@
  * Internal use only */
 size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers)
 {
-    if (nbWorkers > ZSTDMT_NBWORKERS_MAX) nbWorkers = ZSTDMT_NBWORKERS_MAX;
-    params->nbWorkers = nbWorkers;
-    params->overlapLog = ZSTDMT_OVERLAPLOG_DEFAULT;
-    params->jobSize = 0;
-    return nbWorkers;
+    return ZSTD_CCtxParams_setParameter(params, ZSTD_c_nbWorkers, (int)nbWorkers);
 }
 
-ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem)
+MEM_STATIC ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem)
 {
     ZSTDMT_CCtx* mtctx;
     U32 nbJobs = nbWorkers + 2;
@@ -906,6 +903,17 @@
     return mtctx;
 }
 
+ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem)
+{
+#ifdef ZSTD_MULTITHREAD
+    return ZSTDMT_createCCtx_advanced_internal(nbWorkers, cMem);
+#else
+    (void)nbWorkers;
+    (void)cMem;
+    return NULL;
+#endif
+}
+
 ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbWorkers)
 {
     return ZSTDMT_createCCtx_advanced(nbWorkers, ZSTD_defaultCMem);
@@ -986,26 +994,13 @@
     {
     case ZSTDMT_p_jobSize :
         DEBUGLOG(4, "ZSTDMT_CCtxParam_setMTCtxParameter : set jobSize to %i", value);
-        if ( value != 0  /* default */
-          && value < ZSTDMT_JOBSIZE_MIN)
-            value = ZSTDMT_JOBSIZE_MIN;
-        assert(value >= 0);
-        if (value > ZSTDMT_JOBSIZE_MAX) value = ZSTDMT_JOBSIZE_MAX;
-        params->jobSize = value;
-        return value;
-
+        return ZSTD_CCtxParams_setParameter(params, ZSTD_c_jobSize, value);
     case ZSTDMT_p_overlapLog :
         DEBUGLOG(4, "ZSTDMT_p_overlapLog : %i", value);
-        if (value < ZSTD_OVERLAPLOG_MIN) value = ZSTD_OVERLAPLOG_MIN;
-        if (value > ZSTD_OVERLAPLOG_MAX) value = ZSTD_OVERLAPLOG_MAX;
-        params->overlapLog = value;
-        return value;
-
+        return ZSTD_CCtxParams_setParameter(params, ZSTD_c_overlapLog, value);
     case ZSTDMT_p_rsyncable :
-        value = (value != 0);
-        params->rsyncable = value;
-        return value;
-
+        DEBUGLOG(4, "ZSTD_p_rsyncable : %i", value);
+        return ZSTD_CCtxParams_setParameter(params, ZSTD_c_rsyncable, value);
     default :
         return ERROR(parameter_unsupported);
     }
@@ -1021,32 +1016,29 @@
 {
     switch (parameter) {
     case ZSTDMT_p_jobSize:
-        assert(mtctx->params.jobSize <= INT_MAX);
-        *value = (int)(mtctx->params.jobSize);
-        break;
+        return ZSTD_CCtxParams_getParameter(&mtctx->params, ZSTD_c_jobSize, value);
     case ZSTDMT_p_overlapLog:
-        *value = mtctx->params.overlapLog;
-        break;
+        return ZSTD_CCtxParams_getParameter(&mtctx->params, ZSTD_c_overlapLog, value);
     case ZSTDMT_p_rsyncable:
-        *value = mtctx->params.rsyncable;
-        break;
+        return ZSTD_CCtxParams_getParameter(&mtctx->params, ZSTD_c_rsyncable, value);
     default:
         return ERROR(parameter_unsupported);
     }
-    return 0;
 }
 
 /* Sets parameters relevant to the compression job,
  * initializing others to default values. */
 static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
 {
-    ZSTD_CCtx_params jobParams;
-    memset(&jobParams, 0, sizeof(jobParams));
-
-    jobParams.cParams = params.cParams;
-    jobParams.fParams = params.fParams;
-    jobParams.compressionLevel = params.compressionLevel;
-
+    ZSTD_CCtx_params jobParams = params;
+    /* Clear parameters related to multithreading */
+    jobParams.forceWindow = 0;
+    jobParams.nbWorkers = 0;
+    jobParams.jobSize = 0;
+    jobParams.overlapLog = 0;
+    jobParams.rsyncable = 0;
+    memset(&jobParams.ldmParams, 0, sizeof(ldmParams_t));
+    memset(&jobParams.customMem, 0, sizeof(ZSTD_customMem));
     return jobParams;
 }
 
@@ -1056,7 +1048,7 @@
 static size_t ZSTDMT_resize(ZSTDMT_CCtx* mtctx, unsigned nbWorkers)
 {
     if (POOL_resize(mtctx->factory, nbWorkers)) return ERROR(memory_allocation);
-    CHECK_F( ZSTDMT_expandJobsTable(mtctx, nbWorkers) );
+    FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbWorkers) );
     mtctx->bufPool = ZSTDMT_expandBufferPool(mtctx->bufPool, nbWorkers);
     if (mtctx->bufPool == NULL) return ERROR(memory_allocation);
     mtctx->cctxPool = ZSTDMT_expandCCtxPool(mtctx->cctxPool, nbWorkers);
@@ -1137,9 +1129,14 @@
             size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
             size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed;
             assert(flushed <= produced);
+            assert(jobPtr->consumed <= jobPtr->src.size);
             toFlush = produced - flushed;
-            if (toFlush==0 && (jobPtr->consumed >= jobPtr->src.size)) {
-                /* doneJobID is not-fully-flushed, but toFlush==0 : doneJobID should be compressing some more data */
+            /* if toFlush==0, nothing is available to flush.
+             * However, jobID is expected to still be active:
+             * if jobID was already completed and fully flushed,
+             * ZSTDMT_flushProduced() should have already moved onto next job.
+             * Therefore, some input has not yet been consumed. */
+            if (toFlush==0) {
                 assert(jobPtr->consumed < jobPtr->src.size);
             }
         }
@@ -1156,12 +1153,16 @@
 
 static unsigned ZSTDMT_computeTargetJobLog(ZSTD_CCtx_params const params)
 {
-    if (params.ldmParams.enableLdm)
+    unsigned jobLog;
+    if (params.ldmParams.enableLdm) {
         /* In Long Range Mode, the windowLog is typically oversized.
          * In which case, it's preferable to determine the jobSize
          * based on chainLog instead. */
-        return MAX(21, params.cParams.chainLog + 4);
-    return MAX(20, params.cParams.windowLog + 2);
+        jobLog = MAX(21, params.cParams.chainLog + 4);
+    } else {
+        jobLog = MAX(20, params.cParams.windowLog + 2);
+    }
+    return MIN(jobLog, (unsigned)ZSTDMT_JOBLOG_MAX);
 }
 
 static int ZSTDMT_overlapLog_default(ZSTD_strategy strat)
@@ -1205,7 +1206,7 @@
         ovLog = MIN(params.cParams.windowLog, ZSTDMT_computeTargetJobLog(params) - 2)
                 - overlapRLog;
     }
-    assert(0 <= ovLog && ovLog <= 30);
+    assert(0 <= ovLog && ovLog <= ZSTD_WINDOWLOG_MAX);
     DEBUGLOG(4, "overlapLog : %i", params.overlapLog);
     DEBUGLOG(4, "overlap size : %i", 1 << ovLog);
     return (ovLog==0) ? 0 : (size_t)1 << ovLog;
@@ -1263,7 +1264,7 @@
     if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, avgJobSize))
         return ERROR(memory_allocation);
 
-    CHECK_F( ZSTDMT_expandJobsTable(mtctx, nbJobs) );  /* only expands if necessary */
+    FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbJobs) );  /* only expands if necessary */
 
     {   unsigned u;
         for (u=0; u<nbJobs; u++) {
@@ -1396,10 +1397,10 @@
 
     /* init */
     if (params.nbWorkers != mtctx->params.nbWorkers)
-        CHECK_F( ZSTDMT_resize(mtctx, params.nbWorkers) );
+        FORWARD_IF_ERROR( ZSTDMT_resize(mtctx, params.nbWorkers) );
 
     if (params.jobSize != 0 && params.jobSize < ZSTDMT_JOBSIZE_MIN) params.jobSize = ZSTDMT_JOBSIZE_MIN;
-    if (params.jobSize > (size_t)ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX;
+    if (params.jobSize > (size_t)ZSTDMT_JOBSIZE_MAX) params.jobSize = (size_t)ZSTDMT_JOBSIZE_MAX;
 
     mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN);  /* do not trigger multi-threading when srcSize is too small */
     if (mtctx->singleBlockingThread) {
@@ -1440,6 +1441,8 @@
     if (mtctx->targetSectionSize == 0) {
         mtctx->targetSectionSize = 1ULL << ZSTDMT_computeTargetJobLog(params);
     }
+    assert(mtctx->targetSectionSize <= (size_t)ZSTDMT_JOBSIZE_MAX);
+
     if (params.rsyncable) {
         /* Aim for the targetsectionSize as the average job size. */
         U32 const jobSizeMB = (U32)(mtctx->targetSectionSize >> 20);
@@ -1547,7 +1550,7 @@
 /* ZSTDMT_writeLastEmptyBlock()
  * Write a single empty block with an end-of-frame to finish a frame.
  * Job must be created from streaming variant.
- * This function is always successfull if expected conditions are fulfilled.
+ * This function is always successful if expected conditions are fulfilled.
  */
 static void ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job)
 {
@@ -1987,7 +1990,7 @@
     assert(input->pos  <= input->size);
 
     if (mtctx->singleBlockingThread) {  /* delegate to single-thread (synchronous) */
-        return ZSTD_compressStream_generic(mtctx->cctxPool->cctx[0], output, input, endOp);
+        return ZSTD_compressStream2(mtctx->cctxPool->cctx[0], output, input, endOp);
     }
 
     if ((mtctx->frameEnded) && (endOp==ZSTD_e_continue)) {
@@ -2051,7 +2054,7 @@
       || ((endOp == ZSTD_e_end) && (!mtctx->frameEnded)) ) {   /* must finish the frame with a zero-size block */
         size_t const jobSize = mtctx->inBuff.filled;
         assert(mtctx->inBuff.filled <= mtctx->targetSectionSize);
-        CHECK_F( ZSTDMT_createCompressionJob(mtctx, jobSize, endOp) );
+        FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, jobSize, endOp) );
     }
 
     /* check for potential compressed data ready to be flushed */
@@ -2065,7 +2068,7 @@
 
 size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
 {
-    CHECK_F( ZSTDMT_compressStream_generic(mtctx, output, input, ZSTD_e_continue) );
+    FORWARD_IF_ERROR( ZSTDMT_compressStream_generic(mtctx, output, input, ZSTD_e_continue) );
 
     /* recommended next input size : fill current input buffer */
     return mtctx->targetSectionSize - mtctx->inBuff.filled;   /* note : could be zero when input buffer is fully filled and no more availability to create new job */
@@ -2082,7 +2085,7 @@
       || ((endFrame==ZSTD_e_end) && !mtctx->frameEnded)) {  /* need a last 0-size block to end frame */
            DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job (%u bytes, end:%u)",
                         (U32)srcSize, (U32)endFrame);
-        CHECK_F( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) );
+        FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) );
     }
 
     /* check if there is any data available to flush */