mercurial/bundle2.py
changeset 35113 8aa43ff9c12c
parent 35112 073eec083e25
child 35114 db5038525718
--- a/mercurial/bundle2.py	Mon Nov 13 19:22:11 2017 -0800
+++ b/mercurial/bundle2.py	Sun Nov 12 19:46:15 2017 -0800
@@ -1187,6 +1187,32 @@
     def gettransaction(self):
         raise TransactionUnavailable('no repo access from stream interruption')
 
+def decodepayloadchunks(ui, fh):
+    """Reads bundle2 part payload data into chunks.
+
+    Part payload data consists of framed chunks. This function takes
+    a file handle and emits those chunks.
+    """
+    headersize = struct.calcsize(_fpayloadsize)
+    readexactly = changegroup.readexactly
+
+    chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0]
+    indebug(ui, 'payload chunk size: %i' % chunksize)
+
+    while chunksize:
+        if chunksize >= 0:
+            yield readexactly(fh, chunksize)
+        elif chunksize == flaginterrupt:
+            # Interrupt "signal" detected. The regular stream is interrupted
+            # and a bundle2 part follows. Consume it.
+            interrupthandler(ui, fh)()
+        else:
+            raise error.BundleValueError(
+                'negative payload chunk size: %s' % chunksize)
+
+        chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0]
+        indebug(ui, 'payload chunk size: %i' % chunksize)
+
 class unbundlepart(unpackermixin):
     """a bundle part read from a bundle"""
 
@@ -1270,6 +1296,10 @@
         # we read the data, tell it
         self._initialized = True
 
+    def _payloadchunks(self):
+        """Generator of decoded chunks in the payload."""
+        return decodepayloadchunks(self.ui, self._fp)
+
     def read(self, size=None):
         """read payload data"""
         if not self._initialized:
@@ -1320,25 +1350,14 @@
             self._seekfp(self._chunkindex[chunknum][1])
 
         pos = self._chunkindex[chunknum][0]
-        payloadsize = self._unpack(_fpayloadsize)[0]
-        indebug(self.ui, 'payload chunk size: %i' % payloadsize)
-        while payloadsize:
-            if payloadsize == flaginterrupt:
-                # interruption detection, the handler will now read a
-                # single part and process it.
-                interrupthandler(self.ui, self._fp)()
-            elif payloadsize < 0:
-                msg = 'negative payload chunk size: %i' %  payloadsize
-                raise error.BundleValueError(msg)
-            else:
-                result = self._readexact(payloadsize)
-                chunknum += 1
-                pos += payloadsize
-                if chunknum == len(self._chunkindex):
-                    self._chunkindex.append((pos, self._tellfp()))
-                yield result
-            payloadsize = self._unpack(_fpayloadsize)[0]
-            indebug(self.ui, 'payload chunk size: %i' % payloadsize)
+
+        for chunk in decodepayloadchunks(self.ui, self._fp):
+            chunknum += 1
+            pos += len(chunk)
+            if chunknum == len(self._chunkindex):
+                self._chunkindex.append((pos, self._tellfp()))
+
+            yield chunk
 
     def _findchunk(self, pos):
         '''for a given payload position, return a chunk number and offset'''