--- a/mercurial/bundle2.py Mon Nov 13 19:22:11 2017 -0800
+++ b/mercurial/bundle2.py Sun Nov 12 19:46:15 2017 -0800
@@ -1187,6 +1187,32 @@
def gettransaction(self):
raise TransactionUnavailable('no repo access from stream interruption')
+def decodepayloadchunks(ui, fh):
+ """Reads bundle2 part payload data into chunks.
+
+ Part payload data consists of framed chunks. This function takes
+ a file handle and emits those chunks.
+ """
+ headersize = struct.calcsize(_fpayloadsize)
+ readexactly = changegroup.readexactly
+
+ chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0]
+ indebug(ui, 'payload chunk size: %i' % chunksize)
+
+ while chunksize:
+ if chunksize >= 0:
+ yield readexactly(fh, chunksize)
+ elif chunksize == flaginterrupt:
+ # Interrupt "signal" detected. The regular stream is interrupted
+ # and a bundle2 part follows. Consume it.
+ interrupthandler(ui, fh)()
+ else:
+ raise error.BundleValueError(
+ 'negative payload chunk size: %s' % chunksize)
+
+ chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0]
+ indebug(ui, 'payload chunk size: %i' % chunksize)
+
class unbundlepart(unpackermixin):
"""a bundle part read from a bundle"""
@@ -1270,6 +1296,10 @@
# we read the data, tell it
self._initialized = True
+ def _payloadchunks(self):
+ """Generator of decoded chunks in the payload."""
+ return decodepayloadchunks(self.ui, self._fp)
+
def read(self, size=None):
"""read payload data"""
if not self._initialized:
@@ -1320,25 +1350,14 @@
self._seekfp(self._chunkindex[chunknum][1])
pos = self._chunkindex[chunknum][0]
- payloadsize = self._unpack(_fpayloadsize)[0]
- indebug(self.ui, 'payload chunk size: %i' % payloadsize)
- while payloadsize:
- if payloadsize == flaginterrupt:
- # interruption detection, the handler will now read a
- # single part and process it.
- interrupthandler(self.ui, self._fp)()
- elif payloadsize < 0:
- msg = 'negative payload chunk size: %i' % payloadsize
- raise error.BundleValueError(msg)
- else:
- result = self._readexact(payloadsize)
- chunknum += 1
- pos += payloadsize
- if chunknum == len(self._chunkindex):
- self._chunkindex.append((pos, self._tellfp()))
- yield result
- payloadsize = self._unpack(_fpayloadsize)[0]
- indebug(self.ui, 'payload chunk size: %i' % payloadsize)
+
+ for chunk in decodepayloadchunks(self.ui, self._fp):
+ chunknum += 1
+ pos += len(chunk)
+ if chunknum == len(self._chunkindex):
+ self._chunkindex.append((pos, self._tellfp()))
+
+ yield chunk
def _findchunk(self, pos):
'''for a given payload position, return a chunk number and offset'''