Mercurial > public > mercurial-scm > hg
comparison contrib/python-zstandard/zstd/compress/zstdmt_compress.c @ 40121:73fef626dae3
zstandard: vendor python-zstandard 0.10.1
This was just released.
The upstream source distribution from PyPI was extracted. Unwanted
files were removed.
The clang-format ignore list was updated to reflect the new source
of files.
setup.py was updated to pass a new argument to python-zstandard's
function for returning an Extension instance. Upstream had to change
to use relative paths because Python 3.7's packaging doesn't
seem to like absolute paths when defining sources, includes, etc.
The default relative path calculation is relative to setup_zstd.py
which is different from the directory of Mercurial's setup.py.
The project contains a vendored copy of zstandard 1.3.6. The old
version was 1.3.4.
The API should be backwards compatible and nothing in core should
need adjusted. However, there is a new "chunker" API that we
may find useful in places where we want to emit compressed chunks
of a fixed size.
There are a pair of bug fixes in 0.10.0 with regards to
compressobj() and decompressobj() when block flushing is used. I
actually found these bugs when introducing these APIs in Mercurial!
But existing Mercurial code is not affected because we don't
perform block flushing.
# no-check-commit because 3rd party code has different style guidelines
Differential Revision: https://phab.mercurial-scm.org/D4911
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Mon, 08 Oct 2018 16:27:40 -0700 |
parents | b1fb341d8a61 |
children | 675775c33ab6 |
comparison
equal
deleted
inserted
replaced
40120:89742f1fa6cb | 40121:73fef626dae3 |
---|---|
35 * Until then, comment the code out since it is unused. | 35 * Until then, comment the code out since it is unused. |
36 */ | 36 */ |
37 #define ZSTD_RESIZE_SEQPOOL 0 | 37 #define ZSTD_RESIZE_SEQPOOL 0 |
38 | 38 |
39 /* ====== Debug ====== */ | 39 /* ====== Debug ====== */ |
40 #if defined(ZSTD_DEBUG) && (ZSTD_DEBUG>=2) | 40 #if defined(DEBUGLEVEL) && (DEBUGLEVEL>=2) \ |
41 && !defined(_MSC_VER) \ | |
42 && !defined(__MINGW32__) | |
41 | 43 |
42 # include <stdio.h> | 44 # include <stdio.h> |
43 # include <unistd.h> | 45 # include <unistd.h> |
44 # include <sys/times.h> | 46 # include <sys/times.h> |
45 # define DEBUGLOGRAW(l, ...) if (l<=ZSTD_DEBUG) { fprintf(stderr, __VA_ARGS__); } | |
46 | 47 |
47 # define DEBUG_PRINTHEX(l,p,n) { \ | 48 # define DEBUG_PRINTHEX(l,p,n) { \ |
48 unsigned debug_u; \ | 49 unsigned debug_u; \ |
49 for (debug_u=0; debug_u<(n); debug_u++) \ | 50 for (debug_u=0; debug_u<(n); debug_u++) \ |
50 DEBUGLOGRAW(l, "%02X ", ((const unsigned char*)(p))[debug_u]); \ | 51 RAWLOG(l, "%02X ", ((const unsigned char*)(p))[debug_u]); \ |
51 DEBUGLOGRAW(l, " \n"); \ | 52 RAWLOG(l, " \n"); \ |
52 } | 53 } |
53 | 54 |
54 static unsigned long long GetCurrentClockTimeMicroseconds(void) | 55 static unsigned long long GetCurrentClockTimeMicroseconds(void) |
55 { | 56 { |
56 static clock_t _ticksPerSecond = 0; | 57 static clock_t _ticksPerSecond = 0; |
60 return ((((unsigned long long)newTicks)*(1000000))/_ticksPerSecond); } | 61 return ((((unsigned long long)newTicks)*(1000000))/_ticksPerSecond); } |
61 } | 62 } |
62 | 63 |
63 #define MUTEX_WAIT_TIME_DLEVEL 6 | 64 #define MUTEX_WAIT_TIME_DLEVEL 6 |
64 #define ZSTD_PTHREAD_MUTEX_LOCK(mutex) { \ | 65 #define ZSTD_PTHREAD_MUTEX_LOCK(mutex) { \ |
65 if (ZSTD_DEBUG >= MUTEX_WAIT_TIME_DLEVEL) { \ | 66 if (DEBUGLEVEL >= MUTEX_WAIT_TIME_DLEVEL) { \ |
66 unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds(); \ | 67 unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds(); \ |
67 ZSTD_pthread_mutex_lock(mutex); \ | 68 ZSTD_pthread_mutex_lock(mutex); \ |
68 { unsigned long long const afterTime = GetCurrentClockTimeMicroseconds(); \ | 69 { unsigned long long const afterTime = GetCurrentClockTimeMicroseconds(); \ |
69 unsigned long long const elapsedTime = (afterTime-beforeTime); \ | 70 unsigned long long const elapsedTime = (afterTime-beforeTime); \ |
70 if (elapsedTime > 1000) { /* or whatever threshold you like; I'm using 1 millisecond here */ \ | 71 if (elapsedTime > 1000) { /* or whatever threshold you like; I'm using 1 millisecond here */ \ |
156 { | 157 { |
157 ZSTD_pthread_mutex_lock(&bufPool->poolMutex); | 158 ZSTD_pthread_mutex_lock(&bufPool->poolMutex); |
158 DEBUGLOG(4, "ZSTDMT_setBufferSize: bSize = %u", (U32)bSize); | 159 DEBUGLOG(4, "ZSTDMT_setBufferSize: bSize = %u", (U32)bSize); |
159 bufPool->bufferSize = bSize; | 160 bufPool->bufferSize = bSize; |
160 ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); | 161 ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); |
162 } | |
163 | |
164 | |
165 static ZSTDMT_bufferPool* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool* srcBufPool, U32 nbWorkers) | |
166 { | |
167 unsigned const maxNbBuffers = 2*nbWorkers + 3; | |
168 if (srcBufPool==NULL) return NULL; | |
169 if (srcBufPool->totalBuffers >= maxNbBuffers) /* good enough */ | |
170 return srcBufPool; | |
171 /* need a larger buffer pool */ | |
172 { ZSTD_customMem const cMem = srcBufPool->cMem; | |
173 size_t const bSize = srcBufPool->bufferSize; /* forward parameters */ | |
174 ZSTDMT_bufferPool* newBufPool; | |
175 ZSTDMT_freeBufferPool(srcBufPool); | |
176 newBufPool = ZSTDMT_createBufferPool(nbWorkers, cMem); | |
177 if (newBufPool==NULL) return newBufPool; | |
178 ZSTDMT_setBufferSize(newBufPool, bSize); | |
179 return newBufPool; | |
180 } | |
161 } | 181 } |
162 | 182 |
163 /** ZSTDMT_getBuffer() : | 183 /** ZSTDMT_getBuffer() : |
164 * assumption : bufPool must be valid | 184 * assumption : bufPool must be valid |
165 * @return : a buffer, with start pointer and size | 185 * @return : a buffer, with start pointer and size |
227 #endif | 247 #endif |
228 | 248 |
229 /* store buffer for later re-use, up to pool capacity */ | 249 /* store buffer for later re-use, up to pool capacity */ |
230 static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf) | 250 static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf) |
231 { | 251 { |
252 DEBUGLOG(5, "ZSTDMT_releaseBuffer"); | |
232 if (buf.start == NULL) return; /* compatible with release on NULL */ | 253 if (buf.start == NULL) return; /* compatible with release on NULL */ |
233 DEBUGLOG(5, "ZSTDMT_releaseBuffer"); | |
234 ZSTD_pthread_mutex_lock(&bufPool->poolMutex); | 254 ZSTD_pthread_mutex_lock(&bufPool->poolMutex); |
235 if (bufPool->nbBuffers < bufPool->totalBuffers) { | 255 if (bufPool->nbBuffers < bufPool->totalBuffers) { |
236 bufPool->bTable[bufPool->nbBuffers++] = buf; /* stored for later use */ | 256 bufPool->bTable[bufPool->nbBuffers++] = buf; /* stored for later use */ |
237 DEBUGLOG(5, "ZSTDMT_releaseBuffer: stored buffer of size %u in slot %u", | 257 DEBUGLOG(5, "ZSTDMT_releaseBuffer: stored buffer of size %u in slot %u", |
238 (U32)buf.capacity, (U32)(bufPool->nbBuffers-1)); | 258 (U32)buf.capacity, (U32)(bufPool->nbBuffers-1)); |
298 ZSTDMT_setBufferSize(seqPool, nbSeq * sizeof(rawSeq)); | 318 ZSTDMT_setBufferSize(seqPool, nbSeq * sizeof(rawSeq)); |
299 } | 319 } |
300 | 320 |
301 static ZSTDMT_seqPool* ZSTDMT_createSeqPool(unsigned nbWorkers, ZSTD_customMem cMem) | 321 static ZSTDMT_seqPool* ZSTDMT_createSeqPool(unsigned nbWorkers, ZSTD_customMem cMem) |
302 { | 322 { |
303 ZSTDMT_seqPool* seqPool = ZSTDMT_createBufferPool(nbWorkers, cMem); | 323 ZSTDMT_seqPool* const seqPool = ZSTDMT_createBufferPool(nbWorkers, cMem); |
324 if (seqPool == NULL) return NULL; | |
304 ZSTDMT_setNbSeq(seqPool, 0); | 325 ZSTDMT_setNbSeq(seqPool, 0); |
305 return seqPool; | 326 return seqPool; |
306 } | 327 } |
307 | 328 |
308 static void ZSTDMT_freeSeqPool(ZSTDMT_seqPool* seqPool) | 329 static void ZSTDMT_freeSeqPool(ZSTDMT_seqPool* seqPool) |
309 { | 330 { |
310 ZSTDMT_freeBufferPool(seqPool); | 331 ZSTDMT_freeBufferPool(seqPool); |
311 } | 332 } |
312 | 333 |
334 static ZSTDMT_seqPool* ZSTDMT_expandSeqPool(ZSTDMT_seqPool* pool, U32 nbWorkers) | |
335 { | |
336 return ZSTDMT_expandBufferPool(pool, nbWorkers); | |
337 } | |
313 | 338 |
314 | 339 |
315 /* ===== CCtx Pool ===== */ | 340 /* ===== CCtx Pool ===== */ |
316 /* a single CCtx Pool can be invoked from multiple threads in parallel */ | 341 /* a single CCtx Pool can be invoked from multiple threads in parallel */ |
317 | 342 |
351 cctxPool->availCCtx = 1; /* at least one cctx for single-thread mode */ | 376 cctxPool->availCCtx = 1; /* at least one cctx for single-thread mode */ |
352 cctxPool->cctx[0] = ZSTD_createCCtx_advanced(cMem); | 377 cctxPool->cctx[0] = ZSTD_createCCtx_advanced(cMem); |
353 if (!cctxPool->cctx[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; } | 378 if (!cctxPool->cctx[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; } |
354 DEBUGLOG(3, "cctxPool created, with %u workers", nbWorkers); | 379 DEBUGLOG(3, "cctxPool created, with %u workers", nbWorkers); |
355 return cctxPool; | 380 return cctxPool; |
381 } | |
382 | |
383 static ZSTDMT_CCtxPool* ZSTDMT_expandCCtxPool(ZSTDMT_CCtxPool* srcPool, | |
384 unsigned nbWorkers) | |
385 { | |
386 if (srcPool==NULL) return NULL; | |
387 if (nbWorkers <= srcPool->totalCCtx) return srcPool; /* good enough */ | |
388 /* need a larger cctx pool */ | |
389 { ZSTD_customMem const cMem = srcPool->cMem; | |
390 ZSTDMT_freeCCtxPool(srcPool); | |
391 return ZSTDMT_createCCtxPool(nbWorkers, cMem); | |
392 } | |
356 } | 393 } |
357 | 394 |
358 /* only works during initialization phase, not during compression */ | 395 /* only works during initialization phase, not during compression */ |
359 static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool) | 396 static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool) |
360 { | 397 { |
423 ZSTD_pthread_mutex_t ldmWindowMutex; | 460 ZSTD_pthread_mutex_t ldmWindowMutex; |
424 ZSTD_pthread_cond_t ldmWindowCond; /* Signaled when ldmWindow is udpated */ | 461 ZSTD_pthread_cond_t ldmWindowCond; /* Signaled when ldmWindow is udpated */ |
425 ZSTD_window_t ldmWindow; /* A thread-safe copy of ldmState.window */ | 462 ZSTD_window_t ldmWindow; /* A thread-safe copy of ldmState.window */ |
426 } serialState_t; | 463 } serialState_t; |
427 | 464 |
428 static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* seqPool, ZSTD_CCtx_params params) | 465 static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* seqPool, ZSTD_CCtx_params params, size_t jobSize) |
429 { | 466 { |
430 /* Adjust parameters */ | 467 /* Adjust parameters */ |
431 if (params.ldmParams.enableLdm) { | 468 if (params.ldmParams.enableLdm) { |
432 DEBUGLOG(4, "LDM window size = %u KB", (1U << params.cParams.windowLog) >> 10); | 469 DEBUGLOG(4, "LDM window size = %u KB", (1U << params.cParams.windowLog) >> 10); |
433 params.ldmParams.windowLog = params.cParams.windowLog; | |
434 ZSTD_ldm_adjustParameters(¶ms.ldmParams, ¶ms.cParams); | 470 ZSTD_ldm_adjustParameters(¶ms.ldmParams, ¶ms.cParams); |
435 assert(params.ldmParams.hashLog >= params.ldmParams.bucketSizeLog); | 471 assert(params.ldmParams.hashLog >= params.ldmParams.bucketSizeLog); |
436 assert(params.ldmParams.hashEveryLog < 32); | 472 assert(params.ldmParams.hashEveryLog < 32); |
437 serialState->ldmState.hashPower = | 473 serialState->ldmState.hashPower = |
438 ZSTD_ldm_getHashPower(params.ldmParams.minMatchLength); | 474 ZSTD_ldm_getHashPower(params.ldmParams.minMatchLength); |
451 size_t const bucketSize = (size_t)1 << bucketLog; | 487 size_t const bucketSize = (size_t)1 << bucketLog; |
452 unsigned const prevBucketLog = | 488 unsigned const prevBucketLog = |
453 serialState->params.ldmParams.hashLog - | 489 serialState->params.ldmParams.hashLog - |
454 serialState->params.ldmParams.bucketSizeLog; | 490 serialState->params.ldmParams.bucketSizeLog; |
455 /* Size the seq pool tables */ | 491 /* Size the seq pool tables */ |
456 ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, params.jobSize)); | 492 ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, jobSize)); |
457 /* Reset the window */ | 493 /* Reset the window */ |
458 ZSTD_window_clear(&serialState->ldmState.window); | 494 ZSTD_window_clear(&serialState->ldmState.window); |
459 serialState->ldmWindow = serialState->ldmState.window; | 495 serialState->ldmWindow = serialState->ldmState.window; |
460 /* Resize tables and output space if necessary. */ | 496 /* Resize tables and output space if necessary. */ |
461 if (serialState->ldmState.hashTable == NULL || serialState->params.ldmParams.hashLog < hashLog) { | 497 if (serialState->ldmState.hashTable == NULL || serialState->params.ldmParams.hashLog < hashLog) { |
471 /* Zero the tables */ | 507 /* Zero the tables */ |
472 memset(serialState->ldmState.hashTable, 0, hashSize); | 508 memset(serialState->ldmState.hashTable, 0, hashSize); |
473 memset(serialState->ldmState.bucketOffsets, 0, bucketSize); | 509 memset(serialState->ldmState.bucketOffsets, 0, bucketSize); |
474 } | 510 } |
475 serialState->params = params; | 511 serialState->params = params; |
512 serialState->params.jobSize = (U32)jobSize; | |
476 return 0; | 513 return 0; |
477 } | 514 } |
478 | 515 |
479 static int ZSTDMT_serialState_init(serialState_t* serialState) | 516 static int ZSTDMT_serialState_init(serialState_t* serialState) |
480 { | 517 { |
503 range_t src, unsigned jobID) | 540 range_t src, unsigned jobID) |
504 { | 541 { |
505 /* Wait for our turn */ | 542 /* Wait for our turn */ |
506 ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex); | 543 ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex); |
507 while (serialState->nextJobID < jobID) { | 544 while (serialState->nextJobID < jobID) { |
545 DEBUGLOG(5, "wait for serialState->cond"); | |
508 ZSTD_pthread_cond_wait(&serialState->cond, &serialState->mutex); | 546 ZSTD_pthread_cond_wait(&serialState->cond, &serialState->mutex); |
509 } | 547 } |
510 /* A future job may error and skip our job */ | 548 /* A future job may error and skip our job */ |
511 if (serialState->nextJobID == jobID) { | 549 if (serialState->nextJobID == jobID) { |
512 /* It is now our turn, do any processing necessary */ | 550 /* It is now our turn, do any processing necessary */ |
513 if (serialState->params.ldmParams.enableLdm) { | 551 if (serialState->params.ldmParams.enableLdm) { |
514 size_t error; | 552 size_t error; |
515 assert(seqStore.seq != NULL && seqStore.pos == 0 && | 553 assert(seqStore.seq != NULL && seqStore.pos == 0 && |
516 seqStore.size == 0 && seqStore.capacity > 0); | 554 seqStore.size == 0 && seqStore.capacity > 0); |
555 assert(src.size <= serialState->params.jobSize); | |
517 ZSTD_window_update(&serialState->ldmState.window, src.start, src.size); | 556 ZSTD_window_update(&serialState->ldmState.window, src.start, src.size); |
518 error = ZSTD_ldm_generateSequences( | 557 error = ZSTD_ldm_generateSequences( |
519 &serialState->ldmState, &seqStore, | 558 &serialState->ldmState, &seqStore, |
520 &serialState->params.ldmParams, src.start, src.size); | 559 &serialState->params.ldmParams, src.start, src.size); |
521 /* We provide a large enough buffer to never fail. */ | 560 /* We provide a large enough buffer to never fail. */ |
591 unsigned long long fullFrameSize; /* set by mtctx, then read by worker => no barrier */ | 630 unsigned long long fullFrameSize; /* set by mtctx, then read by worker => no barrier */ |
592 size_t dstFlushed; /* used only by mtctx */ | 631 size_t dstFlushed; /* used only by mtctx */ |
593 unsigned frameChecksumNeeded; /* used only by mtctx */ | 632 unsigned frameChecksumNeeded; /* used only by mtctx */ |
594 } ZSTDMT_jobDescription; | 633 } ZSTDMT_jobDescription; |
595 | 634 |
635 #define JOB_ERROR(e) { \ | |
636 ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); \ | |
637 job->cSize = e; \ | |
638 ZSTD_pthread_mutex_unlock(&job->job_mutex); \ | |
639 goto _endJob; \ | |
640 } | |
641 | |
596 /* ZSTDMT_compressionJob() is a POOL_function type */ | 642 /* ZSTDMT_compressionJob() is a POOL_function type */ |
597 void ZSTDMT_compressionJob(void* jobDescription) | 643 static void ZSTDMT_compressionJob(void* jobDescription) |
598 { | 644 { |
599 ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; | 645 ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; |
600 ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */ | 646 ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */ |
601 ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool); | 647 ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool); |
602 rawSeqStore_t rawSeqStore = ZSTDMT_getSeq(job->seqPool); | 648 rawSeqStore_t rawSeqStore = ZSTDMT_getSeq(job->seqPool); |
603 buffer_t dstBuff = job->dstBuff; | 649 buffer_t dstBuff = job->dstBuff; |
650 size_t lastCBlockSize = 0; | |
651 | |
652 /* ressources */ | |
653 if (cctx==NULL) JOB_ERROR(ERROR(memory_allocation)); | |
654 if (dstBuff.start == NULL) { /* streaming job : doesn't provide a dstBuffer */ | |
655 dstBuff = ZSTDMT_getBuffer(job->bufPool); | |
656 if (dstBuff.start==NULL) JOB_ERROR(ERROR(memory_allocation)); | |
657 job->dstBuff = dstBuff; /* this value can be read in ZSTDMT_flush, when it copies the whole job */ | |
658 } | |
659 if (jobParams.ldmParams.enableLdm && rawSeqStore.seq == NULL) | |
660 JOB_ERROR(ERROR(memory_allocation)); | |
604 | 661 |
605 /* Don't compute the checksum for chunks, since we compute it externally, | 662 /* Don't compute the checksum for chunks, since we compute it externally, |
606 * but write it in the header. | 663 * but write it in the header. |
607 */ | 664 */ |
608 if (job->jobID != 0) jobParams.fParams.checksumFlag = 0; | 665 if (job->jobID != 0) jobParams.fParams.checksumFlag = 0; |
609 /* Don't run LDM for the chunks, since we handle it externally */ | 666 /* Don't run LDM for the chunks, since we handle it externally */ |
610 jobParams.ldmParams.enableLdm = 0; | 667 jobParams.ldmParams.enableLdm = 0; |
611 | 668 |
612 /* ressources */ | |
613 if (cctx==NULL) { | |
614 job->cSize = ERROR(memory_allocation); | |
615 goto _endJob; | |
616 } | |
617 if (dstBuff.start == NULL) { /* streaming job : doesn't provide a dstBuffer */ | |
618 dstBuff = ZSTDMT_getBuffer(job->bufPool); | |
619 if (dstBuff.start==NULL) { | |
620 job->cSize = ERROR(memory_allocation); | |
621 goto _endJob; | |
622 } | |
623 job->dstBuff = dstBuff; /* this value can be read in ZSTDMT_flush, when it copies the whole job */ | |
624 } | |
625 | 669 |
626 /* init */ | 670 /* init */ |
627 if (job->cdict) { | 671 if (job->cdict) { |
628 size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, job->cdict, jobParams, job->fullFrameSize); | 672 size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, ZSTD_dtlm_fast, job->cdict, jobParams, job->fullFrameSize); |
629 assert(job->firstJob); /* only allowed for first job */ | 673 assert(job->firstJob); /* only allowed for first job */ |
630 if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } | 674 if (ZSTD_isError(initError)) JOB_ERROR(initError); |
631 } else { /* srcStart points at reloaded section */ | 675 } else { /* srcStart points at reloaded section */ |
632 U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->src.size; | 676 U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->src.size; |
633 { size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstJob); | 677 { size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstJob); |
634 if (ZSTD_isError(forceWindowError)) { | 678 if (ZSTD_isError(forceWindowError)) JOB_ERROR(forceWindowError); |
635 job->cSize = forceWindowError; | 679 } |
636 goto _endJob; | |
637 } } | |
638 { size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, | 680 { size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, |
639 job->prefix.start, job->prefix.size, ZSTD_dct_rawContent, /* load dictionary in "content-only" mode (no header analysis) */ | 681 job->prefix.start, job->prefix.size, ZSTD_dct_rawContent, /* load dictionary in "content-only" mode (no header analysis) */ |
682 ZSTD_dtlm_fast, | |
640 NULL, /*cdict*/ | 683 NULL, /*cdict*/ |
641 jobParams, pledgedSrcSize); | 684 jobParams, pledgedSrcSize); |
642 if (ZSTD_isError(initError)) { | 685 if (ZSTD_isError(initError)) JOB_ERROR(initError); |
643 job->cSize = initError; | 686 } } |
644 goto _endJob; | |
645 } } } | |
646 | 687 |
647 /* Perform serial step as early as possible, but after CCtx initialization */ | 688 /* Perform serial step as early as possible, but after CCtx initialization */ |
648 ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID); | 689 ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID); |
649 | 690 |
650 if (!job->firstJob) { /* flush and overwrite frame header when it's not first job */ | 691 if (!job->firstJob) { /* flush and overwrite frame header when it's not first job */ |
651 size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0); | 692 size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0); |
652 if (ZSTD_isError(hSize)) { job->cSize = hSize; /* save error code */ goto _endJob; } | 693 if (ZSTD_isError(hSize)) JOB_ERROR(hSize); |
653 DEBUGLOG(5, "ZSTDMT_compressionJob: flush and overwrite %u bytes of frame header (not first job)", (U32)hSize); | 694 DEBUGLOG(5, "ZSTDMT_compressionJob: flush and overwrite %u bytes of frame header (not first job)", (U32)hSize); |
654 ZSTD_invalidateRepCodes(cctx); | 695 ZSTD_invalidateRepCodes(cctx); |
655 } | 696 } |
656 | 697 |
657 /* compress */ | 698 /* compress */ |
665 if (sizeof(size_t) > sizeof(int)) assert(job->src.size < ((size_t)INT_MAX) * chunkSize); /* check overflow */ | 706 if (sizeof(size_t) > sizeof(int)) assert(job->src.size < ((size_t)INT_MAX) * chunkSize); /* check overflow */ |
666 DEBUGLOG(5, "ZSTDMT_compressionJob: compress %u bytes in %i blocks", (U32)job->src.size, nbChunks); | 707 DEBUGLOG(5, "ZSTDMT_compressionJob: compress %u bytes in %i blocks", (U32)job->src.size, nbChunks); |
667 assert(job->cSize == 0); | 708 assert(job->cSize == 0); |
668 for (chunkNb = 1; chunkNb < nbChunks; chunkNb++) { | 709 for (chunkNb = 1; chunkNb < nbChunks; chunkNb++) { |
669 size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, chunkSize); | 710 size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, chunkSize); |
670 if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } | 711 if (ZSTD_isError(cSize)) JOB_ERROR(cSize); |
671 ip += chunkSize; | 712 ip += chunkSize; |
672 op += cSize; assert(op < oend); | 713 op += cSize; assert(op < oend); |
673 /* stats */ | 714 /* stats */ |
674 ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); | 715 ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); |
675 job->cSize += cSize; | 716 job->cSize += cSize; |
678 (U32)cSize, (U32)job->cSize); | 719 (U32)cSize, (U32)job->cSize); |
679 ZSTD_pthread_cond_signal(&job->job_cond); /* warns some more data is ready to be flushed */ | 720 ZSTD_pthread_cond_signal(&job->job_cond); /* warns some more data is ready to be flushed */ |
680 ZSTD_pthread_mutex_unlock(&job->job_mutex); | 721 ZSTD_pthread_mutex_unlock(&job->job_mutex); |
681 } | 722 } |
682 /* last block */ | 723 /* last block */ |
683 assert(chunkSize > 0); assert((chunkSize & (chunkSize - 1)) == 0); /* chunkSize must be power of 2 for mask==(chunkSize-1) to work */ | 724 assert(chunkSize > 0); |
725 assert((chunkSize & (chunkSize - 1)) == 0); /* chunkSize must be power of 2 for mask==(chunkSize-1) to work */ | |
684 if ((nbChunks > 0) | job->lastJob /*must output a "last block" flag*/ ) { | 726 if ((nbChunks > 0) | job->lastJob /*must output a "last block" flag*/ ) { |
685 size_t const lastBlockSize1 = job->src.size & (chunkSize-1); | 727 size_t const lastBlockSize1 = job->src.size & (chunkSize-1); |
686 size_t const lastBlockSize = ((lastBlockSize1==0) & (job->src.size>=chunkSize)) ? chunkSize : lastBlockSize1; | 728 size_t const lastBlockSize = ((lastBlockSize1==0) & (job->src.size>=chunkSize)) ? chunkSize : lastBlockSize1; |
687 size_t const cSize = (job->lastJob) ? | 729 size_t const cSize = (job->lastJob) ? |
688 ZSTD_compressEnd (cctx, op, oend-op, ip, lastBlockSize) : | 730 ZSTD_compressEnd (cctx, op, oend-op, ip, lastBlockSize) : |
689 ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize); | 731 ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize); |
690 if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } | 732 if (ZSTD_isError(cSize)) JOB_ERROR(cSize); |
691 /* stats */ | 733 lastCBlockSize = cSize; |
692 ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); | |
693 job->cSize += cSize; | |
694 ZSTD_pthread_mutex_unlock(&job->job_mutex); | |
695 } } | 734 } } |
696 | 735 |
697 _endJob: | 736 _endJob: |
698 ZSTDMT_serialState_ensureFinished(job->serial, job->jobID, job->cSize); | 737 ZSTDMT_serialState_ensureFinished(job->serial, job->jobID, job->cSize); |
699 if (job->prefix.size > 0) | 738 if (job->prefix.size > 0) |
702 /* release resources */ | 741 /* release resources */ |
703 ZSTDMT_releaseSeq(job->seqPool, rawSeqStore); | 742 ZSTDMT_releaseSeq(job->seqPool, rawSeqStore); |
704 ZSTDMT_releaseCCtx(job->cctxPool, cctx); | 743 ZSTDMT_releaseCCtx(job->cctxPool, cctx); |
705 /* report */ | 744 /* report */ |
706 ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); | 745 ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); |
707 job->consumed = job->src.size; | 746 if (ZSTD_isError(job->cSize)) assert(lastCBlockSize == 0); |
747 job->cSize += lastCBlockSize; | |
748 job->consumed = job->src.size; /* when job->consumed == job->src.size , compression job is presumed completed */ | |
708 ZSTD_pthread_cond_signal(&job->job_cond); | 749 ZSTD_pthread_cond_signal(&job->job_cond); |
709 ZSTD_pthread_mutex_unlock(&job->job_mutex); | 750 ZSTD_pthread_mutex_unlock(&job->job_mutex); |
710 } | 751 } |
711 | 752 |
712 | 753 |
743 ZSTDMT_CCtxPool* cctxPool; | 784 ZSTDMT_CCtxPool* cctxPool; |
744 ZSTDMT_seqPool* seqPool; | 785 ZSTDMT_seqPool* seqPool; |
745 ZSTD_CCtx_params params; | 786 ZSTD_CCtx_params params; |
746 size_t targetSectionSize; | 787 size_t targetSectionSize; |
747 size_t targetPrefixSize; | 788 size_t targetPrefixSize; |
789 int jobReady; /* 1 => one job is already prepared, but pool has shortage of workers. Don't create a new job. */ | |
790 inBuff_t inBuff; | |
748 roundBuff_t roundBuff; | 791 roundBuff_t roundBuff; |
749 inBuff_t inBuff; | |
750 int jobReady; /* 1 => one job is already prepared, but pool has shortage of workers. Don't create another one. */ | |
751 serialState_t serial; | 792 serialState_t serial; |
752 unsigned singleBlockingThread; | 793 unsigned singleBlockingThread; |
753 unsigned jobIDMask; | 794 unsigned jobIDMask; |
754 unsigned doneJobID; | 795 unsigned doneJobID; |
755 unsigned nextJobID; | 796 unsigned nextJobID; |
795 ZSTDMT_freeJobsTable(jobTable, nbJobs, cMem); | 836 ZSTDMT_freeJobsTable(jobTable, nbJobs, cMem); |
796 return NULL; | 837 return NULL; |
797 } | 838 } |
798 return jobTable; | 839 return jobTable; |
799 } | 840 } |
841 | |
842 static size_t ZSTDMT_expandJobsTable (ZSTDMT_CCtx* mtctx, U32 nbWorkers) { | |
843 U32 nbJobs = nbWorkers + 2; | |
844 if (nbJobs > mtctx->jobIDMask+1) { /* need more job capacity */ | |
845 ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem); | |
846 mtctx->jobIDMask = 0; | |
847 mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, mtctx->cMem); | |
848 if (mtctx->jobs==NULL) return ERROR(memory_allocation); | |
849 assert((nbJobs != 0) && ((nbJobs & (nbJobs - 1)) == 0)); /* ensure nbJobs is a power of 2 */ | |
850 mtctx->jobIDMask = nbJobs - 1; | |
851 } | |
852 return 0; | |
853 } | |
854 | |
800 | 855 |
801 /* ZSTDMT_CCtxParam_setNbWorkers(): | 856 /* ZSTDMT_CCtxParam_setNbWorkers(): |
802 * Internal use only */ | 857 * Internal use only */ |
803 size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers) | 858 size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers) |
804 { | 859 { |
873 DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted"); | 928 DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted"); |
874 while (mtctx->doneJobID < mtctx->nextJobID) { | 929 while (mtctx->doneJobID < mtctx->nextJobID) { |
875 unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask; | 930 unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask; |
876 ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex); | 931 ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex); |
877 while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].src.size) { | 932 while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].src.size) { |
878 DEBUGLOG(5, "waiting for jobCompleted signal from job %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */ | 933 DEBUGLOG(4, "waiting for jobCompleted signal from job %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */ |
879 ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex); | 934 ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex); |
880 } | 935 } |
881 ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex); | 936 ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex); |
882 mtctx->doneJobID++; | 937 mtctx->doneJobID++; |
883 } | 938 } |
922 case ZSTDMT_p_jobSize : | 977 case ZSTDMT_p_jobSize : |
923 DEBUGLOG(4, "ZSTDMT_CCtxParam_setMTCtxParameter : set jobSize to %u", value); | 978 DEBUGLOG(4, "ZSTDMT_CCtxParam_setMTCtxParameter : set jobSize to %u", value); |
924 if ( (value > 0) /* value==0 => automatic job size */ | 979 if ( (value > 0) /* value==0 => automatic job size */ |
925 & (value < ZSTDMT_JOBSIZE_MIN) ) | 980 & (value < ZSTDMT_JOBSIZE_MIN) ) |
926 value = ZSTDMT_JOBSIZE_MIN; | 981 value = ZSTDMT_JOBSIZE_MIN; |
982 if (value > ZSTDMT_JOBSIZE_MAX) | |
983 value = ZSTDMT_JOBSIZE_MAX; | |
927 params->jobSize = value; | 984 params->jobSize = value; |
928 return value; | 985 return value; |
929 case ZSTDMT_p_overlapSectionLog : | 986 case ZSTDMT_p_overlapSectionLog : |
930 if (value > 9) value = 9; | 987 if (value > 9) value = 9; |
931 DEBUGLOG(4, "ZSTDMT_p_overlapSectionLog : %u", value); | 988 DEBUGLOG(4, "ZSTDMT_p_overlapSectionLog : %u", value); |
948 default : | 1005 default : |
949 return ERROR(parameter_unsupported); | 1006 return ERROR(parameter_unsupported); |
950 } | 1007 } |
951 } | 1008 } |
952 | 1009 |
1010 size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned* value) | |
1011 { | |
1012 switch (parameter) { | |
1013 case ZSTDMT_p_jobSize: | |
1014 *value = mtctx->params.jobSize; | |
1015 break; | |
1016 case ZSTDMT_p_overlapSectionLog: | |
1017 *value = mtctx->params.overlapSizeLog; | |
1018 break; | |
1019 default: | |
1020 return ERROR(parameter_unsupported); | |
1021 } | |
1022 return 0; | |
1023 } | |
1024 | |
953 /* Sets parameters relevant to the compression job, | 1025 /* Sets parameters relevant to the compression job, |
954 * initializing others to default values. */ | 1026 * initializing others to default values. */ |
955 static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) | 1027 static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) |
956 { | 1028 { |
957 ZSTD_CCtx_params jobParams; | 1029 ZSTD_CCtx_params jobParams; |
958 memset(&jobParams, 0, sizeof(jobParams)); | 1030 memset(&jobParams, 0, sizeof(jobParams)); |
959 | 1031 |
960 jobParams.cParams = params.cParams; | 1032 jobParams.cParams = params.cParams; |
961 jobParams.fParams = params.fParams; | 1033 jobParams.fParams = params.fParams; |
962 jobParams.compressionLevel = params.compressionLevel; | 1034 jobParams.compressionLevel = params.compressionLevel; |
963 jobParams.disableLiteralCompression = params.disableLiteralCompression; | |
964 | 1035 |
965 return jobParams; | 1036 return jobParams; |
966 } | 1037 } |
967 | 1038 |
1039 | |
1040 /* ZSTDMT_resize() : | |
1041 * @return : error code if fails, 0 on success */ | |
1042 static size_t ZSTDMT_resize(ZSTDMT_CCtx* mtctx, unsigned nbWorkers) | |
1043 { | |
1044 if (POOL_resize(mtctx->factory, nbWorkers)) return ERROR(memory_allocation); | |
1045 CHECK_F( ZSTDMT_expandJobsTable(mtctx, nbWorkers) ); | |
1046 mtctx->bufPool = ZSTDMT_expandBufferPool(mtctx->bufPool, nbWorkers); | |
1047 if (mtctx->bufPool == NULL) return ERROR(memory_allocation); | |
1048 mtctx->cctxPool = ZSTDMT_expandCCtxPool(mtctx->cctxPool, nbWorkers); | |
1049 if (mtctx->cctxPool == NULL) return ERROR(memory_allocation); | |
1050 mtctx->seqPool = ZSTDMT_expandSeqPool(mtctx->seqPool, nbWorkers); | |
1051 if (mtctx->seqPool == NULL) return ERROR(memory_allocation); | |
1052 ZSTDMT_CCtxParam_setNbWorkers(&mtctx->params, nbWorkers); | |
1053 return 0; | |
1054 } | |
1055 | |
1056 | |
968 /*! ZSTDMT_updateCParams_whileCompressing() : | 1057 /*! ZSTDMT_updateCParams_whileCompressing() : |
969 * Updates only a selected set of compression parameters, to remain compatible with current frame. | 1058 * Updates a selected set of compression parameters, remaining compatible with currently active frame. |
970 * New parameters will be applied to next compression job. */ | 1059 * New parameters will be applied to next compression job. */ |
971 void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* cctxParams) | 1060 void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* cctxParams) |
972 { | 1061 { |
973 U32 const saved_wlog = mtctx->params.cParams.windowLog; /* Do not modify windowLog while compressing */ | 1062 U32 const saved_wlog = mtctx->params.cParams.windowLog; /* Do not modify windowLog while compressing */ |
974 int const compressionLevel = cctxParams->compressionLevel; | 1063 int const compressionLevel = cctxParams->compressionLevel; |
979 cParams.windowLog = saved_wlog; | 1068 cParams.windowLog = saved_wlog; |
980 mtctx->params.cParams = cParams; | 1069 mtctx->params.cParams = cParams; |
981 } | 1070 } |
982 } | 1071 } |
983 | 1072 |
984 /* ZSTDMT_getNbWorkers(): | |
985 * @return nb threads currently active in mtctx. | |
986 * mtctx must be valid */ | |
987 unsigned ZSTDMT_getNbWorkers(const ZSTDMT_CCtx* mtctx) | |
988 { | |
989 assert(mtctx != NULL); | |
990 return mtctx->params.nbWorkers; | |
991 } | |
992 | |
993 /* ZSTDMT_getFrameProgression(): | 1073 /* ZSTDMT_getFrameProgression(): |
994 * tells how much data has been consumed (input) and produced (output) for current frame. | 1074 * tells how much data has been consumed (input) and produced (output) for current frame. |
995 * able to count progression inside worker threads. | 1075 * able to count progression inside worker threads. |
996 * Note : mutex will be acquired during statistics collection. */ | 1076 * Note : mutex will be acquired during statistics collection inside workers. */ |
997 ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) | 1077 ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) |
998 { | 1078 { |
999 ZSTD_frameProgression fps; | 1079 ZSTD_frameProgression fps; |
1000 DEBUGLOG(6, "ZSTDMT_getFrameProgression"); | 1080 DEBUGLOG(5, "ZSTDMT_getFrameProgression"); |
1081 fps.ingested = mtctx->consumed + mtctx->inBuff.filled; | |
1001 fps.consumed = mtctx->consumed; | 1082 fps.consumed = mtctx->consumed; |
1002 fps.produced = mtctx->produced; | 1083 fps.produced = fps.flushed = mtctx->produced; |
1003 fps.ingested = mtctx->consumed + mtctx->inBuff.filled; | 1084 fps.currentJobID = mtctx->nextJobID; |
1085 fps.nbActiveWorkers = 0; | |
1004 { unsigned jobNb; | 1086 { unsigned jobNb; |
1005 unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1); | 1087 unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1); |
1006 DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)", | 1088 DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)", |
1007 mtctx->doneJobID, lastJobNb, mtctx->jobReady) | 1089 mtctx->doneJobID, lastJobNb, mtctx->jobReady) |
1008 for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) { | 1090 for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) { |
1009 unsigned const wJobID = jobNb & mtctx->jobIDMask; | 1091 unsigned const wJobID = jobNb & mtctx->jobIDMask; |
1010 ZSTD_pthread_mutex_lock(&mtctx->jobs[wJobID].job_mutex); | 1092 ZSTDMT_jobDescription* jobPtr = &mtctx->jobs[wJobID]; |
1011 { size_t const cResult = mtctx->jobs[wJobID].cSize; | 1093 ZSTD_pthread_mutex_lock(&jobPtr->job_mutex); |
1094 { size_t const cResult = jobPtr->cSize; | |
1012 size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; | 1095 size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; |
1013 fps.consumed += mtctx->jobs[wJobID].consumed; | 1096 size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed; |
1014 fps.ingested += mtctx->jobs[wJobID].src.size; | 1097 assert(flushed <= produced); |
1098 fps.ingested += jobPtr->src.size; | |
1099 fps.consumed += jobPtr->consumed; | |
1015 fps.produced += produced; | 1100 fps.produced += produced; |
1101 fps.flushed += flushed; | |
1102 fps.nbActiveWorkers += (jobPtr->consumed < jobPtr->src.size); | |
1016 } | 1103 } |
1017 ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); | 1104 ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); |
1018 } | 1105 } |
1019 } | 1106 } |
1020 return fps; | 1107 return fps; |
1108 } | |
1109 | |
1110 | |
1111 size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx) | |
1112 { | |
1113 size_t toFlush; | |
1114 unsigned const jobID = mtctx->doneJobID; | |
1115 assert(jobID <= mtctx->nextJobID); | |
1116 if (jobID == mtctx->nextJobID) return 0; /* no active job => nothing to flush */ | |
1117 | |
1118 /* look into oldest non-fully-flushed job */ | |
1119 { unsigned const wJobID = jobID & mtctx->jobIDMask; | |
1120 ZSTDMT_jobDescription* const jobPtr = &mtctx->jobs[wJobID]; | |
1121 ZSTD_pthread_mutex_lock(&jobPtr->job_mutex); | |
1122 { size_t const cResult = jobPtr->cSize; | |
1123 size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; | |
1124 size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed; | |
1125 assert(flushed <= produced); | |
1126 toFlush = produced - flushed; | |
1127 if (toFlush==0 && (jobPtr->consumed >= jobPtr->src.size)) { | |
1128 /* doneJobID is not-fully-flushed, but toFlush==0 : doneJobID should be compressing some more data */ | |
1129 assert(jobPtr->consumed < jobPtr->src.size); | |
1130 } | |
1131 } | |
1132 ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); | |
1133 } | |
1134 | |
1135 return toFlush; | |
1021 } | 1136 } |
1022 | 1137 |
1023 | 1138 |
1024 /* ------------------------------------------ */ | 1139 /* ------------------------------------------ */ |
1025 /* ===== Multi-threaded compression ===== */ | 1140 /* ===== Multi-threaded compression ===== */ |
1085 return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, jobParams); | 1200 return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, jobParams); |
1086 } | 1201 } |
1087 | 1202 |
1088 assert(avgJobSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */ | 1203 assert(avgJobSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */ |
1089 ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) ); | 1204 ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) ); |
1090 if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params)) | 1205 if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, avgJobSize)) |
1091 return ERROR(memory_allocation); | 1206 return ERROR(memory_allocation); |
1092 | 1207 |
1093 if (nbJobs > mtctx->jobIDMask+1) { /* enlarge job table */ | 1208 CHECK_F( ZSTDMT_expandJobsTable(mtctx, nbJobs) ); /* only expands if necessary */ |
1094 U32 jobsTableSize = nbJobs; | |
1095 ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem); | |
1096 mtctx->jobIDMask = 0; | |
1097 mtctx->jobs = ZSTDMT_createJobsTable(&jobsTableSize, mtctx->cMem); | |
1098 if (mtctx->jobs==NULL) return ERROR(memory_allocation); | |
1099 assert((jobsTableSize != 0) && ((jobsTableSize & (jobsTableSize - 1)) == 0)); /* ensure jobsTableSize is a power of 2 */ | |
1100 mtctx->jobIDMask = jobsTableSize - 1; | |
1101 } | |
1102 | 1209 |
1103 { unsigned u; | 1210 { unsigned u; |
1104 for (u=0; u<nbJobs; u++) { | 1211 for (u=0; u<nbJobs; u++) { |
1105 size_t const jobSize = MIN(remainingSrcSize, avgJobSize); | 1212 size_t const jobSize = MIN(remainingSrcSize, avgJobSize); |
1106 size_t const dstBufferCapacity = ZSTD_compressBound(jobSize); | 1213 size_t const dstBufferCapacity = ZSTD_compressBound(jobSize); |
1219 ZSTDMT_CCtx* mtctx, | 1326 ZSTDMT_CCtx* mtctx, |
1220 const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType, | 1327 const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType, |
1221 const ZSTD_CDict* cdict, ZSTD_CCtx_params params, | 1328 const ZSTD_CDict* cdict, ZSTD_CCtx_params params, |
1222 unsigned long long pledgedSrcSize) | 1329 unsigned long long pledgedSrcSize) |
1223 { | 1330 { |
1224 DEBUGLOG(4, "ZSTDMT_initCStream_internal (pledgedSrcSize=%u, nbWorkers=%u, cctxPool=%u, disableLiteralCompression=%i)", | 1331 DEBUGLOG(4, "ZSTDMT_initCStream_internal (pledgedSrcSize=%u, nbWorkers=%u, cctxPool=%u)", |
1225 (U32)pledgedSrcSize, params.nbWorkers, mtctx->cctxPool->totalCCtx, params.disableLiteralCompression); | 1332 (U32)pledgedSrcSize, params.nbWorkers, mtctx->cctxPool->totalCCtx); |
1226 /* params are supposed to be fully validated at this point */ | 1333 |
1334 /* params supposed partially fully validated at this point */ | |
1227 assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); | 1335 assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); |
1228 assert(!((dict) && (cdict))); /* either dict or cdict, not both */ | 1336 assert(!((dict) && (cdict))); /* either dict or cdict, not both */ |
1229 assert(mtctx->cctxPool->totalCCtx == params.nbWorkers); | |
1230 | 1337 |
1231 /* init */ | 1338 /* init */ |
1232 if (params.jobSize == 0) { | 1339 if (params.nbWorkers != mtctx->params.nbWorkers) |
1233 params.jobSize = 1U << ZSTDMT_computeTargetJobLog(params); | 1340 CHECK_F( ZSTDMT_resize(mtctx, params.nbWorkers) ); |
1234 } | 1341 |
1342 if (params.jobSize > 0 && params.jobSize < ZSTDMT_JOBSIZE_MIN) params.jobSize = ZSTDMT_JOBSIZE_MIN; | |
1235 if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX; | 1343 if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX; |
1236 | 1344 |
1237 mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */ | 1345 mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */ |
1238 if (mtctx->singleBlockingThread) { | 1346 if (mtctx->singleBlockingThread) { |
1239 ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(params); | 1347 ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(params); |
1268 } | 1376 } |
1269 | 1377 |
1270 mtctx->targetPrefixSize = (size_t)1 << ZSTDMT_computeOverlapLog(params); | 1378 mtctx->targetPrefixSize = (size_t)1 << ZSTDMT_computeOverlapLog(params); |
1271 DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(mtctx->targetPrefixSize>>10)); | 1379 DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(mtctx->targetPrefixSize>>10)); |
1272 mtctx->targetSectionSize = params.jobSize; | 1380 mtctx->targetSectionSize = params.jobSize; |
1273 if (mtctx->targetSectionSize < ZSTDMT_JOBSIZE_MIN) mtctx->targetSectionSize = ZSTDMT_JOBSIZE_MIN; | 1381 if (mtctx->targetSectionSize == 0) { |
1382 mtctx->targetSectionSize = 1ULL << ZSTDMT_computeTargetJobLog(params); | |
1383 } | |
1274 if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */ | 1384 if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */ |
1275 DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize); | 1385 DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize); |
1276 DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10)); | 1386 DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10)); |
1277 ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetSectionSize)); | 1387 ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetSectionSize)); |
1278 { | 1388 { |
1310 mtctx->nextJobID = 0; | 1420 mtctx->nextJobID = 0; |
1311 mtctx->frameEnded = 0; | 1421 mtctx->frameEnded = 0; |
1312 mtctx->allJobsCompleted = 0; | 1422 mtctx->allJobsCompleted = 0; |
1313 mtctx->consumed = 0; | 1423 mtctx->consumed = 0; |
1314 mtctx->produced = 0; | 1424 mtctx->produced = 0; |
1315 if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params)) | 1425 if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, mtctx->targetSectionSize)) |
1316 return ERROR(memory_allocation); | 1426 return ERROR(memory_allocation); |
1317 return 0; | 1427 return 0; |
1318 } | 1428 } |
1319 | 1429 |
1320 size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, | 1430 size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, |
1418 mtctx->jobs[jobID].seqPool = mtctx->seqPool; | 1528 mtctx->jobs[jobID].seqPool = mtctx->seqPool; |
1419 mtctx->jobs[jobID].serial = &mtctx->serial; | 1529 mtctx->jobs[jobID].serial = &mtctx->serial; |
1420 mtctx->jobs[jobID].jobID = mtctx->nextJobID; | 1530 mtctx->jobs[jobID].jobID = mtctx->nextJobID; |
1421 mtctx->jobs[jobID].firstJob = (mtctx->nextJobID==0); | 1531 mtctx->jobs[jobID].firstJob = (mtctx->nextJobID==0); |
1422 mtctx->jobs[jobID].lastJob = endFrame; | 1532 mtctx->jobs[jobID].lastJob = endFrame; |
1423 mtctx->jobs[jobID].frameChecksumNeeded = endFrame && (mtctx->nextJobID>0) && mtctx->params.fParams.checksumFlag; | 1533 mtctx->jobs[jobID].frameChecksumNeeded = mtctx->params.fParams.checksumFlag && endFrame && (mtctx->nextJobID>0); |
1424 mtctx->jobs[jobID].dstFlushed = 0; | 1534 mtctx->jobs[jobID].dstFlushed = 0; |
1425 | 1535 |
1426 /* Update the round buffer pos and clear the input buffer to be reset */ | 1536 /* Update the round buffer pos and clear the input buffer to be reset */ |
1427 mtctx->roundBuff.pos += srcSize; | 1537 mtctx->roundBuff.pos += srcSize; |
1428 mtctx->inBuff.buffer = g_nullBuffer; | 1538 mtctx->inBuff.buffer = g_nullBuffer; |
1466 return 0; | 1576 return 0; |
1467 } | 1577 } |
1468 | 1578 |
1469 | 1579 |
1470 /*! ZSTDMT_flushProduced() : | 1580 /*! ZSTDMT_flushProduced() : |
1581 * flush whatever data has been produced but not yet flushed in current job. | |
1582 * move to next job if current one is fully flushed. | |
1471 * `output` : `pos` will be updated with amount of data flushed . | 1583 * `output` : `pos` will be updated with amount of data flushed . |
1472 * `blockToFlush` : if >0, the function will block and wait if there is no data available to flush . | 1584 * `blockToFlush` : if >0, the function will block and wait if there is no data available to flush . |
1473 * @return : amount of data remaining within internal buffer, 0 if no more, 1 if unknown but > 0, or an error code */ | 1585 * @return : amount of data remaining within internal buffer, 0 if no more, 1 if unknown but > 0, or an error code */ |
1474 static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned blockToFlush, ZSTD_EndDirective end) | 1586 static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned blockToFlush, ZSTD_EndDirective end) |
1475 { | 1587 { |
1494 } } | 1606 } } |
1495 | 1607 |
1496 /* try to flush something */ | 1608 /* try to flush something */ |
1497 { size_t cSize = mtctx->jobs[wJobID].cSize; /* shared */ | 1609 { size_t cSize = mtctx->jobs[wJobID].cSize; /* shared */ |
1498 size_t const srcConsumed = mtctx->jobs[wJobID].consumed; /* shared */ | 1610 size_t const srcConsumed = mtctx->jobs[wJobID].consumed; /* shared */ |
1499 size_t const srcSize = mtctx->jobs[wJobID].src.size; /* read-only, could be done after mutex lock, but no-declaration-after-statement */ | 1611 size_t const srcSize = mtctx->jobs[wJobID].src.size; /* read-only, could be done after mutex lock, but no-declaration-after-statement */ |
1500 ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); | 1612 ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); |
1501 if (ZSTD_isError(cSize)) { | 1613 if (ZSTD_isError(cSize)) { |
1502 DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s", | 1614 DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s", |
1503 mtctx->doneJobID, ZSTD_getErrorName(cSize)); | 1615 mtctx->doneJobID, ZSTD_getErrorName(cSize)); |
1504 ZSTDMT_waitForAllJobsCompleted(mtctx); | 1616 ZSTDMT_waitForAllJobsCompleted(mtctx); |
1514 MEM_writeLE32((char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].cSize, checksum); | 1626 MEM_writeLE32((char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].cSize, checksum); |
1515 cSize += 4; | 1627 cSize += 4; |
1516 mtctx->jobs[wJobID].cSize += 4; /* can write this shared value, as worker is no longer active */ | 1628 mtctx->jobs[wJobID].cSize += 4; /* can write this shared value, as worker is no longer active */ |
1517 mtctx->jobs[wJobID].frameChecksumNeeded = 0; | 1629 mtctx->jobs[wJobID].frameChecksumNeeded = 0; |
1518 } | 1630 } |
1631 | |
1519 if (cSize > 0) { /* compression is ongoing or completed */ | 1632 if (cSize > 0) { /* compression is ongoing or completed */ |
1520 size_t const toFlush = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos); | 1633 size_t const toFlush = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos); |
1521 DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u, generated:%u)", | 1634 DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u, generated:%u)", |
1522 (U32)toFlush, mtctx->doneJobID, (U32)srcConsumed, (U32)srcSize, (U32)cSize); | 1635 (U32)toFlush, mtctx->doneJobID, (U32)srcConsumed, (U32)srcSize, (U32)cSize); |
1523 assert(mtctx->doneJobID < mtctx->nextJobID); | 1636 assert(mtctx->doneJobID < mtctx->nextJobID); |
1527 (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed, | 1640 (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed, |
1528 toFlush); | 1641 toFlush); |
1529 output->pos += toFlush; | 1642 output->pos += toFlush; |
1530 mtctx->jobs[wJobID].dstFlushed += toFlush; /* can write : this value is only used by mtctx */ | 1643 mtctx->jobs[wJobID].dstFlushed += toFlush; /* can write : this value is only used by mtctx */ |
1531 | 1644 |
1532 if ( (srcConsumed == srcSize) /* job completed */ | 1645 if ( (srcConsumed == srcSize) /* job is completed */ |
1533 && (mtctx->jobs[wJobID].dstFlushed == cSize) ) { /* output buffer fully flushed => free this job position */ | 1646 && (mtctx->jobs[wJobID].dstFlushed == cSize) ) { /* output buffer fully flushed => free this job position */ |
1534 DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one", | 1647 DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one", |
1535 mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed); | 1648 mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed); |
1536 ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff); | 1649 ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff); |
1650 DEBUGLOG(5, "dstBuffer released"); | |
1537 mtctx->jobs[wJobID].dstBuff = g_nullBuffer; | 1651 mtctx->jobs[wJobID].dstBuff = g_nullBuffer; |
1538 mtctx->jobs[wJobID].cSize = 0; /* ensure this job slot is considered "not started" in future check */ | 1652 mtctx->jobs[wJobID].cSize = 0; /* ensure this job slot is considered "not started" in future check */ |
1539 mtctx->consumed += srcSize; | 1653 mtctx->consumed += srcSize; |
1540 mtctx->produced += cSize; | 1654 mtctx->produced += cSize; |
1541 mtctx->doneJobID++; | 1655 mtctx->doneJobID++; |
1608 static int ZSTDMT_doesOverlapWindow(buffer_t buffer, ZSTD_window_t window) | 1722 static int ZSTDMT_doesOverlapWindow(buffer_t buffer, ZSTD_window_t window) |
1609 { | 1723 { |
1610 range_t extDict; | 1724 range_t extDict; |
1611 range_t prefix; | 1725 range_t prefix; |
1612 | 1726 |
1727 DEBUGLOG(5, "ZSTDMT_doesOverlapWindow"); | |
1613 extDict.start = window.dictBase + window.lowLimit; | 1728 extDict.start = window.dictBase + window.lowLimit; |
1614 extDict.size = window.dictLimit - window.lowLimit; | 1729 extDict.size = window.dictLimit - window.lowLimit; |
1615 | 1730 |
1616 prefix.start = window.base + window.dictLimit; | 1731 prefix.start = window.base + window.dictLimit; |
1617 prefix.size = window.nextSrc - (window.base + window.dictLimit); | 1732 prefix.size = window.nextSrc - (window.base + window.dictLimit); |
1628 | 1743 |
1629 static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, buffer_t buffer) | 1744 static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, buffer_t buffer) |
1630 { | 1745 { |
1631 if (mtctx->params.ldmParams.enableLdm) { | 1746 if (mtctx->params.ldmParams.enableLdm) { |
1632 ZSTD_pthread_mutex_t* mutex = &mtctx->serial.ldmWindowMutex; | 1747 ZSTD_pthread_mutex_t* mutex = &mtctx->serial.ldmWindowMutex; |
1748 DEBUGLOG(5, "ZSTDMT_waitForLdmComplete"); | |
1633 DEBUGLOG(5, "source [0x%zx, 0x%zx)", | 1749 DEBUGLOG(5, "source [0x%zx, 0x%zx)", |
1634 (size_t)buffer.start, | 1750 (size_t)buffer.start, |
1635 (size_t)buffer.start + buffer.capacity); | 1751 (size_t)buffer.start + buffer.capacity); |
1636 ZSTD_PTHREAD_MUTEX_LOCK(mutex); | 1752 ZSTD_PTHREAD_MUTEX_LOCK(mutex); |
1637 while (ZSTDMT_doesOverlapWindow(buffer, mtctx->serial.ldmWindow)) { | 1753 while (ZSTDMT_doesOverlapWindow(buffer, mtctx->serial.ldmWindow)) { |
1638 DEBUGLOG(6, "Waiting for LDM to finish..."); | 1754 DEBUGLOG(5, "Waiting for LDM to finish..."); |
1639 ZSTD_pthread_cond_wait(&mtctx->serial.ldmWindowCond, mutex); | 1755 ZSTD_pthread_cond_wait(&mtctx->serial.ldmWindowCond, mutex); |
1640 } | 1756 } |
1641 DEBUGLOG(6, "Done waiting for LDM to finish"); | 1757 DEBUGLOG(6, "Done waiting for LDM to finish"); |
1642 ZSTD_pthread_mutex_unlock(mutex); | 1758 ZSTD_pthread_mutex_unlock(mutex); |
1643 } | 1759 } |
1653 range_t const inUse = ZSTDMT_getInputDataInUse(mtctx); | 1769 range_t const inUse = ZSTDMT_getInputDataInUse(mtctx); |
1654 size_t const spaceLeft = mtctx->roundBuff.capacity - mtctx->roundBuff.pos; | 1770 size_t const spaceLeft = mtctx->roundBuff.capacity - mtctx->roundBuff.pos; |
1655 size_t const target = mtctx->targetSectionSize; | 1771 size_t const target = mtctx->targetSectionSize; |
1656 buffer_t buffer; | 1772 buffer_t buffer; |
1657 | 1773 |
1774 DEBUGLOG(5, "ZSTDMT_tryGetInputRange"); | |
1658 assert(mtctx->inBuff.buffer.start == NULL); | 1775 assert(mtctx->inBuff.buffer.start == NULL); |
1659 assert(mtctx->roundBuff.capacity >= target); | 1776 assert(mtctx->roundBuff.capacity >= target); |
1660 | 1777 |
1661 if (spaceLeft < target) { | 1778 if (spaceLeft < target) { |
1662 /* ZSTD_invalidateRepCodes() doesn't work for extDict variants. | 1779 /* ZSTD_invalidateRepCodes() doesn't work for extDict variants. |
1666 size_t const prefixSize = mtctx->inBuff.prefix.size; | 1783 size_t const prefixSize = mtctx->inBuff.prefix.size; |
1667 | 1784 |
1668 buffer.start = start; | 1785 buffer.start = start; |
1669 buffer.capacity = prefixSize; | 1786 buffer.capacity = prefixSize; |
1670 if (ZSTDMT_isOverlapped(buffer, inUse)) { | 1787 if (ZSTDMT_isOverlapped(buffer, inUse)) { |
1671 DEBUGLOG(6, "Waiting for buffer..."); | 1788 DEBUGLOG(5, "Waiting for buffer..."); |
1672 return 0; | 1789 return 0; |
1673 } | 1790 } |
1674 ZSTDMT_waitForLdmComplete(mtctx, buffer); | 1791 ZSTDMT_waitForLdmComplete(mtctx, buffer); |
1675 memmove(start, mtctx->inBuff.prefix.start, prefixSize); | 1792 memmove(start, mtctx->inBuff.prefix.start, prefixSize); |
1676 mtctx->inBuff.prefix.start = start; | 1793 mtctx->inBuff.prefix.start = start; |
1678 } | 1795 } |
1679 buffer.start = mtctx->roundBuff.buffer + mtctx->roundBuff.pos; | 1796 buffer.start = mtctx->roundBuff.buffer + mtctx->roundBuff.pos; |
1680 buffer.capacity = target; | 1797 buffer.capacity = target; |
1681 | 1798 |
1682 if (ZSTDMT_isOverlapped(buffer, inUse)) { | 1799 if (ZSTDMT_isOverlapped(buffer, inUse)) { |
1683 DEBUGLOG(6, "Waiting for buffer..."); | 1800 DEBUGLOG(5, "Waiting for buffer..."); |
1684 return 0; | 1801 return 0; |
1685 } | 1802 } |
1686 assert(!ZSTDMT_isOverlapped(buffer, mtctx->inBuff.prefix)); | 1803 assert(!ZSTDMT_isOverlapped(buffer, mtctx->inBuff.prefix)); |
1687 | 1804 |
1688 ZSTDMT_waitForLdmComplete(mtctx, buffer); | 1805 ZSTDMT_waitForLdmComplete(mtctx, buffer); |
1751 assert(mtctx->inBuff.filled == 0); /* Can't fill an empty buffer */ | 1868 assert(mtctx->inBuff.filled == 0); /* Can't fill an empty buffer */ |
1752 if (!ZSTDMT_tryGetInputRange(mtctx)) { | 1869 if (!ZSTDMT_tryGetInputRange(mtctx)) { |
1753 /* It is only possible for this operation to fail if there are | 1870 /* It is only possible for this operation to fail if there are |
1754 * still compression jobs ongoing. | 1871 * still compression jobs ongoing. |
1755 */ | 1872 */ |
1873 DEBUGLOG(5, "ZSTDMT_tryGetInputRange failed"); | |
1756 assert(mtctx->doneJobID != mtctx->nextJobID); | 1874 assert(mtctx->doneJobID != mtctx->nextJobID); |
1757 } | 1875 } else |
1876 DEBUGLOG(5, "ZSTDMT_tryGetInputRange completed successfully : mtctx->inBuff.buffer.start = %p", mtctx->inBuff.buffer.start); | |
1758 } | 1877 } |
1759 if (mtctx->inBuff.buffer.start != NULL) { | 1878 if (mtctx->inBuff.buffer.start != NULL) { |
1760 size_t const toLoad = MIN(input->size - input->pos, mtctx->targetSectionSize - mtctx->inBuff.filled); | 1879 size_t const toLoad = MIN(input->size - input->pos, mtctx->targetSectionSize - mtctx->inBuff.filled); |
1761 assert(mtctx->inBuff.buffer.capacity >= mtctx->targetSectionSize); | 1880 assert(mtctx->inBuff.buffer.capacity >= mtctx->targetSectionSize); |
1762 DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u", | 1881 DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u", |
1780 } | 1899 } |
1781 | 1900 |
1782 /* check for potential compressed data ready to be flushed */ | 1901 /* check for potential compressed data ready to be flushed */ |
1783 { size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp); /* block if there was no forward input progress */ | 1902 { size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp); /* block if there was no forward input progress */ |
1784 if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not end flush yet */ | 1903 if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not end flush yet */ |
1904 DEBUGLOG(5, "end of ZSTDMT_compressStream_generic: remainingToFlush = %u", (U32)remainingToFlush); | |
1785 return remainingToFlush; | 1905 return remainingToFlush; |
1786 } | 1906 } |
1787 } | 1907 } |
1788 | 1908 |
1789 | 1909 |