693 yield chunk |
693 yield chunk |
694 chunk = buff.read(preferedchunksize) |
694 chunk = buff.read(preferedchunksize) |
695 elif len(self.data): |
695 elif len(self.data): |
696 yield self.data |
696 yield self.data |
697 |
697 |
|
698 |
|
699 flaginterrupt = -1 |
|
700 |
|
701 class interrupthandler(unpackermixin): |
|
702 """read one part and process it with restricted capability |
|
703 |
|
704 This allows to transmit exception raised on the producer size during part |
|
705 iteration while the consumer is reading a part. |
|
706 |
|
707 Part processed in this manner only have access to a ui object,""" |
|
708 |
|
709 def __init__(self, ui, fp): |
|
710 super(interrupthandler, self).__init__(fp) |
|
711 self.ui = ui |
|
712 |
|
713 def _readpartheader(self): |
|
714 """reads a part header size and return the bytes blob |
|
715 |
|
716 returns None if empty""" |
|
717 headersize = self._unpack(_fpartheadersize)[0] |
|
718 if headersize < 0: |
|
719 raise error.BundleValueError('negative part header size: %i' |
|
720 % headersize) |
|
721 self.ui.debug('part header size: %i\n' % headersize) |
|
722 if headersize: |
|
723 return self._readexact(headersize) |
|
724 return None |
|
725 |
|
726 def __call__(self): |
|
727 self.ui.debug('bundle2 stream interruption, looking for a part.\n') |
|
728 headerblock = self._readpartheader() |
|
729 if headerblock is None: |
|
730 self.ui.debug('no part found during iterruption.\n') |
|
731 return |
|
732 part = unbundlepart(self.ui, headerblock, self._fp) |
|
733 op = interruptoperation(self.ui) |
|
734 _processpart(op, part) |
|
735 |
|
736 class interruptoperation(object): |
|
737 """A limited operation to be use by part handler during interruption |
|
738 |
|
739 It only have access to an ui object. |
|
740 """ |
|
741 |
|
742 def __init__(self, ui): |
|
743 self.ui = ui |
|
744 self.reply = None |
|
745 |
|
746 @property |
|
747 def repo(self): |
|
748 raise RuntimeError('no repo access from stream interruption') |
|
749 |
|
750 def gettransaction(self): |
|
751 raise TransactionUnavailable('no repo access from stream interruption') |
|
752 |
698 class unbundlepart(unpackermixin): |
753 class unbundlepart(unpackermixin): |
699 """a bundle part read from a bundle""" |
754 """a bundle part read from a bundle""" |
700 |
755 |
701 def __init__(self, ui, header, fp): |
756 def __init__(self, ui, header, fp): |
702 super(unbundlepart, self).__init__(fp) |
757 super(unbundlepart, self).__init__(fp) |
770 ## part payload |
825 ## part payload |
771 def payloadchunks(): |
826 def payloadchunks(): |
772 payloadsize = self._unpack(_fpayloadsize)[0] |
827 payloadsize = self._unpack(_fpayloadsize)[0] |
773 self.ui.debug('payload chunk size: %i\n' % payloadsize) |
828 self.ui.debug('payload chunk size: %i\n' % payloadsize) |
774 while payloadsize: |
829 while payloadsize: |
775 if payloadsize < 0: |
830 if payloadsize == flaginterrupt: |
776 msg = 'negative payload chunk size: %i' % payloadsize |
831 # interruption detection, the handler will now read a |
|
832 # single part and process it. |
|
833 interrupthandler(self.ui, self._fp)() |
|
834 elif payloadsize < 0: |
|
835 msg = 'negative payload chunk size: %i' % payloadsize |
777 raise error.BundleValueError(msg) |
836 raise error.BundleValueError(msg) |
778 yield self._readexact(payloadsize) |
837 else: |
|
838 yield self._readexact(payloadsize) |
779 payloadsize = self._unpack(_fpayloadsize)[0] |
839 payloadsize = self._unpack(_fpayloadsize)[0] |
780 self.ui.debug('payload chunk size: %i\n' % payloadsize) |
840 self.ui.debug('payload chunk size: %i\n' % payloadsize) |
781 self._payloadstream = util.chunkbuffer(payloadchunks()) |
841 self._payloadstream = util.chunkbuffer(payloadchunks()) |
782 # we read the data, tell it |
842 # we read the data, tell it |
783 self._initialized = True |
843 self._initialized = True |