contrib/python-zstandard/zstd/compress/zstdmt_compress.c
changeset 43994 de7838053207
parent 42937 69de49c4e39c
--- a/contrib/python-zstandard/zstd/compress/zstdmt_compress.c	Fri Dec 27 18:54:57 2019 -0500
+++ b/contrib/python-zstandard/zstd/compress/zstdmt_compress.c	Sat Dec 28 09:55:45 2019 -0800
@@ -668,7 +668,7 @@
 
     /* init */
     if (job->cdict) {
-        size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, ZSTD_dtlm_fast, job->cdict, jobParams, job->fullFrameSize);
+        size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, ZSTD_dtlm_fast, job->cdict, &jobParams, job->fullFrameSize);
         assert(job->firstJob);  /* only allowed for first job */
         if (ZSTD_isError(initError)) JOB_ERROR(initError);
     } else {  /* srcStart points at reloaded section */
@@ -680,7 +680,7 @@
                                         job->prefix.start, job->prefix.size, ZSTD_dct_rawContent, /* load dictionary in "content-only" mode (no header analysis) */
                                         ZSTD_dtlm_fast,
                                         NULL, /*cdict*/
-                                        jobParams, pledgedSrcSize);
+                                        &jobParams, pledgedSrcSize);
             if (ZSTD_isError(initError)) JOB_ERROR(initError);
     }   }
 
@@ -927,12 +927,18 @@
     unsigned jobID;
     DEBUGLOG(3, "ZSTDMT_releaseAllJobResources");
     for (jobID=0; jobID <= mtctx->jobIDMask; jobID++) {
+        /* Copy the mutex/cond out */
+        ZSTD_pthread_mutex_t const mutex = mtctx->jobs[jobID].job_mutex;
+        ZSTD_pthread_cond_t const cond = mtctx->jobs[jobID].job_cond;
+
         DEBUGLOG(4, "job%02u: release dst address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].dstBuff.start);
         ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff);
-        mtctx->jobs[jobID].dstBuff = g_nullBuffer;
-        mtctx->jobs[jobID].cSize = 0;
+
+        /* Clear the job description, but keep the mutex/cond */
+        memset(&mtctx->jobs[jobID], 0, sizeof(mtctx->jobs[jobID]));
+        mtctx->jobs[jobID].job_mutex = mutex;
+        mtctx->jobs[jobID].job_cond = cond;
     }
-    memset(mtctx->jobs, 0, (mtctx->jobIDMask+1)*sizeof(ZSTDMT_jobDescription));
     mtctx->inBuff.buffer = g_nullBuffer;
     mtctx->inBuff.filled = 0;
     mtctx->allJobsCompleted = 1;
@@ -1028,9 +1034,9 @@
 
 /* Sets parameters relevant to the compression job,
  * initializing others to default values. */
-static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
+static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(const ZSTD_CCtx_params* params)
 {
-    ZSTD_CCtx_params jobParams = params;
+    ZSTD_CCtx_params jobParams = *params;
     /* Clear parameters related to multithreading */
     jobParams.forceWindow = 0;
     jobParams.nbWorkers = 0;
@@ -1151,16 +1157,16 @@
 /* =====   Multi-threaded compression   ===== */
 /* ------------------------------------------ */
 
-static unsigned ZSTDMT_computeTargetJobLog(ZSTD_CCtx_params const params)
+static unsigned ZSTDMT_computeTargetJobLog(const ZSTD_CCtx_params* params)
 {
     unsigned jobLog;
-    if (params.ldmParams.enableLdm) {
+    if (params->ldmParams.enableLdm) {
         /* In Long Range Mode, the windowLog is typically oversized.
          * In which case, it's preferable to determine the jobSize
          * based on chainLog instead. */
-        jobLog = MAX(21, params.cParams.chainLog + 4);
+        jobLog = MAX(21, params->cParams.chainLog + 4);
     } else {
-        jobLog = MAX(20, params.cParams.windowLog + 2);
+        jobLog = MAX(20, params->cParams.windowLog + 2);
     }
     return MIN(jobLog, (unsigned)ZSTDMT_JOBLOG_MAX);
 }
@@ -1193,27 +1199,27 @@
     return ovlog;
 }
 
-static size_t ZSTDMT_computeOverlapSize(ZSTD_CCtx_params const params)
+static size_t ZSTDMT_computeOverlapSize(const ZSTD_CCtx_params* params)
 {
-    int const overlapRLog = 9 - ZSTDMT_overlapLog(params.overlapLog, params.cParams.strategy);
-    int ovLog = (overlapRLog >= 8) ? 0 : (params.cParams.windowLog - overlapRLog);
+    int const overlapRLog = 9 - ZSTDMT_overlapLog(params->overlapLog, params->cParams.strategy);
+    int ovLog = (overlapRLog >= 8) ? 0 : (params->cParams.windowLog - overlapRLog);
     assert(0 <= overlapRLog && overlapRLog <= 8);
-    if (params.ldmParams.enableLdm) {
+    if (params->ldmParams.enableLdm) {
         /* In Long Range Mode, the windowLog is typically oversized.
          * In which case, it's preferable to determine the jobSize
          * based on chainLog instead.
          * Then, ovLog becomes a fraction of the jobSize, rather than windowSize */
-        ovLog = MIN(params.cParams.windowLog, ZSTDMT_computeTargetJobLog(params) - 2)
+        ovLog = MIN(params->cParams.windowLog, ZSTDMT_computeTargetJobLog(params) - 2)
                 - overlapRLog;
     }
     assert(0 <= ovLog && ovLog <= ZSTD_WINDOWLOG_MAX);
-    DEBUGLOG(4, "overlapLog : %i", params.overlapLog);
+    DEBUGLOG(4, "overlapLog : %i", params->overlapLog);
     DEBUGLOG(4, "overlap size : %i", 1 << ovLog);
     return (ovLog==0) ? 0 : (size_t)1 << ovLog;
 }
 
 static unsigned
-ZSTDMT_computeNbJobs(ZSTD_CCtx_params params, size_t srcSize, unsigned nbWorkers)
+ZSTDMT_computeNbJobs(const ZSTD_CCtx_params* params, size_t srcSize, unsigned nbWorkers)
 {
     assert(nbWorkers>0);
     {   size_t const jobSizeTarget = (size_t)1 << ZSTDMT_computeTargetJobLog(params);
@@ -1236,9 +1242,9 @@
           const ZSTD_CDict* cdict,
                 ZSTD_CCtx_params params)
 {
-    ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(params);
-    size_t const overlapSize = ZSTDMT_computeOverlapSize(params);
-    unsigned const nbJobs = ZSTDMT_computeNbJobs(params, srcSize, params.nbWorkers);
+    ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(&params);
+    size_t const overlapSize = ZSTDMT_computeOverlapSize(&params);
+    unsigned const nbJobs = ZSTDMT_computeNbJobs(&params, srcSize, params.nbWorkers);
     size_t const proposedJobSize = (srcSize + (nbJobs-1)) / nbJobs;
     size_t const avgJobSize = (((proposedJobSize-1) & 0x1FFFF) < 0x7FFF) ? proposedJobSize + 0xFFFF : proposedJobSize;   /* avoid too small last block */
     const char* const srcStart = (const char*)src;
@@ -1256,7 +1262,7 @@
         ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0];
         DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: fallback to single-thread mode");
         if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, jobParams.fParams);
-        return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, jobParams);
+        return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, &jobParams);
     }
 
     assert(avgJobSize >= 256 KB);  /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */
@@ -1404,12 +1410,12 @@
 
     mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN);  /* do not trigger multi-threading when srcSize is too small */
     if (mtctx->singleBlockingThread) {
-        ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(params);
+        ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(&params);
         DEBUGLOG(5, "ZSTDMT_initCStream_internal: switch to single blocking thread mode");
         assert(singleThreadParams.nbWorkers == 0);
         return ZSTD_initCStream_internal(mtctx->cctxPool->cctx[0],
                                          dict, dictSize, cdict,
-                                         singleThreadParams, pledgedSrcSize);
+                                         &singleThreadParams, pledgedSrcSize);
     }
 
     DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u workers", params.nbWorkers);
@@ -1435,11 +1441,11 @@
         mtctx->cdict = cdict;
     }
 
-    mtctx->targetPrefixSize = ZSTDMT_computeOverlapSize(params);
+    mtctx->targetPrefixSize = ZSTDMT_computeOverlapSize(&params);
     DEBUGLOG(4, "overlapLog=%i => %u KB", params.overlapLog, (U32)(mtctx->targetPrefixSize>>10));
     mtctx->targetSectionSize = params.jobSize;
     if (mtctx->targetSectionSize == 0) {
-        mtctx->targetSectionSize = 1ULL << ZSTDMT_computeTargetJobLog(params);
+        mtctx->targetSectionSize = 1ULL << ZSTDMT_computeTargetJobLog(&params);
     }
     assert(mtctx->targetSectionSize <= (size_t)ZSTDMT_JOBSIZE_MAX);