diff mercurial/utils/compression.py @ 42041:3e47d1ec9da5

util: extract compression code in `mercurial.utils.compression` The code seems large enough to be worth extracting. This is similar to what was done for various module in `mercurial/utils/`. Since None of the compression logic takes a `ui` objet, issuing deprecation warning is tricky. Luckly the logic does not seems to have many external users.
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Wed, 27 Mar 2019 16:45:14 +0100
parents mercurial/util.py@7f63ec6969f3
children aaececb4b066
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mercurial/utils/compression.py	Wed Mar 27 16:45:14 2019 +0100
@@ -0,0 +1,747 @@
+# compression.py - Mercurial utility functions for compression
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+
+from __future__ import absolute_import, print_function
+
+import bz2
+import collections
+import zlib
+
+from .. import (
+    error,
+    i18n,
+    pycompat,
+)
+from . import (
+    stringutil,
+)
+
+safehasattr = pycompat.safehasattr
+
+
+_ = i18n._
+
+# compression code
+
+SERVERROLE = 'server'
+CLIENTROLE = 'client'
+
+compewireprotosupport = collections.namedtuple(r'compenginewireprotosupport',
+                                               (r'name', r'serverpriority',
+                                                r'clientpriority'))
+
+class propertycache(object):
+    def __init__(self, func):
+        self.func = func
+        self.name = func.__name__
+    def __get__(self, obj, type=None):
+        result = self.func(obj)
+        self.cachevalue(obj, result)
+        return result
+
+    def cachevalue(self, obj, value):
+        # __dict__ assignment required to bypass __setattr__ (eg: repoview)
+        obj.__dict__[self.name] = value
+
+class compressormanager(object):
+    """Holds registrations of various compression engines.
+
+    This class essentially abstracts the differences between compression
+    engines to allow new compression formats to be added easily, possibly from
+    extensions.
+
+    Compressors are registered against the global instance by calling its
+    ``register()`` method.
+    """
+    def __init__(self):
+        self._engines = {}
+        # Bundle spec human name to engine name.
+        self._bundlenames = {}
+        # Internal bundle identifier to engine name.
+        self._bundletypes = {}
+        # Revlog header to engine name.
+        self._revlogheaders = {}
+        # Wire proto identifier to engine name.
+        self._wiretypes = {}
+
+    def __getitem__(self, key):
+        return self._engines[key]
+
+    def __contains__(self, key):
+        return key in self._engines
+
+    def __iter__(self):
+        return iter(self._engines.keys())
+
+    def register(self, engine):
+        """Register a compression engine with the manager.
+
+        The argument must be a ``compressionengine`` instance.
+        """
+        if not isinstance(engine, compressionengine):
+            raise ValueError(_('argument must be a compressionengine'))
+
+        name = engine.name()
+
+        if name in self._engines:
+            raise error.Abort(_('compression engine %s already registered') %
+                              name)
+
+        bundleinfo = engine.bundletype()
+        if bundleinfo:
+            bundlename, bundletype = bundleinfo
+
+            if bundlename in self._bundlenames:
+                raise error.Abort(_('bundle name %s already registered') %
+                                  bundlename)
+            if bundletype in self._bundletypes:
+                raise error.Abort(_('bundle type %s already registered by %s') %
+                                  (bundletype, self._bundletypes[bundletype]))
+
+            # No external facing name declared.
+            if bundlename:
+                self._bundlenames[bundlename] = name
+
+            self._bundletypes[bundletype] = name
+
+        wiresupport = engine.wireprotosupport()
+        if wiresupport:
+            wiretype = wiresupport.name
+            if wiretype in self._wiretypes:
+                raise error.Abort(_('wire protocol compression %s already '
+                                    'registered by %s') %
+                                  (wiretype, self._wiretypes[wiretype]))
+
+            self._wiretypes[wiretype] = name
+
+        revlogheader = engine.revlogheader()
+        if revlogheader and revlogheader in self._revlogheaders:
+            raise error.Abort(_('revlog header %s already registered by %s') %
+                              (revlogheader, self._revlogheaders[revlogheader]))
+
+        if revlogheader:
+            self._revlogheaders[revlogheader] = name
+
+        self._engines[name] = engine
+
+    @property
+    def supportedbundlenames(self):
+        return set(self._bundlenames.keys())
+
+    @property
+    def supportedbundletypes(self):
+        return set(self._bundletypes.keys())
+
+    def forbundlename(self, bundlename):
+        """Obtain a compression engine registered to a bundle name.
+
+        Will raise KeyError if the bundle type isn't registered.
+
+        Will abort if the engine is known but not available.
+        """
+        engine = self._engines[self._bundlenames[bundlename]]
+        if not engine.available():
+            raise error.Abort(_('compression engine %s could not be loaded') %
+                              engine.name())
+        return engine
+
+    def forbundletype(self, bundletype):
+        """Obtain a compression engine registered to a bundle type.
+
+        Will raise KeyError if the bundle type isn't registered.
+
+        Will abort if the engine is known but not available.
+        """
+        engine = self._engines[self._bundletypes[bundletype]]
+        if not engine.available():
+            raise error.Abort(_('compression engine %s could not be loaded') %
+                              engine.name())
+        return engine
+
+    def supportedwireengines(self, role, onlyavailable=True):
+        """Obtain compression engines that support the wire protocol.
+
+        Returns a list of engines in prioritized order, most desired first.
+
+        If ``onlyavailable`` is set, filter out engines that can't be
+        loaded.
+        """
+        assert role in (SERVERROLE, CLIENTROLE)
+
+        attr = 'serverpriority' if role == SERVERROLE else 'clientpriority'
+
+        engines = [self._engines[e] for e in self._wiretypes.values()]
+        if onlyavailable:
+            engines = [e for e in engines if e.available()]
+
+        def getkey(e):
+            # Sort first by priority, highest first. In case of tie, sort
+            # alphabetically. This is arbitrary, but ensures output is
+            # stable.
+            w = e.wireprotosupport()
+            return -1 * getattr(w, attr), w.name
+
+        return list(sorted(engines, key=getkey))
+
+    def forwiretype(self, wiretype):
+        engine = self._engines[self._wiretypes[wiretype]]
+        if not engine.available():
+            raise error.Abort(_('compression engine %s could not be loaded') %
+                              engine.name())
+        return engine
+
+    def forrevlogheader(self, header):
+        """Obtain a compression engine registered to a revlog header.
+
+        Will raise KeyError if the revlog header value isn't registered.
+        """
+        return self._engines[self._revlogheaders[header]]
+
+compengines = compressormanager()
+
+class compressionengine(object):
+    """Base class for compression engines.
+
+    Compression engines must implement the interface defined by this class.
+    """
+    def name(self):
+        """Returns the name of the compression engine.
+
+        This is the key the engine is registered under.
+
+        This method must be implemented.
+        """
+        raise NotImplementedError()
+
+    def available(self):
+        """Whether the compression engine is available.
+
+        The intent of this method is to allow optional compression engines
+        that may not be available in all installations (such as engines relying
+        on C extensions that may not be present).
+        """
+        return True
+
+    def bundletype(self):
+        """Describes bundle identifiers for this engine.
+
+        If this compression engine isn't supported for bundles, returns None.
+
+        If this engine can be used for bundles, returns a 2-tuple of strings of
+        the user-facing "bundle spec" compression name and an internal
+        identifier used to denote the compression format within bundles. To
+        exclude the name from external usage, set the first element to ``None``.
+
+        If bundle compression is supported, the class must also implement
+        ``compressstream`` and `decompressorreader``.
+
+        The docstring of this method is used in the help system to tell users
+        about this engine.
+        """
+        return None
+
+    def wireprotosupport(self):
+        """Declare support for this compression format on the wire protocol.
+
+        If this compression engine isn't supported for compressing wire
+        protocol payloads, returns None.
+
+        Otherwise, returns ``compenginewireprotosupport`` with the following
+        fields:
+
+        * String format identifier
+        * Integer priority for the server
+        * Integer priority for the client
+
+        The integer priorities are used to order the advertisement of format
+        support by server and client. The highest integer is advertised
+        first. Integers with non-positive values aren't advertised.
+
+        The priority values are somewhat arbitrary and only used for default
+        ordering. The relative order can be changed via config options.
+
+        If wire protocol compression is supported, the class must also implement
+        ``compressstream`` and ``decompressorreader``.
+        """
+        return None
+
+    def revlogheader(self):
+        """Header added to revlog chunks that identifies this engine.
+
+        If this engine can be used to compress revlogs, this method should
+        return the bytes used to identify chunks compressed with this engine.
+        Else, the method should return ``None`` to indicate it does not
+        participate in revlog compression.
+        """
+        return None
+
+    def compressstream(self, it, opts=None):
+        """Compress an iterator of chunks.
+
+        The method receives an iterator (ideally a generator) of chunks of
+        bytes to be compressed. It returns an iterator (ideally a generator)
+        of bytes of chunks representing the compressed output.
+
+        Optionally accepts an argument defining how to perform compression.
+        Each engine treats this argument differently.
+        """
+        raise NotImplementedError()
+
+    def decompressorreader(self, fh):
+        """Perform decompression on a file object.
+
+        Argument is an object with a ``read(size)`` method that returns
+        compressed data. Return value is an object with a ``read(size)`` that
+        returns uncompressed data.
+        """
+        raise NotImplementedError()
+
+    def revlogcompressor(self, opts=None):
+        """Obtain an object that can be used to compress revlog entries.
+
+        The object has a ``compress(data)`` method that compresses binary
+        data. This method returns compressed binary data or ``None`` if
+        the data could not be compressed (too small, not compressible, etc).
+        The returned data should have a header uniquely identifying this
+        compression format so decompression can be routed to this engine.
+        This header should be identified by the ``revlogheader()`` return
+        value.
+
+        The object has a ``decompress(data)`` method that decompresses
+        data. The method will only be called if ``data`` begins with
+        ``revlogheader()``. The method should return the raw, uncompressed
+        data or raise a ``StorageError``.
+
+        The object is reusable but is not thread safe.
+        """
+        raise NotImplementedError()
+
+class _CompressedStreamReader(object):
+    def __init__(self, fh):
+        if safehasattr(fh, 'unbufferedread'):
+            self._reader = fh.unbufferedread
+        else:
+            self._reader = fh.read
+        self._pending = []
+        self._pos = 0
+        self._eof = False
+
+    def _decompress(self, chunk):
+        raise NotImplementedError()
+
+    def read(self, l):
+        buf = []
+        while True:
+            while self._pending:
+                if len(self._pending[0]) > l + self._pos:
+                    newbuf = self._pending[0]
+                    buf.append(newbuf[self._pos:self._pos + l])
+                    self._pos += l
+                    return ''.join(buf)
+
+                newbuf = self._pending.pop(0)
+                if self._pos:
+                    buf.append(newbuf[self._pos:])
+                    l -= len(newbuf) - self._pos
+                else:
+                    buf.append(newbuf)
+                    l -= len(newbuf)
+                self._pos = 0
+
+            if self._eof:
+                return ''.join(buf)
+            chunk = self._reader(65536)
+            self._decompress(chunk)
+            if not chunk and not self._pending and not self._eof:
+                # No progress and no new data, bail out
+                return ''.join(buf)
+
+class _GzipCompressedStreamReader(_CompressedStreamReader):
+    def __init__(self, fh):
+        super(_GzipCompressedStreamReader, self).__init__(fh)
+        self._decompobj = zlib.decompressobj()
+    def _decompress(self, chunk):
+        newbuf = self._decompobj.decompress(chunk)
+        if newbuf:
+            self._pending.append(newbuf)
+        d = self._decompobj.copy()
+        try:
+            d.decompress('x')
+            d.flush()
+            if d.unused_data == 'x':
+                self._eof = True
+        except zlib.error:
+            pass
+
+class _BZ2CompressedStreamReader(_CompressedStreamReader):
+    def __init__(self, fh):
+        super(_BZ2CompressedStreamReader, self).__init__(fh)
+        self._decompobj = bz2.BZ2Decompressor()
+    def _decompress(self, chunk):
+        newbuf = self._decompobj.decompress(chunk)
+        if newbuf:
+            self._pending.append(newbuf)
+        try:
+            while True:
+                newbuf = self._decompobj.decompress('')
+                if newbuf:
+                    self._pending.append(newbuf)
+                else:
+                    break
+        except EOFError:
+            self._eof = True
+
+class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader):
+    def __init__(self, fh):
+        super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh)
+        newbuf = self._decompobj.decompress('BZ')
+        if newbuf:
+            self._pending.append(newbuf)
+
+class _ZstdCompressedStreamReader(_CompressedStreamReader):
+    def __init__(self, fh, zstd):
+        super(_ZstdCompressedStreamReader, self).__init__(fh)
+        self._zstd = zstd
+        self._decompobj = zstd.ZstdDecompressor().decompressobj()
+    def _decompress(self, chunk):
+        newbuf = self._decompobj.decompress(chunk)
+        if newbuf:
+            self._pending.append(newbuf)
+        try:
+            while True:
+                newbuf = self._decompobj.decompress('')
+                if newbuf:
+                    self._pending.append(newbuf)
+                else:
+                    break
+        except self._zstd.ZstdError:
+            self._eof = True
+
+class _zlibengine(compressionengine):
+    def name(self):
+        return 'zlib'
+
+    def bundletype(self):
+        """zlib compression using the DEFLATE algorithm.
+
+        All Mercurial clients should support this format. The compression
+        algorithm strikes a reasonable balance between compression ratio
+        and size.
+        """
+        return 'gzip', 'GZ'
+
+    def wireprotosupport(self):
+        return compewireprotosupport('zlib', 20, 20)
+
+    def revlogheader(self):
+        return 'x'
+
+    def compressstream(self, it, opts=None):
+        opts = opts or {}
+
+        z = zlib.compressobj(opts.get('level', -1))
+        for chunk in it:
+            data = z.compress(chunk)
+            # Not all calls to compress emit data. It is cheaper to inspect
+            # here than to feed empty chunks through generator.
+            if data:
+                yield data
+
+        yield z.flush()
+
+    def decompressorreader(self, fh):
+        return _GzipCompressedStreamReader(fh)
+
+    class zlibrevlogcompressor(object):
+        def compress(self, data):
+            insize = len(data)
+            # Caller handles empty input case.
+            assert insize > 0
+
+            if insize < 44:
+                return None
+
+            elif insize <= 1000000:
+                compressed = zlib.compress(data)
+                if len(compressed) < insize:
+                    return compressed
+                return None
+
+            # zlib makes an internal copy of the input buffer, doubling
+            # memory usage for large inputs. So do streaming compression
+            # on large inputs.
+            else:
+                z = zlib.compressobj()
+                parts = []
+                pos = 0
+                while pos < insize:
+                    pos2 = pos + 2**20
+                    parts.append(z.compress(data[pos:pos2]))
+                    pos = pos2
+                parts.append(z.flush())
+
+                if sum(map(len, parts)) < insize:
+                    return ''.join(parts)
+                return None
+
+        def decompress(self, data):
+            try:
+                return zlib.decompress(data)
+            except zlib.error as e:
+                raise error.StorageError(_('revlog decompress error: %s') %
+                                         stringutil.forcebytestr(e))
+
+    def revlogcompressor(self, opts=None):
+        return self.zlibrevlogcompressor()
+
+compengines.register(_zlibengine())
+
+class _bz2engine(compressionengine):
+    def name(self):
+        return 'bz2'
+
+    def bundletype(self):
+        """An algorithm that produces smaller bundles than ``gzip``.
+
+        All Mercurial clients should support this format.
+
+        This engine will likely produce smaller bundles than ``gzip`` but
+        will be significantly slower, both during compression and
+        decompression.
+
+        If available, the ``zstd`` engine can yield similar or better
+        compression at much higher speeds.
+        """
+        return 'bzip2', 'BZ'
+
+    # We declare a protocol name but don't advertise by default because
+    # it is slow.
+    def wireprotosupport(self):
+        return compewireprotosupport('bzip2', 0, 0)
+
+    def compressstream(self, it, opts=None):
+        opts = opts or {}
+        z = bz2.BZ2Compressor(opts.get('level', 9))
+        for chunk in it:
+            data = z.compress(chunk)
+            if data:
+                yield data
+
+        yield z.flush()
+
+    def decompressorreader(self, fh):
+        return _BZ2CompressedStreamReader(fh)
+
+compengines.register(_bz2engine())
+
+class _truncatedbz2engine(compressionengine):
+    def name(self):
+        return 'bz2truncated'
+
+    def bundletype(self):
+        return None, '_truncatedBZ'
+
+    # We don't implement compressstream because it is hackily handled elsewhere.
+
+    def decompressorreader(self, fh):
+        return _TruncatedBZ2CompressedStreamReader(fh)
+
+compengines.register(_truncatedbz2engine())
+
+class _noopengine(compressionengine):
+    def name(self):
+        return 'none'
+
+    def bundletype(self):
+        """No compression is performed.
+
+        Use this compression engine to explicitly disable compression.
+        """
+        return 'none', 'UN'
+
+    # Clients always support uncompressed payloads. Servers don't because
+    # unless you are on a fast network, uncompressed payloads can easily
+    # saturate your network pipe.
+    def wireprotosupport(self):
+        return compewireprotosupport('none', 0, 10)
+
+    # We don't implement revlogheader because it is handled specially
+    # in the revlog class.
+
+    def compressstream(self, it, opts=None):
+        return it
+
+    def decompressorreader(self, fh):
+        return fh
+
+    class nooprevlogcompressor(object):
+        def compress(self, data):
+            return None
+
+    def revlogcompressor(self, opts=None):
+        return self.nooprevlogcompressor()
+
+compengines.register(_noopengine())
+
+class _zstdengine(compressionengine):
+    def name(self):
+        return 'zstd'
+
+    @propertycache
+    def _module(self):
+        # Not all installs have the zstd module available. So defer importing
+        # until first access.
+        try:
+            from .. import zstd
+            # Force delayed import.
+            zstd.__version__
+            return zstd
+        except ImportError:
+            return None
+
+    def available(self):
+        return bool(self._module)
+
+    def bundletype(self):
+        """A modern compression algorithm that is fast and highly flexible.
+
+        Only supported by Mercurial 4.1 and newer clients.
+
+        With the default settings, zstd compression is both faster and yields
+        better compression than ``gzip``. It also frequently yields better
+        compression than ``bzip2`` while operating at much higher speeds.
+
+        If this engine is available and backwards compatibility is not a
+        concern, it is likely the best available engine.
+        """
+        return 'zstd', 'ZS'
+
+    def wireprotosupport(self):
+        return compewireprotosupport('zstd', 50, 50)
+
+    def revlogheader(self):
+        return '\x28'
+
+    def compressstream(self, it, opts=None):
+        opts = opts or {}
+        # zstd level 3 is almost always significantly faster than zlib
+        # while providing no worse compression. It strikes a good balance
+        # between speed and compression.
+        level = opts.get('level', 3)
+
+        zstd = self._module
+        z = zstd.ZstdCompressor(level=level).compressobj()
+        for chunk in it:
+            data = z.compress(chunk)
+            if data:
+                yield data
+
+        yield z.flush()
+
+    def decompressorreader(self, fh):
+        return _ZstdCompressedStreamReader(fh, self._module)
+
+    class zstdrevlogcompressor(object):
+        def __init__(self, zstd, level=3):
+            # TODO consider omitting frame magic to save 4 bytes.
+            # This writes content sizes into the frame header. That is
+            # extra storage. But it allows a correct size memory allocation
+            # to hold the result.
+            self._cctx = zstd.ZstdCompressor(level=level)
+            self._dctx = zstd.ZstdDecompressor()
+            self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE
+            self._decompinsize = zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
+
+        def compress(self, data):
+            insize = len(data)
+            # Caller handles empty input case.
+            assert insize > 0
+
+            if insize < 50:
+                return None
+
+            elif insize <= 1000000:
+                compressed = self._cctx.compress(data)
+                if len(compressed) < insize:
+                    return compressed
+                return None
+            else:
+                z = self._cctx.compressobj()
+                chunks = []
+                pos = 0
+                while pos < insize:
+                    pos2 = pos + self._compinsize
+                    chunk = z.compress(data[pos:pos2])
+                    if chunk:
+                        chunks.append(chunk)
+                    pos = pos2
+                chunks.append(z.flush())
+
+                if sum(map(len, chunks)) < insize:
+                    return ''.join(chunks)
+                return None
+
+        def decompress(self, data):
+            insize = len(data)
+
+            try:
+                # This was measured to be faster than other streaming
+                # decompressors.
+                dobj = self._dctx.decompressobj()
+                chunks = []
+                pos = 0
+                while pos < insize:
+                    pos2 = pos + self._decompinsize
+                    chunk = dobj.decompress(data[pos:pos2])
+                    if chunk:
+                        chunks.append(chunk)
+                    pos = pos2
+                # Frame should be exhausted, so no finish() API.
+
+                return ''.join(chunks)
+            except Exception as e:
+                raise error.StorageError(_('revlog decompress error: %s') %
+                                         stringutil.forcebytestr(e))
+
+    def revlogcompressor(self, opts=None):
+        opts = opts or {}
+        return self.zstdrevlogcompressor(self._module,
+                                         level=opts.get('level', 3))
+
+compengines.register(_zstdengine())
+
+def bundlecompressiontopics():
+    """Obtains a list of available bundle compressions for use in help."""
+    # help.makeitemsdocs() expects a dict of names to items with a .__doc__.
+    items = {}
+
+    # We need to format the docstring. So use a dummy object/type to hold it
+    # rather than mutating the original.
+    class docobject(object):
+        pass
+
+    for name in compengines:
+        engine = compengines[name]
+
+        if not engine.available():
+            continue
+
+        bt = engine.bundletype()
+        if not bt or not bt[0]:
+            continue
+
+        doc = b'``%s``\n    %s' % (bt[0], pycompat.getdoc(engine.bundletype))
+
+        value = docobject()
+        value.__doc__ = pycompat.sysstr(doc)
+        value._origdoc = engine.bundletype.__doc__
+        value._origfunc = engine.bundletype
+
+        items[bt[0]] = value
+
+    return items
+
+i18nfunctions = bundlecompressiontopics().values()