852 # From there, payload need to be decompressed |
852 # From there, payload need to be decompressed |
853 self._fp = self._compengine.decompressorreader(self._fp) |
853 self._fp = self._compengine.decompressorreader(self._fp) |
854 indebug(self.ui, 'start extraction of bundle2 parts') |
854 indebug(self.ui, 'start extraction of bundle2 parts') |
855 headerblock = self._readpartheader() |
855 headerblock = self._readpartheader() |
856 while headerblock is not None: |
856 while headerblock is not None: |
857 part = unbundlepart(self.ui, headerblock, self._fp) |
857 part = seekableunbundlepart(self.ui, headerblock, self._fp) |
858 yield part |
858 yield part |
859 # Seek to the end of the part to force it's consumption so the next |
859 # Seek to the end of the part to force it's consumption so the next |
860 # part can be read. But then seek back to the beginning so the |
860 # part can be read. But then seek back to the beginning so the |
861 # code consuming this generator has a part that starts at 0. |
861 # code consuming this generator has a part that starts at 0. |
862 part.seek(0, os.SEEK_END) |
862 part.seek(0, os.SEEK_END) |
1153 indebug(self.ui, 'bundle2 stream interruption, looking for a part.') |
1153 indebug(self.ui, 'bundle2 stream interruption, looking for a part.') |
1154 headerblock = self._readpartheader() |
1154 headerblock = self._readpartheader() |
1155 if headerblock is None: |
1155 if headerblock is None: |
1156 indebug(self.ui, 'no part found during interruption.') |
1156 indebug(self.ui, 'no part found during interruption.') |
1157 return |
1157 return |
1158 part = unbundlepart(self.ui, headerblock, self._fp) |
1158 part = seekableunbundlepart(self.ui, headerblock, self._fp) |
1159 op = interruptoperation(self.ui) |
1159 op = interruptoperation(self.ui) |
1160 hardabort = False |
1160 hardabort = False |
1161 try: |
1161 try: |
1162 _processpart(op, part) |
1162 _processpart(op, part) |
1163 except (SystemExit, KeyboardInterrupt): |
1163 except (SystemExit, KeyboardInterrupt): |
1205 self.type = None |
1205 self.type = None |
1206 self.mandatoryparams = None |
1206 self.mandatoryparams = None |
1207 self.advisoryparams = None |
1207 self.advisoryparams = None |
1208 self.params = None |
1208 self.params = None |
1209 self.mandatorykeys = () |
1209 self.mandatorykeys = () |
1210 self._payloadstream = None |
|
1211 self._readheader() |
1210 self._readheader() |
1212 self._mandatory = None |
1211 self._mandatory = None |
1213 self._chunkindex = [] #(payload, file) position tuples for chunk starts |
|
1214 self._pos = 0 |
1212 self._pos = 0 |
1215 |
1213 |
1216 def _fromheader(self, size): |
1214 def _fromheader(self, size): |
1217 """return the next <size> byte from the header""" |
1215 """return the next <size> byte from the header""" |
1218 offset = self._headeroffset |
1216 offset = self._headeroffset |
1234 self.advisoryparams = tuple(advisoryparams) |
1232 self.advisoryparams = tuple(advisoryparams) |
1235 # user friendly UI |
1233 # user friendly UI |
1236 self.params = util.sortdict(self.mandatoryparams) |
1234 self.params = util.sortdict(self.mandatoryparams) |
1237 self.params.update(self.advisoryparams) |
1235 self.params.update(self.advisoryparams) |
1238 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams) |
1236 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams) |
1239 |
|
1240 def _payloadchunks(self, chunknum=0): |
|
1241 '''seek to specified chunk and start yielding data''' |
|
1242 if len(self._chunkindex) == 0: |
|
1243 assert chunknum == 0, 'Must start with chunk 0' |
|
1244 self._chunkindex.append((0, self._tellfp())) |
|
1245 else: |
|
1246 assert chunknum < len(self._chunkindex), \ |
|
1247 'Unknown chunk %d' % chunknum |
|
1248 self._seekfp(self._chunkindex[chunknum][1]) |
|
1249 |
|
1250 pos = self._chunkindex[chunknum][0] |
|
1251 payloadsize = self._unpack(_fpayloadsize)[0] |
|
1252 indebug(self.ui, 'payload chunk size: %i' % payloadsize) |
|
1253 while payloadsize: |
|
1254 if payloadsize == flaginterrupt: |
|
1255 # interruption detection, the handler will now read a |
|
1256 # single part and process it. |
|
1257 interrupthandler(self.ui, self._fp)() |
|
1258 elif payloadsize < 0: |
|
1259 msg = 'negative payload chunk size: %i' % payloadsize |
|
1260 raise error.BundleValueError(msg) |
|
1261 else: |
|
1262 result = self._readexact(payloadsize) |
|
1263 chunknum += 1 |
|
1264 pos += payloadsize |
|
1265 if chunknum == len(self._chunkindex): |
|
1266 self._chunkindex.append((pos, self._tellfp())) |
|
1267 yield result |
|
1268 payloadsize = self._unpack(_fpayloadsize)[0] |
|
1269 indebug(self.ui, 'payload chunk size: %i' % payloadsize) |
|
1270 |
|
1271 def _findchunk(self, pos): |
|
1272 '''for a given payload position, return a chunk number and offset''' |
|
1273 for chunk, (ppos, fpos) in enumerate(self._chunkindex): |
|
1274 if ppos == pos: |
|
1275 return chunk, 0 |
|
1276 elif ppos > pos: |
|
1277 return chunk - 1, pos - self._chunkindex[chunk - 1][0] |
|
1278 raise ValueError('Unknown chunk') |
|
1279 |
1237 |
1280 def _readheader(self): |
1238 def _readheader(self): |
1281 """read the header and setup the object""" |
1239 """read the header and setup the object""" |
1282 typesize = self._unpackheader(_fparttypesize)[0] |
1240 typesize = self._unpackheader(_fparttypesize)[0] |
1283 self.type = self._fromheader(typesize) |
1241 self.type = self._fromheader(typesize) |
1325 if not self.consumed and self._pos: |
1283 if not self.consumed and self._pos: |
1326 self.ui.debug('bundle2-input-part: total payload size %i\n' |
1284 self.ui.debug('bundle2-input-part: total payload size %i\n' |
1327 % self._pos) |
1285 % self._pos) |
1328 self.consumed = True |
1286 self.consumed = True |
1329 return data |
1287 return data |
|
1288 |
|
1289 class seekableunbundlepart(unbundlepart): |
|
1290 """A bundle2 part in a bundle that is seekable. |
|
1291 |
|
1292 Regular ``unbundlepart`` instances can only be read once. This class |
|
1293 extends ``unbundlepart`` to enable bi-directional seeking within the |
|
1294 part. |
|
1295 |
|
1296 Bundle2 part data consists of framed chunks. Offsets when seeking |
|
1297 refer to the decoded data, not the offsets in the underlying bundle2 |
|
1298 stream. |
|
1299 |
|
1300 To facilitate quickly seeking within the decoded data, instances of this |
|
1301 class maintain a mapping between offsets in the underlying stream and |
|
1302 the decoded payload. This mapping will consume memory in proportion |
|
1303 to the number of chunks within the payload (which almost certainly |
|
1304 increases in proportion with the size of the part). |
|
1305 """ |
|
1306 def __init__(self, ui, header, fp): |
|
1307 # (payload, file) offsets for chunk starts. |
|
1308 self._chunkindex = [] |
|
1309 |
|
1310 super(seekableunbundlepart, self).__init__(ui, header, fp) |
|
1311 |
|
1312 def _payloadchunks(self, chunknum=0): |
|
1313 '''seek to specified chunk and start yielding data''' |
|
1314 if len(self._chunkindex) == 0: |
|
1315 assert chunknum == 0, 'Must start with chunk 0' |
|
1316 self._chunkindex.append((0, self._tellfp())) |
|
1317 else: |
|
1318 assert chunknum < len(self._chunkindex), \ |
|
1319 'Unknown chunk %d' % chunknum |
|
1320 self._seekfp(self._chunkindex[chunknum][1]) |
|
1321 |
|
1322 pos = self._chunkindex[chunknum][0] |
|
1323 payloadsize = self._unpack(_fpayloadsize)[0] |
|
1324 indebug(self.ui, 'payload chunk size: %i' % payloadsize) |
|
1325 while payloadsize: |
|
1326 if payloadsize == flaginterrupt: |
|
1327 # interruption detection, the handler will now read a |
|
1328 # single part and process it. |
|
1329 interrupthandler(self.ui, self._fp)() |
|
1330 elif payloadsize < 0: |
|
1331 msg = 'negative payload chunk size: %i' % payloadsize |
|
1332 raise error.BundleValueError(msg) |
|
1333 else: |
|
1334 result = self._readexact(payloadsize) |
|
1335 chunknum += 1 |
|
1336 pos += payloadsize |
|
1337 if chunknum == len(self._chunkindex): |
|
1338 self._chunkindex.append((pos, self._tellfp())) |
|
1339 yield result |
|
1340 payloadsize = self._unpack(_fpayloadsize)[0] |
|
1341 indebug(self.ui, 'payload chunk size: %i' % payloadsize) |
|
1342 |
|
1343 def _findchunk(self, pos): |
|
1344 '''for a given payload position, return a chunk number and offset''' |
|
1345 for chunk, (ppos, fpos) in enumerate(self._chunkindex): |
|
1346 if ppos == pos: |
|
1347 return chunk, 0 |
|
1348 elif ppos > pos: |
|
1349 return chunk - 1, pos - self._chunkindex[chunk - 1][0] |
|
1350 raise ValueError('Unknown chunk') |
1330 |
1351 |
1331 def tell(self): |
1352 def tell(self): |
1332 return self._pos |
1353 return self._pos |
1333 |
1354 |
1334 def seek(self, offset, whence=os.SEEK_SET): |
1355 def seek(self, offset, whence=os.SEEK_SET): |