Mercurial > public > mercurial-scm > hg-stable
diff mercurial/utils/compression.py @ 42041:3e47d1ec9da5
util: extract compression code in `mercurial.utils.compression`
The code seems large enough to be worth extracting. This is similar to what was
done for various module in `mercurial/utils/`.
Since None of the compression logic takes a `ui` objet, issuing deprecation
warning is tricky. Luckly the logic does not seems to have many external users.
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Wed, 27 Mar 2019 16:45:14 +0100 |
parents | mercurial/util.py@7f63ec6969f3 |
children | aaececb4b066 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mercurial/utils/compression.py Wed Mar 27 16:45:14 2019 +0100 @@ -0,0 +1,747 @@ +# compression.py - Mercurial utility functions for compression +# +# This software may be used and distributed according to the terms of the +# GNU General Public License version 2 or any later version. + + +from __future__ import absolute_import, print_function + +import bz2 +import collections +import zlib + +from .. import ( + error, + i18n, + pycompat, +) +from . import ( + stringutil, +) + +safehasattr = pycompat.safehasattr + + +_ = i18n._ + +# compression code + +SERVERROLE = 'server' +CLIENTROLE = 'client' + +compewireprotosupport = collections.namedtuple(r'compenginewireprotosupport', + (r'name', r'serverpriority', + r'clientpriority')) + +class propertycache(object): + def __init__(self, func): + self.func = func + self.name = func.__name__ + def __get__(self, obj, type=None): + result = self.func(obj) + self.cachevalue(obj, result) + return result + + def cachevalue(self, obj, value): + # __dict__ assignment required to bypass __setattr__ (eg: repoview) + obj.__dict__[self.name] = value + +class compressormanager(object): + """Holds registrations of various compression engines. + + This class essentially abstracts the differences between compression + engines to allow new compression formats to be added easily, possibly from + extensions. + + Compressors are registered against the global instance by calling its + ``register()`` method. + """ + def __init__(self): + self._engines = {} + # Bundle spec human name to engine name. + self._bundlenames = {} + # Internal bundle identifier to engine name. + self._bundletypes = {} + # Revlog header to engine name. + self._revlogheaders = {} + # Wire proto identifier to engine name. + self._wiretypes = {} + + def __getitem__(self, key): + return self._engines[key] + + def __contains__(self, key): + return key in self._engines + + def __iter__(self): + return iter(self._engines.keys()) + + def register(self, engine): + """Register a compression engine with the manager. + + The argument must be a ``compressionengine`` instance. + """ + if not isinstance(engine, compressionengine): + raise ValueError(_('argument must be a compressionengine')) + + name = engine.name() + + if name in self._engines: + raise error.Abort(_('compression engine %s already registered') % + name) + + bundleinfo = engine.bundletype() + if bundleinfo: + bundlename, bundletype = bundleinfo + + if bundlename in self._bundlenames: + raise error.Abort(_('bundle name %s already registered') % + bundlename) + if bundletype in self._bundletypes: + raise error.Abort(_('bundle type %s already registered by %s') % + (bundletype, self._bundletypes[bundletype])) + + # No external facing name declared. + if bundlename: + self._bundlenames[bundlename] = name + + self._bundletypes[bundletype] = name + + wiresupport = engine.wireprotosupport() + if wiresupport: + wiretype = wiresupport.name + if wiretype in self._wiretypes: + raise error.Abort(_('wire protocol compression %s already ' + 'registered by %s') % + (wiretype, self._wiretypes[wiretype])) + + self._wiretypes[wiretype] = name + + revlogheader = engine.revlogheader() + if revlogheader and revlogheader in self._revlogheaders: + raise error.Abort(_('revlog header %s already registered by %s') % + (revlogheader, self._revlogheaders[revlogheader])) + + if revlogheader: + self._revlogheaders[revlogheader] = name + + self._engines[name] = engine + + @property + def supportedbundlenames(self): + return set(self._bundlenames.keys()) + + @property + def supportedbundletypes(self): + return set(self._bundletypes.keys()) + + def forbundlename(self, bundlename): + """Obtain a compression engine registered to a bundle name. + + Will raise KeyError if the bundle type isn't registered. + + Will abort if the engine is known but not available. + """ + engine = self._engines[self._bundlenames[bundlename]] + if not engine.available(): + raise error.Abort(_('compression engine %s could not be loaded') % + engine.name()) + return engine + + def forbundletype(self, bundletype): + """Obtain a compression engine registered to a bundle type. + + Will raise KeyError if the bundle type isn't registered. + + Will abort if the engine is known but not available. + """ + engine = self._engines[self._bundletypes[bundletype]] + if not engine.available(): + raise error.Abort(_('compression engine %s could not be loaded') % + engine.name()) + return engine + + def supportedwireengines(self, role, onlyavailable=True): + """Obtain compression engines that support the wire protocol. + + Returns a list of engines in prioritized order, most desired first. + + If ``onlyavailable`` is set, filter out engines that can't be + loaded. + """ + assert role in (SERVERROLE, CLIENTROLE) + + attr = 'serverpriority' if role == SERVERROLE else 'clientpriority' + + engines = [self._engines[e] for e in self._wiretypes.values()] + if onlyavailable: + engines = [e for e in engines if e.available()] + + def getkey(e): + # Sort first by priority, highest first. In case of tie, sort + # alphabetically. This is arbitrary, but ensures output is + # stable. + w = e.wireprotosupport() + return -1 * getattr(w, attr), w.name + + return list(sorted(engines, key=getkey)) + + def forwiretype(self, wiretype): + engine = self._engines[self._wiretypes[wiretype]] + if not engine.available(): + raise error.Abort(_('compression engine %s could not be loaded') % + engine.name()) + return engine + + def forrevlogheader(self, header): + """Obtain a compression engine registered to a revlog header. + + Will raise KeyError if the revlog header value isn't registered. + """ + return self._engines[self._revlogheaders[header]] + +compengines = compressormanager() + +class compressionengine(object): + """Base class for compression engines. + + Compression engines must implement the interface defined by this class. + """ + def name(self): + """Returns the name of the compression engine. + + This is the key the engine is registered under. + + This method must be implemented. + """ + raise NotImplementedError() + + def available(self): + """Whether the compression engine is available. + + The intent of this method is to allow optional compression engines + that may not be available in all installations (such as engines relying + on C extensions that may not be present). + """ + return True + + def bundletype(self): + """Describes bundle identifiers for this engine. + + If this compression engine isn't supported for bundles, returns None. + + If this engine can be used for bundles, returns a 2-tuple of strings of + the user-facing "bundle spec" compression name and an internal + identifier used to denote the compression format within bundles. To + exclude the name from external usage, set the first element to ``None``. + + If bundle compression is supported, the class must also implement + ``compressstream`` and `decompressorreader``. + + The docstring of this method is used in the help system to tell users + about this engine. + """ + return None + + def wireprotosupport(self): + """Declare support for this compression format on the wire protocol. + + If this compression engine isn't supported for compressing wire + protocol payloads, returns None. + + Otherwise, returns ``compenginewireprotosupport`` with the following + fields: + + * String format identifier + * Integer priority for the server + * Integer priority for the client + + The integer priorities are used to order the advertisement of format + support by server and client. The highest integer is advertised + first. Integers with non-positive values aren't advertised. + + The priority values are somewhat arbitrary and only used for default + ordering. The relative order can be changed via config options. + + If wire protocol compression is supported, the class must also implement + ``compressstream`` and ``decompressorreader``. + """ + return None + + def revlogheader(self): + """Header added to revlog chunks that identifies this engine. + + If this engine can be used to compress revlogs, this method should + return the bytes used to identify chunks compressed with this engine. + Else, the method should return ``None`` to indicate it does not + participate in revlog compression. + """ + return None + + def compressstream(self, it, opts=None): + """Compress an iterator of chunks. + + The method receives an iterator (ideally a generator) of chunks of + bytes to be compressed. It returns an iterator (ideally a generator) + of bytes of chunks representing the compressed output. + + Optionally accepts an argument defining how to perform compression. + Each engine treats this argument differently. + """ + raise NotImplementedError() + + def decompressorreader(self, fh): + """Perform decompression on a file object. + + Argument is an object with a ``read(size)`` method that returns + compressed data. Return value is an object with a ``read(size)`` that + returns uncompressed data. + """ + raise NotImplementedError() + + def revlogcompressor(self, opts=None): + """Obtain an object that can be used to compress revlog entries. + + The object has a ``compress(data)`` method that compresses binary + data. This method returns compressed binary data or ``None`` if + the data could not be compressed (too small, not compressible, etc). + The returned data should have a header uniquely identifying this + compression format so decompression can be routed to this engine. + This header should be identified by the ``revlogheader()`` return + value. + + The object has a ``decompress(data)`` method that decompresses + data. The method will only be called if ``data`` begins with + ``revlogheader()``. The method should return the raw, uncompressed + data or raise a ``StorageError``. + + The object is reusable but is not thread safe. + """ + raise NotImplementedError() + +class _CompressedStreamReader(object): + def __init__(self, fh): + if safehasattr(fh, 'unbufferedread'): + self._reader = fh.unbufferedread + else: + self._reader = fh.read + self._pending = [] + self._pos = 0 + self._eof = False + + def _decompress(self, chunk): + raise NotImplementedError() + + def read(self, l): + buf = [] + while True: + while self._pending: + if len(self._pending[0]) > l + self._pos: + newbuf = self._pending[0] + buf.append(newbuf[self._pos:self._pos + l]) + self._pos += l + return ''.join(buf) + + newbuf = self._pending.pop(0) + if self._pos: + buf.append(newbuf[self._pos:]) + l -= len(newbuf) - self._pos + else: + buf.append(newbuf) + l -= len(newbuf) + self._pos = 0 + + if self._eof: + return ''.join(buf) + chunk = self._reader(65536) + self._decompress(chunk) + if not chunk and not self._pending and not self._eof: + # No progress and no new data, bail out + return ''.join(buf) + +class _GzipCompressedStreamReader(_CompressedStreamReader): + def __init__(self, fh): + super(_GzipCompressedStreamReader, self).__init__(fh) + self._decompobj = zlib.decompressobj() + def _decompress(self, chunk): + newbuf = self._decompobj.decompress(chunk) + if newbuf: + self._pending.append(newbuf) + d = self._decompobj.copy() + try: + d.decompress('x') + d.flush() + if d.unused_data == 'x': + self._eof = True + except zlib.error: + pass + +class _BZ2CompressedStreamReader(_CompressedStreamReader): + def __init__(self, fh): + super(_BZ2CompressedStreamReader, self).__init__(fh) + self._decompobj = bz2.BZ2Decompressor() + def _decompress(self, chunk): + newbuf = self._decompobj.decompress(chunk) + if newbuf: + self._pending.append(newbuf) + try: + while True: + newbuf = self._decompobj.decompress('') + if newbuf: + self._pending.append(newbuf) + else: + break + except EOFError: + self._eof = True + +class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader): + def __init__(self, fh): + super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh) + newbuf = self._decompobj.decompress('BZ') + if newbuf: + self._pending.append(newbuf) + +class _ZstdCompressedStreamReader(_CompressedStreamReader): + def __init__(self, fh, zstd): + super(_ZstdCompressedStreamReader, self).__init__(fh) + self._zstd = zstd + self._decompobj = zstd.ZstdDecompressor().decompressobj() + def _decompress(self, chunk): + newbuf = self._decompobj.decompress(chunk) + if newbuf: + self._pending.append(newbuf) + try: + while True: + newbuf = self._decompobj.decompress('') + if newbuf: + self._pending.append(newbuf) + else: + break + except self._zstd.ZstdError: + self._eof = True + +class _zlibengine(compressionengine): + def name(self): + return 'zlib' + + def bundletype(self): + """zlib compression using the DEFLATE algorithm. + + All Mercurial clients should support this format. The compression + algorithm strikes a reasonable balance between compression ratio + and size. + """ + return 'gzip', 'GZ' + + def wireprotosupport(self): + return compewireprotosupport('zlib', 20, 20) + + def revlogheader(self): + return 'x' + + def compressstream(self, it, opts=None): + opts = opts or {} + + z = zlib.compressobj(opts.get('level', -1)) + for chunk in it: + data = z.compress(chunk) + # Not all calls to compress emit data. It is cheaper to inspect + # here than to feed empty chunks through generator. + if data: + yield data + + yield z.flush() + + def decompressorreader(self, fh): + return _GzipCompressedStreamReader(fh) + + class zlibrevlogcompressor(object): + def compress(self, data): + insize = len(data) + # Caller handles empty input case. + assert insize > 0 + + if insize < 44: + return None + + elif insize <= 1000000: + compressed = zlib.compress(data) + if len(compressed) < insize: + return compressed + return None + + # zlib makes an internal copy of the input buffer, doubling + # memory usage for large inputs. So do streaming compression + # on large inputs. + else: + z = zlib.compressobj() + parts = [] + pos = 0 + while pos < insize: + pos2 = pos + 2**20 + parts.append(z.compress(data[pos:pos2])) + pos = pos2 + parts.append(z.flush()) + + if sum(map(len, parts)) < insize: + return ''.join(parts) + return None + + def decompress(self, data): + try: + return zlib.decompress(data) + except zlib.error as e: + raise error.StorageError(_('revlog decompress error: %s') % + stringutil.forcebytestr(e)) + + def revlogcompressor(self, opts=None): + return self.zlibrevlogcompressor() + +compengines.register(_zlibengine()) + +class _bz2engine(compressionengine): + def name(self): + return 'bz2' + + def bundletype(self): + """An algorithm that produces smaller bundles than ``gzip``. + + All Mercurial clients should support this format. + + This engine will likely produce smaller bundles than ``gzip`` but + will be significantly slower, both during compression and + decompression. + + If available, the ``zstd`` engine can yield similar or better + compression at much higher speeds. + """ + return 'bzip2', 'BZ' + + # We declare a protocol name but don't advertise by default because + # it is slow. + def wireprotosupport(self): + return compewireprotosupport('bzip2', 0, 0) + + def compressstream(self, it, opts=None): + opts = opts or {} + z = bz2.BZ2Compressor(opts.get('level', 9)) + for chunk in it: + data = z.compress(chunk) + if data: + yield data + + yield z.flush() + + def decompressorreader(self, fh): + return _BZ2CompressedStreamReader(fh) + +compengines.register(_bz2engine()) + +class _truncatedbz2engine(compressionengine): + def name(self): + return 'bz2truncated' + + def bundletype(self): + return None, '_truncatedBZ' + + # We don't implement compressstream because it is hackily handled elsewhere. + + def decompressorreader(self, fh): + return _TruncatedBZ2CompressedStreamReader(fh) + +compengines.register(_truncatedbz2engine()) + +class _noopengine(compressionengine): + def name(self): + return 'none' + + def bundletype(self): + """No compression is performed. + + Use this compression engine to explicitly disable compression. + """ + return 'none', 'UN' + + # Clients always support uncompressed payloads. Servers don't because + # unless you are on a fast network, uncompressed payloads can easily + # saturate your network pipe. + def wireprotosupport(self): + return compewireprotosupport('none', 0, 10) + + # We don't implement revlogheader because it is handled specially + # in the revlog class. + + def compressstream(self, it, opts=None): + return it + + def decompressorreader(self, fh): + return fh + + class nooprevlogcompressor(object): + def compress(self, data): + return None + + def revlogcompressor(self, opts=None): + return self.nooprevlogcompressor() + +compengines.register(_noopengine()) + +class _zstdengine(compressionengine): + def name(self): + return 'zstd' + + @propertycache + def _module(self): + # Not all installs have the zstd module available. So defer importing + # until first access. + try: + from .. import zstd + # Force delayed import. + zstd.__version__ + return zstd + except ImportError: + return None + + def available(self): + return bool(self._module) + + def bundletype(self): + """A modern compression algorithm that is fast and highly flexible. + + Only supported by Mercurial 4.1 and newer clients. + + With the default settings, zstd compression is both faster and yields + better compression than ``gzip``. It also frequently yields better + compression than ``bzip2`` while operating at much higher speeds. + + If this engine is available and backwards compatibility is not a + concern, it is likely the best available engine. + """ + return 'zstd', 'ZS' + + def wireprotosupport(self): + return compewireprotosupport('zstd', 50, 50) + + def revlogheader(self): + return '\x28' + + def compressstream(self, it, opts=None): + opts = opts or {} + # zstd level 3 is almost always significantly faster than zlib + # while providing no worse compression. It strikes a good balance + # between speed and compression. + level = opts.get('level', 3) + + zstd = self._module + z = zstd.ZstdCompressor(level=level).compressobj() + for chunk in it: + data = z.compress(chunk) + if data: + yield data + + yield z.flush() + + def decompressorreader(self, fh): + return _ZstdCompressedStreamReader(fh, self._module) + + class zstdrevlogcompressor(object): + def __init__(self, zstd, level=3): + # TODO consider omitting frame magic to save 4 bytes. + # This writes content sizes into the frame header. That is + # extra storage. But it allows a correct size memory allocation + # to hold the result. + self._cctx = zstd.ZstdCompressor(level=level) + self._dctx = zstd.ZstdDecompressor() + self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE + self._decompinsize = zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE + + def compress(self, data): + insize = len(data) + # Caller handles empty input case. + assert insize > 0 + + if insize < 50: + return None + + elif insize <= 1000000: + compressed = self._cctx.compress(data) + if len(compressed) < insize: + return compressed + return None + else: + z = self._cctx.compressobj() + chunks = [] + pos = 0 + while pos < insize: + pos2 = pos + self._compinsize + chunk = z.compress(data[pos:pos2]) + if chunk: + chunks.append(chunk) + pos = pos2 + chunks.append(z.flush()) + + if sum(map(len, chunks)) < insize: + return ''.join(chunks) + return None + + def decompress(self, data): + insize = len(data) + + try: + # This was measured to be faster than other streaming + # decompressors. + dobj = self._dctx.decompressobj() + chunks = [] + pos = 0 + while pos < insize: + pos2 = pos + self._decompinsize + chunk = dobj.decompress(data[pos:pos2]) + if chunk: + chunks.append(chunk) + pos = pos2 + # Frame should be exhausted, so no finish() API. + + return ''.join(chunks) + except Exception as e: + raise error.StorageError(_('revlog decompress error: %s') % + stringutil.forcebytestr(e)) + + def revlogcompressor(self, opts=None): + opts = opts or {} + return self.zstdrevlogcompressor(self._module, + level=opts.get('level', 3)) + +compengines.register(_zstdengine()) + +def bundlecompressiontopics(): + """Obtains a list of available bundle compressions for use in help.""" + # help.makeitemsdocs() expects a dict of names to items with a .__doc__. + items = {} + + # We need to format the docstring. So use a dummy object/type to hold it + # rather than mutating the original. + class docobject(object): + pass + + for name in compengines: + engine = compengines[name] + + if not engine.available(): + continue + + bt = engine.bundletype() + if not bt or not bt[0]: + continue + + doc = b'``%s``\n %s' % (bt[0], pycompat.getdoc(engine.bundletype)) + + value = docobject() + value.__doc__ = pycompat.sysstr(doc) + value._origdoc = engine.bundletype.__doc__ + value._origfunc = engine.bundletype + + items[bt[0]] = value + + return items + +i18nfunctions = bundlecompressiontopics().values()