mercurial/utils/compression.py
changeset 42041 3e47d1ec9da5
parent 41834 7f63ec6969f3
child 42042 aaececb4b066
equal deleted inserted replaced
42040:02fa567f8a3c 42041:3e47d1ec9da5
       
     1 # compression.py - Mercurial utility functions for compression
       
     2 #
       
     3 # This software may be used and distributed according to the terms of the
       
     4 # GNU General Public License version 2 or any later version.
       
     5 
       
     6 
       
     7 from __future__ import absolute_import, print_function
       
     8 
       
     9 import bz2
       
    10 import collections
       
    11 import zlib
       
    12 
       
    13 from .. import (
       
    14     error,
       
    15     i18n,
       
    16     pycompat,
       
    17 )
       
    18 from . import (
       
    19     stringutil,
       
    20 )
       
    21 
       
    22 safehasattr = pycompat.safehasattr
       
    23 
       
    24 
       
    25 _ = i18n._
       
    26 
       
    27 # compression code
       
    28 
       
    29 SERVERROLE = 'server'
       
    30 CLIENTROLE = 'client'
       
    31 
       
    32 compewireprotosupport = collections.namedtuple(r'compenginewireprotosupport',
       
    33                                                (r'name', r'serverpriority',
       
    34                                                 r'clientpriority'))
       
    35 
       
    36 class propertycache(object):
       
    37     def __init__(self, func):
       
    38         self.func = func
       
    39         self.name = func.__name__
       
    40     def __get__(self, obj, type=None):
       
    41         result = self.func(obj)
       
    42         self.cachevalue(obj, result)
       
    43         return result
       
    44 
       
    45     def cachevalue(self, obj, value):
       
    46         # __dict__ assignment required to bypass __setattr__ (eg: repoview)
       
    47         obj.__dict__[self.name] = value
       
    48 
       
    49 class compressormanager(object):
       
    50     """Holds registrations of various compression engines.
       
    51 
       
    52     This class essentially abstracts the differences between compression
       
    53     engines to allow new compression formats to be added easily, possibly from
       
    54     extensions.
       
    55 
       
    56     Compressors are registered against the global instance by calling its
       
    57     ``register()`` method.
       
    58     """
       
    59     def __init__(self):
       
    60         self._engines = {}
       
    61         # Bundle spec human name to engine name.
       
    62         self._bundlenames = {}
       
    63         # Internal bundle identifier to engine name.
       
    64         self._bundletypes = {}
       
    65         # Revlog header to engine name.
       
    66         self._revlogheaders = {}
       
    67         # Wire proto identifier to engine name.
       
    68         self._wiretypes = {}
       
    69 
       
    70     def __getitem__(self, key):
       
    71         return self._engines[key]
       
    72 
       
    73     def __contains__(self, key):
       
    74         return key in self._engines
       
    75 
       
    76     def __iter__(self):
       
    77         return iter(self._engines.keys())
       
    78 
       
    79     def register(self, engine):
       
    80         """Register a compression engine with the manager.
       
    81 
       
    82         The argument must be a ``compressionengine`` instance.
       
    83         """
       
    84         if not isinstance(engine, compressionengine):
       
    85             raise ValueError(_('argument must be a compressionengine'))
       
    86 
       
    87         name = engine.name()
       
    88 
       
    89         if name in self._engines:
       
    90             raise error.Abort(_('compression engine %s already registered') %
       
    91                               name)
       
    92 
       
    93         bundleinfo = engine.bundletype()
       
    94         if bundleinfo:
       
    95             bundlename, bundletype = bundleinfo
       
    96 
       
    97             if bundlename in self._bundlenames:
       
    98                 raise error.Abort(_('bundle name %s already registered') %
       
    99                                   bundlename)
       
   100             if bundletype in self._bundletypes:
       
   101                 raise error.Abort(_('bundle type %s already registered by %s') %
       
   102                                   (bundletype, self._bundletypes[bundletype]))
       
   103 
       
   104             # No external facing name declared.
       
   105             if bundlename:
       
   106                 self._bundlenames[bundlename] = name
       
   107 
       
   108             self._bundletypes[bundletype] = name
       
   109 
       
   110         wiresupport = engine.wireprotosupport()
       
   111         if wiresupport:
       
   112             wiretype = wiresupport.name
       
   113             if wiretype in self._wiretypes:
       
   114                 raise error.Abort(_('wire protocol compression %s already '
       
   115                                     'registered by %s') %
       
   116                                   (wiretype, self._wiretypes[wiretype]))
       
   117 
       
   118             self._wiretypes[wiretype] = name
       
   119 
       
   120         revlogheader = engine.revlogheader()
       
   121         if revlogheader and revlogheader in self._revlogheaders:
       
   122             raise error.Abort(_('revlog header %s already registered by %s') %
       
   123                               (revlogheader, self._revlogheaders[revlogheader]))
       
   124 
       
   125         if revlogheader:
       
   126             self._revlogheaders[revlogheader] = name
       
   127 
       
   128         self._engines[name] = engine
       
   129 
       
   130     @property
       
   131     def supportedbundlenames(self):
       
   132         return set(self._bundlenames.keys())
       
   133 
       
   134     @property
       
   135     def supportedbundletypes(self):
       
   136         return set(self._bundletypes.keys())
       
   137 
       
   138     def forbundlename(self, bundlename):
       
   139         """Obtain a compression engine registered to a bundle name.
       
   140 
       
   141         Will raise KeyError if the bundle type isn't registered.
       
   142 
       
   143         Will abort if the engine is known but not available.
       
   144         """
       
   145         engine = self._engines[self._bundlenames[bundlename]]
       
   146         if not engine.available():
       
   147             raise error.Abort(_('compression engine %s could not be loaded') %
       
   148                               engine.name())
       
   149         return engine
       
   150 
       
   151     def forbundletype(self, bundletype):
       
   152         """Obtain a compression engine registered to a bundle type.
       
   153 
       
   154         Will raise KeyError if the bundle type isn't registered.
       
   155 
       
   156         Will abort if the engine is known but not available.
       
   157         """
       
   158         engine = self._engines[self._bundletypes[bundletype]]
       
   159         if not engine.available():
       
   160             raise error.Abort(_('compression engine %s could not be loaded') %
       
   161                               engine.name())
       
   162         return engine
       
   163 
       
   164     def supportedwireengines(self, role, onlyavailable=True):
       
   165         """Obtain compression engines that support the wire protocol.
       
   166 
       
   167         Returns a list of engines in prioritized order, most desired first.
       
   168 
       
   169         If ``onlyavailable`` is set, filter out engines that can't be
       
   170         loaded.
       
   171         """
       
   172         assert role in (SERVERROLE, CLIENTROLE)
       
   173 
       
   174         attr = 'serverpriority' if role == SERVERROLE else 'clientpriority'
       
   175 
       
   176         engines = [self._engines[e] for e in self._wiretypes.values()]
       
   177         if onlyavailable:
       
   178             engines = [e for e in engines if e.available()]
       
   179 
       
   180         def getkey(e):
       
   181             # Sort first by priority, highest first. In case of tie, sort
       
   182             # alphabetically. This is arbitrary, but ensures output is
       
   183             # stable.
       
   184             w = e.wireprotosupport()
       
   185             return -1 * getattr(w, attr), w.name
       
   186 
       
   187         return list(sorted(engines, key=getkey))
       
   188 
       
   189     def forwiretype(self, wiretype):
       
   190         engine = self._engines[self._wiretypes[wiretype]]
       
   191         if not engine.available():
       
   192             raise error.Abort(_('compression engine %s could not be loaded') %
       
   193                               engine.name())
       
   194         return engine
       
   195 
       
   196     def forrevlogheader(self, header):
       
   197         """Obtain a compression engine registered to a revlog header.
       
   198 
       
   199         Will raise KeyError if the revlog header value isn't registered.
       
   200         """
       
   201         return self._engines[self._revlogheaders[header]]
       
   202 
       
   203 compengines = compressormanager()
       
   204 
       
   205 class compressionengine(object):
       
   206     """Base class for compression engines.
       
   207 
       
   208     Compression engines must implement the interface defined by this class.
       
   209     """
       
   210     def name(self):
       
   211         """Returns the name of the compression engine.
       
   212 
       
   213         This is the key the engine is registered under.
       
   214 
       
   215         This method must be implemented.
       
   216         """
       
   217         raise NotImplementedError()
       
   218 
       
   219     def available(self):
       
   220         """Whether the compression engine is available.
       
   221 
       
   222         The intent of this method is to allow optional compression engines
       
   223         that may not be available in all installations (such as engines relying
       
   224         on C extensions that may not be present).
       
   225         """
       
   226         return True
       
   227 
       
   228     def bundletype(self):
       
   229         """Describes bundle identifiers for this engine.
       
   230 
       
   231         If this compression engine isn't supported for bundles, returns None.
       
   232 
       
   233         If this engine can be used for bundles, returns a 2-tuple of strings of
       
   234         the user-facing "bundle spec" compression name and an internal
       
   235         identifier used to denote the compression format within bundles. To
       
   236         exclude the name from external usage, set the first element to ``None``.
       
   237 
       
   238         If bundle compression is supported, the class must also implement
       
   239         ``compressstream`` and `decompressorreader``.
       
   240 
       
   241         The docstring of this method is used in the help system to tell users
       
   242         about this engine.
       
   243         """
       
   244         return None
       
   245 
       
   246     def wireprotosupport(self):
       
   247         """Declare support for this compression format on the wire protocol.
       
   248 
       
   249         If this compression engine isn't supported for compressing wire
       
   250         protocol payloads, returns None.
       
   251 
       
   252         Otherwise, returns ``compenginewireprotosupport`` with the following
       
   253         fields:
       
   254 
       
   255         * String format identifier
       
   256         * Integer priority for the server
       
   257         * Integer priority for the client
       
   258 
       
   259         The integer priorities are used to order the advertisement of format
       
   260         support by server and client. The highest integer is advertised
       
   261         first. Integers with non-positive values aren't advertised.
       
   262 
       
   263         The priority values are somewhat arbitrary and only used for default
       
   264         ordering. The relative order can be changed via config options.
       
   265 
       
   266         If wire protocol compression is supported, the class must also implement
       
   267         ``compressstream`` and ``decompressorreader``.
       
   268         """
       
   269         return None
       
   270 
       
   271     def revlogheader(self):
       
   272         """Header added to revlog chunks that identifies this engine.
       
   273 
       
   274         If this engine can be used to compress revlogs, this method should
       
   275         return the bytes used to identify chunks compressed with this engine.
       
   276         Else, the method should return ``None`` to indicate it does not
       
   277         participate in revlog compression.
       
   278         """
       
   279         return None
       
   280 
       
   281     def compressstream(self, it, opts=None):
       
   282         """Compress an iterator of chunks.
       
   283 
       
   284         The method receives an iterator (ideally a generator) of chunks of
       
   285         bytes to be compressed. It returns an iterator (ideally a generator)
       
   286         of bytes of chunks representing the compressed output.
       
   287 
       
   288         Optionally accepts an argument defining how to perform compression.
       
   289         Each engine treats this argument differently.
       
   290         """
       
   291         raise NotImplementedError()
       
   292 
       
   293     def decompressorreader(self, fh):
       
   294         """Perform decompression on a file object.
       
   295 
       
   296         Argument is an object with a ``read(size)`` method that returns
       
   297         compressed data. Return value is an object with a ``read(size)`` that
       
   298         returns uncompressed data.
       
   299         """
       
   300         raise NotImplementedError()
       
   301 
       
   302     def revlogcompressor(self, opts=None):
       
   303         """Obtain an object that can be used to compress revlog entries.
       
   304 
       
   305         The object has a ``compress(data)`` method that compresses binary
       
   306         data. This method returns compressed binary data or ``None`` if
       
   307         the data could not be compressed (too small, not compressible, etc).
       
   308         The returned data should have a header uniquely identifying this
       
   309         compression format so decompression can be routed to this engine.
       
   310         This header should be identified by the ``revlogheader()`` return
       
   311         value.
       
   312 
       
   313         The object has a ``decompress(data)`` method that decompresses
       
   314         data. The method will only be called if ``data`` begins with
       
   315         ``revlogheader()``. The method should return the raw, uncompressed
       
   316         data or raise a ``StorageError``.
       
   317 
       
   318         The object is reusable but is not thread safe.
       
   319         """
       
   320         raise NotImplementedError()
       
   321 
       
   322 class _CompressedStreamReader(object):
       
   323     def __init__(self, fh):
       
   324         if safehasattr(fh, 'unbufferedread'):
       
   325             self._reader = fh.unbufferedread
       
   326         else:
       
   327             self._reader = fh.read
       
   328         self._pending = []
       
   329         self._pos = 0
       
   330         self._eof = False
       
   331 
       
   332     def _decompress(self, chunk):
       
   333         raise NotImplementedError()
       
   334 
       
   335     def read(self, l):
       
   336         buf = []
       
   337         while True:
       
   338             while self._pending:
       
   339                 if len(self._pending[0]) > l + self._pos:
       
   340                     newbuf = self._pending[0]
       
   341                     buf.append(newbuf[self._pos:self._pos + l])
       
   342                     self._pos += l
       
   343                     return ''.join(buf)
       
   344 
       
   345                 newbuf = self._pending.pop(0)
       
   346                 if self._pos:
       
   347                     buf.append(newbuf[self._pos:])
       
   348                     l -= len(newbuf) - self._pos
       
   349                 else:
       
   350                     buf.append(newbuf)
       
   351                     l -= len(newbuf)
       
   352                 self._pos = 0
       
   353 
       
   354             if self._eof:
       
   355                 return ''.join(buf)
       
   356             chunk = self._reader(65536)
       
   357             self._decompress(chunk)
       
   358             if not chunk and not self._pending and not self._eof:
       
   359                 # No progress and no new data, bail out
       
   360                 return ''.join(buf)
       
   361 
       
   362 class _GzipCompressedStreamReader(_CompressedStreamReader):
       
   363     def __init__(self, fh):
       
   364         super(_GzipCompressedStreamReader, self).__init__(fh)
       
   365         self._decompobj = zlib.decompressobj()
       
   366     def _decompress(self, chunk):
       
   367         newbuf = self._decompobj.decompress(chunk)
       
   368         if newbuf:
       
   369             self._pending.append(newbuf)
       
   370         d = self._decompobj.copy()
       
   371         try:
       
   372             d.decompress('x')
       
   373             d.flush()
       
   374             if d.unused_data == 'x':
       
   375                 self._eof = True
       
   376         except zlib.error:
       
   377             pass
       
   378 
       
   379 class _BZ2CompressedStreamReader(_CompressedStreamReader):
       
   380     def __init__(self, fh):
       
   381         super(_BZ2CompressedStreamReader, self).__init__(fh)
       
   382         self._decompobj = bz2.BZ2Decompressor()
       
   383     def _decompress(self, chunk):
       
   384         newbuf = self._decompobj.decompress(chunk)
       
   385         if newbuf:
       
   386             self._pending.append(newbuf)
       
   387         try:
       
   388             while True:
       
   389                 newbuf = self._decompobj.decompress('')
       
   390                 if newbuf:
       
   391                     self._pending.append(newbuf)
       
   392                 else:
       
   393                     break
       
   394         except EOFError:
       
   395             self._eof = True
       
   396 
       
   397 class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader):
       
   398     def __init__(self, fh):
       
   399         super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh)
       
   400         newbuf = self._decompobj.decompress('BZ')
       
   401         if newbuf:
       
   402             self._pending.append(newbuf)
       
   403 
       
   404 class _ZstdCompressedStreamReader(_CompressedStreamReader):
       
   405     def __init__(self, fh, zstd):
       
   406         super(_ZstdCompressedStreamReader, self).__init__(fh)
       
   407         self._zstd = zstd
       
   408         self._decompobj = zstd.ZstdDecompressor().decompressobj()
       
   409     def _decompress(self, chunk):
       
   410         newbuf = self._decompobj.decompress(chunk)
       
   411         if newbuf:
       
   412             self._pending.append(newbuf)
       
   413         try:
       
   414             while True:
       
   415                 newbuf = self._decompobj.decompress('')
       
   416                 if newbuf:
       
   417                     self._pending.append(newbuf)
       
   418                 else:
       
   419                     break
       
   420         except self._zstd.ZstdError:
       
   421             self._eof = True
       
   422 
       
   423 class _zlibengine(compressionengine):
       
   424     def name(self):
       
   425         return 'zlib'
       
   426 
       
   427     def bundletype(self):
       
   428         """zlib compression using the DEFLATE algorithm.
       
   429 
       
   430         All Mercurial clients should support this format. The compression
       
   431         algorithm strikes a reasonable balance between compression ratio
       
   432         and size.
       
   433         """
       
   434         return 'gzip', 'GZ'
       
   435 
       
   436     def wireprotosupport(self):
       
   437         return compewireprotosupport('zlib', 20, 20)
       
   438 
       
   439     def revlogheader(self):
       
   440         return 'x'
       
   441 
       
   442     def compressstream(self, it, opts=None):
       
   443         opts = opts or {}
       
   444 
       
   445         z = zlib.compressobj(opts.get('level', -1))
       
   446         for chunk in it:
       
   447             data = z.compress(chunk)
       
   448             # Not all calls to compress emit data. It is cheaper to inspect
       
   449             # here than to feed empty chunks through generator.
       
   450             if data:
       
   451                 yield data
       
   452 
       
   453         yield z.flush()
       
   454 
       
   455     def decompressorreader(self, fh):
       
   456         return _GzipCompressedStreamReader(fh)
       
   457 
       
   458     class zlibrevlogcompressor(object):
       
   459         def compress(self, data):
       
   460             insize = len(data)
       
   461             # Caller handles empty input case.
       
   462             assert insize > 0
       
   463 
       
   464             if insize < 44:
       
   465                 return None
       
   466 
       
   467             elif insize <= 1000000:
       
   468                 compressed = zlib.compress(data)
       
   469                 if len(compressed) < insize:
       
   470                     return compressed
       
   471                 return None
       
   472 
       
   473             # zlib makes an internal copy of the input buffer, doubling
       
   474             # memory usage for large inputs. So do streaming compression
       
   475             # on large inputs.
       
   476             else:
       
   477                 z = zlib.compressobj()
       
   478                 parts = []
       
   479                 pos = 0
       
   480                 while pos < insize:
       
   481                     pos2 = pos + 2**20
       
   482                     parts.append(z.compress(data[pos:pos2]))
       
   483                     pos = pos2
       
   484                 parts.append(z.flush())
       
   485 
       
   486                 if sum(map(len, parts)) < insize:
       
   487                     return ''.join(parts)
       
   488                 return None
       
   489 
       
   490         def decompress(self, data):
       
   491             try:
       
   492                 return zlib.decompress(data)
       
   493             except zlib.error as e:
       
   494                 raise error.StorageError(_('revlog decompress error: %s') %
       
   495                                          stringutil.forcebytestr(e))
       
   496 
       
   497     def revlogcompressor(self, opts=None):
       
   498         return self.zlibrevlogcompressor()
       
   499 
       
   500 compengines.register(_zlibengine())
       
   501 
       
   502 class _bz2engine(compressionengine):
       
   503     def name(self):
       
   504         return 'bz2'
       
   505 
       
   506     def bundletype(self):
       
   507         """An algorithm that produces smaller bundles than ``gzip``.
       
   508 
       
   509         All Mercurial clients should support this format.
       
   510 
       
   511         This engine will likely produce smaller bundles than ``gzip`` but
       
   512         will be significantly slower, both during compression and
       
   513         decompression.
       
   514 
       
   515         If available, the ``zstd`` engine can yield similar or better
       
   516         compression at much higher speeds.
       
   517         """
       
   518         return 'bzip2', 'BZ'
       
   519 
       
   520     # We declare a protocol name but don't advertise by default because
       
   521     # it is slow.
       
   522     def wireprotosupport(self):
       
   523         return compewireprotosupport('bzip2', 0, 0)
       
   524 
       
   525     def compressstream(self, it, opts=None):
       
   526         opts = opts or {}
       
   527         z = bz2.BZ2Compressor(opts.get('level', 9))
       
   528         for chunk in it:
       
   529             data = z.compress(chunk)
       
   530             if data:
       
   531                 yield data
       
   532 
       
   533         yield z.flush()
       
   534 
       
   535     def decompressorreader(self, fh):
       
   536         return _BZ2CompressedStreamReader(fh)
       
   537 
       
   538 compengines.register(_bz2engine())
       
   539 
       
   540 class _truncatedbz2engine(compressionengine):
       
   541     def name(self):
       
   542         return 'bz2truncated'
       
   543 
       
   544     def bundletype(self):
       
   545         return None, '_truncatedBZ'
       
   546 
       
   547     # We don't implement compressstream because it is hackily handled elsewhere.
       
   548 
       
   549     def decompressorreader(self, fh):
       
   550         return _TruncatedBZ2CompressedStreamReader(fh)
       
   551 
       
   552 compengines.register(_truncatedbz2engine())
       
   553 
       
   554 class _noopengine(compressionengine):
       
   555     def name(self):
       
   556         return 'none'
       
   557 
       
   558     def bundletype(self):
       
   559         """No compression is performed.
       
   560 
       
   561         Use this compression engine to explicitly disable compression.
       
   562         """
       
   563         return 'none', 'UN'
       
   564 
       
   565     # Clients always support uncompressed payloads. Servers don't because
       
   566     # unless you are on a fast network, uncompressed payloads can easily
       
   567     # saturate your network pipe.
       
   568     def wireprotosupport(self):
       
   569         return compewireprotosupport('none', 0, 10)
       
   570 
       
   571     # We don't implement revlogheader because it is handled specially
       
   572     # in the revlog class.
       
   573 
       
   574     def compressstream(self, it, opts=None):
       
   575         return it
       
   576 
       
   577     def decompressorreader(self, fh):
       
   578         return fh
       
   579 
       
   580     class nooprevlogcompressor(object):
       
   581         def compress(self, data):
       
   582             return None
       
   583 
       
   584     def revlogcompressor(self, opts=None):
       
   585         return self.nooprevlogcompressor()
       
   586 
       
   587 compengines.register(_noopengine())
       
   588 
       
   589 class _zstdengine(compressionengine):
       
   590     def name(self):
       
   591         return 'zstd'
       
   592 
       
   593     @propertycache
       
   594     def _module(self):
       
   595         # Not all installs have the zstd module available. So defer importing
       
   596         # until first access.
       
   597         try:
       
   598             from .. import zstd
       
   599             # Force delayed import.
       
   600             zstd.__version__
       
   601             return zstd
       
   602         except ImportError:
       
   603             return None
       
   604 
       
   605     def available(self):
       
   606         return bool(self._module)
       
   607 
       
   608     def bundletype(self):
       
   609         """A modern compression algorithm that is fast and highly flexible.
       
   610 
       
   611         Only supported by Mercurial 4.1 and newer clients.
       
   612 
       
   613         With the default settings, zstd compression is both faster and yields
       
   614         better compression than ``gzip``. It also frequently yields better
       
   615         compression than ``bzip2`` while operating at much higher speeds.
       
   616 
       
   617         If this engine is available and backwards compatibility is not a
       
   618         concern, it is likely the best available engine.
       
   619         """
       
   620         return 'zstd', 'ZS'
       
   621 
       
   622     def wireprotosupport(self):
       
   623         return compewireprotosupport('zstd', 50, 50)
       
   624 
       
   625     def revlogheader(self):
       
   626         return '\x28'
       
   627 
       
   628     def compressstream(self, it, opts=None):
       
   629         opts = opts or {}
       
   630         # zstd level 3 is almost always significantly faster than zlib
       
   631         # while providing no worse compression. It strikes a good balance
       
   632         # between speed and compression.
       
   633         level = opts.get('level', 3)
       
   634 
       
   635         zstd = self._module
       
   636         z = zstd.ZstdCompressor(level=level).compressobj()
       
   637         for chunk in it:
       
   638             data = z.compress(chunk)
       
   639             if data:
       
   640                 yield data
       
   641 
       
   642         yield z.flush()
       
   643 
       
   644     def decompressorreader(self, fh):
       
   645         return _ZstdCompressedStreamReader(fh, self._module)
       
   646 
       
   647     class zstdrevlogcompressor(object):
       
   648         def __init__(self, zstd, level=3):
       
   649             # TODO consider omitting frame magic to save 4 bytes.
       
   650             # This writes content sizes into the frame header. That is
       
   651             # extra storage. But it allows a correct size memory allocation
       
   652             # to hold the result.
       
   653             self._cctx = zstd.ZstdCompressor(level=level)
       
   654             self._dctx = zstd.ZstdDecompressor()
       
   655             self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE
       
   656             self._decompinsize = zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
       
   657 
       
   658         def compress(self, data):
       
   659             insize = len(data)
       
   660             # Caller handles empty input case.
       
   661             assert insize > 0
       
   662 
       
   663             if insize < 50:
       
   664                 return None
       
   665 
       
   666             elif insize <= 1000000:
       
   667                 compressed = self._cctx.compress(data)
       
   668                 if len(compressed) < insize:
       
   669                     return compressed
       
   670                 return None
       
   671             else:
       
   672                 z = self._cctx.compressobj()
       
   673                 chunks = []
       
   674                 pos = 0
       
   675                 while pos < insize:
       
   676                     pos2 = pos + self._compinsize
       
   677                     chunk = z.compress(data[pos:pos2])
       
   678                     if chunk:
       
   679                         chunks.append(chunk)
       
   680                     pos = pos2
       
   681                 chunks.append(z.flush())
       
   682 
       
   683                 if sum(map(len, chunks)) < insize:
       
   684                     return ''.join(chunks)
       
   685                 return None
       
   686 
       
   687         def decompress(self, data):
       
   688             insize = len(data)
       
   689 
       
   690             try:
       
   691                 # This was measured to be faster than other streaming
       
   692                 # decompressors.
       
   693                 dobj = self._dctx.decompressobj()
       
   694                 chunks = []
       
   695                 pos = 0
       
   696                 while pos < insize:
       
   697                     pos2 = pos + self._decompinsize
       
   698                     chunk = dobj.decompress(data[pos:pos2])
       
   699                     if chunk:
       
   700                         chunks.append(chunk)
       
   701                     pos = pos2
       
   702                 # Frame should be exhausted, so no finish() API.
       
   703 
       
   704                 return ''.join(chunks)
       
   705             except Exception as e:
       
   706                 raise error.StorageError(_('revlog decompress error: %s') %
       
   707                                          stringutil.forcebytestr(e))
       
   708 
       
   709     def revlogcompressor(self, opts=None):
       
   710         opts = opts or {}
       
   711         return self.zstdrevlogcompressor(self._module,
       
   712                                          level=opts.get('level', 3))
       
   713 
       
   714 compengines.register(_zstdengine())
       
   715 
       
   716 def bundlecompressiontopics():
       
   717     """Obtains a list of available bundle compressions for use in help."""
       
   718     # help.makeitemsdocs() expects a dict of names to items with a .__doc__.
       
   719     items = {}
       
   720 
       
   721     # We need to format the docstring. So use a dummy object/type to hold it
       
   722     # rather than mutating the original.
       
   723     class docobject(object):
       
   724         pass
       
   725 
       
   726     for name in compengines:
       
   727         engine = compengines[name]
       
   728 
       
   729         if not engine.available():
       
   730             continue
       
   731 
       
   732         bt = engine.bundletype()
       
   733         if not bt or not bt[0]:
       
   734             continue
       
   735 
       
   736         doc = b'``%s``\n    %s' % (bt[0], pycompat.getdoc(engine.bundletype))
       
   737 
       
   738         value = docobject()
       
   739         value.__doc__ = pycompat.sysstr(doc)
       
   740         value._origdoc = engine.bundletype.__doc__
       
   741         value._origfunc = engine.bundletype
       
   742 
       
   743         items[bt[0]] = value
       
   744 
       
   745     return items
       
   746 
       
   747 i18nfunctions = bundlecompressiontopics().values()