mercurial/bundle2.py
branchstable
changeset 23066 ad144882318d
parent 23029 149fc8a44184
child 23067 420a051616ce
equal deleted inserted replaced
23065:963f311e3a81 23066:ad144882318d
   693                 yield chunk
   693                 yield chunk
   694                 chunk = buff.read(preferedchunksize)
   694                 chunk = buff.read(preferedchunksize)
   695         elif len(self.data):
   695         elif len(self.data):
   696             yield self.data
   696             yield self.data
   697 
   697 
       
   698 
       
   699 flaginterrupt = -1
       
   700 
       
   701 class interrupthandler(unpackermixin):
       
   702     """read one part and process it with restricted capability
       
   703 
       
   704     This allows to transmit exception raised on the producer size during part
       
   705     iteration while the consumer is reading a part.
       
   706 
       
   707     Part processed in this manner only have access to a ui object,"""
       
   708 
       
   709     def __init__(self, ui, fp):
       
   710         super(interrupthandler, self).__init__(fp)
       
   711         self.ui = ui
       
   712 
       
   713     def _readpartheader(self):
       
   714         """reads a part header size and return the bytes blob
       
   715 
       
   716         returns None if empty"""
       
   717         headersize = self._unpack(_fpartheadersize)[0]
       
   718         if headersize < 0:
       
   719             raise error.BundleValueError('negative part header size: %i'
       
   720                                          % headersize)
       
   721         self.ui.debug('part header size: %i\n' % headersize)
       
   722         if headersize:
       
   723             return self._readexact(headersize)
       
   724         return None
       
   725 
       
   726     def __call__(self):
       
   727         self.ui.debug('bundle2 stream interruption, looking for a part.\n')
       
   728         headerblock = self._readpartheader()
       
   729         if headerblock is None:
       
   730             self.ui.debug('no part found during iterruption.\n')
       
   731             return
       
   732         part = unbundlepart(self.ui, headerblock, self._fp)
       
   733         op = interruptoperation(self.ui)
       
   734         _processpart(op, part)
       
   735 
       
   736 class interruptoperation(object):
       
   737     """A limited operation to be use by part handler during interruption
       
   738 
       
   739     It only have access to an ui object.
       
   740     """
       
   741 
       
   742     def __init__(self, ui):
       
   743         self.ui = ui
       
   744         self.reply = None
       
   745 
       
   746     @property
       
   747     def repo(self):
       
   748         raise RuntimeError('no repo access from stream interruption')
       
   749 
       
   750     def gettransaction(self):
       
   751         raise TransactionUnavailable('no repo access from stream interruption')
       
   752 
   698 class unbundlepart(unpackermixin):
   753 class unbundlepart(unpackermixin):
   699     """a bundle part read from a bundle"""
   754     """a bundle part read from a bundle"""
   700 
   755 
   701     def __init__(self, ui, header, fp):
   756     def __init__(self, ui, header, fp):
   702         super(unbundlepart, self).__init__(fp)
   757         super(unbundlepart, self).__init__(fp)
   770         ## part payload
   825         ## part payload
   771         def payloadchunks():
   826         def payloadchunks():
   772             payloadsize = self._unpack(_fpayloadsize)[0]
   827             payloadsize = self._unpack(_fpayloadsize)[0]
   773             self.ui.debug('payload chunk size: %i\n' % payloadsize)
   828             self.ui.debug('payload chunk size: %i\n' % payloadsize)
   774             while payloadsize:
   829             while payloadsize:
   775                 if payloadsize < 0:
   830                 if payloadsize == flaginterrupt:
   776                     msg = 'negative payload chunk size: %i' % payloadsize
   831                     # interruption detection, the handler will now read a
       
   832                     # single part and process it.
       
   833                     interrupthandler(self.ui, self._fp)()
       
   834                 elif payloadsize < 0:
       
   835                     msg = 'negative payload chunk size: %i' %  payloadsize
   777                     raise error.BundleValueError(msg)
   836                     raise error.BundleValueError(msg)
   778                 yield self._readexact(payloadsize)
   837                 else:
       
   838                     yield self._readexact(payloadsize)
   779                 payloadsize = self._unpack(_fpayloadsize)[0]
   839                 payloadsize = self._unpack(_fpayloadsize)[0]
   780                 self.ui.debug('payload chunk size: %i\n' % payloadsize)
   840                 self.ui.debug('payload chunk size: %i\n' % payloadsize)
   781         self._payloadstream = util.chunkbuffer(payloadchunks())
   841         self._payloadstream = util.chunkbuffer(payloadchunks())
   782         # we read the data, tell it
   842         # we read the data, tell it
   783         self._initialized = True
   843         self._initialized = True