mercurial/utils/compression.py
changeset 43076 2372284d9457
parent 42044 bb271ec2fbfb
child 43077 687b865b95ad
equal deleted inserted replaced
43075:57875cf423c9 43076:2372284d9457
    13 from .. import (
    13 from .. import (
    14     error,
    14     error,
    15     i18n,
    15     i18n,
    16     pycompat,
    16     pycompat,
    17 )
    17 )
    18 from . import (
    18 from . import stringutil
    19     stringutil,
       
    20 )
       
    21 
    19 
    22 safehasattr = pycompat.safehasattr
    20 safehasattr = pycompat.safehasattr
    23 
    21 
    24 
    22 
    25 _ = i18n._
    23 _ = i18n._
    27 # compression code
    25 # compression code
    28 
    26 
    29 SERVERROLE = 'server'
    27 SERVERROLE = 'server'
    30 CLIENTROLE = 'client'
    28 CLIENTROLE = 'client'
    31 
    29 
    32 compewireprotosupport = collections.namedtuple(r'compenginewireprotosupport',
    30 compewireprotosupport = collections.namedtuple(
    33                                                (r'name', r'serverpriority',
    31     r'compenginewireprotosupport',
    34                                                 r'clientpriority'))
    32     (r'name', r'serverpriority', r'clientpriority'),
       
    33 )
       
    34 
    35 
    35 
    36 class propertycache(object):
    36 class propertycache(object):
    37     def __init__(self, func):
    37     def __init__(self, func):
    38         self.func = func
    38         self.func = func
    39         self.name = func.__name__
    39         self.name = func.__name__
       
    40 
    40     def __get__(self, obj, type=None):
    41     def __get__(self, obj, type=None):
    41         result = self.func(obj)
    42         result = self.func(obj)
    42         self.cachevalue(obj, result)
    43         self.cachevalue(obj, result)
    43         return result
    44         return result
    44 
    45 
    45     def cachevalue(self, obj, value):
    46     def cachevalue(self, obj, value):
    46         # __dict__ assignment required to bypass __setattr__ (eg: repoview)
    47         # __dict__ assignment required to bypass __setattr__ (eg: repoview)
    47         obj.__dict__[self.name] = value
    48         obj.__dict__[self.name] = value
    48 
    49 
       
    50 
    49 class compressormanager(object):
    51 class compressormanager(object):
    50     """Holds registrations of various compression engines.
    52     """Holds registrations of various compression engines.
    51 
    53 
    52     This class essentially abstracts the differences between compression
    54     This class essentially abstracts the differences between compression
    53     engines to allow new compression formats to be added easily, possibly from
    55     engines to allow new compression formats to be added easily, possibly from
    54     extensions.
    56     extensions.
    55 
    57 
    56     Compressors are registered against the global instance by calling its
    58     Compressors are registered against the global instance by calling its
    57     ``register()`` method.
    59     ``register()`` method.
    58     """
    60     """
       
    61 
    59     def __init__(self):
    62     def __init__(self):
    60         self._engines = {}
    63         self._engines = {}
    61         # Bundle spec human name to engine name.
    64         # Bundle spec human name to engine name.
    62         self._bundlenames = {}
    65         self._bundlenames = {}
    63         # Internal bundle identifier to engine name.
    66         # Internal bundle identifier to engine name.
    85             raise ValueError(_('argument must be a compressionengine'))
    88             raise ValueError(_('argument must be a compressionengine'))
    86 
    89 
    87         name = engine.name()
    90         name = engine.name()
    88 
    91 
    89         if name in self._engines:
    92         if name in self._engines:
    90             raise error.Abort(_('compression engine %s already registered') %
    93             raise error.Abort(
    91                               name)
    94                 _('compression engine %s already registered') % name
       
    95             )
    92 
    96 
    93         bundleinfo = engine.bundletype()
    97         bundleinfo = engine.bundletype()
    94         if bundleinfo:
    98         if bundleinfo:
    95             bundlename, bundletype = bundleinfo
    99             bundlename, bundletype = bundleinfo
    96 
   100 
    97             if bundlename in self._bundlenames:
   101             if bundlename in self._bundlenames:
    98                 raise error.Abort(_('bundle name %s already registered') %
   102                 raise error.Abort(
    99                                   bundlename)
   103                     _('bundle name %s already registered') % bundlename
       
   104                 )
   100             if bundletype in self._bundletypes:
   105             if bundletype in self._bundletypes:
   101                 raise error.Abort(_('bundle type %s already registered by %s') %
   106                 raise error.Abort(
   102                                   (bundletype, self._bundletypes[bundletype]))
   107                     _('bundle type %s already registered by %s')
       
   108                     % (bundletype, self._bundletypes[bundletype])
       
   109                 )
   103 
   110 
   104             # No external facing name declared.
   111             # No external facing name declared.
   105             if bundlename:
   112             if bundlename:
   106                 self._bundlenames[bundlename] = name
   113                 self._bundlenames[bundlename] = name
   107 
   114 
   109 
   116 
   110         wiresupport = engine.wireprotosupport()
   117         wiresupport = engine.wireprotosupport()
   111         if wiresupport:
   118         if wiresupport:
   112             wiretype = wiresupport.name
   119             wiretype = wiresupport.name
   113             if wiretype in self._wiretypes:
   120             if wiretype in self._wiretypes:
   114                 raise error.Abort(_('wire protocol compression %s already '
   121                 raise error.Abort(
   115                                     'registered by %s') %
   122                     _(
   116                                   (wiretype, self._wiretypes[wiretype]))
   123                         'wire protocol compression %s already '
       
   124                         'registered by %s'
       
   125                     )
       
   126                     % (wiretype, self._wiretypes[wiretype])
       
   127                 )
   117 
   128 
   118             self._wiretypes[wiretype] = name
   129             self._wiretypes[wiretype] = name
   119 
   130 
   120         revlogheader = engine.revlogheader()
   131         revlogheader = engine.revlogheader()
   121         if revlogheader and revlogheader in self._revlogheaders:
   132         if revlogheader and revlogheader in self._revlogheaders:
   122             raise error.Abort(_('revlog header %s already registered by %s') %
   133             raise error.Abort(
   123                               (revlogheader, self._revlogheaders[revlogheader]))
   134                 _('revlog header %s already registered by %s')
       
   135                 % (revlogheader, self._revlogheaders[revlogheader])
       
   136             )
   124 
   137 
   125         if revlogheader:
   138         if revlogheader:
   126             self._revlogheaders[revlogheader] = name
   139             self._revlogheaders[revlogheader] = name
   127 
   140 
   128         self._engines[name] = engine
   141         self._engines[name] = engine
   142 
   155 
   143         Will abort if the engine is known but not available.
   156         Will abort if the engine is known but not available.
   144         """
   157         """
   145         engine = self._engines[self._bundlenames[bundlename]]
   158         engine = self._engines[self._bundlenames[bundlename]]
   146         if not engine.available():
   159         if not engine.available():
   147             raise error.Abort(_('compression engine %s could not be loaded') %
   160             raise error.Abort(
   148                               engine.name())
   161                 _('compression engine %s could not be loaded') % engine.name()
       
   162             )
   149         return engine
   163         return engine
   150 
   164 
   151     def forbundletype(self, bundletype):
   165     def forbundletype(self, bundletype):
   152         """Obtain a compression engine registered to a bundle type.
   166         """Obtain a compression engine registered to a bundle type.
   153 
   167 
   155 
   169 
   156         Will abort if the engine is known but not available.
   170         Will abort if the engine is known but not available.
   157         """
   171         """
   158         engine = self._engines[self._bundletypes[bundletype]]
   172         engine = self._engines[self._bundletypes[bundletype]]
   159         if not engine.available():
   173         if not engine.available():
   160             raise error.Abort(_('compression engine %s could not be loaded') %
   174             raise error.Abort(
   161                               engine.name())
   175                 _('compression engine %s could not be loaded') % engine.name()
       
   176             )
   162         return engine
   177         return engine
   163 
   178 
   164     def supportedwireengines(self, role, onlyavailable=True):
   179     def supportedwireengines(self, role, onlyavailable=True):
   165         """Obtain compression engines that support the wire protocol.
   180         """Obtain compression engines that support the wire protocol.
   166 
   181 
   187         return list(sorted(engines, key=getkey))
   202         return list(sorted(engines, key=getkey))
   188 
   203 
   189     def forwiretype(self, wiretype):
   204     def forwiretype(self, wiretype):
   190         engine = self._engines[self._wiretypes[wiretype]]
   205         engine = self._engines[self._wiretypes[wiretype]]
   191         if not engine.available():
   206         if not engine.available():
   192             raise error.Abort(_('compression engine %s could not be loaded') %
   207             raise error.Abort(
   193                               engine.name())
   208                 _('compression engine %s could not be loaded') % engine.name()
       
   209             )
   194         return engine
   210         return engine
   195 
   211 
   196     def forrevlogheader(self, header):
   212     def forrevlogheader(self, header):
   197         """Obtain a compression engine registered to a revlog header.
   213         """Obtain a compression engine registered to a revlog header.
   198 
   214 
   199         Will raise KeyError if the revlog header value isn't registered.
   215         Will raise KeyError if the revlog header value isn't registered.
   200         """
   216         """
   201         return self._engines[self._revlogheaders[header]]
   217         return self._engines[self._revlogheaders[header]]
   202 
   218 
       
   219 
   203 compengines = compressormanager()
   220 compengines = compressormanager()
       
   221 
   204 
   222 
   205 class compressionengine(object):
   223 class compressionengine(object):
   206     """Base class for compression engines.
   224     """Base class for compression engines.
   207 
   225 
   208     Compression engines must implement the interface defined by this class.
   226     Compression engines must implement the interface defined by this class.
   209     """
   227     """
       
   228 
   210     def name(self):
   229     def name(self):
   211         """Returns the name of the compression engine.
   230         """Returns the name of the compression engine.
   212 
   231 
   213         This is the key the engine is registered under.
   232         This is the key the engine is registered under.
   214 
   233 
   316         data or raise a ``StorageError``.
   335         data or raise a ``StorageError``.
   317 
   336 
   318         The object is reusable but is not thread safe.
   337         The object is reusable but is not thread safe.
   319         """
   338         """
   320         raise NotImplementedError()
   339         raise NotImplementedError()
       
   340 
   321 
   341 
   322 class _CompressedStreamReader(object):
   342 class _CompressedStreamReader(object):
   323     def __init__(self, fh):
   343     def __init__(self, fh):
   324         if safehasattr(fh, 'unbufferedread'):
   344         if safehasattr(fh, 'unbufferedread'):
   325             self._reader = fh.unbufferedread
   345             self._reader = fh.unbufferedread
   336         buf = []
   356         buf = []
   337         while True:
   357         while True:
   338             while self._pending:
   358             while self._pending:
   339                 if len(self._pending[0]) > l + self._pos:
   359                 if len(self._pending[0]) > l + self._pos:
   340                     newbuf = self._pending[0]
   360                     newbuf = self._pending[0]
   341                     buf.append(newbuf[self._pos:self._pos + l])
   361                     buf.append(newbuf[self._pos : self._pos + l])
   342                     self._pos += l
   362                     self._pos += l
   343                     return ''.join(buf)
   363                     return ''.join(buf)
   344 
   364 
   345                 newbuf = self._pending.pop(0)
   365                 newbuf = self._pending.pop(0)
   346                 if self._pos:
   366                 if self._pos:
   347                     buf.append(newbuf[self._pos:])
   367                     buf.append(newbuf[self._pos :])
   348                     l -= len(newbuf) - self._pos
   368                     l -= len(newbuf) - self._pos
   349                 else:
   369                 else:
   350                     buf.append(newbuf)
   370                     buf.append(newbuf)
   351                     l -= len(newbuf)
   371                     l -= len(newbuf)
   352                 self._pos = 0
   372                 self._pos = 0
   357             self._decompress(chunk)
   377             self._decompress(chunk)
   358             if not chunk and not self._pending and not self._eof:
   378             if not chunk and not self._pending and not self._eof:
   359                 # No progress and no new data, bail out
   379                 # No progress and no new data, bail out
   360                 return ''.join(buf)
   380                 return ''.join(buf)
   361 
   381 
       
   382 
   362 class _GzipCompressedStreamReader(_CompressedStreamReader):
   383 class _GzipCompressedStreamReader(_CompressedStreamReader):
   363     def __init__(self, fh):
   384     def __init__(self, fh):
   364         super(_GzipCompressedStreamReader, self).__init__(fh)
   385         super(_GzipCompressedStreamReader, self).__init__(fh)
   365         self._decompobj = zlib.decompressobj()
   386         self._decompobj = zlib.decompressobj()
       
   387 
   366     def _decompress(self, chunk):
   388     def _decompress(self, chunk):
   367         newbuf = self._decompobj.decompress(chunk)
   389         newbuf = self._decompobj.decompress(chunk)
   368         if newbuf:
   390         if newbuf:
   369             self._pending.append(newbuf)
   391             self._pending.append(newbuf)
   370         d = self._decompobj.copy()
   392         d = self._decompobj.copy()
   374             if d.unused_data == 'x':
   396             if d.unused_data == 'x':
   375                 self._eof = True
   397                 self._eof = True
   376         except zlib.error:
   398         except zlib.error:
   377             pass
   399             pass
   378 
   400 
       
   401 
   379 class _BZ2CompressedStreamReader(_CompressedStreamReader):
   402 class _BZ2CompressedStreamReader(_CompressedStreamReader):
   380     def __init__(self, fh):
   403     def __init__(self, fh):
   381         super(_BZ2CompressedStreamReader, self).__init__(fh)
   404         super(_BZ2CompressedStreamReader, self).__init__(fh)
   382         self._decompobj = bz2.BZ2Decompressor()
   405         self._decompobj = bz2.BZ2Decompressor()
       
   406 
   383     def _decompress(self, chunk):
   407     def _decompress(self, chunk):
   384         newbuf = self._decompobj.decompress(chunk)
   408         newbuf = self._decompobj.decompress(chunk)
   385         if newbuf:
   409         if newbuf:
   386             self._pending.append(newbuf)
   410             self._pending.append(newbuf)
   387         try:
   411         try:
   392                 else:
   416                 else:
   393                     break
   417                     break
   394         except EOFError:
   418         except EOFError:
   395             self._eof = True
   419             self._eof = True
   396 
   420 
       
   421 
   397 class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader):
   422 class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader):
   398     def __init__(self, fh):
   423     def __init__(self, fh):
   399         super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh)
   424         super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh)
   400         newbuf = self._decompobj.decompress('BZ')
   425         newbuf = self._decompobj.decompress('BZ')
   401         if newbuf:
   426         if newbuf:
   402             self._pending.append(newbuf)
   427             self._pending.append(newbuf)
   403 
   428 
       
   429 
   404 class _ZstdCompressedStreamReader(_CompressedStreamReader):
   430 class _ZstdCompressedStreamReader(_CompressedStreamReader):
   405     def __init__(self, fh, zstd):
   431     def __init__(self, fh, zstd):
   406         super(_ZstdCompressedStreamReader, self).__init__(fh)
   432         super(_ZstdCompressedStreamReader, self).__init__(fh)
   407         self._zstd = zstd
   433         self._zstd = zstd
   408         self._decompobj = zstd.ZstdDecompressor().decompressobj()
   434         self._decompobj = zstd.ZstdDecompressor().decompressobj()
       
   435 
   409     def _decompress(self, chunk):
   436     def _decompress(self, chunk):
   410         newbuf = self._decompobj.decompress(chunk)
   437         newbuf = self._decompobj.decompress(chunk)
   411         if newbuf:
   438         if newbuf:
   412             self._pending.append(newbuf)
   439             self._pending.append(newbuf)
   413         try:
   440         try:
   418                 else:
   445                 else:
   419                     break
   446                     break
   420         except self._zstd.ZstdError:
   447         except self._zstd.ZstdError:
   421             self._eof = True
   448             self._eof = True
   422 
   449 
       
   450 
   423 class _zlibengine(compressionengine):
   451 class _zlibengine(compressionengine):
   424     def name(self):
   452     def name(self):
   425         return 'zlib'
   453         return 'zlib'
   426 
   454 
   427     def bundletype(self):
   455     def bundletype(self):
   454 
   482 
   455     def decompressorreader(self, fh):
   483     def decompressorreader(self, fh):
   456         return _GzipCompressedStreamReader(fh)
   484         return _GzipCompressedStreamReader(fh)
   457 
   485 
   458     class zlibrevlogcompressor(object):
   486     class zlibrevlogcompressor(object):
   459 
       
   460         def __init__(self, level=None):
   487         def __init__(self, level=None):
   461             self._level = level
   488             self._level = level
   462 
   489 
   463         def compress(self, data):
   490         def compress(self, data):
   464             insize = len(data)
   491             insize = len(data)
   486                 else:
   513                 else:
   487                     z = zlib.compressobj(level=self._level)
   514                     z = zlib.compressobj(level=self._level)
   488                 parts = []
   515                 parts = []
   489                 pos = 0
   516                 pos = 0
   490                 while pos < insize:
   517                 while pos < insize:
   491                     pos2 = pos + 2**20
   518                     pos2 = pos + 2 ** 20
   492                     parts.append(z.compress(data[pos:pos2]))
   519                     parts.append(z.compress(data[pos:pos2]))
   493                     pos = pos2
   520                     pos = pos2
   494                 parts.append(z.flush())
   521                 parts.append(z.flush())
   495 
   522 
   496                 if sum(map(len, parts)) < insize:
   523                 if sum(map(len, parts)) < insize:
   499 
   526 
   500         def decompress(self, data):
   527         def decompress(self, data):
   501             try:
   528             try:
   502                 return zlib.decompress(data)
   529                 return zlib.decompress(data)
   503             except zlib.error as e:
   530             except zlib.error as e:
   504                 raise error.StorageError(_('revlog decompress error: %s') %
   531                 raise error.StorageError(
   505                                          stringutil.forcebytestr(e))
   532                     _('revlog decompress error: %s')
       
   533                     % stringutil.forcebytestr(e)
       
   534                 )
   506 
   535 
   507     def revlogcompressor(self, opts=None):
   536     def revlogcompressor(self, opts=None):
   508         level = None
   537         level = None
   509         if opts is not None:
   538         if opts is not None:
   510             level = opts.get('zlib.level')
   539             level = opts.get('zlib.level')
   511         return self.zlibrevlogcompressor(level)
   540         return self.zlibrevlogcompressor(level)
   512 
   541 
       
   542 
   513 compengines.register(_zlibengine())
   543 compengines.register(_zlibengine())
       
   544 
   514 
   545 
   515 class _bz2engine(compressionengine):
   546 class _bz2engine(compressionengine):
   516     def name(self):
   547     def name(self):
   517         return 'bz2'
   548         return 'bz2'
   518 
   549 
   546         yield z.flush()
   577         yield z.flush()
   547 
   578 
   548     def decompressorreader(self, fh):
   579     def decompressorreader(self, fh):
   549         return _BZ2CompressedStreamReader(fh)
   580         return _BZ2CompressedStreamReader(fh)
   550 
   581 
       
   582 
   551 compengines.register(_bz2engine())
   583 compengines.register(_bz2engine())
       
   584 
   552 
   585 
   553 class _truncatedbz2engine(compressionengine):
   586 class _truncatedbz2engine(compressionengine):
   554     def name(self):
   587     def name(self):
   555         return 'bz2truncated'
   588         return 'bz2truncated'
   556 
   589 
   560     # We don't implement compressstream because it is hackily handled elsewhere.
   593     # We don't implement compressstream because it is hackily handled elsewhere.
   561 
   594 
   562     def decompressorreader(self, fh):
   595     def decompressorreader(self, fh):
   563         return _TruncatedBZ2CompressedStreamReader(fh)
   596         return _TruncatedBZ2CompressedStreamReader(fh)
   564 
   597 
       
   598 
   565 compengines.register(_truncatedbz2engine())
   599 compengines.register(_truncatedbz2engine())
       
   600 
   566 
   601 
   567 class _noopengine(compressionengine):
   602 class _noopengine(compressionengine):
   568     def name(self):
   603     def name(self):
   569         return 'none'
   604         return 'none'
   570 
   605 
   595             return None
   630             return None
   596 
   631 
   597     def revlogcompressor(self, opts=None):
   632     def revlogcompressor(self, opts=None):
   598         return self.nooprevlogcompressor()
   633         return self.nooprevlogcompressor()
   599 
   634 
       
   635 
   600 compengines.register(_noopengine())
   636 compengines.register(_noopengine())
       
   637 
   601 
   638 
   602 class _zstdengine(compressionengine):
   639 class _zstdengine(compressionengine):
   603     def name(self):
   640     def name(self):
   604         return 'zstd'
   641         return 'zstd'
   605 
   642 
   607     def _module(self):
   644     def _module(self):
   608         # Not all installs have the zstd module available. So defer importing
   645         # Not all installs have the zstd module available. So defer importing
   609         # until first access.
   646         # until first access.
   610         try:
   647         try:
   611             from .. import zstd
   648             from .. import zstd
       
   649 
   612             # Force delayed import.
   650             # Force delayed import.
   613             zstd.__version__
   651             zstd.__version__
   614             return zstd
   652             return zstd
   615         except ImportError:
   653         except ImportError:
   616             return None
   654             return None
   714                     pos = pos2
   752                     pos = pos2
   715                 # Frame should be exhausted, so no finish() API.
   753                 # Frame should be exhausted, so no finish() API.
   716 
   754 
   717                 return ''.join(chunks)
   755                 return ''.join(chunks)
   718             except Exception as e:
   756             except Exception as e:
   719                 raise error.StorageError(_('revlog decompress error: %s') %
   757                 raise error.StorageError(
   720                                          stringutil.forcebytestr(e))
   758                     _('revlog decompress error: %s')
       
   759                     % stringutil.forcebytestr(e)
       
   760                 )
   721 
   761 
   722     def revlogcompressor(self, opts=None):
   762     def revlogcompressor(self, opts=None):
   723         opts = opts or {}
   763         opts = opts or {}
   724         level = opts.get('zstd.level')
   764         level = opts.get('zstd.level')
   725         if level is None:
   765         if level is None:
   726             level = opts.get('level')
   766             level = opts.get('level')
   727         if level is None:
   767         if level is None:
   728             level = 3
   768             level = 3
   729         return self.zstdrevlogcompressor(self._module, level=level)
   769         return self.zstdrevlogcompressor(self._module, level=level)
   730 
   770 
       
   771 
   731 compengines.register(_zstdengine())
   772 compengines.register(_zstdengine())
       
   773 
   732 
   774 
   733 def bundlecompressiontopics():
   775 def bundlecompressiontopics():
   734     """Obtains a list of available bundle compressions for use in help."""
   776     """Obtains a list of available bundle compressions for use in help."""
   735     # help.makeitemsdocs() expects a dict of names to items with a .__doc__.
   777     # help.makeitemsdocs() expects a dict of names to items with a .__doc__.
   736     items = {}
   778     items = {}
   759 
   801 
   760         items[bt[0]] = value
   802         items[bt[0]] = value
   761 
   803 
   762     return items
   804     return items
   763 
   805 
       
   806 
   764 i18nfunctions = bundlecompressiontopics().values()
   807 i18nfunctions = bundlecompressiontopics().values()