Mercurial > public > mercurial-scm > hg
comparison mercurial/bundle2.py @ 35114:db5038525718
bundle2: implement consume() API on unbundlepart
We want bundle parts to not be seekable by default. That means
eliminating the generic seek() method.
A common pattern in bundle2.py is to seek to the end of the part
data. This is mainly used by the part iteration code to ensure
the underlying stream is advanced to the next bundle part.
In this commit, we establish a dedicated API for consuming a
bundle2 part data. We switch users of seek() to it.
The old implementation of seek(0, os.SEEK_END) would effectively
call self.read(). The new implementation calls self.read(32768)
in a loop. The old implementation would therefore assemble a
buffer to hold all remaining data being seeked over. For seeking
over large bundle parts, this would involve a large allocation and
a lot of overhead to collect intermediate data! This overhead can
be seen in the results for `hg perfbundleread`:
! bundle2 iterparts()
! wall 10.891305 comb 10.820000 user 7.990000 sys 2.830000 (best of 3)
! wall 8.070791 comb 8.060000 user 7.180000 sys 0.880000 (best of 3)
! bundle2 part seek()
! wall 12.991478 comb 10.390000 user 7.720000 sys 2.670000 (best of 3)
! wall 10.370142 comb 10.350000 user 7.430000 sys 2.920000 (best of 3)
Of course, skipping over large payload data isn't likely very common.
So I doubt the performance wins will be observed in the wild.
Differential Revision: https://phab.mercurial-scm.org/D1388
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Mon, 13 Nov 2017 20:03:02 -0800 |
parents | 8aa43ff9c12c |
children | 2b72bc88043f |
comparison
equal
deleted
inserted
replaced
35113:8aa43ff9c12c | 35114:db5038525718 |
---|---|
361 itr = enumerate(self.unbundler.iterparts()) | 361 itr = enumerate(self.unbundler.iterparts()) |
362 for count, p in itr: | 362 for count, p in itr: |
363 self.count = count | 363 self.count = count |
364 self.current = p | 364 self.current = p |
365 yield p | 365 yield p |
366 p.seek(0, os.SEEK_END) | 366 p.consume() |
367 self.current = None | 367 self.current = None |
368 self.iterator = func() | 368 self.iterator = func() |
369 return self.iterator | 369 return self.iterator |
370 | 370 |
371 def __exit__(self, type, exc, tb): | 371 def __exit__(self, type, exc, tb): |
383 # re-raise the original error. | 383 # re-raise the original error. |
384 seekerror = False | 384 seekerror = False |
385 try: | 385 try: |
386 if self.current: | 386 if self.current: |
387 # consume the part content to not corrupt the stream. | 387 # consume the part content to not corrupt the stream. |
388 self.current.seek(0, os.SEEK_END) | 388 self.current.consume() |
389 | 389 |
390 for part in self.iterator: | 390 for part in self.iterator: |
391 # consume the bundle content | 391 # consume the bundle content |
392 part.seek(0, os.SEEK_END) | 392 part.consume() |
393 except Exception: | 393 except Exception: |
394 seekerror = True | 394 seekerror = True |
395 | 395 |
396 # Small hack to let caller code distinguish exceptions from bundle2 | 396 # Small hack to let caller code distinguish exceptions from bundle2 |
397 # processing from processing the old format. This is mostly needed | 397 # processing from processing the old format. This is mostly needed |
854 indebug(self.ui, 'start extraction of bundle2 parts') | 854 indebug(self.ui, 'start extraction of bundle2 parts') |
855 headerblock = self._readpartheader() | 855 headerblock = self._readpartheader() |
856 while headerblock is not None: | 856 while headerblock is not None: |
857 part = seekableunbundlepart(self.ui, headerblock, self._fp) | 857 part = seekableunbundlepart(self.ui, headerblock, self._fp) |
858 yield part | 858 yield part |
859 # Seek to the end of the part to force it's consumption so the next | 859 # Ensure part is fully consumed so we can start reading the next |
860 # part can be read. But then seek back to the beginning so the | 860 # part. |
861 # code consuming this generator has a part that starts at 0. | 861 part.consume() |
862 part.seek(0, os.SEEK_END) | 862 # But then seek back to the beginning so the code consuming this |
863 # generator has a part that starts at 0. | |
863 part.seek(0, os.SEEK_SET) | 864 part.seek(0, os.SEEK_SET) |
864 headerblock = self._readpartheader() | 865 headerblock = self._readpartheader() |
865 indebug(self.ui, 'end of bundle2 stream') | 866 indebug(self.ui, 'end of bundle2 stream') |
866 | 867 |
867 def _readpartheader(self): | 868 def _readpartheader(self): |
1163 except (SystemExit, KeyboardInterrupt): | 1164 except (SystemExit, KeyboardInterrupt): |
1164 hardabort = True | 1165 hardabort = True |
1165 raise | 1166 raise |
1166 finally: | 1167 finally: |
1167 if not hardabort: | 1168 if not hardabort: |
1168 part.seek(0, os.SEEK_END) | 1169 part.consume() |
1169 self.ui.debug('bundle2-input-stream-interrupt:' | 1170 self.ui.debug('bundle2-input-stream-interrupt:' |
1170 ' closing out of band context\n') | 1171 ' closing out of band context\n') |
1171 | 1172 |
1172 class interruptoperation(object): | 1173 class interruptoperation(object): |
1173 """A limited operation to be use by part handler during interruption | 1174 """A limited operation to be use by part handler during interruption |
1297 self._initialized = True | 1298 self._initialized = True |
1298 | 1299 |
1299 def _payloadchunks(self): | 1300 def _payloadchunks(self): |
1300 """Generator of decoded chunks in the payload.""" | 1301 """Generator of decoded chunks in the payload.""" |
1301 return decodepayloadchunks(self.ui, self._fp) | 1302 return decodepayloadchunks(self.ui, self._fp) |
1303 | |
1304 def consume(self): | |
1305 """Read the part payload until completion. | |
1306 | |
1307 By consuming the part data, the underlying stream read offset will | |
1308 be advanced to the next part (or end of stream). | |
1309 """ | |
1310 if self.consumed: | |
1311 return | |
1312 | |
1313 chunk = self.read(32768) | |
1314 while chunk: | |
1315 self._pos += len(chunk) | |
1316 chunk = self.read(32768) | |
1302 | 1317 |
1303 def read(self, size=None): | 1318 def read(self, size=None): |
1304 """read payload data""" | 1319 """read payload data""" |
1305 if not self._initialized: | 1320 if not self._initialized: |
1306 self._readheader() | 1321 self._readheader() |