mercurial/bundle2.py
changeset 35113 8aa43ff9c12c
parent 35112 073eec083e25
child 35114 db5038525718
equal deleted inserted replaced
35112:073eec083e25 35113:8aa43ff9c12c
  1185         raise error.ProgrammingError('no repo access from stream interruption')
  1185         raise error.ProgrammingError('no repo access from stream interruption')
  1186 
  1186 
  1187     def gettransaction(self):
  1187     def gettransaction(self):
  1188         raise TransactionUnavailable('no repo access from stream interruption')
  1188         raise TransactionUnavailable('no repo access from stream interruption')
  1189 
  1189 
       
  1190 def decodepayloadchunks(ui, fh):
       
  1191     """Reads bundle2 part payload data into chunks.
       
  1192 
       
  1193     Part payload data consists of framed chunks. This function takes
       
  1194     a file handle and emits those chunks.
       
  1195     """
       
  1196     headersize = struct.calcsize(_fpayloadsize)
       
  1197     readexactly = changegroup.readexactly
       
  1198 
       
  1199     chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0]
       
  1200     indebug(ui, 'payload chunk size: %i' % chunksize)
       
  1201 
       
  1202     while chunksize:
       
  1203         if chunksize >= 0:
       
  1204             yield readexactly(fh, chunksize)
       
  1205         elif chunksize == flaginterrupt:
       
  1206             # Interrupt "signal" detected. The regular stream is interrupted
       
  1207             # and a bundle2 part follows. Consume it.
       
  1208             interrupthandler(ui, fh)()
       
  1209         else:
       
  1210             raise error.BundleValueError(
       
  1211                 'negative payload chunk size: %s' % chunksize)
       
  1212 
       
  1213         chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0]
       
  1214         indebug(ui, 'payload chunk size: %i' % chunksize)
       
  1215 
  1190 class unbundlepart(unpackermixin):
  1216 class unbundlepart(unpackermixin):
  1191     """a bundle part read from a bundle"""
  1217     """a bundle part read from a bundle"""
  1192 
  1218 
  1193     def __init__(self, ui, header, fp):
  1219     def __init__(self, ui, header, fp):
  1194         super(unbundlepart, self).__init__(fp)
  1220         super(unbundlepart, self).__init__(fp)
  1268         ## part payload
  1294         ## part payload
  1269         self._payloadstream = util.chunkbuffer(self._payloadchunks())
  1295         self._payloadstream = util.chunkbuffer(self._payloadchunks())
  1270         # we read the data, tell it
  1296         # we read the data, tell it
  1271         self._initialized = True
  1297         self._initialized = True
  1272 
  1298 
       
  1299     def _payloadchunks(self):
       
  1300         """Generator of decoded chunks in the payload."""
       
  1301         return decodepayloadchunks(self.ui, self._fp)
       
  1302 
  1273     def read(self, size=None):
  1303     def read(self, size=None):
  1274         """read payload data"""
  1304         """read payload data"""
  1275         if not self._initialized:
  1305         if not self._initialized:
  1276             self._readheader()
  1306             self._readheader()
  1277         if size is None:
  1307         if size is None:
  1318             assert chunknum < len(self._chunkindex), \
  1348             assert chunknum < len(self._chunkindex), \
  1319                    'Unknown chunk %d' % chunknum
  1349                    'Unknown chunk %d' % chunknum
  1320             self._seekfp(self._chunkindex[chunknum][1])
  1350             self._seekfp(self._chunkindex[chunknum][1])
  1321 
  1351 
  1322         pos = self._chunkindex[chunknum][0]
  1352         pos = self._chunkindex[chunknum][0]
  1323         payloadsize = self._unpack(_fpayloadsize)[0]
  1353 
  1324         indebug(self.ui, 'payload chunk size: %i' % payloadsize)
  1354         for chunk in decodepayloadchunks(self.ui, self._fp):
  1325         while payloadsize:
  1355             chunknum += 1
  1326             if payloadsize == flaginterrupt:
  1356             pos += len(chunk)
  1327                 # interruption detection, the handler will now read a
  1357             if chunknum == len(self._chunkindex):
  1328                 # single part and process it.
  1358                 self._chunkindex.append((pos, self._tellfp()))
  1329                 interrupthandler(self.ui, self._fp)()
  1359 
  1330             elif payloadsize < 0:
  1360             yield chunk
  1331                 msg = 'negative payload chunk size: %i' %  payloadsize
       
  1332                 raise error.BundleValueError(msg)
       
  1333             else:
       
  1334                 result = self._readexact(payloadsize)
       
  1335                 chunknum += 1
       
  1336                 pos += payloadsize
       
  1337                 if chunknum == len(self._chunkindex):
       
  1338                     self._chunkindex.append((pos, self._tellfp()))
       
  1339                 yield result
       
  1340             payloadsize = self._unpack(_fpayloadsize)[0]
       
  1341             indebug(self.ui, 'payload chunk size: %i' % payloadsize)
       
  1342 
  1361 
  1343     def _findchunk(self, pos):
  1362     def _findchunk(self, pos):
  1344         '''for a given payload position, return a chunk number and offset'''
  1363         '''for a given payload position, return a chunk number and offset'''
  1345         for chunk, (ppos, fpos) in enumerate(self._chunkindex):
  1364         for chunk, (ppos, fpos) in enumerate(self._chunkindex):
  1346             if ppos == pos:
  1365             if ppos == pos: