35 * Until then, comment the code out since it is unused. |
35 * Until then, comment the code out since it is unused. |
36 */ |
36 */ |
37 #define ZSTD_RESIZE_SEQPOOL 0 |
37 #define ZSTD_RESIZE_SEQPOOL 0 |
38 |
38 |
39 /* ====== Debug ====== */ |
39 /* ====== Debug ====== */ |
40 #if defined(ZSTD_DEBUG) && (ZSTD_DEBUG>=2) |
40 #if defined(DEBUGLEVEL) && (DEBUGLEVEL>=2) \ |
|
41 && !defined(_MSC_VER) \ |
|
42 && !defined(__MINGW32__) |
41 |
43 |
42 # include <stdio.h> |
44 # include <stdio.h> |
43 # include <unistd.h> |
45 # include <unistd.h> |
44 # include <sys/times.h> |
46 # include <sys/times.h> |
45 # define DEBUGLOGRAW(l, ...) if (l<=ZSTD_DEBUG) { fprintf(stderr, __VA_ARGS__); } |
|
46 |
47 |
47 # define DEBUG_PRINTHEX(l,p,n) { \ |
48 # define DEBUG_PRINTHEX(l,p,n) { \ |
48 unsigned debug_u; \ |
49 unsigned debug_u; \ |
49 for (debug_u=0; debug_u<(n); debug_u++) \ |
50 for (debug_u=0; debug_u<(n); debug_u++) \ |
50 DEBUGLOGRAW(l, "%02X ", ((const unsigned char*)(p))[debug_u]); \ |
51 RAWLOG(l, "%02X ", ((const unsigned char*)(p))[debug_u]); \ |
51 DEBUGLOGRAW(l, " \n"); \ |
52 RAWLOG(l, " \n"); \ |
52 } |
53 } |
53 |
54 |
54 static unsigned long long GetCurrentClockTimeMicroseconds(void) |
55 static unsigned long long GetCurrentClockTimeMicroseconds(void) |
55 { |
56 { |
56 static clock_t _ticksPerSecond = 0; |
57 static clock_t _ticksPerSecond = 0; |
156 { |
157 { |
157 ZSTD_pthread_mutex_lock(&bufPool->poolMutex); |
158 ZSTD_pthread_mutex_lock(&bufPool->poolMutex); |
158 DEBUGLOG(4, "ZSTDMT_setBufferSize: bSize = %u", (U32)bSize); |
159 DEBUGLOG(4, "ZSTDMT_setBufferSize: bSize = %u", (U32)bSize); |
159 bufPool->bufferSize = bSize; |
160 bufPool->bufferSize = bSize; |
160 ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); |
161 ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); |
|
162 } |
|
163 |
|
164 |
|
165 static ZSTDMT_bufferPool* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool* srcBufPool, U32 nbWorkers) |
|
166 { |
|
167 unsigned const maxNbBuffers = 2*nbWorkers + 3; |
|
168 if (srcBufPool==NULL) return NULL; |
|
169 if (srcBufPool->totalBuffers >= maxNbBuffers) /* good enough */ |
|
170 return srcBufPool; |
|
171 /* need a larger buffer pool */ |
|
172 { ZSTD_customMem const cMem = srcBufPool->cMem; |
|
173 size_t const bSize = srcBufPool->bufferSize; /* forward parameters */ |
|
174 ZSTDMT_bufferPool* newBufPool; |
|
175 ZSTDMT_freeBufferPool(srcBufPool); |
|
176 newBufPool = ZSTDMT_createBufferPool(nbWorkers, cMem); |
|
177 if (newBufPool==NULL) return newBufPool; |
|
178 ZSTDMT_setBufferSize(newBufPool, bSize); |
|
179 return newBufPool; |
|
180 } |
161 } |
181 } |
162 |
182 |
163 /** ZSTDMT_getBuffer() : |
183 /** ZSTDMT_getBuffer() : |
164 * assumption : bufPool must be valid |
184 * assumption : bufPool must be valid |
165 * @return : a buffer, with start pointer and size |
185 * @return : a buffer, with start pointer and size |
227 #endif |
247 #endif |
228 |
248 |
229 /* store buffer for later re-use, up to pool capacity */ |
249 /* store buffer for later re-use, up to pool capacity */ |
230 static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf) |
250 static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf) |
231 { |
251 { |
|
252 DEBUGLOG(5, "ZSTDMT_releaseBuffer"); |
232 if (buf.start == NULL) return; /* compatible with release on NULL */ |
253 if (buf.start == NULL) return; /* compatible with release on NULL */ |
233 DEBUGLOG(5, "ZSTDMT_releaseBuffer"); |
|
234 ZSTD_pthread_mutex_lock(&bufPool->poolMutex); |
254 ZSTD_pthread_mutex_lock(&bufPool->poolMutex); |
235 if (bufPool->nbBuffers < bufPool->totalBuffers) { |
255 if (bufPool->nbBuffers < bufPool->totalBuffers) { |
236 bufPool->bTable[bufPool->nbBuffers++] = buf; /* stored for later use */ |
256 bufPool->bTable[bufPool->nbBuffers++] = buf; /* stored for later use */ |
237 DEBUGLOG(5, "ZSTDMT_releaseBuffer: stored buffer of size %u in slot %u", |
257 DEBUGLOG(5, "ZSTDMT_releaseBuffer: stored buffer of size %u in slot %u", |
238 (U32)buf.capacity, (U32)(bufPool->nbBuffers-1)); |
258 (U32)buf.capacity, (U32)(bufPool->nbBuffers-1)); |
351 cctxPool->availCCtx = 1; /* at least one cctx for single-thread mode */ |
376 cctxPool->availCCtx = 1; /* at least one cctx for single-thread mode */ |
352 cctxPool->cctx[0] = ZSTD_createCCtx_advanced(cMem); |
377 cctxPool->cctx[0] = ZSTD_createCCtx_advanced(cMem); |
353 if (!cctxPool->cctx[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; } |
378 if (!cctxPool->cctx[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; } |
354 DEBUGLOG(3, "cctxPool created, with %u workers", nbWorkers); |
379 DEBUGLOG(3, "cctxPool created, with %u workers", nbWorkers); |
355 return cctxPool; |
380 return cctxPool; |
|
381 } |
|
382 |
|
383 static ZSTDMT_CCtxPool* ZSTDMT_expandCCtxPool(ZSTDMT_CCtxPool* srcPool, |
|
384 unsigned nbWorkers) |
|
385 { |
|
386 if (srcPool==NULL) return NULL; |
|
387 if (nbWorkers <= srcPool->totalCCtx) return srcPool; /* good enough */ |
|
388 /* need a larger cctx pool */ |
|
389 { ZSTD_customMem const cMem = srcPool->cMem; |
|
390 ZSTDMT_freeCCtxPool(srcPool); |
|
391 return ZSTDMT_createCCtxPool(nbWorkers, cMem); |
|
392 } |
356 } |
393 } |
357 |
394 |
358 /* only works during initialization phase, not during compression */ |
395 /* only works during initialization phase, not during compression */ |
359 static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool) |
396 static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool) |
360 { |
397 { |
423 ZSTD_pthread_mutex_t ldmWindowMutex; |
460 ZSTD_pthread_mutex_t ldmWindowMutex; |
424 ZSTD_pthread_cond_t ldmWindowCond; /* Signaled when ldmWindow is udpated */ |
461 ZSTD_pthread_cond_t ldmWindowCond; /* Signaled when ldmWindow is udpated */ |
425 ZSTD_window_t ldmWindow; /* A thread-safe copy of ldmState.window */ |
462 ZSTD_window_t ldmWindow; /* A thread-safe copy of ldmState.window */ |
426 } serialState_t; |
463 } serialState_t; |
427 |
464 |
428 static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* seqPool, ZSTD_CCtx_params params) |
465 static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* seqPool, ZSTD_CCtx_params params, size_t jobSize) |
429 { |
466 { |
430 /* Adjust parameters */ |
467 /* Adjust parameters */ |
431 if (params.ldmParams.enableLdm) { |
468 if (params.ldmParams.enableLdm) { |
432 DEBUGLOG(4, "LDM window size = %u KB", (1U << params.cParams.windowLog) >> 10); |
469 DEBUGLOG(4, "LDM window size = %u KB", (1U << params.cParams.windowLog) >> 10); |
433 params.ldmParams.windowLog = params.cParams.windowLog; |
|
434 ZSTD_ldm_adjustParameters(¶ms.ldmParams, ¶ms.cParams); |
470 ZSTD_ldm_adjustParameters(¶ms.ldmParams, ¶ms.cParams); |
435 assert(params.ldmParams.hashLog >= params.ldmParams.bucketSizeLog); |
471 assert(params.ldmParams.hashLog >= params.ldmParams.bucketSizeLog); |
436 assert(params.ldmParams.hashEveryLog < 32); |
472 assert(params.ldmParams.hashEveryLog < 32); |
437 serialState->ldmState.hashPower = |
473 serialState->ldmState.hashPower = |
438 ZSTD_ldm_getHashPower(params.ldmParams.minMatchLength); |
474 ZSTD_ldm_getHashPower(params.ldmParams.minMatchLength); |
451 size_t const bucketSize = (size_t)1 << bucketLog; |
487 size_t const bucketSize = (size_t)1 << bucketLog; |
452 unsigned const prevBucketLog = |
488 unsigned const prevBucketLog = |
453 serialState->params.ldmParams.hashLog - |
489 serialState->params.ldmParams.hashLog - |
454 serialState->params.ldmParams.bucketSizeLog; |
490 serialState->params.ldmParams.bucketSizeLog; |
455 /* Size the seq pool tables */ |
491 /* Size the seq pool tables */ |
456 ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, params.jobSize)); |
492 ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, jobSize)); |
457 /* Reset the window */ |
493 /* Reset the window */ |
458 ZSTD_window_clear(&serialState->ldmState.window); |
494 ZSTD_window_clear(&serialState->ldmState.window); |
459 serialState->ldmWindow = serialState->ldmState.window; |
495 serialState->ldmWindow = serialState->ldmState.window; |
460 /* Resize tables and output space if necessary. */ |
496 /* Resize tables and output space if necessary. */ |
461 if (serialState->ldmState.hashTable == NULL || serialState->params.ldmParams.hashLog < hashLog) { |
497 if (serialState->ldmState.hashTable == NULL || serialState->params.ldmParams.hashLog < hashLog) { |
503 range_t src, unsigned jobID) |
540 range_t src, unsigned jobID) |
504 { |
541 { |
505 /* Wait for our turn */ |
542 /* Wait for our turn */ |
506 ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex); |
543 ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex); |
507 while (serialState->nextJobID < jobID) { |
544 while (serialState->nextJobID < jobID) { |
|
545 DEBUGLOG(5, "wait for serialState->cond"); |
508 ZSTD_pthread_cond_wait(&serialState->cond, &serialState->mutex); |
546 ZSTD_pthread_cond_wait(&serialState->cond, &serialState->mutex); |
509 } |
547 } |
510 /* A future job may error and skip our job */ |
548 /* A future job may error and skip our job */ |
511 if (serialState->nextJobID == jobID) { |
549 if (serialState->nextJobID == jobID) { |
512 /* It is now our turn, do any processing necessary */ |
550 /* It is now our turn, do any processing necessary */ |
513 if (serialState->params.ldmParams.enableLdm) { |
551 if (serialState->params.ldmParams.enableLdm) { |
514 size_t error; |
552 size_t error; |
515 assert(seqStore.seq != NULL && seqStore.pos == 0 && |
553 assert(seqStore.seq != NULL && seqStore.pos == 0 && |
516 seqStore.size == 0 && seqStore.capacity > 0); |
554 seqStore.size == 0 && seqStore.capacity > 0); |
|
555 assert(src.size <= serialState->params.jobSize); |
517 ZSTD_window_update(&serialState->ldmState.window, src.start, src.size); |
556 ZSTD_window_update(&serialState->ldmState.window, src.start, src.size); |
518 error = ZSTD_ldm_generateSequences( |
557 error = ZSTD_ldm_generateSequences( |
519 &serialState->ldmState, &seqStore, |
558 &serialState->ldmState, &seqStore, |
520 &serialState->params.ldmParams, src.start, src.size); |
559 &serialState->params.ldmParams, src.start, src.size); |
521 /* We provide a large enough buffer to never fail. */ |
560 /* We provide a large enough buffer to never fail. */ |
591 unsigned long long fullFrameSize; /* set by mtctx, then read by worker => no barrier */ |
630 unsigned long long fullFrameSize; /* set by mtctx, then read by worker => no barrier */ |
592 size_t dstFlushed; /* used only by mtctx */ |
631 size_t dstFlushed; /* used only by mtctx */ |
593 unsigned frameChecksumNeeded; /* used only by mtctx */ |
632 unsigned frameChecksumNeeded; /* used only by mtctx */ |
594 } ZSTDMT_jobDescription; |
633 } ZSTDMT_jobDescription; |
595 |
634 |
|
635 #define JOB_ERROR(e) { \ |
|
636 ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); \ |
|
637 job->cSize = e; \ |
|
638 ZSTD_pthread_mutex_unlock(&job->job_mutex); \ |
|
639 goto _endJob; \ |
|
640 } |
|
641 |
596 /* ZSTDMT_compressionJob() is a POOL_function type */ |
642 /* ZSTDMT_compressionJob() is a POOL_function type */ |
597 void ZSTDMT_compressionJob(void* jobDescription) |
643 static void ZSTDMT_compressionJob(void* jobDescription) |
598 { |
644 { |
599 ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; |
645 ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; |
600 ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */ |
646 ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */ |
601 ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool); |
647 ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool); |
602 rawSeqStore_t rawSeqStore = ZSTDMT_getSeq(job->seqPool); |
648 rawSeqStore_t rawSeqStore = ZSTDMT_getSeq(job->seqPool); |
603 buffer_t dstBuff = job->dstBuff; |
649 buffer_t dstBuff = job->dstBuff; |
|
650 size_t lastCBlockSize = 0; |
|
651 |
|
652 /* ressources */ |
|
653 if (cctx==NULL) JOB_ERROR(ERROR(memory_allocation)); |
|
654 if (dstBuff.start == NULL) { /* streaming job : doesn't provide a dstBuffer */ |
|
655 dstBuff = ZSTDMT_getBuffer(job->bufPool); |
|
656 if (dstBuff.start==NULL) JOB_ERROR(ERROR(memory_allocation)); |
|
657 job->dstBuff = dstBuff; /* this value can be read in ZSTDMT_flush, when it copies the whole job */ |
|
658 } |
|
659 if (jobParams.ldmParams.enableLdm && rawSeqStore.seq == NULL) |
|
660 JOB_ERROR(ERROR(memory_allocation)); |
604 |
661 |
605 /* Don't compute the checksum for chunks, since we compute it externally, |
662 /* Don't compute the checksum for chunks, since we compute it externally, |
606 * but write it in the header. |
663 * but write it in the header. |
607 */ |
664 */ |
608 if (job->jobID != 0) jobParams.fParams.checksumFlag = 0; |
665 if (job->jobID != 0) jobParams.fParams.checksumFlag = 0; |
609 /* Don't run LDM for the chunks, since we handle it externally */ |
666 /* Don't run LDM for the chunks, since we handle it externally */ |
610 jobParams.ldmParams.enableLdm = 0; |
667 jobParams.ldmParams.enableLdm = 0; |
611 |
668 |
612 /* ressources */ |
|
613 if (cctx==NULL) { |
|
614 job->cSize = ERROR(memory_allocation); |
|
615 goto _endJob; |
|
616 } |
|
617 if (dstBuff.start == NULL) { /* streaming job : doesn't provide a dstBuffer */ |
|
618 dstBuff = ZSTDMT_getBuffer(job->bufPool); |
|
619 if (dstBuff.start==NULL) { |
|
620 job->cSize = ERROR(memory_allocation); |
|
621 goto _endJob; |
|
622 } |
|
623 job->dstBuff = dstBuff; /* this value can be read in ZSTDMT_flush, when it copies the whole job */ |
|
624 } |
|
625 |
669 |
626 /* init */ |
670 /* init */ |
627 if (job->cdict) { |
671 if (job->cdict) { |
628 size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, job->cdict, jobParams, job->fullFrameSize); |
672 size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, ZSTD_dtlm_fast, job->cdict, jobParams, job->fullFrameSize); |
629 assert(job->firstJob); /* only allowed for first job */ |
673 assert(job->firstJob); /* only allowed for first job */ |
630 if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } |
674 if (ZSTD_isError(initError)) JOB_ERROR(initError); |
631 } else { /* srcStart points at reloaded section */ |
675 } else { /* srcStart points at reloaded section */ |
632 U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->src.size; |
676 U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->src.size; |
633 { size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstJob); |
677 { size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstJob); |
634 if (ZSTD_isError(forceWindowError)) { |
678 if (ZSTD_isError(forceWindowError)) JOB_ERROR(forceWindowError); |
635 job->cSize = forceWindowError; |
679 } |
636 goto _endJob; |
|
637 } } |
|
638 { size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, |
680 { size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, |
639 job->prefix.start, job->prefix.size, ZSTD_dct_rawContent, /* load dictionary in "content-only" mode (no header analysis) */ |
681 job->prefix.start, job->prefix.size, ZSTD_dct_rawContent, /* load dictionary in "content-only" mode (no header analysis) */ |
|
682 ZSTD_dtlm_fast, |
640 NULL, /*cdict*/ |
683 NULL, /*cdict*/ |
641 jobParams, pledgedSrcSize); |
684 jobParams, pledgedSrcSize); |
642 if (ZSTD_isError(initError)) { |
685 if (ZSTD_isError(initError)) JOB_ERROR(initError); |
643 job->cSize = initError; |
686 } } |
644 goto _endJob; |
|
645 } } } |
|
646 |
687 |
647 /* Perform serial step as early as possible, but after CCtx initialization */ |
688 /* Perform serial step as early as possible, but after CCtx initialization */ |
648 ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID); |
689 ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID); |
649 |
690 |
650 if (!job->firstJob) { /* flush and overwrite frame header when it's not first job */ |
691 if (!job->firstJob) { /* flush and overwrite frame header when it's not first job */ |
651 size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0); |
692 size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0); |
652 if (ZSTD_isError(hSize)) { job->cSize = hSize; /* save error code */ goto _endJob; } |
693 if (ZSTD_isError(hSize)) JOB_ERROR(hSize); |
653 DEBUGLOG(5, "ZSTDMT_compressionJob: flush and overwrite %u bytes of frame header (not first job)", (U32)hSize); |
694 DEBUGLOG(5, "ZSTDMT_compressionJob: flush and overwrite %u bytes of frame header (not first job)", (U32)hSize); |
654 ZSTD_invalidateRepCodes(cctx); |
695 ZSTD_invalidateRepCodes(cctx); |
655 } |
696 } |
656 |
697 |
657 /* compress */ |
698 /* compress */ |
678 (U32)cSize, (U32)job->cSize); |
719 (U32)cSize, (U32)job->cSize); |
679 ZSTD_pthread_cond_signal(&job->job_cond); /* warns some more data is ready to be flushed */ |
720 ZSTD_pthread_cond_signal(&job->job_cond); /* warns some more data is ready to be flushed */ |
680 ZSTD_pthread_mutex_unlock(&job->job_mutex); |
721 ZSTD_pthread_mutex_unlock(&job->job_mutex); |
681 } |
722 } |
682 /* last block */ |
723 /* last block */ |
683 assert(chunkSize > 0); assert((chunkSize & (chunkSize - 1)) == 0); /* chunkSize must be power of 2 for mask==(chunkSize-1) to work */ |
724 assert(chunkSize > 0); |
|
725 assert((chunkSize & (chunkSize - 1)) == 0); /* chunkSize must be power of 2 for mask==(chunkSize-1) to work */ |
684 if ((nbChunks > 0) | job->lastJob /*must output a "last block" flag*/ ) { |
726 if ((nbChunks > 0) | job->lastJob /*must output a "last block" flag*/ ) { |
685 size_t const lastBlockSize1 = job->src.size & (chunkSize-1); |
727 size_t const lastBlockSize1 = job->src.size & (chunkSize-1); |
686 size_t const lastBlockSize = ((lastBlockSize1==0) & (job->src.size>=chunkSize)) ? chunkSize : lastBlockSize1; |
728 size_t const lastBlockSize = ((lastBlockSize1==0) & (job->src.size>=chunkSize)) ? chunkSize : lastBlockSize1; |
687 size_t const cSize = (job->lastJob) ? |
729 size_t const cSize = (job->lastJob) ? |
688 ZSTD_compressEnd (cctx, op, oend-op, ip, lastBlockSize) : |
730 ZSTD_compressEnd (cctx, op, oend-op, ip, lastBlockSize) : |
689 ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize); |
731 ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize); |
690 if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } |
732 if (ZSTD_isError(cSize)) JOB_ERROR(cSize); |
691 /* stats */ |
733 lastCBlockSize = cSize; |
692 ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); |
|
693 job->cSize += cSize; |
|
694 ZSTD_pthread_mutex_unlock(&job->job_mutex); |
|
695 } } |
734 } } |
696 |
735 |
697 _endJob: |
736 _endJob: |
698 ZSTDMT_serialState_ensureFinished(job->serial, job->jobID, job->cSize); |
737 ZSTDMT_serialState_ensureFinished(job->serial, job->jobID, job->cSize); |
699 if (job->prefix.size > 0) |
738 if (job->prefix.size > 0) |
948 default : |
1005 default : |
949 return ERROR(parameter_unsupported); |
1006 return ERROR(parameter_unsupported); |
950 } |
1007 } |
951 } |
1008 } |
952 |
1009 |
|
1010 size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned* value) |
|
1011 { |
|
1012 switch (parameter) { |
|
1013 case ZSTDMT_p_jobSize: |
|
1014 *value = mtctx->params.jobSize; |
|
1015 break; |
|
1016 case ZSTDMT_p_overlapSectionLog: |
|
1017 *value = mtctx->params.overlapSizeLog; |
|
1018 break; |
|
1019 default: |
|
1020 return ERROR(parameter_unsupported); |
|
1021 } |
|
1022 return 0; |
|
1023 } |
|
1024 |
953 /* Sets parameters relevant to the compression job, |
1025 /* Sets parameters relevant to the compression job, |
954 * initializing others to default values. */ |
1026 * initializing others to default values. */ |
955 static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) |
1027 static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) |
956 { |
1028 { |
957 ZSTD_CCtx_params jobParams; |
1029 ZSTD_CCtx_params jobParams; |
958 memset(&jobParams, 0, sizeof(jobParams)); |
1030 memset(&jobParams, 0, sizeof(jobParams)); |
959 |
1031 |
960 jobParams.cParams = params.cParams; |
1032 jobParams.cParams = params.cParams; |
961 jobParams.fParams = params.fParams; |
1033 jobParams.fParams = params.fParams; |
962 jobParams.compressionLevel = params.compressionLevel; |
1034 jobParams.compressionLevel = params.compressionLevel; |
963 jobParams.disableLiteralCompression = params.disableLiteralCompression; |
|
964 |
1035 |
965 return jobParams; |
1036 return jobParams; |
966 } |
1037 } |
967 |
1038 |
|
1039 |
|
1040 /* ZSTDMT_resize() : |
|
1041 * @return : error code if fails, 0 on success */ |
|
1042 static size_t ZSTDMT_resize(ZSTDMT_CCtx* mtctx, unsigned nbWorkers) |
|
1043 { |
|
1044 if (POOL_resize(mtctx->factory, nbWorkers)) return ERROR(memory_allocation); |
|
1045 CHECK_F( ZSTDMT_expandJobsTable(mtctx, nbWorkers) ); |
|
1046 mtctx->bufPool = ZSTDMT_expandBufferPool(mtctx->bufPool, nbWorkers); |
|
1047 if (mtctx->bufPool == NULL) return ERROR(memory_allocation); |
|
1048 mtctx->cctxPool = ZSTDMT_expandCCtxPool(mtctx->cctxPool, nbWorkers); |
|
1049 if (mtctx->cctxPool == NULL) return ERROR(memory_allocation); |
|
1050 mtctx->seqPool = ZSTDMT_expandSeqPool(mtctx->seqPool, nbWorkers); |
|
1051 if (mtctx->seqPool == NULL) return ERROR(memory_allocation); |
|
1052 ZSTDMT_CCtxParam_setNbWorkers(&mtctx->params, nbWorkers); |
|
1053 return 0; |
|
1054 } |
|
1055 |
|
1056 |
968 /*! ZSTDMT_updateCParams_whileCompressing() : |
1057 /*! ZSTDMT_updateCParams_whileCompressing() : |
969 * Updates only a selected set of compression parameters, to remain compatible with current frame. |
1058 * Updates a selected set of compression parameters, remaining compatible with currently active frame. |
970 * New parameters will be applied to next compression job. */ |
1059 * New parameters will be applied to next compression job. */ |
971 void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* cctxParams) |
1060 void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* cctxParams) |
972 { |
1061 { |
973 U32 const saved_wlog = mtctx->params.cParams.windowLog; /* Do not modify windowLog while compressing */ |
1062 U32 const saved_wlog = mtctx->params.cParams.windowLog; /* Do not modify windowLog while compressing */ |
974 int const compressionLevel = cctxParams->compressionLevel; |
1063 int const compressionLevel = cctxParams->compressionLevel; |
979 cParams.windowLog = saved_wlog; |
1068 cParams.windowLog = saved_wlog; |
980 mtctx->params.cParams = cParams; |
1069 mtctx->params.cParams = cParams; |
981 } |
1070 } |
982 } |
1071 } |
983 |
1072 |
984 /* ZSTDMT_getNbWorkers(): |
|
985 * @return nb threads currently active in mtctx. |
|
986 * mtctx must be valid */ |
|
987 unsigned ZSTDMT_getNbWorkers(const ZSTDMT_CCtx* mtctx) |
|
988 { |
|
989 assert(mtctx != NULL); |
|
990 return mtctx->params.nbWorkers; |
|
991 } |
|
992 |
|
993 /* ZSTDMT_getFrameProgression(): |
1073 /* ZSTDMT_getFrameProgression(): |
994 * tells how much data has been consumed (input) and produced (output) for current frame. |
1074 * tells how much data has been consumed (input) and produced (output) for current frame. |
995 * able to count progression inside worker threads. |
1075 * able to count progression inside worker threads. |
996 * Note : mutex will be acquired during statistics collection. */ |
1076 * Note : mutex will be acquired during statistics collection inside workers. */ |
997 ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) |
1077 ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) |
998 { |
1078 { |
999 ZSTD_frameProgression fps; |
1079 ZSTD_frameProgression fps; |
1000 DEBUGLOG(6, "ZSTDMT_getFrameProgression"); |
1080 DEBUGLOG(5, "ZSTDMT_getFrameProgression"); |
|
1081 fps.ingested = mtctx->consumed + mtctx->inBuff.filled; |
1001 fps.consumed = mtctx->consumed; |
1082 fps.consumed = mtctx->consumed; |
1002 fps.produced = mtctx->produced; |
1083 fps.produced = fps.flushed = mtctx->produced; |
1003 fps.ingested = mtctx->consumed + mtctx->inBuff.filled; |
1084 fps.currentJobID = mtctx->nextJobID; |
|
1085 fps.nbActiveWorkers = 0; |
1004 { unsigned jobNb; |
1086 { unsigned jobNb; |
1005 unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1); |
1087 unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1); |
1006 DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)", |
1088 DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)", |
1007 mtctx->doneJobID, lastJobNb, mtctx->jobReady) |
1089 mtctx->doneJobID, lastJobNb, mtctx->jobReady) |
1008 for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) { |
1090 for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) { |
1009 unsigned const wJobID = jobNb & mtctx->jobIDMask; |
1091 unsigned const wJobID = jobNb & mtctx->jobIDMask; |
1010 ZSTD_pthread_mutex_lock(&mtctx->jobs[wJobID].job_mutex); |
1092 ZSTDMT_jobDescription* jobPtr = &mtctx->jobs[wJobID]; |
1011 { size_t const cResult = mtctx->jobs[wJobID].cSize; |
1093 ZSTD_pthread_mutex_lock(&jobPtr->job_mutex); |
|
1094 { size_t const cResult = jobPtr->cSize; |
1012 size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; |
1095 size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; |
1013 fps.consumed += mtctx->jobs[wJobID].consumed; |
1096 size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed; |
1014 fps.ingested += mtctx->jobs[wJobID].src.size; |
1097 assert(flushed <= produced); |
|
1098 fps.ingested += jobPtr->src.size; |
|
1099 fps.consumed += jobPtr->consumed; |
1015 fps.produced += produced; |
1100 fps.produced += produced; |
|
1101 fps.flushed += flushed; |
|
1102 fps.nbActiveWorkers += (jobPtr->consumed < jobPtr->src.size); |
1016 } |
1103 } |
1017 ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); |
1104 ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); |
1018 } |
1105 } |
1019 } |
1106 } |
1020 return fps; |
1107 return fps; |
|
1108 } |
|
1109 |
|
1110 |
|
1111 size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx) |
|
1112 { |
|
1113 size_t toFlush; |
|
1114 unsigned const jobID = mtctx->doneJobID; |
|
1115 assert(jobID <= mtctx->nextJobID); |
|
1116 if (jobID == mtctx->nextJobID) return 0; /* no active job => nothing to flush */ |
|
1117 |
|
1118 /* look into oldest non-fully-flushed job */ |
|
1119 { unsigned const wJobID = jobID & mtctx->jobIDMask; |
|
1120 ZSTDMT_jobDescription* const jobPtr = &mtctx->jobs[wJobID]; |
|
1121 ZSTD_pthread_mutex_lock(&jobPtr->job_mutex); |
|
1122 { size_t const cResult = jobPtr->cSize; |
|
1123 size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; |
|
1124 size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed; |
|
1125 assert(flushed <= produced); |
|
1126 toFlush = produced - flushed; |
|
1127 if (toFlush==0 && (jobPtr->consumed >= jobPtr->src.size)) { |
|
1128 /* doneJobID is not-fully-flushed, but toFlush==0 : doneJobID should be compressing some more data */ |
|
1129 assert(jobPtr->consumed < jobPtr->src.size); |
|
1130 } |
|
1131 } |
|
1132 ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); |
|
1133 } |
|
1134 |
|
1135 return toFlush; |
1021 } |
1136 } |
1022 |
1137 |
1023 |
1138 |
1024 /* ------------------------------------------ */ |
1139 /* ------------------------------------------ */ |
1025 /* ===== Multi-threaded compression ===== */ |
1140 /* ===== Multi-threaded compression ===== */ |
1085 return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, jobParams); |
1200 return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, jobParams); |
1086 } |
1201 } |
1087 |
1202 |
1088 assert(avgJobSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */ |
1203 assert(avgJobSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */ |
1089 ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) ); |
1204 ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) ); |
1090 if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params)) |
1205 if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, avgJobSize)) |
1091 return ERROR(memory_allocation); |
1206 return ERROR(memory_allocation); |
1092 |
1207 |
1093 if (nbJobs > mtctx->jobIDMask+1) { /* enlarge job table */ |
1208 CHECK_F( ZSTDMT_expandJobsTable(mtctx, nbJobs) ); /* only expands if necessary */ |
1094 U32 jobsTableSize = nbJobs; |
|
1095 ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem); |
|
1096 mtctx->jobIDMask = 0; |
|
1097 mtctx->jobs = ZSTDMT_createJobsTable(&jobsTableSize, mtctx->cMem); |
|
1098 if (mtctx->jobs==NULL) return ERROR(memory_allocation); |
|
1099 assert((jobsTableSize != 0) && ((jobsTableSize & (jobsTableSize - 1)) == 0)); /* ensure jobsTableSize is a power of 2 */ |
|
1100 mtctx->jobIDMask = jobsTableSize - 1; |
|
1101 } |
|
1102 |
1209 |
1103 { unsigned u; |
1210 { unsigned u; |
1104 for (u=0; u<nbJobs; u++) { |
1211 for (u=0; u<nbJobs; u++) { |
1105 size_t const jobSize = MIN(remainingSrcSize, avgJobSize); |
1212 size_t const jobSize = MIN(remainingSrcSize, avgJobSize); |
1106 size_t const dstBufferCapacity = ZSTD_compressBound(jobSize); |
1213 size_t const dstBufferCapacity = ZSTD_compressBound(jobSize); |
1219 ZSTDMT_CCtx* mtctx, |
1326 ZSTDMT_CCtx* mtctx, |
1220 const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType, |
1327 const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType, |
1221 const ZSTD_CDict* cdict, ZSTD_CCtx_params params, |
1328 const ZSTD_CDict* cdict, ZSTD_CCtx_params params, |
1222 unsigned long long pledgedSrcSize) |
1329 unsigned long long pledgedSrcSize) |
1223 { |
1330 { |
1224 DEBUGLOG(4, "ZSTDMT_initCStream_internal (pledgedSrcSize=%u, nbWorkers=%u, cctxPool=%u, disableLiteralCompression=%i)", |
1331 DEBUGLOG(4, "ZSTDMT_initCStream_internal (pledgedSrcSize=%u, nbWorkers=%u, cctxPool=%u)", |
1225 (U32)pledgedSrcSize, params.nbWorkers, mtctx->cctxPool->totalCCtx, params.disableLiteralCompression); |
1332 (U32)pledgedSrcSize, params.nbWorkers, mtctx->cctxPool->totalCCtx); |
1226 /* params are supposed to be fully validated at this point */ |
1333 |
|
1334 /* params supposed partially fully validated at this point */ |
1227 assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); |
1335 assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); |
1228 assert(!((dict) && (cdict))); /* either dict or cdict, not both */ |
1336 assert(!((dict) && (cdict))); /* either dict or cdict, not both */ |
1229 assert(mtctx->cctxPool->totalCCtx == params.nbWorkers); |
|
1230 |
1337 |
1231 /* init */ |
1338 /* init */ |
1232 if (params.jobSize == 0) { |
1339 if (params.nbWorkers != mtctx->params.nbWorkers) |
1233 params.jobSize = 1U << ZSTDMT_computeTargetJobLog(params); |
1340 CHECK_F( ZSTDMT_resize(mtctx, params.nbWorkers) ); |
1234 } |
1341 |
|
1342 if (params.jobSize > 0 && params.jobSize < ZSTDMT_JOBSIZE_MIN) params.jobSize = ZSTDMT_JOBSIZE_MIN; |
1235 if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX; |
1343 if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX; |
1236 |
1344 |
1237 mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */ |
1345 mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */ |
1238 if (mtctx->singleBlockingThread) { |
1346 if (mtctx->singleBlockingThread) { |
1239 ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(params); |
1347 ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(params); |
1268 } |
1376 } |
1269 |
1377 |
1270 mtctx->targetPrefixSize = (size_t)1 << ZSTDMT_computeOverlapLog(params); |
1378 mtctx->targetPrefixSize = (size_t)1 << ZSTDMT_computeOverlapLog(params); |
1271 DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(mtctx->targetPrefixSize>>10)); |
1379 DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(mtctx->targetPrefixSize>>10)); |
1272 mtctx->targetSectionSize = params.jobSize; |
1380 mtctx->targetSectionSize = params.jobSize; |
1273 if (mtctx->targetSectionSize < ZSTDMT_JOBSIZE_MIN) mtctx->targetSectionSize = ZSTDMT_JOBSIZE_MIN; |
1381 if (mtctx->targetSectionSize == 0) { |
|
1382 mtctx->targetSectionSize = 1ULL << ZSTDMT_computeTargetJobLog(params); |
|
1383 } |
1274 if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */ |
1384 if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */ |
1275 DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize); |
1385 DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize); |
1276 DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10)); |
1386 DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10)); |
1277 ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetSectionSize)); |
1387 ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetSectionSize)); |
1278 { |
1388 { |
1514 MEM_writeLE32((char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].cSize, checksum); |
1626 MEM_writeLE32((char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].cSize, checksum); |
1515 cSize += 4; |
1627 cSize += 4; |
1516 mtctx->jobs[wJobID].cSize += 4; /* can write this shared value, as worker is no longer active */ |
1628 mtctx->jobs[wJobID].cSize += 4; /* can write this shared value, as worker is no longer active */ |
1517 mtctx->jobs[wJobID].frameChecksumNeeded = 0; |
1629 mtctx->jobs[wJobID].frameChecksumNeeded = 0; |
1518 } |
1630 } |
|
1631 |
1519 if (cSize > 0) { /* compression is ongoing or completed */ |
1632 if (cSize > 0) { /* compression is ongoing or completed */ |
1520 size_t const toFlush = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos); |
1633 size_t const toFlush = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos); |
1521 DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u, generated:%u)", |
1634 DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u, generated:%u)", |
1522 (U32)toFlush, mtctx->doneJobID, (U32)srcConsumed, (U32)srcSize, (U32)cSize); |
1635 (U32)toFlush, mtctx->doneJobID, (U32)srcConsumed, (U32)srcSize, (U32)cSize); |
1523 assert(mtctx->doneJobID < mtctx->nextJobID); |
1636 assert(mtctx->doneJobID < mtctx->nextJobID); |
1527 (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed, |
1640 (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed, |
1528 toFlush); |
1641 toFlush); |
1529 output->pos += toFlush; |
1642 output->pos += toFlush; |
1530 mtctx->jobs[wJobID].dstFlushed += toFlush; /* can write : this value is only used by mtctx */ |
1643 mtctx->jobs[wJobID].dstFlushed += toFlush; /* can write : this value is only used by mtctx */ |
1531 |
1644 |
1532 if ( (srcConsumed == srcSize) /* job completed */ |
1645 if ( (srcConsumed == srcSize) /* job is completed */ |
1533 && (mtctx->jobs[wJobID].dstFlushed == cSize) ) { /* output buffer fully flushed => free this job position */ |
1646 && (mtctx->jobs[wJobID].dstFlushed == cSize) ) { /* output buffer fully flushed => free this job position */ |
1534 DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one", |
1647 DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one", |
1535 mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed); |
1648 mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed); |
1536 ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff); |
1649 ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff); |
|
1650 DEBUGLOG(5, "dstBuffer released"); |
1537 mtctx->jobs[wJobID].dstBuff = g_nullBuffer; |
1651 mtctx->jobs[wJobID].dstBuff = g_nullBuffer; |
1538 mtctx->jobs[wJobID].cSize = 0; /* ensure this job slot is considered "not started" in future check */ |
1652 mtctx->jobs[wJobID].cSize = 0; /* ensure this job slot is considered "not started" in future check */ |
1539 mtctx->consumed += srcSize; |
1653 mtctx->consumed += srcSize; |
1540 mtctx->produced += cSize; |
1654 mtctx->produced += cSize; |
1541 mtctx->doneJobID++; |
1655 mtctx->doneJobID++; |
1628 |
1743 |
1629 static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, buffer_t buffer) |
1744 static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, buffer_t buffer) |
1630 { |
1745 { |
1631 if (mtctx->params.ldmParams.enableLdm) { |
1746 if (mtctx->params.ldmParams.enableLdm) { |
1632 ZSTD_pthread_mutex_t* mutex = &mtctx->serial.ldmWindowMutex; |
1747 ZSTD_pthread_mutex_t* mutex = &mtctx->serial.ldmWindowMutex; |
|
1748 DEBUGLOG(5, "ZSTDMT_waitForLdmComplete"); |
1633 DEBUGLOG(5, "source [0x%zx, 0x%zx)", |
1749 DEBUGLOG(5, "source [0x%zx, 0x%zx)", |
1634 (size_t)buffer.start, |
1750 (size_t)buffer.start, |
1635 (size_t)buffer.start + buffer.capacity); |
1751 (size_t)buffer.start + buffer.capacity); |
1636 ZSTD_PTHREAD_MUTEX_LOCK(mutex); |
1752 ZSTD_PTHREAD_MUTEX_LOCK(mutex); |
1637 while (ZSTDMT_doesOverlapWindow(buffer, mtctx->serial.ldmWindow)) { |
1753 while (ZSTDMT_doesOverlapWindow(buffer, mtctx->serial.ldmWindow)) { |
1638 DEBUGLOG(6, "Waiting for LDM to finish..."); |
1754 DEBUGLOG(5, "Waiting for LDM to finish..."); |
1639 ZSTD_pthread_cond_wait(&mtctx->serial.ldmWindowCond, mutex); |
1755 ZSTD_pthread_cond_wait(&mtctx->serial.ldmWindowCond, mutex); |
1640 } |
1756 } |
1641 DEBUGLOG(6, "Done waiting for LDM to finish"); |
1757 DEBUGLOG(6, "Done waiting for LDM to finish"); |
1642 ZSTD_pthread_mutex_unlock(mutex); |
1758 ZSTD_pthread_mutex_unlock(mutex); |
1643 } |
1759 } |
1751 assert(mtctx->inBuff.filled == 0); /* Can't fill an empty buffer */ |
1868 assert(mtctx->inBuff.filled == 0); /* Can't fill an empty buffer */ |
1752 if (!ZSTDMT_tryGetInputRange(mtctx)) { |
1869 if (!ZSTDMT_tryGetInputRange(mtctx)) { |
1753 /* It is only possible for this operation to fail if there are |
1870 /* It is only possible for this operation to fail if there are |
1754 * still compression jobs ongoing. |
1871 * still compression jobs ongoing. |
1755 */ |
1872 */ |
|
1873 DEBUGLOG(5, "ZSTDMT_tryGetInputRange failed"); |
1756 assert(mtctx->doneJobID != mtctx->nextJobID); |
1874 assert(mtctx->doneJobID != mtctx->nextJobID); |
1757 } |
1875 } else |
|
1876 DEBUGLOG(5, "ZSTDMT_tryGetInputRange completed successfully : mtctx->inBuff.buffer.start = %p", mtctx->inBuff.buffer.start); |
1758 } |
1877 } |
1759 if (mtctx->inBuff.buffer.start != NULL) { |
1878 if (mtctx->inBuff.buffer.start != NULL) { |
1760 size_t const toLoad = MIN(input->size - input->pos, mtctx->targetSectionSize - mtctx->inBuff.filled); |
1879 size_t const toLoad = MIN(input->size - input->pos, mtctx->targetSectionSize - mtctx->inBuff.filled); |
1761 assert(mtctx->inBuff.buffer.capacity >= mtctx->targetSectionSize); |
1880 assert(mtctx->inBuff.buffer.capacity >= mtctx->targetSectionSize); |
1762 DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u", |
1881 DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u", |