Mercurial > public > mercurial-scm > hg-stable
diff mercurial/bundle2.py @ 21019:3dc09f831a2e
bundle2: lazy unbundle of part payload
The `unbundle` part gains a `read` method to retrieve payload content.
This method behaves as a python file-like read method.
The bundle-processing code is updated to make sure a part is fully consumed before
another one is extracted.
Test output changes because the debug output is even more interleaved now.
author | Pierre-Yves David <pierre-yves.david@fb.com> |
---|---|
date | Fri, 11 Apr 2014 16:05:22 -0400 |
parents | b477afb1c81e |
children | 5041163ee382 |
line wrap: on
line diff
--- a/mercurial/bundle2.py Thu Apr 10 22:10:26 2014 -0700 +++ b/mercurial/bundle2.py Fri Apr 11 16:05:22 2014 -0400 @@ -288,6 +288,7 @@ # - exception catching unbundler.params iterparts = iter(unbundler) + part = None try: for part in iterparts: parttype = part.type @@ -302,8 +303,8 @@ # - use a more precise exception raise op.ui.debug('ignoring unknown advisory part %r\n' % key) - # todo: - # - consume the part once we use streaming + # consuming the part + part.read() continue # handler is called outside the above try block so that we don't @@ -311,9 +312,14 @@ # parthandlermapping lookup (any KeyError raised by handler() # itself represents a defect of a different variety). handler(op, part) + part.read() except Exception: + if part is not None: + # consume the bundle content + part.read() for part in iterparts: - pass # consume the bundle content + # consume the bundle content + part.read() raise return op @@ -544,19 +550,21 @@ # unbundle state attr self._headerdata = header self._headeroffset = 0 + self._initialized = False + self.consumed = False # part data self.id = None self.type = None self.mandatoryparams = None self.advisoryparams = None - self.data = None - self._readdata() + self._payloadstream = None + self._readheader() def _fromheader(self, size): """return the next <size> byte from the header""" offset = self._headeroffset data = self._headerdata[offset:(offset + size)] - self._headeroffset += size + self._headeroffset = offset + size return data def _unpackheader(self, format): @@ -566,10 +574,8 @@ data = self._fromheader(struct.calcsize(format)) return _unpack(format, data) - def _readdata(self): + def _readheader(self): """read the header and setup the object""" - # some utility to help reading from the header block - typesize = self._unpackheader(_fparttypesize)[0] self.type = self._fromheader(typesize) self.ui.debug('part type: "%s"\n' % self.type) @@ -597,14 +603,29 @@ self.mandatoryparams = manparams self.advisoryparams = advparams ## part payload - payload = [] - payloadsize = self._unpack(_fpayloadsize)[0] - self.ui.debug('payload chunk size: %i\n' % payloadsize) - while payloadsize: - payload.append(self._readexact(payloadsize)) + def payloadchunks(): payloadsize = self._unpack(_fpayloadsize)[0] self.ui.debug('payload chunk size: %i\n' % payloadsize) - self.data = ''.join(payload) + while payloadsize: + yield self._readexact(payloadsize) + payloadsize = self._unpack(_fpayloadsize)[0] + self.ui.debug('payload chunk size: %i\n' % payloadsize) + self._payloadstream = util.chunkbuffer(payloadchunks()) + # we read the data, tell it + self._initialized = True + + def read(self, size=None): + """read payload data""" + if not self._initialized: + self._readheader() + if size is None: + data = self._payloadstream.read() + else: + data = self._payloadstream.read(size) + if size is None or len(data) < size: + self.consumed = True + return data + @parthandler('changegroup') def handlechangegroup(op, inpart): @@ -619,7 +640,7 @@ # we need to make sure we trigger the creation of a transaction object used # for the whole processing scope. op.gettransaction() - data = StringIO.StringIO(inpart.data) + data = StringIO.StringIO(inpart.read()) data.seek(0) cg = changegroup.readbundle(data, 'bundle2part') ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2') @@ -631,6 +652,7 @@ [('in-reply-to', str(inpart.id)), ('return', '%i' % ret)]) op.reply.addpart(part) + assert not inpart.read() @parthandler('reply:changegroup') def handlechangegroup(op, inpart):