diff mercurial/bundle2.py @ 21019:3dc09f831a2e

bundle2: lazy unbundle of part payload The `unbundle` part gains a `read` method to retrieve payload content. This method behaves as a python file-like read method. The bundle-processing code is updated to make sure a part is fully consumed before another one is extracted. Test output changes because the debug output is even more interleaved now.
author Pierre-Yves David <pierre-yves.david@fb.com>
date Fri, 11 Apr 2014 16:05:22 -0400
parents b477afb1c81e
children 5041163ee382
line wrap: on
line diff
--- a/mercurial/bundle2.py	Thu Apr 10 22:10:26 2014 -0700
+++ b/mercurial/bundle2.py	Fri Apr 11 16:05:22 2014 -0400
@@ -288,6 +288,7 @@
     # - exception catching
     unbundler.params
     iterparts = iter(unbundler)
+    part = None
     try:
         for part in iterparts:
             parttype = part.type
@@ -302,8 +303,8 @@
                     # - use a more precise exception
                     raise
                 op.ui.debug('ignoring unknown advisory part %r\n' % key)
-                # todo:
-                # - consume the part once we use streaming
+                # consuming the part
+                part.read()
                 continue
 
             # handler is called outside the above try block so that we don't
@@ -311,9 +312,14 @@
             # parthandlermapping lookup (any KeyError raised by handler()
             # itself represents a defect of a different variety).
             handler(op, part)
+            part.read()
     except Exception:
+        if part is not None:
+            # consume the bundle content
+            part.read()
         for part in iterparts:
-            pass # consume the bundle content
+            # consume the bundle content
+            part.read()
         raise
     return op
 
@@ -544,19 +550,21 @@
         # unbundle state attr
         self._headerdata = header
         self._headeroffset = 0
+        self._initialized = False
+        self.consumed = False
         # part data
         self.id = None
         self.type = None
         self.mandatoryparams = None
         self.advisoryparams = None
-        self.data = None
-        self._readdata()
+        self._payloadstream = None
+        self._readheader()
 
     def _fromheader(self, size):
         """return the next <size> byte from the header"""
         offset = self._headeroffset
         data = self._headerdata[offset:(offset + size)]
-        self._headeroffset += size
+        self._headeroffset = offset + size
         return data
 
     def _unpackheader(self, format):
@@ -566,10 +574,8 @@
         data = self._fromheader(struct.calcsize(format))
         return _unpack(format, data)
 
-    def _readdata(self):
+    def _readheader(self):
         """read the header and setup the object"""
-        # some utility to help reading from the header block
-
         typesize = self._unpackheader(_fparttypesize)[0]
         self.type = self._fromheader(typesize)
         self.ui.debug('part type: "%s"\n' % self.type)
@@ -597,14 +603,29 @@
         self.mandatoryparams = manparams
         self.advisoryparams  = advparams
         ## part payload
-        payload = []
-        payloadsize = self._unpack(_fpayloadsize)[0]
-        self.ui.debug('payload chunk size: %i\n' % payloadsize)
-        while payloadsize:
-            payload.append(self._readexact(payloadsize))
+        def payloadchunks():
             payloadsize = self._unpack(_fpayloadsize)[0]
             self.ui.debug('payload chunk size: %i\n' % payloadsize)
-        self.data = ''.join(payload)
+            while payloadsize:
+                yield self._readexact(payloadsize)
+                payloadsize = self._unpack(_fpayloadsize)[0]
+                self.ui.debug('payload chunk size: %i\n' % payloadsize)
+        self._payloadstream = util.chunkbuffer(payloadchunks())
+        # we read the data, tell it
+        self._initialized = True
+
+    def read(self, size=None):
+        """read payload data"""
+        if not self._initialized:
+            self._readheader()
+        if size is None:
+            data = self._payloadstream.read()
+        else:
+            data = self._payloadstream.read(size)
+        if size is None or len(data) < size:
+            self.consumed = True
+        return data
+
 
 @parthandler('changegroup')
 def handlechangegroup(op, inpart):
@@ -619,7 +640,7 @@
     # we need to make sure we trigger the creation of a transaction object used
     # for the whole processing scope.
     op.gettransaction()
-    data = StringIO.StringIO(inpart.data)
+    data = StringIO.StringIO(inpart.read())
     data.seek(0)
     cg = changegroup.readbundle(data, 'bundle2part')
     ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
@@ -631,6 +652,7 @@
                            [('in-reply-to', str(inpart.id)),
                             ('return', '%i' % ret)])
         op.reply.addpart(part)
+    assert not inpart.read()
 
 @parthandler('reply:changegroup')
 def handlechangegroup(op, inpart):