1185 raise error.ProgrammingError('no repo access from stream interruption') |
1185 raise error.ProgrammingError('no repo access from stream interruption') |
1186 |
1186 |
1187 def gettransaction(self): |
1187 def gettransaction(self): |
1188 raise TransactionUnavailable('no repo access from stream interruption') |
1188 raise TransactionUnavailable('no repo access from stream interruption') |
1189 |
1189 |
|
1190 def decodepayloadchunks(ui, fh): |
|
1191 """Reads bundle2 part payload data into chunks. |
|
1192 |
|
1193 Part payload data consists of framed chunks. This function takes |
|
1194 a file handle and emits those chunks. |
|
1195 """ |
|
1196 headersize = struct.calcsize(_fpayloadsize) |
|
1197 readexactly = changegroup.readexactly |
|
1198 |
|
1199 chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0] |
|
1200 indebug(ui, 'payload chunk size: %i' % chunksize) |
|
1201 |
|
1202 while chunksize: |
|
1203 if chunksize >= 0: |
|
1204 yield readexactly(fh, chunksize) |
|
1205 elif chunksize == flaginterrupt: |
|
1206 # Interrupt "signal" detected. The regular stream is interrupted |
|
1207 # and a bundle2 part follows. Consume it. |
|
1208 interrupthandler(ui, fh)() |
|
1209 else: |
|
1210 raise error.BundleValueError( |
|
1211 'negative payload chunk size: %s' % chunksize) |
|
1212 |
|
1213 chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0] |
|
1214 indebug(ui, 'payload chunk size: %i' % chunksize) |
|
1215 |
1190 class unbundlepart(unpackermixin): |
1216 class unbundlepart(unpackermixin): |
1191 """a bundle part read from a bundle""" |
1217 """a bundle part read from a bundle""" |
1192 |
1218 |
1193 def __init__(self, ui, header, fp): |
1219 def __init__(self, ui, header, fp): |
1194 super(unbundlepart, self).__init__(fp) |
1220 super(unbundlepart, self).__init__(fp) |
1268 ## part payload |
1294 ## part payload |
1269 self._payloadstream = util.chunkbuffer(self._payloadchunks()) |
1295 self._payloadstream = util.chunkbuffer(self._payloadchunks()) |
1270 # we read the data, tell it |
1296 # we read the data, tell it |
1271 self._initialized = True |
1297 self._initialized = True |
1272 |
1298 |
|
1299 def _payloadchunks(self): |
|
1300 """Generator of decoded chunks in the payload.""" |
|
1301 return decodepayloadchunks(self.ui, self._fp) |
|
1302 |
1273 def read(self, size=None): |
1303 def read(self, size=None): |
1274 """read payload data""" |
1304 """read payload data""" |
1275 if not self._initialized: |
1305 if not self._initialized: |
1276 self._readheader() |
1306 self._readheader() |
1277 if size is None: |
1307 if size is None: |
1318 assert chunknum < len(self._chunkindex), \ |
1348 assert chunknum < len(self._chunkindex), \ |
1319 'Unknown chunk %d' % chunknum |
1349 'Unknown chunk %d' % chunknum |
1320 self._seekfp(self._chunkindex[chunknum][1]) |
1350 self._seekfp(self._chunkindex[chunknum][1]) |
1321 |
1351 |
1322 pos = self._chunkindex[chunknum][0] |
1352 pos = self._chunkindex[chunknum][0] |
1323 payloadsize = self._unpack(_fpayloadsize)[0] |
1353 |
1324 indebug(self.ui, 'payload chunk size: %i' % payloadsize) |
1354 for chunk in decodepayloadchunks(self.ui, self._fp): |
1325 while payloadsize: |
1355 chunknum += 1 |
1326 if payloadsize == flaginterrupt: |
1356 pos += len(chunk) |
1327 # interruption detection, the handler will now read a |
1357 if chunknum == len(self._chunkindex): |
1328 # single part and process it. |
1358 self._chunkindex.append((pos, self._tellfp())) |
1329 interrupthandler(self.ui, self._fp)() |
1359 |
1330 elif payloadsize < 0: |
1360 yield chunk |
1331 msg = 'negative payload chunk size: %i' % payloadsize |
|
1332 raise error.BundleValueError(msg) |
|
1333 else: |
|
1334 result = self._readexact(payloadsize) |
|
1335 chunknum += 1 |
|
1336 pos += payloadsize |
|
1337 if chunknum == len(self._chunkindex): |
|
1338 self._chunkindex.append((pos, self._tellfp())) |
|
1339 yield result |
|
1340 payloadsize = self._unpack(_fpayloadsize)[0] |
|
1341 indebug(self.ui, 'payload chunk size: %i' % payloadsize) |
|
1342 |
1361 |
1343 def _findchunk(self, pos): |
1362 def _findchunk(self, pos): |
1344 '''for a given payload position, return a chunk number and offset''' |
1363 '''for a given payload position, return a chunk number and offset''' |
1345 for chunk, (ppos, fpos) in enumerate(self._chunkindex): |
1364 for chunk, (ppos, fpos) in enumerate(self._chunkindex): |
1346 if ppos == pos: |
1365 if ppos == pos: |