mercurial/bundle2.py
changeset 35112 073eec083e25
parent 35046 241d9caca11e
child 35113 8aa43ff9c12c
equal deleted inserted replaced
35109:e96613048bdd 35112:073eec083e25
   852         # From there, payload need to be decompressed
   852         # From there, payload need to be decompressed
   853         self._fp = self._compengine.decompressorreader(self._fp)
   853         self._fp = self._compengine.decompressorreader(self._fp)
   854         indebug(self.ui, 'start extraction of bundle2 parts')
   854         indebug(self.ui, 'start extraction of bundle2 parts')
   855         headerblock = self._readpartheader()
   855         headerblock = self._readpartheader()
   856         while headerblock is not None:
   856         while headerblock is not None:
   857             part = unbundlepart(self.ui, headerblock, self._fp)
   857             part = seekableunbundlepart(self.ui, headerblock, self._fp)
   858             yield part
   858             yield part
   859             # Seek to the end of the part to force it's consumption so the next
   859             # Seek to the end of the part to force it's consumption so the next
   860             # part can be read. But then seek back to the beginning so the
   860             # part can be read. But then seek back to the beginning so the
   861             # code consuming this generator has a part that starts at 0.
   861             # code consuming this generator has a part that starts at 0.
   862             part.seek(0, os.SEEK_END)
   862             part.seek(0, os.SEEK_END)
  1153         indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
  1153         indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
  1154         headerblock = self._readpartheader()
  1154         headerblock = self._readpartheader()
  1155         if headerblock is None:
  1155         if headerblock is None:
  1156             indebug(self.ui, 'no part found during interruption.')
  1156             indebug(self.ui, 'no part found during interruption.')
  1157             return
  1157             return
  1158         part = unbundlepart(self.ui, headerblock, self._fp)
  1158         part = seekableunbundlepart(self.ui, headerblock, self._fp)
  1159         op = interruptoperation(self.ui)
  1159         op = interruptoperation(self.ui)
  1160         hardabort = False
  1160         hardabort = False
  1161         try:
  1161         try:
  1162             _processpart(op, part)
  1162             _processpart(op, part)
  1163         except (SystemExit, KeyboardInterrupt):
  1163         except (SystemExit, KeyboardInterrupt):
  1205         self.type = None
  1205         self.type = None
  1206         self.mandatoryparams = None
  1206         self.mandatoryparams = None
  1207         self.advisoryparams = None
  1207         self.advisoryparams = None
  1208         self.params = None
  1208         self.params = None
  1209         self.mandatorykeys = ()
  1209         self.mandatorykeys = ()
  1210         self._payloadstream = None
       
  1211         self._readheader()
  1210         self._readheader()
  1212         self._mandatory = None
  1211         self._mandatory = None
  1213         self._chunkindex = [] #(payload, file) position tuples for chunk starts
       
  1214         self._pos = 0
  1212         self._pos = 0
  1215 
  1213 
  1216     def _fromheader(self, size):
  1214     def _fromheader(self, size):
  1217         """return the next <size> byte from the header"""
  1215         """return the next <size> byte from the header"""
  1218         offset = self._headeroffset
  1216         offset = self._headeroffset
  1234         self.advisoryparams  = tuple(advisoryparams)
  1232         self.advisoryparams  = tuple(advisoryparams)
  1235         # user friendly UI
  1233         # user friendly UI
  1236         self.params = util.sortdict(self.mandatoryparams)
  1234         self.params = util.sortdict(self.mandatoryparams)
  1237         self.params.update(self.advisoryparams)
  1235         self.params.update(self.advisoryparams)
  1238         self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
  1236         self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
  1239 
       
  1240     def _payloadchunks(self, chunknum=0):
       
  1241         '''seek to specified chunk and start yielding data'''
       
  1242         if len(self._chunkindex) == 0:
       
  1243             assert chunknum == 0, 'Must start with chunk 0'
       
  1244             self._chunkindex.append((0, self._tellfp()))
       
  1245         else:
       
  1246             assert chunknum < len(self._chunkindex), \
       
  1247                    'Unknown chunk %d' % chunknum
       
  1248             self._seekfp(self._chunkindex[chunknum][1])
       
  1249 
       
  1250         pos = self._chunkindex[chunknum][0]
       
  1251         payloadsize = self._unpack(_fpayloadsize)[0]
       
  1252         indebug(self.ui, 'payload chunk size: %i' % payloadsize)
       
  1253         while payloadsize:
       
  1254             if payloadsize == flaginterrupt:
       
  1255                 # interruption detection, the handler will now read a
       
  1256                 # single part and process it.
       
  1257                 interrupthandler(self.ui, self._fp)()
       
  1258             elif payloadsize < 0:
       
  1259                 msg = 'negative payload chunk size: %i' %  payloadsize
       
  1260                 raise error.BundleValueError(msg)
       
  1261             else:
       
  1262                 result = self._readexact(payloadsize)
       
  1263                 chunknum += 1
       
  1264                 pos += payloadsize
       
  1265                 if chunknum == len(self._chunkindex):
       
  1266                     self._chunkindex.append((pos, self._tellfp()))
       
  1267                 yield result
       
  1268             payloadsize = self._unpack(_fpayloadsize)[0]
       
  1269             indebug(self.ui, 'payload chunk size: %i' % payloadsize)
       
  1270 
       
  1271     def _findchunk(self, pos):
       
  1272         '''for a given payload position, return a chunk number and offset'''
       
  1273         for chunk, (ppos, fpos) in enumerate(self._chunkindex):
       
  1274             if ppos == pos:
       
  1275                 return chunk, 0
       
  1276             elif ppos > pos:
       
  1277                 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
       
  1278         raise ValueError('Unknown chunk')
       
  1279 
  1237 
  1280     def _readheader(self):
  1238     def _readheader(self):
  1281         """read the header and setup the object"""
  1239         """read the header and setup the object"""
  1282         typesize = self._unpackheader(_fparttypesize)[0]
  1240         typesize = self._unpackheader(_fparttypesize)[0]
  1283         self.type = self._fromheader(typesize)
  1241         self.type = self._fromheader(typesize)
  1325             if not self.consumed and self._pos:
  1283             if not self.consumed and self._pos:
  1326                 self.ui.debug('bundle2-input-part: total payload size %i\n'
  1284                 self.ui.debug('bundle2-input-part: total payload size %i\n'
  1327                               % self._pos)
  1285                               % self._pos)
  1328             self.consumed = True
  1286             self.consumed = True
  1329         return data
  1287         return data
       
  1288 
       
  1289 class seekableunbundlepart(unbundlepart):
       
  1290     """A bundle2 part in a bundle that is seekable.
       
  1291 
       
  1292     Regular ``unbundlepart`` instances can only be read once. This class
       
  1293     extends ``unbundlepart`` to enable bi-directional seeking within the
       
  1294     part.
       
  1295 
       
  1296     Bundle2 part data consists of framed chunks. Offsets when seeking
       
  1297     refer to the decoded data, not the offsets in the underlying bundle2
       
  1298     stream.
       
  1299 
       
  1300     To facilitate quickly seeking within the decoded data, instances of this
       
  1301     class maintain a mapping between offsets in the underlying stream and
       
  1302     the decoded payload. This mapping will consume memory in proportion
       
  1303     to the number of chunks within the payload (which almost certainly
       
  1304     increases in proportion with the size of the part).
       
  1305     """
       
  1306     def __init__(self, ui, header, fp):
       
  1307         # (payload, file) offsets for chunk starts.
       
  1308         self._chunkindex = []
       
  1309 
       
  1310         super(seekableunbundlepart, self).__init__(ui, header, fp)
       
  1311 
       
  1312     def _payloadchunks(self, chunknum=0):
       
  1313         '''seek to specified chunk and start yielding data'''
       
  1314         if len(self._chunkindex) == 0:
       
  1315             assert chunknum == 0, 'Must start with chunk 0'
       
  1316             self._chunkindex.append((0, self._tellfp()))
       
  1317         else:
       
  1318             assert chunknum < len(self._chunkindex), \
       
  1319                    'Unknown chunk %d' % chunknum
       
  1320             self._seekfp(self._chunkindex[chunknum][1])
       
  1321 
       
  1322         pos = self._chunkindex[chunknum][0]
       
  1323         payloadsize = self._unpack(_fpayloadsize)[0]
       
  1324         indebug(self.ui, 'payload chunk size: %i' % payloadsize)
       
  1325         while payloadsize:
       
  1326             if payloadsize == flaginterrupt:
       
  1327                 # interruption detection, the handler will now read a
       
  1328                 # single part and process it.
       
  1329                 interrupthandler(self.ui, self._fp)()
       
  1330             elif payloadsize < 0:
       
  1331                 msg = 'negative payload chunk size: %i' %  payloadsize
       
  1332                 raise error.BundleValueError(msg)
       
  1333             else:
       
  1334                 result = self._readexact(payloadsize)
       
  1335                 chunknum += 1
       
  1336                 pos += payloadsize
       
  1337                 if chunknum == len(self._chunkindex):
       
  1338                     self._chunkindex.append((pos, self._tellfp()))
       
  1339                 yield result
       
  1340             payloadsize = self._unpack(_fpayloadsize)[0]
       
  1341             indebug(self.ui, 'payload chunk size: %i' % payloadsize)
       
  1342 
       
  1343     def _findchunk(self, pos):
       
  1344         '''for a given payload position, return a chunk number and offset'''
       
  1345         for chunk, (ppos, fpos) in enumerate(self._chunkindex):
       
  1346             if ppos == pos:
       
  1347                 return chunk, 0
       
  1348             elif ppos > pos:
       
  1349                 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
       
  1350         raise ValueError('Unknown chunk')
  1330 
  1351 
  1331     def tell(self):
  1352     def tell(self):
  1332         return self._pos
  1353         return self._pos
  1333 
  1354 
  1334     def seek(self, offset, whence=os.SEEK_SET):
  1355     def seek(self, offset, whence=os.SEEK_SET):