|
1 /** |
|
2 * Copyright (c) 2016-present, Yann Collet, Facebook, Inc. |
|
3 * All rights reserved. |
|
4 * |
|
5 * This source code is licensed under the BSD-style license found in the |
|
6 * LICENSE file in the root directory of this source tree. An additional grant |
|
7 * of patent rights can be found in the PATENTS file in the same directory. |
|
8 */ |
|
9 |
|
10 |
|
11 /* ====== Tuning parameters ====== */ |
|
12 #define ZSTDMT_NBTHREADS_MAX 128 |
|
13 |
|
14 |
|
15 /* ====== Compiler specifics ====== */ |
|
16 #if defined(_MSC_VER) |
|
17 # pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */ |
|
18 #endif |
|
19 |
|
20 |
|
21 /* ====== Dependencies ====== */ |
|
22 #include <stdlib.h> /* malloc */ |
|
23 #include <string.h> /* memcpy */ |
|
24 #include "pool.h" /* threadpool */ |
|
25 #include "threading.h" /* mutex */ |
|
26 #include "zstd_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */ |
|
27 #include "zstdmt_compress.h" |
|
28 #define XXH_STATIC_LINKING_ONLY /* XXH64_state_t */ |
|
29 #include "xxhash.h" |
|
30 |
|
31 |
|
32 /* ====== Debug ====== */ |
|
33 #if 0 |
|
34 |
|
35 # include <stdio.h> |
|
36 # include <unistd.h> |
|
37 # include <sys/times.h> |
|
38 static unsigned g_debugLevel = 3; |
|
39 # define DEBUGLOGRAW(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __VA_ARGS__); } |
|
40 # define DEBUGLOG(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __FILE__ ": "); fprintf(stderr, __VA_ARGS__); fprintf(stderr, " \n"); } |
|
41 |
|
42 # define DEBUG_PRINTHEX(l,p,n) { \ |
|
43 unsigned debug_u; \ |
|
44 for (debug_u=0; debug_u<(n); debug_u++) \ |
|
45 DEBUGLOGRAW(l, "%02X ", ((const unsigned char*)(p))[debug_u]); \ |
|
46 DEBUGLOGRAW(l, " \n"); \ |
|
47 } |
|
48 |
|
49 static unsigned long long GetCurrentClockTimeMicroseconds() |
|
50 { |
|
51 static clock_t _ticksPerSecond = 0; |
|
52 if (_ticksPerSecond <= 0) _ticksPerSecond = sysconf(_SC_CLK_TCK); |
|
53 |
|
54 struct tms junk; clock_t newTicks = (clock_t) times(&junk); |
|
55 return ((((unsigned long long)newTicks)*(1000000))/_ticksPerSecond); |
|
56 } |
|
57 |
|
58 #define MUTEX_WAIT_TIME_DLEVEL 5 |
|
59 #define PTHREAD_MUTEX_LOCK(mutex) \ |
|
60 if (g_debugLevel>=MUTEX_WAIT_TIME_DLEVEL) { \ |
|
61 unsigned long long beforeTime = GetCurrentClockTimeMicroseconds(); \ |
|
62 pthread_mutex_lock(mutex); \ |
|
63 unsigned long long afterTime = GetCurrentClockTimeMicroseconds(); \ |
|
64 unsigned long long elapsedTime = (afterTime-beforeTime); \ |
|
65 if (elapsedTime > 1000) { /* or whatever threshold you like; I'm using 1 millisecond here */ \ |
|
66 DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "Thread took %llu microseconds to acquire mutex %s \n", \ |
|
67 elapsedTime, #mutex); \ |
|
68 } \ |
|
69 } else pthread_mutex_lock(mutex); |
|
70 |
|
71 #else |
|
72 |
|
73 # define DEBUGLOG(l, ...) {} /* disabled */ |
|
74 # define PTHREAD_MUTEX_LOCK(m) pthread_mutex_lock(m) |
|
75 # define DEBUG_PRINTHEX(l,p,n) {} |
|
76 |
|
77 #endif |
|
78 |
|
79 |
|
80 /* ===== Buffer Pool ===== */ |
|
81 |
|
82 typedef struct buffer_s { |
|
83 void* start; |
|
84 size_t size; |
|
85 } buffer_t; |
|
86 |
|
87 static const buffer_t g_nullBuffer = { NULL, 0 }; |
|
88 |
|
89 typedef struct ZSTDMT_bufferPool_s { |
|
90 unsigned totalBuffers; |
|
91 unsigned nbBuffers; |
|
92 buffer_t bTable[1]; /* variable size */ |
|
93 } ZSTDMT_bufferPool; |
|
94 |
|
95 static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbThreads) |
|
96 { |
|
97 unsigned const maxNbBuffers = 2*nbThreads + 2; |
|
98 ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)calloc(1, sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t)); |
|
99 if (bufPool==NULL) return NULL; |
|
100 bufPool->totalBuffers = maxNbBuffers; |
|
101 bufPool->nbBuffers = 0; |
|
102 return bufPool; |
|
103 } |
|
104 |
|
105 static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool* bufPool) |
|
106 { |
|
107 unsigned u; |
|
108 if (!bufPool) return; /* compatibility with free on NULL */ |
|
109 for (u=0; u<bufPool->totalBuffers; u++) |
|
110 free(bufPool->bTable[u].start); |
|
111 free(bufPool); |
|
112 } |
|
113 |
|
114 /* assumption : invocation from main thread only ! */ |
|
115 static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize) |
|
116 { |
|
117 if (pool->nbBuffers) { /* try to use an existing buffer */ |
|
118 buffer_t const buf = pool->bTable[--(pool->nbBuffers)]; |
|
119 size_t const availBufferSize = buf.size; |
|
120 if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize)) /* large enough, but not too much */ |
|
121 return buf; |
|
122 free(buf.start); /* size conditions not respected : scratch this buffer and create a new one */ |
|
123 } |
|
124 /* create new buffer */ |
|
125 { buffer_t buffer; |
|
126 void* const start = malloc(bSize); |
|
127 if (start==NULL) bSize = 0; |
|
128 buffer.start = start; /* note : start can be NULL if malloc fails ! */ |
|
129 buffer.size = bSize; |
|
130 return buffer; |
|
131 } |
|
132 } |
|
133 |
|
134 /* store buffer for later re-use, up to pool capacity */ |
|
135 static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf) |
|
136 { |
|
137 if (buf.start == NULL) return; /* release on NULL */ |
|
138 if (pool->nbBuffers < pool->totalBuffers) { |
|
139 pool->bTable[pool->nbBuffers++] = buf; /* store for later re-use */ |
|
140 return; |
|
141 } |
|
142 /* Reached bufferPool capacity (should not happen) */ |
|
143 free(buf.start); |
|
144 } |
|
145 |
|
146 |
|
147 /* ===== CCtx Pool ===== */ |
|
148 |
|
149 typedef struct { |
|
150 unsigned totalCCtx; |
|
151 unsigned availCCtx; |
|
152 ZSTD_CCtx* cctx[1]; /* variable size */ |
|
153 } ZSTDMT_CCtxPool; |
|
154 |
|
155 /* assumption : CCtxPool invocation only from main thread */ |
|
156 |
|
157 /* note : all CCtx borrowed from the pool should be released back to the pool _before_ freeing the pool */ |
|
158 static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool) |
|
159 { |
|
160 unsigned u; |
|
161 for (u=0; u<pool->totalCCtx; u++) |
|
162 ZSTD_freeCCtx(pool->cctx[u]); /* note : compatible with free on NULL */ |
|
163 free(pool); |
|
164 } |
|
165 |
|
166 /* ZSTDMT_createCCtxPool() : |
|
167 * implies nbThreads >= 1 , checked by caller ZSTDMT_createCCtx() */ |
|
168 static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads) |
|
169 { |
|
170 ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) calloc(1, sizeof(ZSTDMT_CCtxPool) + (nbThreads-1)*sizeof(ZSTD_CCtx*)); |
|
171 if (!cctxPool) return NULL; |
|
172 cctxPool->totalCCtx = nbThreads; |
|
173 cctxPool->availCCtx = 1; /* at least one cctx for single-thread mode */ |
|
174 cctxPool->cctx[0] = ZSTD_createCCtx(); |
|
175 if (!cctxPool->cctx[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; } |
|
176 DEBUGLOG(1, "cctxPool created, with %u threads", nbThreads); |
|
177 return cctxPool; |
|
178 } |
|
179 |
|
180 static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* pool) |
|
181 { |
|
182 if (pool->availCCtx) { |
|
183 pool->availCCtx--; |
|
184 return pool->cctx[pool->availCCtx]; |
|
185 } |
|
186 return ZSTD_createCCtx(); /* note : can be NULL, when creation fails ! */ |
|
187 } |
|
188 |
|
189 static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) |
|
190 { |
|
191 if (cctx==NULL) return; /* compatibility with release on NULL */ |
|
192 if (pool->availCCtx < pool->totalCCtx) |
|
193 pool->cctx[pool->availCCtx++] = cctx; |
|
194 else |
|
195 /* pool overflow : should not happen, since totalCCtx==nbThreads */ |
|
196 ZSTD_freeCCtx(cctx); |
|
197 } |
|
198 |
|
199 |
|
200 /* ===== Thread worker ===== */ |
|
201 |
|
202 typedef struct { |
|
203 buffer_t buffer; |
|
204 size_t filled; |
|
205 } inBuff_t; |
|
206 |
|
207 typedef struct { |
|
208 ZSTD_CCtx* cctx; |
|
209 buffer_t src; |
|
210 const void* srcStart; |
|
211 size_t srcSize; |
|
212 size_t dictSize; |
|
213 buffer_t dstBuff; |
|
214 size_t cSize; |
|
215 size_t dstFlushed; |
|
216 unsigned firstChunk; |
|
217 unsigned lastChunk; |
|
218 unsigned jobCompleted; |
|
219 unsigned jobScanned; |
|
220 pthread_mutex_t* jobCompleted_mutex; |
|
221 pthread_cond_t* jobCompleted_cond; |
|
222 ZSTD_parameters params; |
|
223 ZSTD_CDict* cdict; |
|
224 unsigned long long fullFrameSize; |
|
225 } ZSTDMT_jobDescription; |
|
226 |
|
227 /* ZSTDMT_compressChunk() : POOL_function type */ |
|
228 void ZSTDMT_compressChunk(void* jobDescription) |
|
229 { |
|
230 ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; |
|
231 const void* const src = (const char*)job->srcStart + job->dictSize; |
|
232 buffer_t const dstBuff = job->dstBuff; |
|
233 DEBUGLOG(3, "job (first:%u) (last:%u) : dictSize %u, srcSize %u", job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize); |
|
234 if (job->cdict) { |
|
235 size_t const initError = ZSTD_compressBegin_usingCDict(job->cctx, job->cdict, job->fullFrameSize); |
|
236 if (job->cdict) DEBUGLOG(3, "using CDict "); |
|
237 if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } |
|
238 } else { |
|
239 size_t const initError = ZSTD_compressBegin_advanced(job->cctx, job->srcStart, job->dictSize, job->params, job->fullFrameSize); |
|
240 if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } |
|
241 ZSTD_setCCtxParameter(job->cctx, ZSTD_p_forceWindow, 1); |
|
242 } |
|
243 if (!job->firstChunk) { /* flush frame header */ |
|
244 size_t const hSize = ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, src, 0); |
|
245 if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; } |
|
246 ZSTD_invalidateRepCodes(job->cctx); |
|
247 } |
|
248 |
|
249 DEBUGLOG(4, "Compressing : "); |
|
250 DEBUG_PRINTHEX(4, job->srcStart, 12); |
|
251 job->cSize = (job->lastChunk) ? /* last chunk signal */ |
|
252 ZSTD_compressEnd (job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize) : |
|
253 ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize); |
|
254 DEBUGLOG(3, "compressed %u bytes into %u bytes (first:%u) (last:%u)", (unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk); |
|
255 |
|
256 _endJob: |
|
257 PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); |
|
258 job->jobCompleted = 1; |
|
259 job->jobScanned = 0; |
|
260 pthread_cond_signal(job->jobCompleted_cond); |
|
261 pthread_mutex_unlock(job->jobCompleted_mutex); |
|
262 } |
|
263 |
|
264 |
|
265 /* ------------------------------------------ */ |
|
266 /* ===== Multi-threaded compression ===== */ |
|
267 /* ------------------------------------------ */ |
|
268 |
|
269 struct ZSTDMT_CCtx_s { |
|
270 POOL_ctx* factory; |
|
271 ZSTDMT_bufferPool* buffPool; |
|
272 ZSTDMT_CCtxPool* cctxPool; |
|
273 pthread_mutex_t jobCompleted_mutex; |
|
274 pthread_cond_t jobCompleted_cond; |
|
275 size_t targetSectionSize; |
|
276 size_t marginSize; |
|
277 size_t inBuffSize; |
|
278 size_t dictSize; |
|
279 size_t targetDictSize; |
|
280 inBuff_t inBuff; |
|
281 ZSTD_parameters params; |
|
282 XXH64_state_t xxhState; |
|
283 unsigned nbThreads; |
|
284 unsigned jobIDMask; |
|
285 unsigned doneJobID; |
|
286 unsigned nextJobID; |
|
287 unsigned frameEnded; |
|
288 unsigned allJobsCompleted; |
|
289 unsigned overlapRLog; |
|
290 unsigned long long frameContentSize; |
|
291 size_t sectionSize; |
|
292 ZSTD_CDict* cdict; |
|
293 ZSTD_CStream* cstream; |
|
294 ZSTDMT_jobDescription jobs[1]; /* variable size (must lies at the end) */ |
|
295 }; |
|
296 |
|
297 ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads) |
|
298 { |
|
299 ZSTDMT_CCtx* cctx; |
|
300 U32 const minNbJobs = nbThreads + 2; |
|
301 U32 const nbJobsLog2 = ZSTD_highbit32(minNbJobs) + 1; |
|
302 U32 const nbJobs = 1 << nbJobsLog2; |
|
303 DEBUGLOG(5, "nbThreads : %u ; minNbJobs : %u ; nbJobsLog2 : %u ; nbJobs : %u \n", |
|
304 nbThreads, minNbJobs, nbJobsLog2, nbJobs); |
|
305 if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL; |
|
306 cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx) + nbJobs*sizeof(ZSTDMT_jobDescription)); |
|
307 if (!cctx) return NULL; |
|
308 cctx->nbThreads = nbThreads; |
|
309 cctx->jobIDMask = nbJobs - 1; |
|
310 cctx->allJobsCompleted = 1; |
|
311 cctx->sectionSize = 0; |
|
312 cctx->overlapRLog = 3; |
|
313 cctx->factory = POOL_create(nbThreads, 1); |
|
314 cctx->buffPool = ZSTDMT_createBufferPool(nbThreads); |
|
315 cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads); |
|
316 if (!cctx->factory | !cctx->buffPool | !cctx->cctxPool) { /* one object was not created */ |
|
317 ZSTDMT_freeCCtx(cctx); |
|
318 return NULL; |
|
319 } |
|
320 if (nbThreads==1) { |
|
321 cctx->cstream = ZSTD_createCStream(); |
|
322 if (!cctx->cstream) { |
|
323 ZSTDMT_freeCCtx(cctx); return NULL; |
|
324 } } |
|
325 pthread_mutex_init(&cctx->jobCompleted_mutex, NULL); /* Todo : check init function return */ |
|
326 pthread_cond_init(&cctx->jobCompleted_cond, NULL); |
|
327 DEBUGLOG(4, "mt_cctx created, for %u threads \n", nbThreads); |
|
328 return cctx; |
|
329 } |
|
330 |
|
331 /* ZSTDMT_releaseAllJobResources() : |
|
332 * Ensure all workers are killed first. */ |
|
333 static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx) |
|
334 { |
|
335 unsigned jobID; |
|
336 for (jobID=0; jobID <= mtctx->jobIDMask; jobID++) { |
|
337 ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[jobID].dstBuff); |
|
338 mtctx->jobs[jobID].dstBuff = g_nullBuffer; |
|
339 ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[jobID].src); |
|
340 mtctx->jobs[jobID].src = g_nullBuffer; |
|
341 ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[jobID].cctx); |
|
342 mtctx->jobs[jobID].cctx = NULL; |
|
343 } |
|
344 memset(mtctx->jobs, 0, (mtctx->jobIDMask+1)*sizeof(ZSTDMT_jobDescription)); |
|
345 ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->inBuff.buffer); |
|
346 mtctx->inBuff.buffer = g_nullBuffer; |
|
347 mtctx->allJobsCompleted = 1; |
|
348 } |
|
349 |
|
350 size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx) |
|
351 { |
|
352 if (mtctx==NULL) return 0; /* compatible with free on NULL */ |
|
353 POOL_free(mtctx->factory); |
|
354 if (!mtctx->allJobsCompleted) ZSTDMT_releaseAllJobResources(mtctx); /* stop workers first */ |
|
355 ZSTDMT_freeBufferPool(mtctx->buffPool); /* release job resources into pools first */ |
|
356 ZSTDMT_freeCCtxPool(mtctx->cctxPool); |
|
357 ZSTD_freeCDict(mtctx->cdict); |
|
358 ZSTD_freeCStream(mtctx->cstream); |
|
359 pthread_mutex_destroy(&mtctx->jobCompleted_mutex); |
|
360 pthread_cond_destroy(&mtctx->jobCompleted_cond); |
|
361 free(mtctx); |
|
362 return 0; |
|
363 } |
|
364 |
|
365 size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value) |
|
366 { |
|
367 switch(parameter) |
|
368 { |
|
369 case ZSTDMT_p_sectionSize : |
|
370 mtctx->sectionSize = value; |
|
371 return 0; |
|
372 case ZSTDMT_p_overlapSectionLog : |
|
373 DEBUGLOG(4, "ZSTDMT_p_overlapSectionLog : %u", value); |
|
374 mtctx->overlapRLog = (value >= 9) ? 0 : 9 - value; |
|
375 return 0; |
|
376 default : |
|
377 return ERROR(compressionParameter_unsupported); |
|
378 } |
|
379 } |
|
380 |
|
381 |
|
382 /* ------------------------------------------ */ |
|
383 /* ===== Multi-threaded compression ===== */ |
|
384 /* ------------------------------------------ */ |
|
385 |
|
386 size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, |
|
387 void* dst, size_t dstCapacity, |
|
388 const void* src, size_t srcSize, |
|
389 int compressionLevel) |
|
390 { |
|
391 ZSTD_parameters params = ZSTD_getParams(compressionLevel, srcSize, 0); |
|
392 size_t const chunkTargetSize = (size_t)1 << (params.cParams.windowLog + 2); |
|
393 unsigned const nbChunksMax = (unsigned)(srcSize / chunkTargetSize) + (srcSize < chunkTargetSize) /* min 1 */; |
|
394 unsigned nbChunks = MIN(nbChunksMax, mtctx->nbThreads); |
|
395 size_t const proposedChunkSize = (srcSize + (nbChunks-1)) / nbChunks; |
|
396 size_t const avgChunkSize = ((proposedChunkSize & 0x1FFFF) < 0xFFFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */ |
|
397 size_t remainingSrcSize = srcSize; |
|
398 const char* const srcStart = (const char*)src; |
|
399 size_t frameStartPos = 0; |
|
400 |
|
401 DEBUGLOG(3, "windowLog : %2u => chunkTargetSize : %u bytes ", params.cParams.windowLog, (U32)chunkTargetSize); |
|
402 DEBUGLOG(2, "nbChunks : %2u (chunkSize : %u bytes) ", nbChunks, (U32)avgChunkSize); |
|
403 params.fParams.contentSizeFlag = 1; |
|
404 |
|
405 if (nbChunks==1) { /* fallback to single-thread mode */ |
|
406 ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0]; |
|
407 return ZSTD_compressCCtx(cctx, dst, dstCapacity, src, srcSize, compressionLevel); |
|
408 } |
|
409 |
|
410 { unsigned u; |
|
411 for (u=0; u<nbChunks; u++) { |
|
412 size_t const chunkSize = MIN(remainingSrcSize, avgChunkSize); |
|
413 size_t const dstBufferCapacity = u ? ZSTD_compressBound(chunkSize) : dstCapacity; |
|
414 buffer_t const dstAsBuffer = { dst, dstCapacity }; |
|
415 buffer_t const dstBuffer = u ? ZSTDMT_getBuffer(mtctx->buffPool, dstBufferCapacity) : dstAsBuffer; |
|
416 ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(mtctx->cctxPool); |
|
417 |
|
418 if ((cctx==NULL) || (dstBuffer.start==NULL)) { |
|
419 mtctx->jobs[u].cSize = ERROR(memory_allocation); /* job result */ |
|
420 mtctx->jobs[u].jobCompleted = 1; |
|
421 nbChunks = u+1; |
|
422 break; /* let's wait for previous jobs to complete, but don't start new ones */ |
|
423 } |
|
424 |
|
425 mtctx->jobs[u].srcStart = srcStart + frameStartPos; |
|
426 mtctx->jobs[u].srcSize = chunkSize; |
|
427 mtctx->jobs[u].fullFrameSize = srcSize; |
|
428 mtctx->jobs[u].params = params; |
|
429 mtctx->jobs[u].dstBuff = dstBuffer; |
|
430 mtctx->jobs[u].cctx = cctx; |
|
431 mtctx->jobs[u].firstChunk = (u==0); |
|
432 mtctx->jobs[u].lastChunk = (u==nbChunks-1); |
|
433 mtctx->jobs[u].jobCompleted = 0; |
|
434 mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex; |
|
435 mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond; |
|
436 |
|
437 DEBUGLOG(3, "posting job %u (%u bytes)", u, (U32)chunkSize); |
|
438 DEBUG_PRINTHEX(3, mtctx->jobs[u].srcStart, 12); |
|
439 POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]); |
|
440 |
|
441 frameStartPos += chunkSize; |
|
442 remainingSrcSize -= chunkSize; |
|
443 } } |
|
444 /* note : since nbChunks <= nbThreads, all jobs should be running immediately in parallel */ |
|
445 |
|
446 { unsigned chunkID; |
|
447 size_t error = 0, dstPos = 0; |
|
448 for (chunkID=0; chunkID<nbChunks; chunkID++) { |
|
449 DEBUGLOG(3, "waiting for chunk %u ", chunkID); |
|
450 PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex); |
|
451 while (mtctx->jobs[chunkID].jobCompleted==0) { |
|
452 DEBUGLOG(4, "waiting for jobCompleted signal from chunk %u", chunkID); |
|
453 pthread_cond_wait(&mtctx->jobCompleted_cond, &mtctx->jobCompleted_mutex); |
|
454 } |
|
455 pthread_mutex_unlock(&mtctx->jobCompleted_mutex); |
|
456 DEBUGLOG(3, "ready to write chunk %u ", chunkID); |
|
457 |
|
458 ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[chunkID].cctx); |
|
459 mtctx->jobs[chunkID].cctx = NULL; |
|
460 mtctx->jobs[chunkID].srcStart = NULL; |
|
461 { size_t const cSize = mtctx->jobs[chunkID].cSize; |
|
462 if (ZSTD_isError(cSize)) error = cSize; |
|
463 if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall); |
|
464 if (chunkID) { /* note : chunk 0 is already written directly into dst */ |
|
465 if (!error) memcpy((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize); |
|
466 ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[chunkID].dstBuff); |
|
467 mtctx->jobs[chunkID].dstBuff = g_nullBuffer; |
|
468 } |
|
469 dstPos += cSize ; |
|
470 } |
|
471 } |
|
472 if (!error) DEBUGLOG(3, "compressed size : %u ", (U32)dstPos); |
|
473 return error ? error : dstPos; |
|
474 } |
|
475 |
|
476 } |
|
477 |
|
478 |
|
479 /* ====================================== */ |
|
480 /* ======= Streaming API ======= */ |
|
481 /* ====================================== */ |
|
482 |
|
483 static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs) { |
|
484 while (zcs->doneJobID < zcs->nextJobID) { |
|
485 unsigned const jobID = zcs->doneJobID & zcs->jobIDMask; |
|
486 PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); |
|
487 while (zcs->jobs[jobID].jobCompleted==0) { |
|
488 DEBUGLOG(4, "waiting for jobCompleted signal from chunk %u", zcs->doneJobID); /* we want to block when waiting for data to flush */ |
|
489 pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); |
|
490 } |
|
491 pthread_mutex_unlock(&zcs->jobCompleted_mutex); |
|
492 zcs->doneJobID++; |
|
493 } |
|
494 } |
|
495 |
|
496 |
|
497 static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, |
|
498 const void* dict, size_t dictSize, unsigned updateDict, |
|
499 ZSTD_parameters params, unsigned long long pledgedSrcSize) |
|
500 { |
|
501 ZSTD_customMem const cmem = { NULL, NULL, NULL }; |
|
502 DEBUGLOG(3, "Started new compression, with windowLog : %u", params.cParams.windowLog); |
|
503 if (zcs->nbThreads==1) return ZSTD_initCStream_advanced(zcs->cstream, dict, dictSize, params, pledgedSrcSize); |
|
504 if (zcs->allJobsCompleted == 0) { /* previous job not correctly finished */ |
|
505 ZSTDMT_waitForAllJobsCompleted(zcs); |
|
506 ZSTDMT_releaseAllJobResources(zcs); |
|
507 zcs->allJobsCompleted = 1; |
|
508 } |
|
509 zcs->params = params; |
|
510 if (updateDict) { |
|
511 ZSTD_freeCDict(zcs->cdict); zcs->cdict = NULL; |
|
512 if (dict && dictSize) { |
|
513 zcs->cdict = ZSTD_createCDict_advanced(dict, dictSize, 0, params, cmem); |
|
514 if (zcs->cdict == NULL) return ERROR(memory_allocation); |
|
515 } } |
|
516 zcs->frameContentSize = pledgedSrcSize; |
|
517 zcs->targetDictSize = (zcs->overlapRLog>=9) ? 0 : (size_t)1 << (zcs->params.cParams.windowLog - zcs->overlapRLog); |
|
518 DEBUGLOG(4, "overlapRLog : %u ", zcs->overlapRLog); |
|
519 DEBUGLOG(3, "overlap Size : %u KB", (U32)(zcs->targetDictSize>>10)); |
|
520 zcs->targetSectionSize = zcs->sectionSize ? zcs->sectionSize : (size_t)1 << (zcs->params.cParams.windowLog + 2); |
|
521 zcs->targetSectionSize = MAX(ZSTDMT_SECTION_SIZE_MIN, zcs->targetSectionSize); |
|
522 zcs->targetSectionSize = MAX(zcs->targetDictSize, zcs->targetSectionSize); |
|
523 DEBUGLOG(3, "Section Size : %u KB", (U32)(zcs->targetSectionSize>>10)); |
|
524 zcs->marginSize = zcs->targetSectionSize >> 2; |
|
525 zcs->inBuffSize = zcs->targetDictSize + zcs->targetSectionSize + zcs->marginSize; |
|
526 zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); |
|
527 if (zcs->inBuff.buffer.start == NULL) return ERROR(memory_allocation); |
|
528 zcs->inBuff.filled = 0; |
|
529 zcs->dictSize = 0; |
|
530 zcs->doneJobID = 0; |
|
531 zcs->nextJobID = 0; |
|
532 zcs->frameEnded = 0; |
|
533 zcs->allJobsCompleted = 0; |
|
534 if (params.fParams.checksumFlag) XXH64_reset(&zcs->xxhState, 0); |
|
535 return 0; |
|
536 } |
|
537 |
|
538 size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, |
|
539 const void* dict, size_t dictSize, |
|
540 ZSTD_parameters params, unsigned long long pledgedSrcSize) |
|
541 { |
|
542 return ZSTDMT_initCStream_internal(zcs, dict, dictSize, 1, params, pledgedSrcSize); |
|
543 } |
|
544 |
|
545 /* ZSTDMT_resetCStream() : |
|
546 * pledgedSrcSize is optional and can be zero == unknown */ |
|
547 size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize) |
|
548 { |
|
549 if (zcs->nbThreads==1) return ZSTD_resetCStream(zcs->cstream, pledgedSrcSize); |
|
550 return ZSTDMT_initCStream_internal(zcs, NULL, 0, 0, zcs->params, pledgedSrcSize); |
|
551 } |
|
552 |
|
553 size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) { |
|
554 ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, 0); |
|
555 return ZSTDMT_initCStream_internal(zcs, NULL, 0, 1, params, 0); |
|
556 } |
|
557 |
|
558 |
|
559 static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsigned endFrame) |
|
560 { |
|
561 size_t const dstBufferCapacity = ZSTD_compressBound(srcSize); |
|
562 buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity); |
|
563 ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool); |
|
564 unsigned const jobID = zcs->nextJobID & zcs->jobIDMask; |
|
565 |
|
566 if ((cctx==NULL) || (dstBuffer.start==NULL)) { |
|
567 zcs->jobs[jobID].jobCompleted = 1; |
|
568 zcs->nextJobID++; |
|
569 ZSTDMT_waitForAllJobsCompleted(zcs); |
|
570 ZSTDMT_releaseAllJobResources(zcs); |
|
571 return ERROR(memory_allocation); |
|
572 } |
|
573 |
|
574 DEBUGLOG(4, "preparing job %u to compress %u bytes with %u preload ", zcs->nextJobID, (U32)srcSize, (U32)zcs->dictSize); |
|
575 zcs->jobs[jobID].src = zcs->inBuff.buffer; |
|
576 zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; |
|
577 zcs->jobs[jobID].srcSize = srcSize; |
|
578 zcs->jobs[jobID].dictSize = zcs->dictSize; /* note : zcs->inBuff.filled is presumed >= srcSize + dictSize */ |
|
579 zcs->jobs[jobID].params = zcs->params; |
|
580 if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; /* do not calculate checksum within sections, just keep it in header for first section */ |
|
581 zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL; |
|
582 zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize; |
|
583 zcs->jobs[jobID].dstBuff = dstBuffer; |
|
584 zcs->jobs[jobID].cctx = cctx; |
|
585 zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0); |
|
586 zcs->jobs[jobID].lastChunk = endFrame; |
|
587 zcs->jobs[jobID].jobCompleted = 0; |
|
588 zcs->jobs[jobID].dstFlushed = 0; |
|
589 zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex; |
|
590 zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond; |
|
591 |
|
592 /* get a new buffer for next input */ |
|
593 if (!endFrame) { |
|
594 size_t const newDictSize = MIN(srcSize + zcs->dictSize, zcs->targetDictSize); |
|
595 zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); |
|
596 if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */ |
|
597 zcs->jobs[jobID].jobCompleted = 1; |
|
598 zcs->nextJobID++; |
|
599 ZSTDMT_waitForAllJobsCompleted(zcs); |
|
600 ZSTDMT_releaseAllJobResources(zcs); |
|
601 return ERROR(memory_allocation); |
|
602 } |
|
603 DEBUGLOG(5, "inBuff filled to %u", (U32)zcs->inBuff.filled); |
|
604 zcs->inBuff.filled -= srcSize + zcs->dictSize - newDictSize; |
|
605 DEBUGLOG(5, "new job : filled to %u, with %u dict and %u src", (U32)zcs->inBuff.filled, (U32)newDictSize, (U32)(zcs->inBuff.filled - newDictSize)); |
|
606 memmove(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + zcs->dictSize + srcSize - newDictSize, zcs->inBuff.filled); |
|
607 DEBUGLOG(5, "new inBuff pre-filled"); |
|
608 zcs->dictSize = newDictSize; |
|
609 } else { |
|
610 zcs->inBuff.buffer = g_nullBuffer; |
|
611 zcs->inBuff.filled = 0; |
|
612 zcs->dictSize = 0; |
|
613 zcs->frameEnded = 1; |
|
614 if (zcs->nextJobID == 0) |
|
615 zcs->params.fParams.checksumFlag = 0; /* single chunk : checksum is calculated directly within worker thread */ |
|
616 } |
|
617 |
|
618 DEBUGLOG(3, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask); |
|
619 POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); /* this call is blocking when thread worker pool is exhausted */ |
|
620 zcs->nextJobID++; |
|
621 return 0; |
|
622 } |
|
623 |
|
624 |
|
625 /* ZSTDMT_flushNextJob() : |
|
626 * output : will be updated with amount of data flushed . |
|
627 * blockToFlush : if >0, the function will block and wait if there is no data available to flush . |
|
628 * @return : amount of data remaining within internal buffer, 1 if unknown but > 0, 0 if no more, or an error code */ |
|
629 static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush) |
|
630 { |
|
631 unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask; |
|
632 if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */ |
|
633 PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); |
|
634 while (zcs->jobs[wJobID].jobCompleted==0) { |
|
635 DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID); |
|
636 if (!blockToFlush) { pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */ |
|
637 pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush */ |
|
638 } |
|
639 pthread_mutex_unlock(&zcs->jobCompleted_mutex); |
|
640 /* compression job completed : output can be flushed */ |
|
641 { ZSTDMT_jobDescription job = zcs->jobs[wJobID]; |
|
642 if (!job.jobScanned) { |
|
643 if (ZSTD_isError(job.cSize)) { |
|
644 DEBUGLOG(5, "compression error detected "); |
|
645 ZSTDMT_waitForAllJobsCompleted(zcs); |
|
646 ZSTDMT_releaseAllJobResources(zcs); |
|
647 return job.cSize; |
|
648 } |
|
649 ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); |
|
650 zcs->jobs[wJobID].cctx = NULL; |
|
651 DEBUGLOG(5, "zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag); |
|
652 if (zcs->params.fParams.checksumFlag) { |
|
653 XXH64_update(&zcs->xxhState, (const char*)job.srcStart + job.dictSize, job.srcSize); |
|
654 if (zcs->frameEnded && (zcs->doneJobID+1 == zcs->nextJobID)) { /* write checksum at end of last section */ |
|
655 U32 const checksum = (U32)XXH64_digest(&zcs->xxhState); |
|
656 DEBUGLOG(4, "writing checksum : %08X \n", checksum); |
|
657 MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum); |
|
658 job.cSize += 4; |
|
659 zcs->jobs[wJobID].cSize += 4; |
|
660 } } |
|
661 ZSTDMT_releaseBuffer(zcs->buffPool, job.src); |
|
662 zcs->jobs[wJobID].srcStart = NULL; |
|
663 zcs->jobs[wJobID].src = g_nullBuffer; |
|
664 zcs->jobs[wJobID].jobScanned = 1; |
|
665 } |
|
666 { size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); |
|
667 DEBUGLOG(4, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID); |
|
668 memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); |
|
669 output->pos += toWrite; |
|
670 job.dstFlushed += toWrite; |
|
671 } |
|
672 if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => move to next one */ |
|
673 ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); |
|
674 zcs->jobs[wJobID].dstBuff = g_nullBuffer; |
|
675 zcs->jobs[wJobID].jobCompleted = 0; |
|
676 zcs->doneJobID++; |
|
677 } else { |
|
678 zcs->jobs[wJobID].dstFlushed = job.dstFlushed; |
|
679 } |
|
680 /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */ |
|
681 if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed); |
|
682 if (zcs->doneJobID < zcs->nextJobID) return 1; /* still some buffer to flush */ |
|
683 zcs->allJobsCompleted = zcs->frameEnded; /* frame completed and entirely flushed */ |
|
684 return 0; /* everything flushed */ |
|
685 } } |
|
686 |
|
687 |
|
688 size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input) |
|
689 { |
|
690 size_t const newJobThreshold = zcs->dictSize + zcs->targetSectionSize + zcs->marginSize; |
|
691 if (zcs->frameEnded) return ERROR(stage_wrong); /* current frame being ended. Only flush is allowed. Restart with init */ |
|
692 if (zcs->nbThreads==1) return ZSTD_compressStream(zcs->cstream, output, input); |
|
693 |
|
694 /* fill input buffer */ |
|
695 { size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled); |
|
696 memcpy((char*)zcs->inBuff.buffer.start + zcs->inBuff.filled, input->src, toLoad); |
|
697 input->pos += toLoad; |
|
698 zcs->inBuff.filled += toLoad; |
|
699 } |
|
700 |
|
701 if ( (zcs->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */ |
|
702 && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { /* avoid overwriting job round buffer */ |
|
703 CHECK_F( ZSTDMT_createCompressionJob(zcs, zcs->targetSectionSize, 0) ); |
|
704 } |
|
705 |
|
706 /* check for data to flush */ |
|
707 CHECK_F( ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize)) ); /* block if it wasn't possible to create new job due to saturation */ |
|
708 |
|
709 /* recommended next input size : fill current input buffer */ |
|
710 return zcs->inBuffSize - zcs->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */ |
|
711 } |
|
712 |
|
713 |
|
714 static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned endFrame) |
|
715 { |
|
716 size_t const srcSize = zcs->inBuff.filled - zcs->dictSize; |
|
717 |
|
718 if (srcSize) DEBUGLOG(4, "flushing : %u bytes left to compress", (U32)srcSize); |
|
719 if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded)) |
|
720 && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { |
|
721 CHECK_F( ZSTDMT_createCompressionJob(zcs, srcSize, endFrame) ); |
|
722 } |
|
723 |
|
724 /* check if there is any data available to flush */ |
|
725 DEBUGLOG(5, "zcs->doneJobID : %u ; zcs->nextJobID : %u ", zcs->doneJobID, zcs->nextJobID); |
|
726 return ZSTDMT_flushNextJob(zcs, output, 1); |
|
727 } |
|
728 |
|
729 |
|
730 size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output) |
|
731 { |
|
732 if (zcs->nbThreads==1) return ZSTD_flushStream(zcs->cstream, output); |
|
733 return ZSTDMT_flushStream_internal(zcs, output, 0); |
|
734 } |
|
735 |
|
736 size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output) |
|
737 { |
|
738 if (zcs->nbThreads==1) return ZSTD_endStream(zcs->cstream, output); |
|
739 return ZSTDMT_flushStream_internal(zcs, output, 1); |
|
740 } |