contrib/python-zstandard/zstd/compress/zstdmt_compress.c
changeset 40121 73fef626dae3
parent 37495 b1fb341d8a61
child 42070 675775c33ab6
equal deleted inserted replaced
40120:89742f1fa6cb 40121:73fef626dae3
    35  * Until then, comment the code out since it is unused.
    35  * Until then, comment the code out since it is unused.
    36  */
    36  */
    37 #define ZSTD_RESIZE_SEQPOOL 0
    37 #define ZSTD_RESIZE_SEQPOOL 0
    38 
    38 
    39 /* ======   Debug   ====== */
    39 /* ======   Debug   ====== */
    40 #if defined(ZSTD_DEBUG) && (ZSTD_DEBUG>=2)
    40 #if defined(DEBUGLEVEL) && (DEBUGLEVEL>=2) \
       
    41     && !defined(_MSC_VER) \
       
    42     && !defined(__MINGW32__)
    41 
    43 
    42 #  include <stdio.h>
    44 #  include <stdio.h>
    43 #  include <unistd.h>
    45 #  include <unistd.h>
    44 #  include <sys/times.h>
    46 #  include <sys/times.h>
    45 #  define DEBUGLOGRAW(l, ...) if (l<=ZSTD_DEBUG) { fprintf(stderr, __VA_ARGS__); }
       
    46 
    47 
    47 #  define DEBUG_PRINTHEX(l,p,n) {            \
    48 #  define DEBUG_PRINTHEX(l,p,n) {            \
    48     unsigned debug_u;                        \
    49     unsigned debug_u;                        \
    49     for (debug_u=0; debug_u<(n); debug_u++)  \
    50     for (debug_u=0; debug_u<(n); debug_u++)  \
    50         DEBUGLOGRAW(l, "%02X ", ((const unsigned char*)(p))[debug_u]); \
    51         RAWLOG(l, "%02X ", ((const unsigned char*)(p))[debug_u]); \
    51     DEBUGLOGRAW(l, " \n");                   \
    52     RAWLOG(l, " \n");                        \
    52 }
    53 }
    53 
    54 
    54 static unsigned long long GetCurrentClockTimeMicroseconds(void)
    55 static unsigned long long GetCurrentClockTimeMicroseconds(void)
    55 {
    56 {
    56    static clock_t _ticksPerSecond = 0;
    57    static clock_t _ticksPerSecond = 0;
    60      return ((((unsigned long long)newTicks)*(1000000))/_ticksPerSecond); }
    61      return ((((unsigned long long)newTicks)*(1000000))/_ticksPerSecond); }
    61 }
    62 }
    62 
    63 
    63 #define MUTEX_WAIT_TIME_DLEVEL 6
    64 #define MUTEX_WAIT_TIME_DLEVEL 6
    64 #define ZSTD_PTHREAD_MUTEX_LOCK(mutex) {          \
    65 #define ZSTD_PTHREAD_MUTEX_LOCK(mutex) {          \
    65     if (ZSTD_DEBUG >= MUTEX_WAIT_TIME_DLEVEL) {   \
    66     if (DEBUGLEVEL >= MUTEX_WAIT_TIME_DLEVEL) {   \
    66         unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds(); \
    67         unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds(); \
    67         ZSTD_pthread_mutex_lock(mutex);           \
    68         ZSTD_pthread_mutex_lock(mutex);           \
    68         {   unsigned long long const afterTime = GetCurrentClockTimeMicroseconds(); \
    69         {   unsigned long long const afterTime = GetCurrentClockTimeMicroseconds(); \
    69             unsigned long long const elapsedTime = (afterTime-beforeTime); \
    70             unsigned long long const elapsedTime = (afterTime-beforeTime); \
    70             if (elapsedTime > 1000) {  /* or whatever threshold you like; I'm using 1 millisecond here */ \
    71             if (elapsedTime > 1000) {  /* or whatever threshold you like; I'm using 1 millisecond here */ \
   156 {
   157 {
   157     ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
   158     ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
   158     DEBUGLOG(4, "ZSTDMT_setBufferSize: bSize = %u", (U32)bSize);
   159     DEBUGLOG(4, "ZSTDMT_setBufferSize: bSize = %u", (U32)bSize);
   159     bufPool->bufferSize = bSize;
   160     bufPool->bufferSize = bSize;
   160     ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);
   161     ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);
       
   162 }
       
   163 
       
   164 
       
   165 static ZSTDMT_bufferPool* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool* srcBufPool, U32 nbWorkers)
       
   166 {
       
   167     unsigned const maxNbBuffers = 2*nbWorkers + 3;
       
   168     if (srcBufPool==NULL) return NULL;
       
   169     if (srcBufPool->totalBuffers >= maxNbBuffers) /* good enough */
       
   170         return srcBufPool;
       
   171     /* need a larger buffer pool */
       
   172     {   ZSTD_customMem const cMem = srcBufPool->cMem;
       
   173         size_t const bSize = srcBufPool->bufferSize;   /* forward parameters */
       
   174         ZSTDMT_bufferPool* newBufPool;
       
   175         ZSTDMT_freeBufferPool(srcBufPool);
       
   176         newBufPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
       
   177         if (newBufPool==NULL) return newBufPool;
       
   178         ZSTDMT_setBufferSize(newBufPool, bSize);
       
   179         return newBufPool;
       
   180     }
   161 }
   181 }
   162 
   182 
   163 /** ZSTDMT_getBuffer() :
   183 /** ZSTDMT_getBuffer() :
   164  *  assumption : bufPool must be valid
   184  *  assumption : bufPool must be valid
   165  * @return : a buffer, with start pointer and size
   185  * @return : a buffer, with start pointer and size
   227 #endif
   247 #endif
   228 
   248 
   229 /* store buffer for later re-use, up to pool capacity */
   249 /* store buffer for later re-use, up to pool capacity */
   230 static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
   250 static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
   231 {
   251 {
       
   252     DEBUGLOG(5, "ZSTDMT_releaseBuffer");
   232     if (buf.start == NULL) return;   /* compatible with release on NULL */
   253     if (buf.start == NULL) return;   /* compatible with release on NULL */
   233     DEBUGLOG(5, "ZSTDMT_releaseBuffer");
       
   234     ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
   254     ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
   235     if (bufPool->nbBuffers < bufPool->totalBuffers) {
   255     if (bufPool->nbBuffers < bufPool->totalBuffers) {
   236         bufPool->bTable[bufPool->nbBuffers++] = buf;  /* stored for later use */
   256         bufPool->bTable[bufPool->nbBuffers++] = buf;  /* stored for later use */
   237         DEBUGLOG(5, "ZSTDMT_releaseBuffer: stored buffer of size %u in slot %u",
   257         DEBUGLOG(5, "ZSTDMT_releaseBuffer: stored buffer of size %u in slot %u",
   238                     (U32)buf.capacity, (U32)(bufPool->nbBuffers-1));
   258                     (U32)buf.capacity, (U32)(bufPool->nbBuffers-1));
   298   ZSTDMT_setBufferSize(seqPool, nbSeq * sizeof(rawSeq));
   318   ZSTDMT_setBufferSize(seqPool, nbSeq * sizeof(rawSeq));
   299 }
   319 }
   300 
   320 
   301 static ZSTDMT_seqPool* ZSTDMT_createSeqPool(unsigned nbWorkers, ZSTD_customMem cMem)
   321 static ZSTDMT_seqPool* ZSTDMT_createSeqPool(unsigned nbWorkers, ZSTD_customMem cMem)
   302 {
   322 {
   303     ZSTDMT_seqPool* seqPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
   323     ZSTDMT_seqPool* const seqPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
       
   324     if (seqPool == NULL) return NULL;
   304     ZSTDMT_setNbSeq(seqPool, 0);
   325     ZSTDMT_setNbSeq(seqPool, 0);
   305     return seqPool;
   326     return seqPool;
   306 }
   327 }
   307 
   328 
   308 static void ZSTDMT_freeSeqPool(ZSTDMT_seqPool* seqPool)
   329 static void ZSTDMT_freeSeqPool(ZSTDMT_seqPool* seqPool)
   309 {
   330 {
   310     ZSTDMT_freeBufferPool(seqPool);
   331     ZSTDMT_freeBufferPool(seqPool);
   311 }
   332 }
   312 
   333 
       
   334 static ZSTDMT_seqPool* ZSTDMT_expandSeqPool(ZSTDMT_seqPool* pool, U32 nbWorkers)
       
   335 {
       
   336     return ZSTDMT_expandBufferPool(pool, nbWorkers);
       
   337 }
   313 
   338 
   314 
   339 
   315 /* =====   CCtx Pool   ===== */
   340 /* =====   CCtx Pool   ===== */
   316 /* a single CCtx Pool can be invoked from multiple threads in parallel */
   341 /* a single CCtx Pool can be invoked from multiple threads in parallel */
   317 
   342 
   351     cctxPool->availCCtx = 1;   /* at least one cctx for single-thread mode */
   376     cctxPool->availCCtx = 1;   /* at least one cctx for single-thread mode */
   352     cctxPool->cctx[0] = ZSTD_createCCtx_advanced(cMem);
   377     cctxPool->cctx[0] = ZSTD_createCCtx_advanced(cMem);
   353     if (!cctxPool->cctx[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; }
   378     if (!cctxPool->cctx[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; }
   354     DEBUGLOG(3, "cctxPool created, with %u workers", nbWorkers);
   379     DEBUGLOG(3, "cctxPool created, with %u workers", nbWorkers);
   355     return cctxPool;
   380     return cctxPool;
       
   381 }
       
   382 
       
   383 static ZSTDMT_CCtxPool* ZSTDMT_expandCCtxPool(ZSTDMT_CCtxPool* srcPool,
       
   384                                               unsigned nbWorkers)
       
   385 {
       
   386     if (srcPool==NULL) return NULL;
       
   387     if (nbWorkers <= srcPool->totalCCtx) return srcPool;   /* good enough */
       
   388     /* need a larger cctx pool */
       
   389     {   ZSTD_customMem const cMem = srcPool->cMem;
       
   390         ZSTDMT_freeCCtxPool(srcPool);
       
   391         return ZSTDMT_createCCtxPool(nbWorkers, cMem);
       
   392     }
   356 }
   393 }
   357 
   394 
   358 /* only works during initialization phase, not during compression */
   395 /* only works during initialization phase, not during compression */
   359 static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool)
   396 static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool)
   360 {
   397 {
   423     ZSTD_pthread_mutex_t ldmWindowMutex;
   460     ZSTD_pthread_mutex_t ldmWindowMutex;
   424     ZSTD_pthread_cond_t ldmWindowCond;  /* Signaled when ldmWindow is udpated */
   461     ZSTD_pthread_cond_t ldmWindowCond;  /* Signaled when ldmWindow is udpated */
   425     ZSTD_window_t ldmWindow;  /* A thread-safe copy of ldmState.window */
   462     ZSTD_window_t ldmWindow;  /* A thread-safe copy of ldmState.window */
   426 } serialState_t;
   463 } serialState_t;
   427 
   464 
   428 static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* seqPool, ZSTD_CCtx_params params)
   465 static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* seqPool, ZSTD_CCtx_params params, size_t jobSize)
   429 {
   466 {
   430     /* Adjust parameters */
   467     /* Adjust parameters */
   431     if (params.ldmParams.enableLdm) {
   468     if (params.ldmParams.enableLdm) {
   432         DEBUGLOG(4, "LDM window size = %u KB", (1U << params.cParams.windowLog) >> 10);
   469         DEBUGLOG(4, "LDM window size = %u KB", (1U << params.cParams.windowLog) >> 10);
   433         params.ldmParams.windowLog = params.cParams.windowLog;
       
   434         ZSTD_ldm_adjustParameters(&params.ldmParams, &params.cParams);
   470         ZSTD_ldm_adjustParameters(&params.ldmParams, &params.cParams);
   435         assert(params.ldmParams.hashLog >= params.ldmParams.bucketSizeLog);
   471         assert(params.ldmParams.hashLog >= params.ldmParams.bucketSizeLog);
   436         assert(params.ldmParams.hashEveryLog < 32);
   472         assert(params.ldmParams.hashEveryLog < 32);
   437         serialState->ldmState.hashPower =
   473         serialState->ldmState.hashPower =
   438                 ZSTD_ldm_getHashPower(params.ldmParams.minMatchLength);
   474                 ZSTD_ldm_getHashPower(params.ldmParams.minMatchLength);
   451         size_t const bucketSize = (size_t)1 << bucketLog;
   487         size_t const bucketSize = (size_t)1 << bucketLog;
   452         unsigned const prevBucketLog =
   488         unsigned const prevBucketLog =
   453             serialState->params.ldmParams.hashLog -
   489             serialState->params.ldmParams.hashLog -
   454             serialState->params.ldmParams.bucketSizeLog;
   490             serialState->params.ldmParams.bucketSizeLog;
   455         /* Size the seq pool tables */
   491         /* Size the seq pool tables */
   456         ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, params.jobSize));
   492         ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, jobSize));
   457         /* Reset the window */
   493         /* Reset the window */
   458         ZSTD_window_clear(&serialState->ldmState.window);
   494         ZSTD_window_clear(&serialState->ldmState.window);
   459         serialState->ldmWindow = serialState->ldmState.window;
   495         serialState->ldmWindow = serialState->ldmState.window;
   460         /* Resize tables and output space if necessary. */
   496         /* Resize tables and output space if necessary. */
   461         if (serialState->ldmState.hashTable == NULL || serialState->params.ldmParams.hashLog < hashLog) {
   497         if (serialState->ldmState.hashTable == NULL || serialState->params.ldmParams.hashLog < hashLog) {
   471         /* Zero the tables */
   507         /* Zero the tables */
   472         memset(serialState->ldmState.hashTable, 0, hashSize);
   508         memset(serialState->ldmState.hashTable, 0, hashSize);
   473         memset(serialState->ldmState.bucketOffsets, 0, bucketSize);
   509         memset(serialState->ldmState.bucketOffsets, 0, bucketSize);
   474     }
   510     }
   475     serialState->params = params;
   511     serialState->params = params;
       
   512     serialState->params.jobSize = (U32)jobSize;
   476     return 0;
   513     return 0;
   477 }
   514 }
   478 
   515 
   479 static int ZSTDMT_serialState_init(serialState_t* serialState)
   516 static int ZSTDMT_serialState_init(serialState_t* serialState)
   480 {
   517 {
   503                                       range_t src, unsigned jobID)
   540                                       range_t src, unsigned jobID)
   504 {
   541 {
   505     /* Wait for our turn */
   542     /* Wait for our turn */
   506     ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex);
   543     ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex);
   507     while (serialState->nextJobID < jobID) {
   544     while (serialState->nextJobID < jobID) {
       
   545         DEBUGLOG(5, "wait for serialState->cond");
   508         ZSTD_pthread_cond_wait(&serialState->cond, &serialState->mutex);
   546         ZSTD_pthread_cond_wait(&serialState->cond, &serialState->mutex);
   509     }
   547     }
   510     /* A future job may error and skip our job */
   548     /* A future job may error and skip our job */
   511     if (serialState->nextJobID == jobID) {
   549     if (serialState->nextJobID == jobID) {
   512         /* It is now our turn, do any processing necessary */
   550         /* It is now our turn, do any processing necessary */
   513         if (serialState->params.ldmParams.enableLdm) {
   551         if (serialState->params.ldmParams.enableLdm) {
   514             size_t error;
   552             size_t error;
   515             assert(seqStore.seq != NULL && seqStore.pos == 0 &&
   553             assert(seqStore.seq != NULL && seqStore.pos == 0 &&
   516                    seqStore.size == 0 && seqStore.capacity > 0);
   554                    seqStore.size == 0 && seqStore.capacity > 0);
       
   555             assert(src.size <= serialState->params.jobSize);
   517             ZSTD_window_update(&serialState->ldmState.window, src.start, src.size);
   556             ZSTD_window_update(&serialState->ldmState.window, src.start, src.size);
   518             error = ZSTD_ldm_generateSequences(
   557             error = ZSTD_ldm_generateSequences(
   519                 &serialState->ldmState, &seqStore,
   558                 &serialState->ldmState, &seqStore,
   520                 &serialState->params.ldmParams, src.start, src.size);
   559                 &serialState->params.ldmParams, src.start, src.size);
   521             /* We provide a large enough buffer to never fail. */
   560             /* We provide a large enough buffer to never fail. */
   591     unsigned long long fullFrameSize;    /* set by mtctx, then read by worker => no barrier */
   630     unsigned long long fullFrameSize;    /* set by mtctx, then read by worker => no barrier */
   592     size_t   dstFlushed;                 /* used only by mtctx */
   631     size_t   dstFlushed;                 /* used only by mtctx */
   593     unsigned frameChecksumNeeded;        /* used only by mtctx */
   632     unsigned frameChecksumNeeded;        /* used only by mtctx */
   594 } ZSTDMT_jobDescription;
   633 } ZSTDMT_jobDescription;
   595 
   634 
       
   635 #define JOB_ERROR(e) {                          \
       
   636     ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);   \
       
   637     job->cSize = e;                             \
       
   638     ZSTD_pthread_mutex_unlock(&job->job_mutex); \
       
   639     goto _endJob;                               \
       
   640 }
       
   641 
   596 /* ZSTDMT_compressionJob() is a POOL_function type */
   642 /* ZSTDMT_compressionJob() is a POOL_function type */
   597 void ZSTDMT_compressionJob(void* jobDescription)
   643 static void ZSTDMT_compressionJob(void* jobDescription)
   598 {
   644 {
   599     ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
   645     ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
   600     ZSTD_CCtx_params jobParams = job->params;   /* do not modify job->params ! copy it, modify the copy */
   646     ZSTD_CCtx_params jobParams = job->params;   /* do not modify job->params ! copy it, modify the copy */
   601     ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool);
   647     ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool);
   602     rawSeqStore_t rawSeqStore = ZSTDMT_getSeq(job->seqPool);
   648     rawSeqStore_t rawSeqStore = ZSTDMT_getSeq(job->seqPool);
   603     buffer_t dstBuff = job->dstBuff;
   649     buffer_t dstBuff = job->dstBuff;
       
   650     size_t lastCBlockSize = 0;
       
   651 
       
   652     /* ressources */
       
   653     if (cctx==NULL) JOB_ERROR(ERROR(memory_allocation));
       
   654     if (dstBuff.start == NULL) {   /* streaming job : doesn't provide a dstBuffer */
       
   655         dstBuff = ZSTDMT_getBuffer(job->bufPool);
       
   656         if (dstBuff.start==NULL) JOB_ERROR(ERROR(memory_allocation));
       
   657         job->dstBuff = dstBuff;   /* this value can be read in ZSTDMT_flush, when it copies the whole job */
       
   658     }
       
   659     if (jobParams.ldmParams.enableLdm && rawSeqStore.seq == NULL)
       
   660         JOB_ERROR(ERROR(memory_allocation));
   604 
   661 
   605     /* Don't compute the checksum for chunks, since we compute it externally,
   662     /* Don't compute the checksum for chunks, since we compute it externally,
   606      * but write it in the header.
   663      * but write it in the header.
   607      */
   664      */
   608     if (job->jobID != 0) jobParams.fParams.checksumFlag = 0;
   665     if (job->jobID != 0) jobParams.fParams.checksumFlag = 0;
   609     /* Don't run LDM for the chunks, since we handle it externally */
   666     /* Don't run LDM for the chunks, since we handle it externally */
   610     jobParams.ldmParams.enableLdm = 0;
   667     jobParams.ldmParams.enableLdm = 0;
   611 
   668 
   612     /* ressources */
       
   613     if (cctx==NULL) {
       
   614         job->cSize = ERROR(memory_allocation);
       
   615         goto _endJob;
       
   616     }
       
   617     if (dstBuff.start == NULL) {   /* streaming job : doesn't provide a dstBuffer */
       
   618         dstBuff = ZSTDMT_getBuffer(job->bufPool);
       
   619         if (dstBuff.start==NULL) {
       
   620             job->cSize = ERROR(memory_allocation);
       
   621             goto _endJob;
       
   622         }
       
   623         job->dstBuff = dstBuff;   /* this value can be read in ZSTDMT_flush, when it copies the whole job */
       
   624     }
       
   625 
   669 
   626     /* init */
   670     /* init */
   627     if (job->cdict) {
   671     if (job->cdict) {
   628         size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, job->cdict, jobParams, job->fullFrameSize);
   672         size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, ZSTD_dtlm_fast, job->cdict, jobParams, job->fullFrameSize);
   629         assert(job->firstJob);  /* only allowed for first job */
   673         assert(job->firstJob);  /* only allowed for first job */
   630         if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
   674         if (ZSTD_isError(initError)) JOB_ERROR(initError);
   631     } else {  /* srcStart points at reloaded section */
   675     } else {  /* srcStart points at reloaded section */
   632         U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->src.size;
   676         U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->src.size;
   633         {   size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstJob);
   677         {   size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstJob);
   634             if (ZSTD_isError(forceWindowError)) {
   678             if (ZSTD_isError(forceWindowError)) JOB_ERROR(forceWindowError);
   635                 job->cSize = forceWindowError;
   679         }
   636                 goto _endJob;
       
   637         }   }
       
   638         {   size_t const initError = ZSTD_compressBegin_advanced_internal(cctx,
   680         {   size_t const initError = ZSTD_compressBegin_advanced_internal(cctx,
   639                                         job->prefix.start, job->prefix.size, ZSTD_dct_rawContent, /* load dictionary in "content-only" mode (no header analysis) */
   681                                         job->prefix.start, job->prefix.size, ZSTD_dct_rawContent, /* load dictionary in "content-only" mode (no header analysis) */
       
   682                                         ZSTD_dtlm_fast,
   640                                         NULL, /*cdict*/
   683                                         NULL, /*cdict*/
   641                                         jobParams, pledgedSrcSize);
   684                                         jobParams, pledgedSrcSize);
   642             if (ZSTD_isError(initError)) {
   685             if (ZSTD_isError(initError)) JOB_ERROR(initError);
   643                 job->cSize = initError;
   686     }   }
   644                 goto _endJob;
       
   645     }   }   }
       
   646 
   687 
   647     /* Perform serial step as early as possible, but after CCtx initialization */
   688     /* Perform serial step as early as possible, but after CCtx initialization */
   648     ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID);
   689     ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID);
   649 
   690 
   650     if (!job->firstJob) {  /* flush and overwrite frame header when it's not first job */
   691     if (!job->firstJob) {  /* flush and overwrite frame header when it's not first job */
   651         size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0);
   692         size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0);
   652         if (ZSTD_isError(hSize)) { job->cSize = hSize; /* save error code */ goto _endJob; }
   693         if (ZSTD_isError(hSize)) JOB_ERROR(hSize);
   653         DEBUGLOG(5, "ZSTDMT_compressionJob: flush and overwrite %u bytes of frame header (not first job)", (U32)hSize);
   694         DEBUGLOG(5, "ZSTDMT_compressionJob: flush and overwrite %u bytes of frame header (not first job)", (U32)hSize);
   654         ZSTD_invalidateRepCodes(cctx);
   695         ZSTD_invalidateRepCodes(cctx);
   655     }
   696     }
   656 
   697 
   657     /* compress */
   698     /* compress */
   665         if (sizeof(size_t) > sizeof(int)) assert(job->src.size < ((size_t)INT_MAX) * chunkSize);   /* check overflow */
   706         if (sizeof(size_t) > sizeof(int)) assert(job->src.size < ((size_t)INT_MAX) * chunkSize);   /* check overflow */
   666         DEBUGLOG(5, "ZSTDMT_compressionJob: compress %u bytes in %i blocks", (U32)job->src.size, nbChunks);
   707         DEBUGLOG(5, "ZSTDMT_compressionJob: compress %u bytes in %i blocks", (U32)job->src.size, nbChunks);
   667         assert(job->cSize == 0);
   708         assert(job->cSize == 0);
   668         for (chunkNb = 1; chunkNb < nbChunks; chunkNb++) {
   709         for (chunkNb = 1; chunkNb < nbChunks; chunkNb++) {
   669             size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, chunkSize);
   710             size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, chunkSize);
   670             if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
   711             if (ZSTD_isError(cSize)) JOB_ERROR(cSize);
   671             ip += chunkSize;
   712             ip += chunkSize;
   672             op += cSize; assert(op < oend);
   713             op += cSize; assert(op < oend);
   673             /* stats */
   714             /* stats */
   674             ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
   715             ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
   675             job->cSize += cSize;
   716             job->cSize += cSize;
   678                         (U32)cSize, (U32)job->cSize);
   719                         (U32)cSize, (U32)job->cSize);
   679             ZSTD_pthread_cond_signal(&job->job_cond);   /* warns some more data is ready to be flushed */
   720             ZSTD_pthread_cond_signal(&job->job_cond);   /* warns some more data is ready to be flushed */
   680             ZSTD_pthread_mutex_unlock(&job->job_mutex);
   721             ZSTD_pthread_mutex_unlock(&job->job_mutex);
   681         }
   722         }
   682         /* last block */
   723         /* last block */
   683         assert(chunkSize > 0); assert((chunkSize & (chunkSize - 1)) == 0);  /* chunkSize must be power of 2 for mask==(chunkSize-1) to work */
   724         assert(chunkSize > 0);
       
   725         assert((chunkSize & (chunkSize - 1)) == 0);  /* chunkSize must be power of 2 for mask==(chunkSize-1) to work */
   684         if ((nbChunks > 0) | job->lastJob /*must output a "last block" flag*/ ) {
   726         if ((nbChunks > 0) | job->lastJob /*must output a "last block" flag*/ ) {
   685             size_t const lastBlockSize1 = job->src.size & (chunkSize-1);
   727             size_t const lastBlockSize1 = job->src.size & (chunkSize-1);
   686             size_t const lastBlockSize = ((lastBlockSize1==0) & (job->src.size>=chunkSize)) ? chunkSize : lastBlockSize1;
   728             size_t const lastBlockSize = ((lastBlockSize1==0) & (job->src.size>=chunkSize)) ? chunkSize : lastBlockSize1;
   687             size_t const cSize = (job->lastJob) ?
   729             size_t const cSize = (job->lastJob) ?
   688                  ZSTD_compressEnd     (cctx, op, oend-op, ip, lastBlockSize) :
   730                  ZSTD_compressEnd     (cctx, op, oend-op, ip, lastBlockSize) :
   689                  ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize);
   731                  ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize);
   690             if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
   732             if (ZSTD_isError(cSize)) JOB_ERROR(cSize);
   691             /* stats */
   733             lastCBlockSize = cSize;
   692             ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
       
   693             job->cSize += cSize;
       
   694             ZSTD_pthread_mutex_unlock(&job->job_mutex);
       
   695     }   }
   734     }   }
   696 
   735 
   697 _endJob:
   736 _endJob:
   698     ZSTDMT_serialState_ensureFinished(job->serial, job->jobID, job->cSize);
   737     ZSTDMT_serialState_ensureFinished(job->serial, job->jobID, job->cSize);
   699     if (job->prefix.size > 0)
   738     if (job->prefix.size > 0)
   702     /* release resources */
   741     /* release resources */
   703     ZSTDMT_releaseSeq(job->seqPool, rawSeqStore);
   742     ZSTDMT_releaseSeq(job->seqPool, rawSeqStore);
   704     ZSTDMT_releaseCCtx(job->cctxPool, cctx);
   743     ZSTDMT_releaseCCtx(job->cctxPool, cctx);
   705     /* report */
   744     /* report */
   706     ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
   745     ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
   707     job->consumed = job->src.size;
   746     if (ZSTD_isError(job->cSize)) assert(lastCBlockSize == 0);
       
   747     job->cSize += lastCBlockSize;
       
   748     job->consumed = job->src.size;  /* when job->consumed == job->src.size , compression job is presumed completed */
   708     ZSTD_pthread_cond_signal(&job->job_cond);
   749     ZSTD_pthread_cond_signal(&job->job_cond);
   709     ZSTD_pthread_mutex_unlock(&job->job_mutex);
   750     ZSTD_pthread_mutex_unlock(&job->job_mutex);
   710 }
   751 }
   711 
   752 
   712 
   753 
   743     ZSTDMT_CCtxPool* cctxPool;
   784     ZSTDMT_CCtxPool* cctxPool;
   744     ZSTDMT_seqPool* seqPool;
   785     ZSTDMT_seqPool* seqPool;
   745     ZSTD_CCtx_params params;
   786     ZSTD_CCtx_params params;
   746     size_t targetSectionSize;
   787     size_t targetSectionSize;
   747     size_t targetPrefixSize;
   788     size_t targetPrefixSize;
       
   789     int jobReady;        /* 1 => one job is already prepared, but pool has shortage of workers. Don't create a new job. */
       
   790     inBuff_t inBuff;
   748     roundBuff_t roundBuff;
   791     roundBuff_t roundBuff;
   749     inBuff_t inBuff;
       
   750     int jobReady;        /* 1 => one job is already prepared, but pool has shortage of workers. Don't create another one. */
       
   751     serialState_t serial;
   792     serialState_t serial;
   752     unsigned singleBlockingThread;
   793     unsigned singleBlockingThread;
   753     unsigned jobIDMask;
   794     unsigned jobIDMask;
   754     unsigned doneJobID;
   795     unsigned doneJobID;
   755     unsigned nextJobID;
   796     unsigned nextJobID;
   795         ZSTDMT_freeJobsTable(jobTable, nbJobs, cMem);
   836         ZSTDMT_freeJobsTable(jobTable, nbJobs, cMem);
   796         return NULL;
   837         return NULL;
   797     }
   838     }
   798     return jobTable;
   839     return jobTable;
   799 }
   840 }
       
   841 
       
   842 static size_t ZSTDMT_expandJobsTable (ZSTDMT_CCtx* mtctx, U32 nbWorkers) {
       
   843     U32 nbJobs = nbWorkers + 2;
       
   844     if (nbJobs > mtctx->jobIDMask+1) {  /* need more job capacity */
       
   845         ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem);
       
   846         mtctx->jobIDMask = 0;
       
   847         mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, mtctx->cMem);
       
   848         if (mtctx->jobs==NULL) return ERROR(memory_allocation);
       
   849         assert((nbJobs != 0) && ((nbJobs & (nbJobs - 1)) == 0));  /* ensure nbJobs is a power of 2 */
       
   850         mtctx->jobIDMask = nbJobs - 1;
       
   851     }
       
   852     return 0;
       
   853 }
       
   854 
   800 
   855 
   801 /* ZSTDMT_CCtxParam_setNbWorkers():
   856 /* ZSTDMT_CCtxParam_setNbWorkers():
   802  * Internal use only */
   857  * Internal use only */
   803 size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers)
   858 size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers)
   804 {
   859 {
   873     DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted");
   928     DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted");
   874     while (mtctx->doneJobID < mtctx->nextJobID) {
   929     while (mtctx->doneJobID < mtctx->nextJobID) {
   875         unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask;
   930         unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask;
   876         ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex);
   931         ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex);
   877         while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].src.size) {
   932         while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].src.size) {
   878             DEBUGLOG(5, "waiting for jobCompleted signal from job %u", mtctx->doneJobID);   /* we want to block when waiting for data to flush */
   933             DEBUGLOG(4, "waiting for jobCompleted signal from job %u", mtctx->doneJobID);   /* we want to block when waiting for data to flush */
   879             ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex);
   934             ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex);
   880         }
   935         }
   881         ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex);
   936         ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex);
   882         mtctx->doneJobID++;
   937         mtctx->doneJobID++;
   883     }
   938     }
   922     case ZSTDMT_p_jobSize :
   977     case ZSTDMT_p_jobSize :
   923         DEBUGLOG(4, "ZSTDMT_CCtxParam_setMTCtxParameter : set jobSize to %u", value);
   978         DEBUGLOG(4, "ZSTDMT_CCtxParam_setMTCtxParameter : set jobSize to %u", value);
   924         if ( (value > 0)  /* value==0 => automatic job size */
   979         if ( (value > 0)  /* value==0 => automatic job size */
   925            & (value < ZSTDMT_JOBSIZE_MIN) )
   980            & (value < ZSTDMT_JOBSIZE_MIN) )
   926             value = ZSTDMT_JOBSIZE_MIN;
   981             value = ZSTDMT_JOBSIZE_MIN;
       
   982         if (value > ZSTDMT_JOBSIZE_MAX)
       
   983             value = ZSTDMT_JOBSIZE_MAX;
   927         params->jobSize = value;
   984         params->jobSize = value;
   928         return value;
   985         return value;
   929     case ZSTDMT_p_overlapSectionLog :
   986     case ZSTDMT_p_overlapSectionLog :
   930         if (value > 9) value = 9;
   987         if (value > 9) value = 9;
   931         DEBUGLOG(4, "ZSTDMT_p_overlapSectionLog : %u", value);
   988         DEBUGLOG(4, "ZSTDMT_p_overlapSectionLog : %u", value);
   948     default :
  1005     default :
   949         return ERROR(parameter_unsupported);
  1006         return ERROR(parameter_unsupported);
   950     }
  1007     }
   951 }
  1008 }
   952 
  1009 
       
  1010 size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned* value)
       
  1011 {
       
  1012     switch (parameter) {
       
  1013     case ZSTDMT_p_jobSize:
       
  1014         *value = mtctx->params.jobSize;
       
  1015         break;
       
  1016     case ZSTDMT_p_overlapSectionLog:
       
  1017         *value = mtctx->params.overlapSizeLog;
       
  1018         break;
       
  1019     default:
       
  1020         return ERROR(parameter_unsupported);
       
  1021     }
       
  1022     return 0;
       
  1023 }
       
  1024 
   953 /* Sets parameters relevant to the compression job,
  1025 /* Sets parameters relevant to the compression job,
   954  * initializing others to default values. */
  1026  * initializing others to default values. */
   955 static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
  1027 static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
   956 {
  1028 {
   957     ZSTD_CCtx_params jobParams;
  1029     ZSTD_CCtx_params jobParams;
   958     memset(&jobParams, 0, sizeof(jobParams));
  1030     memset(&jobParams, 0, sizeof(jobParams));
   959 
  1031 
   960     jobParams.cParams = params.cParams;
  1032     jobParams.cParams = params.cParams;
   961     jobParams.fParams = params.fParams;
  1033     jobParams.fParams = params.fParams;
   962     jobParams.compressionLevel = params.compressionLevel;
  1034     jobParams.compressionLevel = params.compressionLevel;
   963     jobParams.disableLiteralCompression = params.disableLiteralCompression;
       
   964 
  1035 
   965     return jobParams;
  1036     return jobParams;
   966 }
  1037 }
   967 
  1038 
       
  1039 
       
  1040 /* ZSTDMT_resize() :
       
  1041  * @return : error code if fails, 0 on success */
       
  1042 static size_t ZSTDMT_resize(ZSTDMT_CCtx* mtctx, unsigned nbWorkers)
       
  1043 {
       
  1044     if (POOL_resize(mtctx->factory, nbWorkers)) return ERROR(memory_allocation);
       
  1045     CHECK_F( ZSTDMT_expandJobsTable(mtctx, nbWorkers) );
       
  1046     mtctx->bufPool = ZSTDMT_expandBufferPool(mtctx->bufPool, nbWorkers);
       
  1047     if (mtctx->bufPool == NULL) return ERROR(memory_allocation);
       
  1048     mtctx->cctxPool = ZSTDMT_expandCCtxPool(mtctx->cctxPool, nbWorkers);
       
  1049     if (mtctx->cctxPool == NULL) return ERROR(memory_allocation);
       
  1050     mtctx->seqPool = ZSTDMT_expandSeqPool(mtctx->seqPool, nbWorkers);
       
  1051     if (mtctx->seqPool == NULL) return ERROR(memory_allocation);
       
  1052     ZSTDMT_CCtxParam_setNbWorkers(&mtctx->params, nbWorkers);
       
  1053     return 0;
       
  1054 }
       
  1055 
       
  1056 
   968 /*! ZSTDMT_updateCParams_whileCompressing() :
  1057 /*! ZSTDMT_updateCParams_whileCompressing() :
   969  *  Updates only a selected set of compression parameters, to remain compatible with current frame.
  1058  *  Updates a selected set of compression parameters, remaining compatible with currently active frame.
   970  *  New parameters will be applied to next compression job. */
  1059  *  New parameters will be applied to next compression job. */
   971 void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* cctxParams)
  1060 void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* cctxParams)
   972 {
  1061 {
   973     U32 const saved_wlog = mtctx->params.cParams.windowLog;   /* Do not modify windowLog while compressing */
  1062     U32 const saved_wlog = mtctx->params.cParams.windowLog;   /* Do not modify windowLog while compressing */
   974     int const compressionLevel = cctxParams->compressionLevel;
  1063     int const compressionLevel = cctxParams->compressionLevel;
   979         cParams.windowLog = saved_wlog;
  1068         cParams.windowLog = saved_wlog;
   980         mtctx->params.cParams = cParams;
  1069         mtctx->params.cParams = cParams;
   981     }
  1070     }
   982 }
  1071 }
   983 
  1072 
   984 /* ZSTDMT_getNbWorkers():
       
   985  * @return nb threads currently active in mtctx.
       
   986  * mtctx must be valid */
       
   987 unsigned ZSTDMT_getNbWorkers(const ZSTDMT_CCtx* mtctx)
       
   988 {
       
   989     assert(mtctx != NULL);
       
   990     return mtctx->params.nbWorkers;
       
   991 }
       
   992 
       
   993 /* ZSTDMT_getFrameProgression():
  1073 /* ZSTDMT_getFrameProgression():
   994  * tells how much data has been consumed (input) and produced (output) for current frame.
  1074  * tells how much data has been consumed (input) and produced (output) for current frame.
   995  * able to count progression inside worker threads.
  1075  * able to count progression inside worker threads.
   996  * Note : mutex will be acquired during statistics collection. */
  1076  * Note : mutex will be acquired during statistics collection inside workers. */
   997 ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
  1077 ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
   998 {
  1078 {
   999     ZSTD_frameProgression fps;
  1079     ZSTD_frameProgression fps;
  1000     DEBUGLOG(6, "ZSTDMT_getFrameProgression");
  1080     DEBUGLOG(5, "ZSTDMT_getFrameProgression");
       
  1081     fps.ingested = mtctx->consumed + mtctx->inBuff.filled;
  1001     fps.consumed = mtctx->consumed;
  1082     fps.consumed = mtctx->consumed;
  1002     fps.produced = mtctx->produced;
  1083     fps.produced = fps.flushed = mtctx->produced;
  1003     fps.ingested = mtctx->consumed + mtctx->inBuff.filled;
  1084     fps.currentJobID = mtctx->nextJobID;
       
  1085     fps.nbActiveWorkers = 0;
  1004     {   unsigned jobNb;
  1086     {   unsigned jobNb;
  1005         unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1);
  1087         unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1);
  1006         DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)",
  1088         DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)",
  1007                     mtctx->doneJobID, lastJobNb, mtctx->jobReady)
  1089                     mtctx->doneJobID, lastJobNb, mtctx->jobReady)
  1008         for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) {
  1090         for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) {
  1009             unsigned const wJobID = jobNb & mtctx->jobIDMask;
  1091             unsigned const wJobID = jobNb & mtctx->jobIDMask;
  1010             ZSTD_pthread_mutex_lock(&mtctx->jobs[wJobID].job_mutex);
  1092             ZSTDMT_jobDescription* jobPtr = &mtctx->jobs[wJobID];
  1011             {   size_t const cResult = mtctx->jobs[wJobID].cSize;
  1093             ZSTD_pthread_mutex_lock(&jobPtr->job_mutex);
       
  1094             {   size_t const cResult = jobPtr->cSize;
  1012                 size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
  1095                 size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
  1013                 fps.consumed += mtctx->jobs[wJobID].consumed;
  1096                 size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed;
  1014                 fps.ingested += mtctx->jobs[wJobID].src.size;
  1097                 assert(flushed <= produced);
       
  1098                 fps.ingested += jobPtr->src.size;
       
  1099                 fps.consumed += jobPtr->consumed;
  1015                 fps.produced += produced;
  1100                 fps.produced += produced;
       
  1101                 fps.flushed  += flushed;
       
  1102                 fps.nbActiveWorkers += (jobPtr->consumed < jobPtr->src.size);
  1016             }
  1103             }
  1017             ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
  1104             ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
  1018         }
  1105         }
  1019     }
  1106     }
  1020     return fps;
  1107     return fps;
       
  1108 }
       
  1109 
       
  1110 
       
  1111 size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx)
       
  1112 {
       
  1113     size_t toFlush;
       
  1114     unsigned const jobID = mtctx->doneJobID;
       
  1115     assert(jobID <= mtctx->nextJobID);
       
  1116     if (jobID == mtctx->nextJobID) return 0;   /* no active job => nothing to flush */
       
  1117 
       
  1118     /* look into oldest non-fully-flushed job */
       
  1119     {   unsigned const wJobID = jobID & mtctx->jobIDMask;
       
  1120         ZSTDMT_jobDescription* const jobPtr = &mtctx->jobs[wJobID];
       
  1121         ZSTD_pthread_mutex_lock(&jobPtr->job_mutex);
       
  1122         {   size_t const cResult = jobPtr->cSize;
       
  1123             size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
       
  1124             size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed;
       
  1125             assert(flushed <= produced);
       
  1126             toFlush = produced - flushed;
       
  1127             if (toFlush==0 && (jobPtr->consumed >= jobPtr->src.size)) {
       
  1128                 /* doneJobID is not-fully-flushed, but toFlush==0 : doneJobID should be compressing some more data */
       
  1129                 assert(jobPtr->consumed < jobPtr->src.size);
       
  1130             }
       
  1131         }
       
  1132         ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
       
  1133     }
       
  1134 
       
  1135     return toFlush;
  1021 }
  1136 }
  1022 
  1137 
  1023 
  1138 
  1024 /* ------------------------------------------ */
  1139 /* ------------------------------------------ */
  1025 /* =====   Multi-threaded compression   ===== */
  1140 /* =====   Multi-threaded compression   ===== */
  1085         return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, jobParams);
  1200         return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, jobParams);
  1086     }
  1201     }
  1087 
  1202 
  1088     assert(avgJobSize >= 256 KB);  /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */
  1203     assert(avgJobSize >= 256 KB);  /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */
  1089     ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) );
  1204     ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) );
  1090     if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params))
  1205     if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, avgJobSize))
  1091         return ERROR(memory_allocation);
  1206         return ERROR(memory_allocation);
  1092 
  1207 
  1093     if (nbJobs > mtctx->jobIDMask+1) {  /* enlarge job table */
  1208     CHECK_F( ZSTDMT_expandJobsTable(mtctx, nbJobs) );  /* only expands if necessary */
  1094         U32 jobsTableSize = nbJobs;
       
  1095         ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem);
       
  1096         mtctx->jobIDMask = 0;
       
  1097         mtctx->jobs = ZSTDMT_createJobsTable(&jobsTableSize, mtctx->cMem);
       
  1098         if (mtctx->jobs==NULL) return ERROR(memory_allocation);
       
  1099         assert((jobsTableSize != 0) && ((jobsTableSize & (jobsTableSize - 1)) == 0));  /* ensure jobsTableSize is a power of 2 */
       
  1100         mtctx->jobIDMask = jobsTableSize - 1;
       
  1101     }
       
  1102 
  1209 
  1103     {   unsigned u;
  1210     {   unsigned u;
  1104         for (u=0; u<nbJobs; u++) {
  1211         for (u=0; u<nbJobs; u++) {
  1105             size_t const jobSize = MIN(remainingSrcSize, avgJobSize);
  1212             size_t const jobSize = MIN(remainingSrcSize, avgJobSize);
  1106             size_t const dstBufferCapacity = ZSTD_compressBound(jobSize);
  1213             size_t const dstBufferCapacity = ZSTD_compressBound(jobSize);
  1219         ZSTDMT_CCtx* mtctx,
  1326         ZSTDMT_CCtx* mtctx,
  1220         const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType,
  1327         const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType,
  1221         const ZSTD_CDict* cdict, ZSTD_CCtx_params params,
  1328         const ZSTD_CDict* cdict, ZSTD_CCtx_params params,
  1222         unsigned long long pledgedSrcSize)
  1329         unsigned long long pledgedSrcSize)
  1223 {
  1330 {
  1224     DEBUGLOG(4, "ZSTDMT_initCStream_internal (pledgedSrcSize=%u, nbWorkers=%u, cctxPool=%u, disableLiteralCompression=%i)",
  1331     DEBUGLOG(4, "ZSTDMT_initCStream_internal (pledgedSrcSize=%u, nbWorkers=%u, cctxPool=%u)",
  1225                 (U32)pledgedSrcSize, params.nbWorkers, mtctx->cctxPool->totalCCtx, params.disableLiteralCompression);
  1332                 (U32)pledgedSrcSize, params.nbWorkers, mtctx->cctxPool->totalCCtx);
  1226     /* params are supposed to be fully validated at this point */
  1333 
       
  1334     /* params supposed partially fully validated at this point */
  1227     assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams)));
  1335     assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams)));
  1228     assert(!((dict) && (cdict)));  /* either dict or cdict, not both */
  1336     assert(!((dict) && (cdict)));  /* either dict or cdict, not both */
  1229     assert(mtctx->cctxPool->totalCCtx == params.nbWorkers);
       
  1230 
  1337 
  1231     /* init */
  1338     /* init */
  1232     if (params.jobSize == 0) {
  1339     if (params.nbWorkers != mtctx->params.nbWorkers)
  1233         params.jobSize = 1U << ZSTDMT_computeTargetJobLog(params);
  1340         CHECK_F( ZSTDMT_resize(mtctx, params.nbWorkers) );
  1234     }
  1341 
       
  1342     if (params.jobSize > 0 && params.jobSize < ZSTDMT_JOBSIZE_MIN) params.jobSize = ZSTDMT_JOBSIZE_MIN;
  1235     if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX;
  1343     if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX;
  1236 
  1344 
  1237     mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN);  /* do not trigger multi-threading when srcSize is too small */
  1345     mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN);  /* do not trigger multi-threading when srcSize is too small */
  1238     if (mtctx->singleBlockingThread) {
  1346     if (mtctx->singleBlockingThread) {
  1239         ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(params);
  1347         ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(params);
  1268     }
  1376     }
  1269 
  1377 
  1270     mtctx->targetPrefixSize = (size_t)1 << ZSTDMT_computeOverlapLog(params);
  1378     mtctx->targetPrefixSize = (size_t)1 << ZSTDMT_computeOverlapLog(params);
  1271     DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(mtctx->targetPrefixSize>>10));
  1379     DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(mtctx->targetPrefixSize>>10));
  1272     mtctx->targetSectionSize = params.jobSize;
  1380     mtctx->targetSectionSize = params.jobSize;
  1273     if (mtctx->targetSectionSize < ZSTDMT_JOBSIZE_MIN) mtctx->targetSectionSize = ZSTDMT_JOBSIZE_MIN;
  1381     if (mtctx->targetSectionSize == 0) {
       
  1382         mtctx->targetSectionSize = 1ULL << ZSTDMT_computeTargetJobLog(params);
       
  1383     }
  1274     if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize;  /* job size must be >= overlap size */
  1384     if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize;  /* job size must be >= overlap size */
  1275     DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize);
  1385     DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize);
  1276     DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10));
  1386     DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10));
  1277     ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetSectionSize));
  1387     ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetSectionSize));
  1278     {
  1388     {
  1310     mtctx->nextJobID = 0;
  1420     mtctx->nextJobID = 0;
  1311     mtctx->frameEnded = 0;
  1421     mtctx->frameEnded = 0;
  1312     mtctx->allJobsCompleted = 0;
  1422     mtctx->allJobsCompleted = 0;
  1313     mtctx->consumed = 0;
  1423     mtctx->consumed = 0;
  1314     mtctx->produced = 0;
  1424     mtctx->produced = 0;
  1315     if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params))
  1425     if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, mtctx->targetSectionSize))
  1316         return ERROR(memory_allocation);
  1426         return ERROR(memory_allocation);
  1317     return 0;
  1427     return 0;
  1318 }
  1428 }
  1319 
  1429 
  1320 size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx,
  1430 size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx,
  1418         mtctx->jobs[jobID].seqPool = mtctx->seqPool;
  1528         mtctx->jobs[jobID].seqPool = mtctx->seqPool;
  1419         mtctx->jobs[jobID].serial = &mtctx->serial;
  1529         mtctx->jobs[jobID].serial = &mtctx->serial;
  1420         mtctx->jobs[jobID].jobID = mtctx->nextJobID;
  1530         mtctx->jobs[jobID].jobID = mtctx->nextJobID;
  1421         mtctx->jobs[jobID].firstJob = (mtctx->nextJobID==0);
  1531         mtctx->jobs[jobID].firstJob = (mtctx->nextJobID==0);
  1422         mtctx->jobs[jobID].lastJob = endFrame;
  1532         mtctx->jobs[jobID].lastJob = endFrame;
  1423         mtctx->jobs[jobID].frameChecksumNeeded = endFrame && (mtctx->nextJobID>0) && mtctx->params.fParams.checksumFlag;
  1533         mtctx->jobs[jobID].frameChecksumNeeded = mtctx->params.fParams.checksumFlag && endFrame && (mtctx->nextJobID>0);
  1424         mtctx->jobs[jobID].dstFlushed = 0;
  1534         mtctx->jobs[jobID].dstFlushed = 0;
  1425 
  1535 
  1426         /* Update the round buffer pos and clear the input buffer to be reset */
  1536         /* Update the round buffer pos and clear the input buffer to be reset */
  1427         mtctx->roundBuff.pos += srcSize;
  1537         mtctx->roundBuff.pos += srcSize;
  1428         mtctx->inBuff.buffer = g_nullBuffer;
  1538         mtctx->inBuff.buffer = g_nullBuffer;
  1466     return 0;
  1576     return 0;
  1467 }
  1577 }
  1468 
  1578 
  1469 
  1579 
  1470 /*! ZSTDMT_flushProduced() :
  1580 /*! ZSTDMT_flushProduced() :
       
  1581  *  flush whatever data has been produced but not yet flushed in current job.
       
  1582  *  move to next job if current one is fully flushed.
  1471  * `output` : `pos` will be updated with amount of data flushed .
  1583  * `output` : `pos` will be updated with amount of data flushed .
  1472  * `blockToFlush` : if >0, the function will block and wait if there is no data available to flush .
  1584  * `blockToFlush` : if >0, the function will block and wait if there is no data available to flush .
  1473  * @return : amount of data remaining within internal buffer, 0 if no more, 1 if unknown but > 0, or an error code */
  1585  * @return : amount of data remaining within internal buffer, 0 if no more, 1 if unknown but > 0, or an error code */
  1474 static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned blockToFlush, ZSTD_EndDirective end)
  1586 static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned blockToFlush, ZSTD_EndDirective end)
  1475 {
  1587 {
  1494     }   }
  1606     }   }
  1495 
  1607 
  1496     /* try to flush something */
  1608     /* try to flush something */
  1497     {   size_t cSize = mtctx->jobs[wJobID].cSize;                  /* shared */
  1609     {   size_t cSize = mtctx->jobs[wJobID].cSize;                  /* shared */
  1498         size_t const srcConsumed = mtctx->jobs[wJobID].consumed;   /* shared */
  1610         size_t const srcConsumed = mtctx->jobs[wJobID].consumed;   /* shared */
  1499         size_t const srcSize = mtctx->jobs[wJobID].src.size;        /* read-only, could be done after mutex lock, but no-declaration-after-statement */
  1611         size_t const srcSize = mtctx->jobs[wJobID].src.size;       /* read-only, could be done after mutex lock, but no-declaration-after-statement */
  1500         ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
  1612         ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
  1501         if (ZSTD_isError(cSize)) {
  1613         if (ZSTD_isError(cSize)) {
  1502             DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s",
  1614             DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s",
  1503                         mtctx->doneJobID, ZSTD_getErrorName(cSize));
  1615                         mtctx->doneJobID, ZSTD_getErrorName(cSize));
  1504             ZSTDMT_waitForAllJobsCompleted(mtctx);
  1616             ZSTDMT_waitForAllJobsCompleted(mtctx);
  1514             MEM_writeLE32((char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].cSize, checksum);
  1626             MEM_writeLE32((char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].cSize, checksum);
  1515             cSize += 4;
  1627             cSize += 4;
  1516             mtctx->jobs[wJobID].cSize += 4;  /* can write this shared value, as worker is no longer active */
  1628             mtctx->jobs[wJobID].cSize += 4;  /* can write this shared value, as worker is no longer active */
  1517             mtctx->jobs[wJobID].frameChecksumNeeded = 0;
  1629             mtctx->jobs[wJobID].frameChecksumNeeded = 0;
  1518         }
  1630         }
       
  1631 
  1519         if (cSize > 0) {   /* compression is ongoing or completed */
  1632         if (cSize > 0) {   /* compression is ongoing or completed */
  1520             size_t const toFlush = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos);
  1633             size_t const toFlush = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos);
  1521             DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u, generated:%u)",
  1634             DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u, generated:%u)",
  1522                         (U32)toFlush, mtctx->doneJobID, (U32)srcConsumed, (U32)srcSize, (U32)cSize);
  1635                         (U32)toFlush, mtctx->doneJobID, (U32)srcConsumed, (U32)srcSize, (U32)cSize);
  1523             assert(mtctx->doneJobID < mtctx->nextJobID);
  1636             assert(mtctx->doneJobID < mtctx->nextJobID);
  1527                    (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed,
  1640                    (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed,
  1528                    toFlush);
  1641                    toFlush);
  1529             output->pos += toFlush;
  1642             output->pos += toFlush;
  1530             mtctx->jobs[wJobID].dstFlushed += toFlush;  /* can write : this value is only used by mtctx */
  1643             mtctx->jobs[wJobID].dstFlushed += toFlush;  /* can write : this value is only used by mtctx */
  1531 
  1644 
  1532             if ( (srcConsumed == srcSize)    /* job completed */
  1645             if ( (srcConsumed == srcSize)    /* job is completed */
  1533               && (mtctx->jobs[wJobID].dstFlushed == cSize) ) {   /* output buffer fully flushed => free this job position */
  1646               && (mtctx->jobs[wJobID].dstFlushed == cSize) ) {   /* output buffer fully flushed => free this job position */
  1534                 DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
  1647                 DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
  1535                         mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);
  1648                         mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);
  1536                 ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff);
  1649                 ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff);
       
  1650                 DEBUGLOG(5, "dstBuffer released");
  1537                 mtctx->jobs[wJobID].dstBuff = g_nullBuffer;
  1651                 mtctx->jobs[wJobID].dstBuff = g_nullBuffer;
  1538                 mtctx->jobs[wJobID].cSize = 0;   /* ensure this job slot is considered "not started" in future check */
  1652                 mtctx->jobs[wJobID].cSize = 0;   /* ensure this job slot is considered "not started" in future check */
  1539                 mtctx->consumed += srcSize;
  1653                 mtctx->consumed += srcSize;
  1540                 mtctx->produced += cSize;
  1654                 mtctx->produced += cSize;
  1541                 mtctx->doneJobID++;
  1655                 mtctx->doneJobID++;
  1608 static int ZSTDMT_doesOverlapWindow(buffer_t buffer, ZSTD_window_t window)
  1722 static int ZSTDMT_doesOverlapWindow(buffer_t buffer, ZSTD_window_t window)
  1609 {
  1723 {
  1610     range_t extDict;
  1724     range_t extDict;
  1611     range_t prefix;
  1725     range_t prefix;
  1612 
  1726 
       
  1727     DEBUGLOG(5, "ZSTDMT_doesOverlapWindow");
  1613     extDict.start = window.dictBase + window.lowLimit;
  1728     extDict.start = window.dictBase + window.lowLimit;
  1614     extDict.size = window.dictLimit - window.lowLimit;
  1729     extDict.size = window.dictLimit - window.lowLimit;
  1615 
  1730 
  1616     prefix.start = window.base + window.dictLimit;
  1731     prefix.start = window.base + window.dictLimit;
  1617     prefix.size = window.nextSrc - (window.base + window.dictLimit);
  1732     prefix.size = window.nextSrc - (window.base + window.dictLimit);
  1628 
  1743 
  1629 static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, buffer_t buffer)
  1744 static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, buffer_t buffer)
  1630 {
  1745 {
  1631     if (mtctx->params.ldmParams.enableLdm) {
  1746     if (mtctx->params.ldmParams.enableLdm) {
  1632         ZSTD_pthread_mutex_t* mutex = &mtctx->serial.ldmWindowMutex;
  1747         ZSTD_pthread_mutex_t* mutex = &mtctx->serial.ldmWindowMutex;
       
  1748         DEBUGLOG(5, "ZSTDMT_waitForLdmComplete");
  1633         DEBUGLOG(5, "source  [0x%zx, 0x%zx)",
  1749         DEBUGLOG(5, "source  [0x%zx, 0x%zx)",
  1634                     (size_t)buffer.start,
  1750                     (size_t)buffer.start,
  1635                     (size_t)buffer.start + buffer.capacity);
  1751                     (size_t)buffer.start + buffer.capacity);
  1636         ZSTD_PTHREAD_MUTEX_LOCK(mutex);
  1752         ZSTD_PTHREAD_MUTEX_LOCK(mutex);
  1637         while (ZSTDMT_doesOverlapWindow(buffer, mtctx->serial.ldmWindow)) {
  1753         while (ZSTDMT_doesOverlapWindow(buffer, mtctx->serial.ldmWindow)) {
  1638             DEBUGLOG(6, "Waiting for LDM to finish...");
  1754             DEBUGLOG(5, "Waiting for LDM to finish...");
  1639             ZSTD_pthread_cond_wait(&mtctx->serial.ldmWindowCond, mutex);
  1755             ZSTD_pthread_cond_wait(&mtctx->serial.ldmWindowCond, mutex);
  1640         }
  1756         }
  1641         DEBUGLOG(6, "Done waiting for LDM to finish");
  1757         DEBUGLOG(6, "Done waiting for LDM to finish");
  1642         ZSTD_pthread_mutex_unlock(mutex);
  1758         ZSTD_pthread_mutex_unlock(mutex);
  1643     }
  1759     }
  1653     range_t const inUse = ZSTDMT_getInputDataInUse(mtctx);
  1769     range_t const inUse = ZSTDMT_getInputDataInUse(mtctx);
  1654     size_t const spaceLeft = mtctx->roundBuff.capacity - mtctx->roundBuff.pos;
  1770     size_t const spaceLeft = mtctx->roundBuff.capacity - mtctx->roundBuff.pos;
  1655     size_t const target = mtctx->targetSectionSize;
  1771     size_t const target = mtctx->targetSectionSize;
  1656     buffer_t buffer;
  1772     buffer_t buffer;
  1657 
  1773 
       
  1774     DEBUGLOG(5, "ZSTDMT_tryGetInputRange");
  1658     assert(mtctx->inBuff.buffer.start == NULL);
  1775     assert(mtctx->inBuff.buffer.start == NULL);
  1659     assert(mtctx->roundBuff.capacity >= target);
  1776     assert(mtctx->roundBuff.capacity >= target);
  1660 
  1777 
  1661     if (spaceLeft < target) {
  1778     if (spaceLeft < target) {
  1662         /* ZSTD_invalidateRepCodes() doesn't work for extDict variants.
  1779         /* ZSTD_invalidateRepCodes() doesn't work for extDict variants.
  1666         size_t const prefixSize = mtctx->inBuff.prefix.size;
  1783         size_t const prefixSize = mtctx->inBuff.prefix.size;
  1667 
  1784 
  1668         buffer.start = start;
  1785         buffer.start = start;
  1669         buffer.capacity = prefixSize;
  1786         buffer.capacity = prefixSize;
  1670         if (ZSTDMT_isOverlapped(buffer, inUse)) {
  1787         if (ZSTDMT_isOverlapped(buffer, inUse)) {
  1671             DEBUGLOG(6, "Waiting for buffer...");
  1788             DEBUGLOG(5, "Waiting for buffer...");
  1672             return 0;
  1789             return 0;
  1673         }
  1790         }
  1674         ZSTDMT_waitForLdmComplete(mtctx, buffer);
  1791         ZSTDMT_waitForLdmComplete(mtctx, buffer);
  1675         memmove(start, mtctx->inBuff.prefix.start, prefixSize);
  1792         memmove(start, mtctx->inBuff.prefix.start, prefixSize);
  1676         mtctx->inBuff.prefix.start = start;
  1793         mtctx->inBuff.prefix.start = start;
  1678     }
  1795     }
  1679     buffer.start = mtctx->roundBuff.buffer + mtctx->roundBuff.pos;
  1796     buffer.start = mtctx->roundBuff.buffer + mtctx->roundBuff.pos;
  1680     buffer.capacity = target;
  1797     buffer.capacity = target;
  1681 
  1798 
  1682     if (ZSTDMT_isOverlapped(buffer, inUse)) {
  1799     if (ZSTDMT_isOverlapped(buffer, inUse)) {
  1683         DEBUGLOG(6, "Waiting for buffer...");
  1800         DEBUGLOG(5, "Waiting for buffer...");
  1684         return 0;
  1801         return 0;
  1685     }
  1802     }
  1686     assert(!ZSTDMT_isOverlapped(buffer, mtctx->inBuff.prefix));
  1803     assert(!ZSTDMT_isOverlapped(buffer, mtctx->inBuff.prefix));
  1687 
  1804 
  1688     ZSTDMT_waitForLdmComplete(mtctx, buffer);
  1805     ZSTDMT_waitForLdmComplete(mtctx, buffer);
  1751             assert(mtctx->inBuff.filled == 0); /* Can't fill an empty buffer */
  1868             assert(mtctx->inBuff.filled == 0); /* Can't fill an empty buffer */
  1752             if (!ZSTDMT_tryGetInputRange(mtctx)) {
  1869             if (!ZSTDMT_tryGetInputRange(mtctx)) {
  1753                 /* It is only possible for this operation to fail if there are
  1870                 /* It is only possible for this operation to fail if there are
  1754                  * still compression jobs ongoing.
  1871                  * still compression jobs ongoing.
  1755                  */
  1872                  */
       
  1873                 DEBUGLOG(5, "ZSTDMT_tryGetInputRange failed");
  1756                 assert(mtctx->doneJobID != mtctx->nextJobID);
  1874                 assert(mtctx->doneJobID != mtctx->nextJobID);
  1757             }
  1875             } else
       
  1876                 DEBUGLOG(5, "ZSTDMT_tryGetInputRange completed successfully : mtctx->inBuff.buffer.start = %p", mtctx->inBuff.buffer.start);
  1758         }
  1877         }
  1759         if (mtctx->inBuff.buffer.start != NULL) {
  1878         if (mtctx->inBuff.buffer.start != NULL) {
  1760             size_t const toLoad = MIN(input->size - input->pos, mtctx->targetSectionSize - mtctx->inBuff.filled);
  1879             size_t const toLoad = MIN(input->size - input->pos, mtctx->targetSectionSize - mtctx->inBuff.filled);
  1761             assert(mtctx->inBuff.buffer.capacity >= mtctx->targetSectionSize);
  1880             assert(mtctx->inBuff.buffer.capacity >= mtctx->targetSectionSize);
  1762             DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u",
  1881             DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u",
  1780     }
  1899     }
  1781 
  1900 
  1782     /* check for potential compressed data ready to be flushed */
  1901     /* check for potential compressed data ready to be flushed */
  1783     {   size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp); /* block if there was no forward input progress */
  1902     {   size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp); /* block if there was no forward input progress */
  1784         if (input->pos < input->size) return MAX(remainingToFlush, 1);  /* input not consumed : do not end flush yet */
  1903         if (input->pos < input->size) return MAX(remainingToFlush, 1);  /* input not consumed : do not end flush yet */
       
  1904         DEBUGLOG(5, "end of ZSTDMT_compressStream_generic: remainingToFlush = %u", (U32)remainingToFlush);
  1785         return remainingToFlush;
  1905         return remainingToFlush;
  1786     }
  1906     }
  1787 }
  1907 }
  1788 
  1908 
  1789 
  1909