1234 self._q = queue.Queue() |
1234 self._q = queue.Queue() |
1235 self._abort = False |
1235 self._abort = False |
1236 self._memory_target = memory_target |
1236 self._memory_target = memory_target |
1237 if self._memory_target is not None and self._memory_target <= 0: |
1237 if self._memory_target is not None and self._memory_target <= 0: |
1238 raise error.ProgrammingError("memory target should be > 0") |
1238 raise error.ProgrammingError("memory target should be > 0") |
1239 self._memory_condition = threading.Condition() |
1239 |
|
1240 # the "_lock" protect manipulation of the _current_used" variable |
|
1241 # the "_wait" is used to have the "reading" thread waits for the |
|
1242 # "using" thread when the buffer is full. |
|
1243 # |
|
1244 # This is similar to the "threading.Condition", but without the absurd |
|
1245 # slowness of the stdlib implementation. |
|
1246 # |
|
1247 # the "_wait" is always released while holding the "_lock". |
|
1248 self._lock = threading.Lock() |
|
1249 self._wait = threading.Lock() |
1240 # only the stream reader touch this, it is find to touch without the lock |
1250 # only the stream reader touch this, it is find to touch without the lock |
1241 self._current_read = 0 |
1251 self._current_read = 0 |
1242 # do not touch this without the lock |
1252 # do not touch this without the lock |
1243 self._current_used = 0 |
1253 self._current_used = 0 |
1244 |
1254 |
1245 def _has_free_space(self): |
1255 def _has_free_space(self): |
1246 """True if more data can be read without further exceeding memory target |
1256 """True if more data can be read without further exceeding memory target |
1247 |
1257 |
1248 Must be called under the condition lock. |
1258 Must be called under the lock. |
1249 """ |
1259 """ |
1250 if self._memory_target is None: |
1260 if self._memory_target is None: |
1251 # Ideally we should not even get into the locking business in that |
1261 # Ideally we should not even get into the locking business in that |
1252 # case, but we keep the implementation simple for now. |
1262 # case, but we keep the implementation simple for now. |
1253 return True |
1263 return True |
1256 def mark_used(self, offset): |
1266 def mark_used(self, offset): |
1257 """Notify we have used the buffer up to "offset" |
1267 """Notify we have used the buffer up to "offset" |
1258 |
1268 |
1259 This is meant to be used from another thread than the one filler the queue. |
1269 This is meant to be used from another thread than the one filler the queue. |
1260 """ |
1270 """ |
1261 with self._memory_condition: |
1271 with self._lock: |
1262 if offset > self._current_used: |
1272 if offset > self._current_used: |
1263 self._current_used = offset |
1273 self._current_used = offset |
1264 self._memory_condition.notify() |
1274 # If the reader is waiting for room, unblock it. |
|
1275 if self._wait.locked() and self._has_free_space(): |
|
1276 self._wait.release() |
1265 |
1277 |
1266 def fill_from(self, data): |
1278 def fill_from(self, data): |
1267 """fill the data queue from a bundle2 part object |
1279 """fill the data queue from a bundle2 part object |
1268 |
1280 |
1269 This is meant to be called by the data reading thread |
1281 This is meant to be called by the data reading thread |
1273 for item in data: |
1285 for item in data: |
1274 self._current_read += len(item) |
1286 self._current_read += len(item) |
1275 q.put(item) |
1287 q.put(item) |
1276 if self._abort: |
1288 if self._abort: |
1277 break |
1289 break |
1278 with self._memory_condition: |
1290 with self._lock: |
1279 self._memory_condition.wait_for( |
1291 while not self._has_free_space(): |
1280 self._has_free_space, |
1292 # make sure the _wait lock is locked |
1281 ) |
1293 # this is done under lock, so there case be no race with the release logic |
|
1294 self._wait.acquire(blocking=False) |
|
1295 self._lock.release() |
|
1296 # acquiring the lock will block until some other thread release it. |
|
1297 self._wait.acquire() |
|
1298 # lets dive into the locked section again |
|
1299 self._lock.acquire() |
|
1300 # make sure we release the lock we just grabed if |
|
1301 # needed. |
|
1302 if self._wait.locked(): |
|
1303 self._wait.release() |
1282 finally: |
1304 finally: |
1283 q.put(None) |
1305 q.put(None) |
1284 |
1306 |
1285 def __iter__(self): |
1307 def __iter__(self): |
1286 """Iterate over the bundle chunkgs |
1308 """Iterate over the bundle chunkgs |
1297 |
1319 |
1298 This is meant to be called on errors. |
1320 This is meant to be called on errors. |
1299 """ |
1321 """ |
1300 self._abort = True |
1322 self._abort = True |
1301 self._q.put(None) |
1323 self._q.put(None) |
1302 self._memory_condition.notify_all() |
1324 with self._lock: |
|
1325 # make sure we unstuck the reader thread. |
|
1326 if self._wait.locked(): |
|
1327 self._wait.release() |
1303 |
1328 |
1304 |
1329 |
1305 class _FileInfoQueue: |
1330 class _FileInfoQueue: |
1306 """A thread-safe queue to passer parsed file information to the writers""" |
1331 """A thread-safe queue to passer parsed file information to the writers""" |
1307 |
1332 |