mercurial/utils/compression.py
changeset 43077 687b865b95ad
parent 43076 2372284d9457
child 43089 c59eb1560c44
equal deleted inserted replaced
43076:2372284d9457 43077:687b865b95ad
    22 
    22 
    23 _ = i18n._
    23 _ = i18n._
    24 
    24 
    25 # compression code
    25 # compression code
    26 
    26 
    27 SERVERROLE = 'server'
    27 SERVERROLE = b'server'
    28 CLIENTROLE = 'client'
    28 CLIENTROLE = b'client'
    29 
    29 
    30 compewireprotosupport = collections.namedtuple(
    30 compewireprotosupport = collections.namedtuple(
    31     r'compenginewireprotosupport',
    31     r'compenginewireprotosupport',
    32     (r'name', r'serverpriority', r'clientpriority'),
    32     (r'name', r'serverpriority', r'clientpriority'),
    33 )
    33 )
    83         """Register a compression engine with the manager.
    83         """Register a compression engine with the manager.
    84 
    84 
    85         The argument must be a ``compressionengine`` instance.
    85         The argument must be a ``compressionengine`` instance.
    86         """
    86         """
    87         if not isinstance(engine, compressionengine):
    87         if not isinstance(engine, compressionengine):
    88             raise ValueError(_('argument must be a compressionengine'))
    88             raise ValueError(_(b'argument must be a compressionengine'))
    89 
    89 
    90         name = engine.name()
    90         name = engine.name()
    91 
    91 
    92         if name in self._engines:
    92         if name in self._engines:
    93             raise error.Abort(
    93             raise error.Abort(
    94                 _('compression engine %s already registered') % name
    94                 _(b'compression engine %s already registered') % name
    95             )
    95             )
    96 
    96 
    97         bundleinfo = engine.bundletype()
    97         bundleinfo = engine.bundletype()
    98         if bundleinfo:
    98         if bundleinfo:
    99             bundlename, bundletype = bundleinfo
    99             bundlename, bundletype = bundleinfo
   100 
   100 
   101             if bundlename in self._bundlenames:
   101             if bundlename in self._bundlenames:
   102                 raise error.Abort(
   102                 raise error.Abort(
   103                     _('bundle name %s already registered') % bundlename
   103                     _(b'bundle name %s already registered') % bundlename
   104                 )
   104                 )
   105             if bundletype in self._bundletypes:
   105             if bundletype in self._bundletypes:
   106                 raise error.Abort(
   106                 raise error.Abort(
   107                     _('bundle type %s already registered by %s')
   107                     _(b'bundle type %s already registered by %s')
   108                     % (bundletype, self._bundletypes[bundletype])
   108                     % (bundletype, self._bundletypes[bundletype])
   109                 )
   109                 )
   110 
   110 
   111             # No external facing name declared.
   111             # No external facing name declared.
   112             if bundlename:
   112             if bundlename:
   118         if wiresupport:
   118         if wiresupport:
   119             wiretype = wiresupport.name
   119             wiretype = wiresupport.name
   120             if wiretype in self._wiretypes:
   120             if wiretype in self._wiretypes:
   121                 raise error.Abort(
   121                 raise error.Abort(
   122                     _(
   122                     _(
   123                         'wire protocol compression %s already '
   123                         b'wire protocol compression %s already '
   124                         'registered by %s'
   124                         b'registered by %s'
   125                     )
   125                     )
   126                     % (wiretype, self._wiretypes[wiretype])
   126                     % (wiretype, self._wiretypes[wiretype])
   127                 )
   127                 )
   128 
   128 
   129             self._wiretypes[wiretype] = name
   129             self._wiretypes[wiretype] = name
   130 
   130 
   131         revlogheader = engine.revlogheader()
   131         revlogheader = engine.revlogheader()
   132         if revlogheader and revlogheader in self._revlogheaders:
   132         if revlogheader and revlogheader in self._revlogheaders:
   133             raise error.Abort(
   133             raise error.Abort(
   134                 _('revlog header %s already registered by %s')
   134                 _(b'revlog header %s already registered by %s')
   135                 % (revlogheader, self._revlogheaders[revlogheader])
   135                 % (revlogheader, self._revlogheaders[revlogheader])
   136             )
   136             )
   137 
   137 
   138         if revlogheader:
   138         if revlogheader:
   139             self._revlogheaders[revlogheader] = name
   139             self._revlogheaders[revlogheader] = name
   156         Will abort if the engine is known but not available.
   156         Will abort if the engine is known but not available.
   157         """
   157         """
   158         engine = self._engines[self._bundlenames[bundlename]]
   158         engine = self._engines[self._bundlenames[bundlename]]
   159         if not engine.available():
   159         if not engine.available():
   160             raise error.Abort(
   160             raise error.Abort(
   161                 _('compression engine %s could not be loaded') % engine.name()
   161                 _(b'compression engine %s could not be loaded') % engine.name()
   162             )
   162             )
   163         return engine
   163         return engine
   164 
   164 
   165     def forbundletype(self, bundletype):
   165     def forbundletype(self, bundletype):
   166         """Obtain a compression engine registered to a bundle type.
   166         """Obtain a compression engine registered to a bundle type.
   170         Will abort if the engine is known but not available.
   170         Will abort if the engine is known but not available.
   171         """
   171         """
   172         engine = self._engines[self._bundletypes[bundletype]]
   172         engine = self._engines[self._bundletypes[bundletype]]
   173         if not engine.available():
   173         if not engine.available():
   174             raise error.Abort(
   174             raise error.Abort(
   175                 _('compression engine %s could not be loaded') % engine.name()
   175                 _(b'compression engine %s could not be loaded') % engine.name()
   176             )
   176             )
   177         return engine
   177         return engine
   178 
   178 
   179     def supportedwireengines(self, role, onlyavailable=True):
   179     def supportedwireengines(self, role, onlyavailable=True):
   180         """Obtain compression engines that support the wire protocol.
   180         """Obtain compression engines that support the wire protocol.
   184         If ``onlyavailable`` is set, filter out engines that can't be
   184         If ``onlyavailable`` is set, filter out engines that can't be
   185         loaded.
   185         loaded.
   186         """
   186         """
   187         assert role in (SERVERROLE, CLIENTROLE)
   187         assert role in (SERVERROLE, CLIENTROLE)
   188 
   188 
   189         attr = 'serverpriority' if role == SERVERROLE else 'clientpriority'
   189         attr = b'serverpriority' if role == SERVERROLE else b'clientpriority'
   190 
   190 
   191         engines = [self._engines[e] for e in self._wiretypes.values()]
   191         engines = [self._engines[e] for e in self._wiretypes.values()]
   192         if onlyavailable:
   192         if onlyavailable:
   193             engines = [e for e in engines if e.available()]
   193             engines = [e for e in engines if e.available()]
   194 
   194 
   203 
   203 
   204     def forwiretype(self, wiretype):
   204     def forwiretype(self, wiretype):
   205         engine = self._engines[self._wiretypes[wiretype]]
   205         engine = self._engines[self._wiretypes[wiretype]]
   206         if not engine.available():
   206         if not engine.available():
   207             raise error.Abort(
   207             raise error.Abort(
   208                 _('compression engine %s could not be loaded') % engine.name()
   208                 _(b'compression engine %s could not be loaded') % engine.name()
   209             )
   209             )
   210         return engine
   210         return engine
   211 
   211 
   212     def forrevlogheader(self, header):
   212     def forrevlogheader(self, header):
   213         """Obtain a compression engine registered to a revlog header.
   213         """Obtain a compression engine registered to a revlog header.
   358             while self._pending:
   358             while self._pending:
   359                 if len(self._pending[0]) > l + self._pos:
   359                 if len(self._pending[0]) > l + self._pos:
   360                     newbuf = self._pending[0]
   360                     newbuf = self._pending[0]
   361                     buf.append(newbuf[self._pos : self._pos + l])
   361                     buf.append(newbuf[self._pos : self._pos + l])
   362                     self._pos += l
   362                     self._pos += l
   363                     return ''.join(buf)
   363                     return b''.join(buf)
   364 
   364 
   365                 newbuf = self._pending.pop(0)
   365                 newbuf = self._pending.pop(0)
   366                 if self._pos:
   366                 if self._pos:
   367                     buf.append(newbuf[self._pos :])
   367                     buf.append(newbuf[self._pos :])
   368                     l -= len(newbuf) - self._pos
   368                     l -= len(newbuf) - self._pos
   370                     buf.append(newbuf)
   370                     buf.append(newbuf)
   371                     l -= len(newbuf)
   371                     l -= len(newbuf)
   372                 self._pos = 0
   372                 self._pos = 0
   373 
   373 
   374             if self._eof:
   374             if self._eof:
   375                 return ''.join(buf)
   375                 return b''.join(buf)
   376             chunk = self._reader(65536)
   376             chunk = self._reader(65536)
   377             self._decompress(chunk)
   377             self._decompress(chunk)
   378             if not chunk and not self._pending and not self._eof:
   378             if not chunk and not self._pending and not self._eof:
   379                 # No progress and no new data, bail out
   379                 # No progress and no new data, bail out
   380                 return ''.join(buf)
   380                 return b''.join(buf)
   381 
   381 
   382 
   382 
   383 class _GzipCompressedStreamReader(_CompressedStreamReader):
   383 class _GzipCompressedStreamReader(_CompressedStreamReader):
   384     def __init__(self, fh):
   384     def __init__(self, fh):
   385         super(_GzipCompressedStreamReader, self).__init__(fh)
   385         super(_GzipCompressedStreamReader, self).__init__(fh)
   389         newbuf = self._decompobj.decompress(chunk)
   389         newbuf = self._decompobj.decompress(chunk)
   390         if newbuf:
   390         if newbuf:
   391             self._pending.append(newbuf)
   391             self._pending.append(newbuf)
   392         d = self._decompobj.copy()
   392         d = self._decompobj.copy()
   393         try:
   393         try:
   394             d.decompress('x')
   394             d.decompress(b'x')
   395             d.flush()
   395             d.flush()
   396             if d.unused_data == 'x':
   396             if d.unused_data == b'x':
   397                 self._eof = True
   397                 self._eof = True
   398         except zlib.error:
   398         except zlib.error:
   399             pass
   399             pass
   400 
   400 
   401 
   401 
   408         newbuf = self._decompobj.decompress(chunk)
   408         newbuf = self._decompobj.decompress(chunk)
   409         if newbuf:
   409         if newbuf:
   410             self._pending.append(newbuf)
   410             self._pending.append(newbuf)
   411         try:
   411         try:
   412             while True:
   412             while True:
   413                 newbuf = self._decompobj.decompress('')
   413                 newbuf = self._decompobj.decompress(b'')
   414                 if newbuf:
   414                 if newbuf:
   415                     self._pending.append(newbuf)
   415                     self._pending.append(newbuf)
   416                 else:
   416                 else:
   417                     break
   417                     break
   418         except EOFError:
   418         except EOFError:
   420 
   420 
   421 
   421 
   422 class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader):
   422 class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader):
   423     def __init__(self, fh):
   423     def __init__(self, fh):
   424         super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh)
   424         super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh)
   425         newbuf = self._decompobj.decompress('BZ')
   425         newbuf = self._decompobj.decompress(b'BZ')
   426         if newbuf:
   426         if newbuf:
   427             self._pending.append(newbuf)
   427             self._pending.append(newbuf)
   428 
   428 
   429 
   429 
   430 class _ZstdCompressedStreamReader(_CompressedStreamReader):
   430 class _ZstdCompressedStreamReader(_CompressedStreamReader):
   437         newbuf = self._decompobj.decompress(chunk)
   437         newbuf = self._decompobj.decompress(chunk)
   438         if newbuf:
   438         if newbuf:
   439             self._pending.append(newbuf)
   439             self._pending.append(newbuf)
   440         try:
   440         try:
   441             while True:
   441             while True:
   442                 newbuf = self._decompobj.decompress('')
   442                 newbuf = self._decompobj.decompress(b'')
   443                 if newbuf:
   443                 if newbuf:
   444                     self._pending.append(newbuf)
   444                     self._pending.append(newbuf)
   445                 else:
   445                 else:
   446                     break
   446                     break
   447         except self._zstd.ZstdError:
   447         except self._zstd.ZstdError:
   448             self._eof = True
   448             self._eof = True
   449 
   449 
   450 
   450 
   451 class _zlibengine(compressionengine):
   451 class _zlibengine(compressionengine):
   452     def name(self):
   452     def name(self):
   453         return 'zlib'
   453         return b'zlib'
   454 
   454 
   455     def bundletype(self):
   455     def bundletype(self):
   456         """zlib compression using the DEFLATE algorithm.
   456         """zlib compression using the DEFLATE algorithm.
   457 
   457 
   458         All Mercurial clients should support this format. The compression
   458         All Mercurial clients should support this format. The compression
   459         algorithm strikes a reasonable balance between compression ratio
   459         algorithm strikes a reasonable balance between compression ratio
   460         and size.
   460         and size.
   461         """
   461         """
   462         return 'gzip', 'GZ'
   462         return b'gzip', b'GZ'
   463 
   463 
   464     def wireprotosupport(self):
   464     def wireprotosupport(self):
   465         return compewireprotosupport('zlib', 20, 20)
   465         return compewireprotosupport(b'zlib', 20, 20)
   466 
   466 
   467     def revlogheader(self):
   467     def revlogheader(self):
   468         return 'x'
   468         return b'x'
   469 
   469 
   470     def compressstream(self, it, opts=None):
   470     def compressstream(self, it, opts=None):
   471         opts = opts or {}
   471         opts = opts or {}
   472 
   472 
   473         z = zlib.compressobj(opts.get('level', -1))
   473         z = zlib.compressobj(opts.get(b'level', -1))
   474         for chunk in it:
   474         for chunk in it:
   475             data = z.compress(chunk)
   475             data = z.compress(chunk)
   476             # Not all calls to compress emit data. It is cheaper to inspect
   476             # Not all calls to compress emit data. It is cheaper to inspect
   477             # here than to feed empty chunks through generator.
   477             # here than to feed empty chunks through generator.
   478             if data:
   478             if data:
   519                     parts.append(z.compress(data[pos:pos2]))
   519                     parts.append(z.compress(data[pos:pos2]))
   520                     pos = pos2
   520                     pos = pos2
   521                 parts.append(z.flush())
   521                 parts.append(z.flush())
   522 
   522 
   523                 if sum(map(len, parts)) < insize:
   523                 if sum(map(len, parts)) < insize:
   524                     return ''.join(parts)
   524                     return b''.join(parts)
   525                 return None
   525                 return None
   526 
   526 
   527         def decompress(self, data):
   527         def decompress(self, data):
   528             try:
   528             try:
   529                 return zlib.decompress(data)
   529                 return zlib.decompress(data)
   530             except zlib.error as e:
   530             except zlib.error as e:
   531                 raise error.StorageError(
   531                 raise error.StorageError(
   532                     _('revlog decompress error: %s')
   532                     _(b'revlog decompress error: %s')
   533                     % stringutil.forcebytestr(e)
   533                     % stringutil.forcebytestr(e)
   534                 )
   534                 )
   535 
   535 
   536     def revlogcompressor(self, opts=None):
   536     def revlogcompressor(self, opts=None):
   537         level = None
   537         level = None
   538         if opts is not None:
   538         if opts is not None:
   539             level = opts.get('zlib.level')
   539             level = opts.get(b'zlib.level')
   540         return self.zlibrevlogcompressor(level)
   540         return self.zlibrevlogcompressor(level)
   541 
   541 
   542 
   542 
   543 compengines.register(_zlibengine())
   543 compengines.register(_zlibengine())
   544 
   544 
   545 
   545 
   546 class _bz2engine(compressionengine):
   546 class _bz2engine(compressionengine):
   547     def name(self):
   547     def name(self):
   548         return 'bz2'
   548         return b'bz2'
   549 
   549 
   550     def bundletype(self):
   550     def bundletype(self):
   551         """An algorithm that produces smaller bundles than ``gzip``.
   551         """An algorithm that produces smaller bundles than ``gzip``.
   552 
   552 
   553         All Mercurial clients should support this format.
   553         All Mercurial clients should support this format.
   557         decompression.
   557         decompression.
   558 
   558 
   559         If available, the ``zstd`` engine can yield similar or better
   559         If available, the ``zstd`` engine can yield similar or better
   560         compression at much higher speeds.
   560         compression at much higher speeds.
   561         """
   561         """
   562         return 'bzip2', 'BZ'
   562         return b'bzip2', b'BZ'
   563 
   563 
   564     # We declare a protocol name but don't advertise by default because
   564     # We declare a protocol name but don't advertise by default because
   565     # it is slow.
   565     # it is slow.
   566     def wireprotosupport(self):
   566     def wireprotosupport(self):
   567         return compewireprotosupport('bzip2', 0, 0)
   567         return compewireprotosupport(b'bzip2', 0, 0)
   568 
   568 
   569     def compressstream(self, it, opts=None):
   569     def compressstream(self, it, opts=None):
   570         opts = opts or {}
   570         opts = opts or {}
   571         z = bz2.BZ2Compressor(opts.get('level', 9))
   571         z = bz2.BZ2Compressor(opts.get(b'level', 9))
   572         for chunk in it:
   572         for chunk in it:
   573             data = z.compress(chunk)
   573             data = z.compress(chunk)
   574             if data:
   574             if data:
   575                 yield data
   575                 yield data
   576 
   576 
   583 compengines.register(_bz2engine())
   583 compengines.register(_bz2engine())
   584 
   584 
   585 
   585 
   586 class _truncatedbz2engine(compressionengine):
   586 class _truncatedbz2engine(compressionengine):
   587     def name(self):
   587     def name(self):
   588         return 'bz2truncated'
   588         return b'bz2truncated'
   589 
   589 
   590     def bundletype(self):
   590     def bundletype(self):
   591         return None, '_truncatedBZ'
   591         return None, b'_truncatedBZ'
   592 
   592 
   593     # We don't implement compressstream because it is hackily handled elsewhere.
   593     # We don't implement compressstream because it is hackily handled elsewhere.
   594 
   594 
   595     def decompressorreader(self, fh):
   595     def decompressorreader(self, fh):
   596         return _TruncatedBZ2CompressedStreamReader(fh)
   596         return _TruncatedBZ2CompressedStreamReader(fh)
   599 compengines.register(_truncatedbz2engine())
   599 compengines.register(_truncatedbz2engine())
   600 
   600 
   601 
   601 
   602 class _noopengine(compressionengine):
   602 class _noopengine(compressionengine):
   603     def name(self):
   603     def name(self):
   604         return 'none'
   604         return b'none'
   605 
   605 
   606     def bundletype(self):
   606     def bundletype(self):
   607         """No compression is performed.
   607         """No compression is performed.
   608 
   608 
   609         Use this compression engine to explicitly disable compression.
   609         Use this compression engine to explicitly disable compression.
   610         """
   610         """
   611         return 'none', 'UN'
   611         return b'none', b'UN'
   612 
   612 
   613     # Clients always support uncompressed payloads. Servers don't because
   613     # Clients always support uncompressed payloads. Servers don't because
   614     # unless you are on a fast network, uncompressed payloads can easily
   614     # unless you are on a fast network, uncompressed payloads can easily
   615     # saturate your network pipe.
   615     # saturate your network pipe.
   616     def wireprotosupport(self):
   616     def wireprotosupport(self):
   617         return compewireprotosupport('none', 0, 10)
   617         return compewireprotosupport(b'none', 0, 10)
   618 
   618 
   619     # We don't implement revlogheader because it is handled specially
   619     # We don't implement revlogheader because it is handled specially
   620     # in the revlog class.
   620     # in the revlog class.
   621 
   621 
   622     def compressstream(self, it, opts=None):
   622     def compressstream(self, it, opts=None):
   636 compengines.register(_noopengine())
   636 compengines.register(_noopengine())
   637 
   637 
   638 
   638 
   639 class _zstdengine(compressionengine):
   639 class _zstdengine(compressionengine):
   640     def name(self):
   640     def name(self):
   641         return 'zstd'
   641         return b'zstd'
   642 
   642 
   643     @propertycache
   643     @propertycache
   644     def _module(self):
   644     def _module(self):
   645         # Not all installs have the zstd module available. So defer importing
   645         # Not all installs have the zstd module available. So defer importing
   646         # until first access.
   646         # until first access.
   666         compression than ``bzip2`` while operating at much higher speeds.
   666         compression than ``bzip2`` while operating at much higher speeds.
   667 
   667 
   668         If this engine is available and backwards compatibility is not a
   668         If this engine is available and backwards compatibility is not a
   669         concern, it is likely the best available engine.
   669         concern, it is likely the best available engine.
   670         """
   670         """
   671         return 'zstd', 'ZS'
   671         return b'zstd', b'ZS'
   672 
   672 
   673     def wireprotosupport(self):
   673     def wireprotosupport(self):
   674         return compewireprotosupport('zstd', 50, 50)
   674         return compewireprotosupport(b'zstd', 50, 50)
   675 
   675 
   676     def revlogheader(self):
   676     def revlogheader(self):
   677         return '\x28'
   677         return b'\x28'
   678 
   678 
   679     def compressstream(self, it, opts=None):
   679     def compressstream(self, it, opts=None):
   680         opts = opts or {}
   680         opts = opts or {}
   681         # zstd level 3 is almost always significantly faster than zlib
   681         # zstd level 3 is almost always significantly faster than zlib
   682         # while providing no worse compression. It strikes a good balance
   682         # while providing no worse compression. It strikes a good balance
   683         # between speed and compression.
   683         # between speed and compression.
   684         level = opts.get('level', 3)
   684         level = opts.get(b'level', 3)
   685 
   685 
   686         zstd = self._module
   686         zstd = self._module
   687         z = zstd.ZstdCompressor(level=level).compressobj()
   687         z = zstd.ZstdCompressor(level=level).compressobj()
   688         for chunk in it:
   688         for chunk in it:
   689             data = z.compress(chunk)
   689             data = z.compress(chunk)
   730                         chunks.append(chunk)
   730                         chunks.append(chunk)
   731                     pos = pos2
   731                     pos = pos2
   732                 chunks.append(z.flush())
   732                 chunks.append(z.flush())
   733 
   733 
   734                 if sum(map(len, chunks)) < insize:
   734                 if sum(map(len, chunks)) < insize:
   735                     return ''.join(chunks)
   735                     return b''.join(chunks)
   736                 return None
   736                 return None
   737 
   737 
   738         def decompress(self, data):
   738         def decompress(self, data):
   739             insize = len(data)
   739             insize = len(data)
   740 
   740 
   750                     if chunk:
   750                     if chunk:
   751                         chunks.append(chunk)
   751                         chunks.append(chunk)
   752                     pos = pos2
   752                     pos = pos2
   753                 # Frame should be exhausted, so no finish() API.
   753                 # Frame should be exhausted, so no finish() API.
   754 
   754 
   755                 return ''.join(chunks)
   755                 return b''.join(chunks)
   756             except Exception as e:
   756             except Exception as e:
   757                 raise error.StorageError(
   757                 raise error.StorageError(
   758                     _('revlog decompress error: %s')
   758                     _(b'revlog decompress error: %s')
   759                     % stringutil.forcebytestr(e)
   759                     % stringutil.forcebytestr(e)
   760                 )
   760                 )
   761 
   761 
   762     def revlogcompressor(self, opts=None):
   762     def revlogcompressor(self, opts=None):
   763         opts = opts or {}
   763         opts = opts or {}
   764         level = opts.get('zstd.level')
   764         level = opts.get(b'zstd.level')
   765         if level is None:
   765         if level is None:
   766             level = opts.get('level')
   766             level = opts.get(b'level')
   767         if level is None:
   767         if level is None:
   768             level = 3
   768             level = 3
   769         return self.zstdrevlogcompressor(self._module, level=level)
   769         return self.zstdrevlogcompressor(self._module, level=level)
   770 
   770 
   771 
   771