mercurial/streamclone.py
changeset 52921 363914ba328d
parent 52920 aee193b1c784
child 52922 0af8965b668a
equal deleted inserted replaced
52920:aee193b1c784 52921:363914ba328d
  1234         self._q = queue.Queue()
  1234         self._q = queue.Queue()
  1235         self._abort = False
  1235         self._abort = False
  1236         self._memory_target = memory_target
  1236         self._memory_target = memory_target
  1237         if self._memory_target is not None and self._memory_target <= 0:
  1237         if self._memory_target is not None and self._memory_target <= 0:
  1238             raise error.ProgrammingError("memory target should be > 0")
  1238             raise error.ProgrammingError("memory target should be > 0")
  1239         self._memory_condition = threading.Condition()
  1239 
       
  1240         # the "_lock" protect manipulation of the _current_used" variable
       
  1241         # the "_wait" is used to have the "reading" thread waits for the
       
  1242         # "using" thread when the buffer is full.
       
  1243         #
       
  1244         # This is similar to the "threading.Condition", but without the absurd
       
  1245         # slowness of the stdlib implementation.
       
  1246         #
       
  1247         # the "_wait" is always released while holding the "_lock".
       
  1248         self._lock = threading.Lock()
       
  1249         self._wait = threading.Lock()
  1240         # only the stream reader touch this, it is find to touch without the lock
  1250         # only the stream reader touch this, it is find to touch without the lock
  1241         self._current_read = 0
  1251         self._current_read = 0
  1242         # do not touch this without the lock
  1252         # do not touch this without the lock
  1243         self._current_used = 0
  1253         self._current_used = 0
  1244 
  1254 
  1245     def _has_free_space(self):
  1255     def _has_free_space(self):
  1246         """True if more data can be read without further exceeding memory target
  1256         """True if more data can be read without further exceeding memory target
  1247 
  1257 
  1248         Must be called under the condition lock.
  1258         Must be called under the lock.
  1249         """
  1259         """
  1250         if self._memory_target is None:
  1260         if self._memory_target is None:
  1251             # Ideally we should not even get into the locking business in that
  1261             # Ideally we should not even get into the locking business in that
  1252             # case, but we keep the implementation simple for now.
  1262             # case, but we keep the implementation simple for now.
  1253             return True
  1263             return True
  1256     def mark_used(self, offset):
  1266     def mark_used(self, offset):
  1257         """Notify we have used the buffer up to "offset"
  1267         """Notify we have used the buffer up to "offset"
  1258 
  1268 
  1259         This is meant to be used from another thread than the one filler the queue.
  1269         This is meant to be used from another thread than the one filler the queue.
  1260         """
  1270         """
  1261         with self._memory_condition:
  1271         with self._lock:
  1262             if offset > self._current_used:
  1272             if offset > self._current_used:
  1263                 self._current_used = offset
  1273                 self._current_used = offset
  1264             self._memory_condition.notify()
  1274             # If the reader is waiting for room, unblock it.
       
  1275             if self._wait.locked() and self._has_free_space():
       
  1276                 self._wait.release()
  1265 
  1277 
  1266     def fill_from(self, data):
  1278     def fill_from(self, data):
  1267         """fill the data queue from a bundle2 part object
  1279         """fill the data queue from a bundle2 part object
  1268 
  1280 
  1269         This is meant to be called by the data reading thread
  1281         This is meant to be called by the data reading thread
  1273             for item in data:
  1285             for item in data:
  1274                 self._current_read += len(item)
  1286                 self._current_read += len(item)
  1275                 q.put(item)
  1287                 q.put(item)
  1276                 if self._abort:
  1288                 if self._abort:
  1277                     break
  1289                     break
  1278                 with self._memory_condition:
  1290                 with self._lock:
  1279                     self._memory_condition.wait_for(
  1291                     while not self._has_free_space():
  1280                         self._has_free_space,
  1292                         # make sure the _wait lock is locked
  1281                     )
  1293                         # this is done under lock, so there case be no race with the release logic
       
  1294                         self._wait.acquire(blocking=False)
       
  1295                         self._lock.release()
       
  1296                         # acquiring the lock will block until some other thread release it.
       
  1297                         self._wait.acquire()
       
  1298                         # lets dive into the locked section again
       
  1299                         self._lock.acquire()
       
  1300                         # make sure we release the lock we just grabed if
       
  1301                         # needed.
       
  1302                         if self._wait.locked():
       
  1303                             self._wait.release()
  1282         finally:
  1304         finally:
  1283             q.put(None)
  1305             q.put(None)
  1284 
  1306 
  1285     def __iter__(self):
  1307     def __iter__(self):
  1286         """Iterate over the bundle chunkgs
  1308         """Iterate over the bundle chunkgs
  1297 
  1319 
  1298         This is meant to be called on errors.
  1320         This is meant to be called on errors.
  1299         """
  1321         """
  1300         self._abort = True
  1322         self._abort = True
  1301         self._q.put(None)
  1323         self._q.put(None)
  1302         self._memory_condition.notify_all()
  1324         with self._lock:
       
  1325             # make sure we unstuck the reader thread.
       
  1326             if self._wait.locked():
       
  1327                 self._wait.release()
  1303 
  1328 
  1304 
  1329 
  1305 class _FileInfoQueue:
  1330 class _FileInfoQueue:
  1306     """A thread-safe queue to passer parsed file information to the writers"""
  1331     """A thread-safe queue to passer parsed file information to the writers"""
  1307 
  1332