Mercurial > public > mercurial-scm > hg
comparison mercurial/bundle2.py @ 35112:073eec083e25
bundle2: extract logic for seeking bundle2 part into own class
Currently, unbundlepart classes support bi-directional seeking.
Most consumers of unbundlepart only ever seek forward - typically
as part of moving to the end of the bundle part so they can move
on to the next one. But regardless of the actual usage of the
part, instances maintain an index mapping offsets within the
underlying raw payload to offsets within the decoded payload.
Maintaining the mapping of offset data can be expensive in terms of
memory use. Furthermore, many bundle2 consumers don't have access
to an underlying seekable stream. This includes all compressed
bundles. So maintaining offset data when the underlying stream
can't be seeked anyway is wasteful. And since many bundle2 streams
can't be seeked, it seems like a bad idea to expose a seek API
in bundle2 parts by default. If you provide them, people will
attempt to use them.
Seekable bundle2 parts should be the exception, not the rule. This
commit starts the process dividing unbundlepart into 2 classes: a
base class that supports linear, one-time reads and a child class
that supports bi-directional seeking. In this first commit, we
split various methods and attributes out into a new
"seekableunbundlepart" class. Previous instantiators of "unbundlepart"
now instantiate "seekableunbundlepart." This preserves backwards
compatibility. The coupling between the classes is still tight:
"unbundlepart" cannot be used on its own. This will be addressed
in subsequent commits.
Differential Revision: https://phab.mercurial-scm.org/D1386
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Mon, 13 Nov 2017 19:22:11 -0800 |
parents | 241d9caca11e |
children | 8aa43ff9c12c |
comparison
equal
deleted
inserted
replaced
35109:e96613048bdd | 35112:073eec083e25 |
---|---|
852 # From there, payload need to be decompressed | 852 # From there, payload need to be decompressed |
853 self._fp = self._compengine.decompressorreader(self._fp) | 853 self._fp = self._compengine.decompressorreader(self._fp) |
854 indebug(self.ui, 'start extraction of bundle2 parts') | 854 indebug(self.ui, 'start extraction of bundle2 parts') |
855 headerblock = self._readpartheader() | 855 headerblock = self._readpartheader() |
856 while headerblock is not None: | 856 while headerblock is not None: |
857 part = unbundlepart(self.ui, headerblock, self._fp) | 857 part = seekableunbundlepart(self.ui, headerblock, self._fp) |
858 yield part | 858 yield part |
859 # Seek to the end of the part to force it's consumption so the next | 859 # Seek to the end of the part to force it's consumption so the next |
860 # part can be read. But then seek back to the beginning so the | 860 # part can be read. But then seek back to the beginning so the |
861 # code consuming this generator has a part that starts at 0. | 861 # code consuming this generator has a part that starts at 0. |
862 part.seek(0, os.SEEK_END) | 862 part.seek(0, os.SEEK_END) |
1153 indebug(self.ui, 'bundle2 stream interruption, looking for a part.') | 1153 indebug(self.ui, 'bundle2 stream interruption, looking for a part.') |
1154 headerblock = self._readpartheader() | 1154 headerblock = self._readpartheader() |
1155 if headerblock is None: | 1155 if headerblock is None: |
1156 indebug(self.ui, 'no part found during interruption.') | 1156 indebug(self.ui, 'no part found during interruption.') |
1157 return | 1157 return |
1158 part = unbundlepart(self.ui, headerblock, self._fp) | 1158 part = seekableunbundlepart(self.ui, headerblock, self._fp) |
1159 op = interruptoperation(self.ui) | 1159 op = interruptoperation(self.ui) |
1160 hardabort = False | 1160 hardabort = False |
1161 try: | 1161 try: |
1162 _processpart(op, part) | 1162 _processpart(op, part) |
1163 except (SystemExit, KeyboardInterrupt): | 1163 except (SystemExit, KeyboardInterrupt): |
1205 self.type = None | 1205 self.type = None |
1206 self.mandatoryparams = None | 1206 self.mandatoryparams = None |
1207 self.advisoryparams = None | 1207 self.advisoryparams = None |
1208 self.params = None | 1208 self.params = None |
1209 self.mandatorykeys = () | 1209 self.mandatorykeys = () |
1210 self._payloadstream = None | |
1211 self._readheader() | 1210 self._readheader() |
1212 self._mandatory = None | 1211 self._mandatory = None |
1213 self._chunkindex = [] #(payload, file) position tuples for chunk starts | |
1214 self._pos = 0 | 1212 self._pos = 0 |
1215 | 1213 |
1216 def _fromheader(self, size): | 1214 def _fromheader(self, size): |
1217 """return the next <size> byte from the header""" | 1215 """return the next <size> byte from the header""" |
1218 offset = self._headeroffset | 1216 offset = self._headeroffset |
1234 self.advisoryparams = tuple(advisoryparams) | 1232 self.advisoryparams = tuple(advisoryparams) |
1235 # user friendly UI | 1233 # user friendly UI |
1236 self.params = util.sortdict(self.mandatoryparams) | 1234 self.params = util.sortdict(self.mandatoryparams) |
1237 self.params.update(self.advisoryparams) | 1235 self.params.update(self.advisoryparams) |
1238 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams) | 1236 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams) |
1239 | |
1240 def _payloadchunks(self, chunknum=0): | |
1241 '''seek to specified chunk and start yielding data''' | |
1242 if len(self._chunkindex) == 0: | |
1243 assert chunknum == 0, 'Must start with chunk 0' | |
1244 self._chunkindex.append((0, self._tellfp())) | |
1245 else: | |
1246 assert chunknum < len(self._chunkindex), \ | |
1247 'Unknown chunk %d' % chunknum | |
1248 self._seekfp(self._chunkindex[chunknum][1]) | |
1249 | |
1250 pos = self._chunkindex[chunknum][0] | |
1251 payloadsize = self._unpack(_fpayloadsize)[0] | |
1252 indebug(self.ui, 'payload chunk size: %i' % payloadsize) | |
1253 while payloadsize: | |
1254 if payloadsize == flaginterrupt: | |
1255 # interruption detection, the handler will now read a | |
1256 # single part and process it. | |
1257 interrupthandler(self.ui, self._fp)() | |
1258 elif payloadsize < 0: | |
1259 msg = 'negative payload chunk size: %i' % payloadsize | |
1260 raise error.BundleValueError(msg) | |
1261 else: | |
1262 result = self._readexact(payloadsize) | |
1263 chunknum += 1 | |
1264 pos += payloadsize | |
1265 if chunknum == len(self._chunkindex): | |
1266 self._chunkindex.append((pos, self._tellfp())) | |
1267 yield result | |
1268 payloadsize = self._unpack(_fpayloadsize)[0] | |
1269 indebug(self.ui, 'payload chunk size: %i' % payloadsize) | |
1270 | |
1271 def _findchunk(self, pos): | |
1272 '''for a given payload position, return a chunk number and offset''' | |
1273 for chunk, (ppos, fpos) in enumerate(self._chunkindex): | |
1274 if ppos == pos: | |
1275 return chunk, 0 | |
1276 elif ppos > pos: | |
1277 return chunk - 1, pos - self._chunkindex[chunk - 1][0] | |
1278 raise ValueError('Unknown chunk') | |
1279 | 1237 |
1280 def _readheader(self): | 1238 def _readheader(self): |
1281 """read the header and setup the object""" | 1239 """read the header and setup the object""" |
1282 typesize = self._unpackheader(_fparttypesize)[0] | 1240 typesize = self._unpackheader(_fparttypesize)[0] |
1283 self.type = self._fromheader(typesize) | 1241 self.type = self._fromheader(typesize) |
1325 if not self.consumed and self._pos: | 1283 if not self.consumed and self._pos: |
1326 self.ui.debug('bundle2-input-part: total payload size %i\n' | 1284 self.ui.debug('bundle2-input-part: total payload size %i\n' |
1327 % self._pos) | 1285 % self._pos) |
1328 self.consumed = True | 1286 self.consumed = True |
1329 return data | 1287 return data |
1288 | |
1289 class seekableunbundlepart(unbundlepart): | |
1290 """A bundle2 part in a bundle that is seekable. | |
1291 | |
1292 Regular ``unbundlepart`` instances can only be read once. This class | |
1293 extends ``unbundlepart`` to enable bi-directional seeking within the | |
1294 part. | |
1295 | |
1296 Bundle2 part data consists of framed chunks. Offsets when seeking | |
1297 refer to the decoded data, not the offsets in the underlying bundle2 | |
1298 stream. | |
1299 | |
1300 To facilitate quickly seeking within the decoded data, instances of this | |
1301 class maintain a mapping between offsets in the underlying stream and | |
1302 the decoded payload. This mapping will consume memory in proportion | |
1303 to the number of chunks within the payload (which almost certainly | |
1304 increases in proportion with the size of the part). | |
1305 """ | |
1306 def __init__(self, ui, header, fp): | |
1307 # (payload, file) offsets for chunk starts. | |
1308 self._chunkindex = [] | |
1309 | |
1310 super(seekableunbundlepart, self).__init__(ui, header, fp) | |
1311 | |
1312 def _payloadchunks(self, chunknum=0): | |
1313 '''seek to specified chunk and start yielding data''' | |
1314 if len(self._chunkindex) == 0: | |
1315 assert chunknum == 0, 'Must start with chunk 0' | |
1316 self._chunkindex.append((0, self._tellfp())) | |
1317 else: | |
1318 assert chunknum < len(self._chunkindex), \ | |
1319 'Unknown chunk %d' % chunknum | |
1320 self._seekfp(self._chunkindex[chunknum][1]) | |
1321 | |
1322 pos = self._chunkindex[chunknum][0] | |
1323 payloadsize = self._unpack(_fpayloadsize)[0] | |
1324 indebug(self.ui, 'payload chunk size: %i' % payloadsize) | |
1325 while payloadsize: | |
1326 if payloadsize == flaginterrupt: | |
1327 # interruption detection, the handler will now read a | |
1328 # single part and process it. | |
1329 interrupthandler(self.ui, self._fp)() | |
1330 elif payloadsize < 0: | |
1331 msg = 'negative payload chunk size: %i' % payloadsize | |
1332 raise error.BundleValueError(msg) | |
1333 else: | |
1334 result = self._readexact(payloadsize) | |
1335 chunknum += 1 | |
1336 pos += payloadsize | |
1337 if chunknum == len(self._chunkindex): | |
1338 self._chunkindex.append((pos, self._tellfp())) | |
1339 yield result | |
1340 payloadsize = self._unpack(_fpayloadsize)[0] | |
1341 indebug(self.ui, 'payload chunk size: %i' % payloadsize) | |
1342 | |
1343 def _findchunk(self, pos): | |
1344 '''for a given payload position, return a chunk number and offset''' | |
1345 for chunk, (ppos, fpos) in enumerate(self._chunkindex): | |
1346 if ppos == pos: | |
1347 return chunk, 0 | |
1348 elif ppos > pos: | |
1349 return chunk - 1, pos - self._chunkindex[chunk - 1][0] | |
1350 raise ValueError('Unknown chunk') | |
1330 | 1351 |
1331 def tell(self): | 1352 def tell(self): |
1332 return self._pos | 1353 return self._pos |
1333 | 1354 |
1334 def seek(self, offset, whence=os.SEEK_SET): | 1355 def seek(self, offset, whence=os.SEEK_SET): |