--- a/mercurial/bundle2.py Tue Oct 21 12:38:28 2014 -0700
+++ b/mercurial/bundle2.py Tue Oct 14 10:47:47 2014 -0700
@@ -695,6 +695,61 @@
elif len(self.data):
yield self.data
+
+flaginterrupt = -1
+
+class interrupthandler(unpackermixin):
+ """read one part and process it with restricted capability
+
+ This allows to transmit exception raised on the producer size during part
+ iteration while the consumer is reading a part.
+
+ Part processed in this manner only have access to a ui object,"""
+
+ def __init__(self, ui, fp):
+ super(interrupthandler, self).__init__(fp)
+ self.ui = ui
+
+ def _readpartheader(self):
+ """reads a part header size and return the bytes blob
+
+ returns None if empty"""
+ headersize = self._unpack(_fpartheadersize)[0]
+ if headersize < 0:
+ raise error.BundleValueError('negative part header size: %i'
+ % headersize)
+ self.ui.debug('part header size: %i\n' % headersize)
+ if headersize:
+ return self._readexact(headersize)
+ return None
+
+ def __call__(self):
+ self.ui.debug('bundle2 stream interruption, looking for a part.\n')
+ headerblock = self._readpartheader()
+ if headerblock is None:
+ self.ui.debug('no part found during iterruption.\n')
+ return
+ part = unbundlepart(self.ui, headerblock, self._fp)
+ op = interruptoperation(self.ui)
+ _processpart(op, part)
+
+class interruptoperation(object):
+ """A limited operation to be use by part handler during interruption
+
+ It only have access to an ui object.
+ """
+
+ def __init__(self, ui):
+ self.ui = ui
+ self.reply = None
+
+ @property
+ def repo(self):
+ raise RuntimeError('no repo access from stream interruption')
+
+ def gettransaction(self):
+ raise TransactionUnavailable('no repo access from stream interruption')
+
class unbundlepart(unpackermixin):
"""a bundle part read from a bundle"""
@@ -772,10 +827,15 @@
payloadsize = self._unpack(_fpayloadsize)[0]
self.ui.debug('payload chunk size: %i\n' % payloadsize)
while payloadsize:
- if payloadsize < 0:
- msg = 'negative payload chunk size: %i' % payloadsize
+ if payloadsize == flaginterrupt:
+ # interruption detection, the handler will now read a
+ # single part and process it.
+ interrupthandler(self.ui, self._fp)()
+ elif payloadsize < 0:
+ msg = 'negative payload chunk size: %i' % payloadsize
raise error.BundleValueError(msg)
- yield self._readexact(payloadsize)
+ else:
+ yield self._readexact(payloadsize)
payloadsize = self._unpack(_fpayloadsize)[0]
self.ui.debug('payload chunk size: %i\n' % payloadsize)
self._payloadstream = util.chunkbuffer(payloadchunks())