comparison mercurial/utils/compression.py @ 42041:3e47d1ec9da5

util: extract compression code in `mercurial.utils.compression` The code seems large enough to be worth extracting. This is similar to what was done for various module in `mercurial/utils/`. Since None of the compression logic takes a `ui` objet, issuing deprecation warning is tricky. Luckly the logic does not seems to have many external users.
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Wed, 27 Mar 2019 16:45:14 +0100
parents mercurial/util.py@7f63ec6969f3
children aaececb4b066
comparison
equal deleted inserted replaced
42040:02fa567f8a3c 42041:3e47d1ec9da5
1 # compression.py - Mercurial utility functions for compression
2 #
3 # This software may be used and distributed according to the terms of the
4 # GNU General Public License version 2 or any later version.
5
6
7 from __future__ import absolute_import, print_function
8
9 import bz2
10 import collections
11 import zlib
12
13 from .. import (
14 error,
15 i18n,
16 pycompat,
17 )
18 from . import (
19 stringutil,
20 )
21
22 safehasattr = pycompat.safehasattr
23
24
25 _ = i18n._
26
27 # compression code
28
29 SERVERROLE = 'server'
30 CLIENTROLE = 'client'
31
32 compewireprotosupport = collections.namedtuple(r'compenginewireprotosupport',
33 (r'name', r'serverpriority',
34 r'clientpriority'))
35
36 class propertycache(object):
37 def __init__(self, func):
38 self.func = func
39 self.name = func.__name__
40 def __get__(self, obj, type=None):
41 result = self.func(obj)
42 self.cachevalue(obj, result)
43 return result
44
45 def cachevalue(self, obj, value):
46 # __dict__ assignment required to bypass __setattr__ (eg: repoview)
47 obj.__dict__[self.name] = value
48
49 class compressormanager(object):
50 """Holds registrations of various compression engines.
51
52 This class essentially abstracts the differences between compression
53 engines to allow new compression formats to be added easily, possibly from
54 extensions.
55
56 Compressors are registered against the global instance by calling its
57 ``register()`` method.
58 """
59 def __init__(self):
60 self._engines = {}
61 # Bundle spec human name to engine name.
62 self._bundlenames = {}
63 # Internal bundle identifier to engine name.
64 self._bundletypes = {}
65 # Revlog header to engine name.
66 self._revlogheaders = {}
67 # Wire proto identifier to engine name.
68 self._wiretypes = {}
69
70 def __getitem__(self, key):
71 return self._engines[key]
72
73 def __contains__(self, key):
74 return key in self._engines
75
76 def __iter__(self):
77 return iter(self._engines.keys())
78
79 def register(self, engine):
80 """Register a compression engine with the manager.
81
82 The argument must be a ``compressionengine`` instance.
83 """
84 if not isinstance(engine, compressionengine):
85 raise ValueError(_('argument must be a compressionengine'))
86
87 name = engine.name()
88
89 if name in self._engines:
90 raise error.Abort(_('compression engine %s already registered') %
91 name)
92
93 bundleinfo = engine.bundletype()
94 if bundleinfo:
95 bundlename, bundletype = bundleinfo
96
97 if bundlename in self._bundlenames:
98 raise error.Abort(_('bundle name %s already registered') %
99 bundlename)
100 if bundletype in self._bundletypes:
101 raise error.Abort(_('bundle type %s already registered by %s') %
102 (bundletype, self._bundletypes[bundletype]))
103
104 # No external facing name declared.
105 if bundlename:
106 self._bundlenames[bundlename] = name
107
108 self._bundletypes[bundletype] = name
109
110 wiresupport = engine.wireprotosupport()
111 if wiresupport:
112 wiretype = wiresupport.name
113 if wiretype in self._wiretypes:
114 raise error.Abort(_('wire protocol compression %s already '
115 'registered by %s') %
116 (wiretype, self._wiretypes[wiretype]))
117
118 self._wiretypes[wiretype] = name
119
120 revlogheader = engine.revlogheader()
121 if revlogheader and revlogheader in self._revlogheaders:
122 raise error.Abort(_('revlog header %s already registered by %s') %
123 (revlogheader, self._revlogheaders[revlogheader]))
124
125 if revlogheader:
126 self._revlogheaders[revlogheader] = name
127
128 self._engines[name] = engine
129
130 @property
131 def supportedbundlenames(self):
132 return set(self._bundlenames.keys())
133
134 @property
135 def supportedbundletypes(self):
136 return set(self._bundletypes.keys())
137
138 def forbundlename(self, bundlename):
139 """Obtain a compression engine registered to a bundle name.
140
141 Will raise KeyError if the bundle type isn't registered.
142
143 Will abort if the engine is known but not available.
144 """
145 engine = self._engines[self._bundlenames[bundlename]]
146 if not engine.available():
147 raise error.Abort(_('compression engine %s could not be loaded') %
148 engine.name())
149 return engine
150
151 def forbundletype(self, bundletype):
152 """Obtain a compression engine registered to a bundle type.
153
154 Will raise KeyError if the bundle type isn't registered.
155
156 Will abort if the engine is known but not available.
157 """
158 engine = self._engines[self._bundletypes[bundletype]]
159 if not engine.available():
160 raise error.Abort(_('compression engine %s could not be loaded') %
161 engine.name())
162 return engine
163
164 def supportedwireengines(self, role, onlyavailable=True):
165 """Obtain compression engines that support the wire protocol.
166
167 Returns a list of engines in prioritized order, most desired first.
168
169 If ``onlyavailable`` is set, filter out engines that can't be
170 loaded.
171 """
172 assert role in (SERVERROLE, CLIENTROLE)
173
174 attr = 'serverpriority' if role == SERVERROLE else 'clientpriority'
175
176 engines = [self._engines[e] for e in self._wiretypes.values()]
177 if onlyavailable:
178 engines = [e for e in engines if e.available()]
179
180 def getkey(e):
181 # Sort first by priority, highest first. In case of tie, sort
182 # alphabetically. This is arbitrary, but ensures output is
183 # stable.
184 w = e.wireprotosupport()
185 return -1 * getattr(w, attr), w.name
186
187 return list(sorted(engines, key=getkey))
188
189 def forwiretype(self, wiretype):
190 engine = self._engines[self._wiretypes[wiretype]]
191 if not engine.available():
192 raise error.Abort(_('compression engine %s could not be loaded') %
193 engine.name())
194 return engine
195
196 def forrevlogheader(self, header):
197 """Obtain a compression engine registered to a revlog header.
198
199 Will raise KeyError if the revlog header value isn't registered.
200 """
201 return self._engines[self._revlogheaders[header]]
202
203 compengines = compressormanager()
204
205 class compressionengine(object):
206 """Base class for compression engines.
207
208 Compression engines must implement the interface defined by this class.
209 """
210 def name(self):
211 """Returns the name of the compression engine.
212
213 This is the key the engine is registered under.
214
215 This method must be implemented.
216 """
217 raise NotImplementedError()
218
219 def available(self):
220 """Whether the compression engine is available.
221
222 The intent of this method is to allow optional compression engines
223 that may not be available in all installations (such as engines relying
224 on C extensions that may not be present).
225 """
226 return True
227
228 def bundletype(self):
229 """Describes bundle identifiers for this engine.
230
231 If this compression engine isn't supported for bundles, returns None.
232
233 If this engine can be used for bundles, returns a 2-tuple of strings of
234 the user-facing "bundle spec" compression name and an internal
235 identifier used to denote the compression format within bundles. To
236 exclude the name from external usage, set the first element to ``None``.
237
238 If bundle compression is supported, the class must also implement
239 ``compressstream`` and `decompressorreader``.
240
241 The docstring of this method is used in the help system to tell users
242 about this engine.
243 """
244 return None
245
246 def wireprotosupport(self):
247 """Declare support for this compression format on the wire protocol.
248
249 If this compression engine isn't supported for compressing wire
250 protocol payloads, returns None.
251
252 Otherwise, returns ``compenginewireprotosupport`` with the following
253 fields:
254
255 * String format identifier
256 * Integer priority for the server
257 * Integer priority for the client
258
259 The integer priorities are used to order the advertisement of format
260 support by server and client. The highest integer is advertised
261 first. Integers with non-positive values aren't advertised.
262
263 The priority values are somewhat arbitrary and only used for default
264 ordering. The relative order can be changed via config options.
265
266 If wire protocol compression is supported, the class must also implement
267 ``compressstream`` and ``decompressorreader``.
268 """
269 return None
270
271 def revlogheader(self):
272 """Header added to revlog chunks that identifies this engine.
273
274 If this engine can be used to compress revlogs, this method should
275 return the bytes used to identify chunks compressed with this engine.
276 Else, the method should return ``None`` to indicate it does not
277 participate in revlog compression.
278 """
279 return None
280
281 def compressstream(self, it, opts=None):
282 """Compress an iterator of chunks.
283
284 The method receives an iterator (ideally a generator) of chunks of
285 bytes to be compressed. It returns an iterator (ideally a generator)
286 of bytes of chunks representing the compressed output.
287
288 Optionally accepts an argument defining how to perform compression.
289 Each engine treats this argument differently.
290 """
291 raise NotImplementedError()
292
293 def decompressorreader(self, fh):
294 """Perform decompression on a file object.
295
296 Argument is an object with a ``read(size)`` method that returns
297 compressed data. Return value is an object with a ``read(size)`` that
298 returns uncompressed data.
299 """
300 raise NotImplementedError()
301
302 def revlogcompressor(self, opts=None):
303 """Obtain an object that can be used to compress revlog entries.
304
305 The object has a ``compress(data)`` method that compresses binary
306 data. This method returns compressed binary data or ``None`` if
307 the data could not be compressed (too small, not compressible, etc).
308 The returned data should have a header uniquely identifying this
309 compression format so decompression can be routed to this engine.
310 This header should be identified by the ``revlogheader()`` return
311 value.
312
313 The object has a ``decompress(data)`` method that decompresses
314 data. The method will only be called if ``data`` begins with
315 ``revlogheader()``. The method should return the raw, uncompressed
316 data or raise a ``StorageError``.
317
318 The object is reusable but is not thread safe.
319 """
320 raise NotImplementedError()
321
322 class _CompressedStreamReader(object):
323 def __init__(self, fh):
324 if safehasattr(fh, 'unbufferedread'):
325 self._reader = fh.unbufferedread
326 else:
327 self._reader = fh.read
328 self._pending = []
329 self._pos = 0
330 self._eof = False
331
332 def _decompress(self, chunk):
333 raise NotImplementedError()
334
335 def read(self, l):
336 buf = []
337 while True:
338 while self._pending:
339 if len(self._pending[0]) > l + self._pos:
340 newbuf = self._pending[0]
341 buf.append(newbuf[self._pos:self._pos + l])
342 self._pos += l
343 return ''.join(buf)
344
345 newbuf = self._pending.pop(0)
346 if self._pos:
347 buf.append(newbuf[self._pos:])
348 l -= len(newbuf) - self._pos
349 else:
350 buf.append(newbuf)
351 l -= len(newbuf)
352 self._pos = 0
353
354 if self._eof:
355 return ''.join(buf)
356 chunk = self._reader(65536)
357 self._decompress(chunk)
358 if not chunk and not self._pending and not self._eof:
359 # No progress and no new data, bail out
360 return ''.join(buf)
361
362 class _GzipCompressedStreamReader(_CompressedStreamReader):
363 def __init__(self, fh):
364 super(_GzipCompressedStreamReader, self).__init__(fh)
365 self._decompobj = zlib.decompressobj()
366 def _decompress(self, chunk):
367 newbuf = self._decompobj.decompress(chunk)
368 if newbuf:
369 self._pending.append(newbuf)
370 d = self._decompobj.copy()
371 try:
372 d.decompress('x')
373 d.flush()
374 if d.unused_data == 'x':
375 self._eof = True
376 except zlib.error:
377 pass
378
379 class _BZ2CompressedStreamReader(_CompressedStreamReader):
380 def __init__(self, fh):
381 super(_BZ2CompressedStreamReader, self).__init__(fh)
382 self._decompobj = bz2.BZ2Decompressor()
383 def _decompress(self, chunk):
384 newbuf = self._decompobj.decompress(chunk)
385 if newbuf:
386 self._pending.append(newbuf)
387 try:
388 while True:
389 newbuf = self._decompobj.decompress('')
390 if newbuf:
391 self._pending.append(newbuf)
392 else:
393 break
394 except EOFError:
395 self._eof = True
396
397 class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader):
398 def __init__(self, fh):
399 super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh)
400 newbuf = self._decompobj.decompress('BZ')
401 if newbuf:
402 self._pending.append(newbuf)
403
404 class _ZstdCompressedStreamReader(_CompressedStreamReader):
405 def __init__(self, fh, zstd):
406 super(_ZstdCompressedStreamReader, self).__init__(fh)
407 self._zstd = zstd
408 self._decompobj = zstd.ZstdDecompressor().decompressobj()
409 def _decompress(self, chunk):
410 newbuf = self._decompobj.decompress(chunk)
411 if newbuf:
412 self._pending.append(newbuf)
413 try:
414 while True:
415 newbuf = self._decompobj.decompress('')
416 if newbuf:
417 self._pending.append(newbuf)
418 else:
419 break
420 except self._zstd.ZstdError:
421 self._eof = True
422
423 class _zlibengine(compressionengine):
424 def name(self):
425 return 'zlib'
426
427 def bundletype(self):
428 """zlib compression using the DEFLATE algorithm.
429
430 All Mercurial clients should support this format. The compression
431 algorithm strikes a reasonable balance between compression ratio
432 and size.
433 """
434 return 'gzip', 'GZ'
435
436 def wireprotosupport(self):
437 return compewireprotosupport('zlib', 20, 20)
438
439 def revlogheader(self):
440 return 'x'
441
442 def compressstream(self, it, opts=None):
443 opts = opts or {}
444
445 z = zlib.compressobj(opts.get('level', -1))
446 for chunk in it:
447 data = z.compress(chunk)
448 # Not all calls to compress emit data. It is cheaper to inspect
449 # here than to feed empty chunks through generator.
450 if data:
451 yield data
452
453 yield z.flush()
454
455 def decompressorreader(self, fh):
456 return _GzipCompressedStreamReader(fh)
457
458 class zlibrevlogcompressor(object):
459 def compress(self, data):
460 insize = len(data)
461 # Caller handles empty input case.
462 assert insize > 0
463
464 if insize < 44:
465 return None
466
467 elif insize <= 1000000:
468 compressed = zlib.compress(data)
469 if len(compressed) < insize:
470 return compressed
471 return None
472
473 # zlib makes an internal copy of the input buffer, doubling
474 # memory usage for large inputs. So do streaming compression
475 # on large inputs.
476 else:
477 z = zlib.compressobj()
478 parts = []
479 pos = 0
480 while pos < insize:
481 pos2 = pos + 2**20
482 parts.append(z.compress(data[pos:pos2]))
483 pos = pos2
484 parts.append(z.flush())
485
486 if sum(map(len, parts)) < insize:
487 return ''.join(parts)
488 return None
489
490 def decompress(self, data):
491 try:
492 return zlib.decompress(data)
493 except zlib.error as e:
494 raise error.StorageError(_('revlog decompress error: %s') %
495 stringutil.forcebytestr(e))
496
497 def revlogcompressor(self, opts=None):
498 return self.zlibrevlogcompressor()
499
500 compengines.register(_zlibengine())
501
502 class _bz2engine(compressionengine):
503 def name(self):
504 return 'bz2'
505
506 def bundletype(self):
507 """An algorithm that produces smaller bundles than ``gzip``.
508
509 All Mercurial clients should support this format.
510
511 This engine will likely produce smaller bundles than ``gzip`` but
512 will be significantly slower, both during compression and
513 decompression.
514
515 If available, the ``zstd`` engine can yield similar or better
516 compression at much higher speeds.
517 """
518 return 'bzip2', 'BZ'
519
520 # We declare a protocol name but don't advertise by default because
521 # it is slow.
522 def wireprotosupport(self):
523 return compewireprotosupport('bzip2', 0, 0)
524
525 def compressstream(self, it, opts=None):
526 opts = opts or {}
527 z = bz2.BZ2Compressor(opts.get('level', 9))
528 for chunk in it:
529 data = z.compress(chunk)
530 if data:
531 yield data
532
533 yield z.flush()
534
535 def decompressorreader(self, fh):
536 return _BZ2CompressedStreamReader(fh)
537
538 compengines.register(_bz2engine())
539
540 class _truncatedbz2engine(compressionengine):
541 def name(self):
542 return 'bz2truncated'
543
544 def bundletype(self):
545 return None, '_truncatedBZ'
546
547 # We don't implement compressstream because it is hackily handled elsewhere.
548
549 def decompressorreader(self, fh):
550 return _TruncatedBZ2CompressedStreamReader(fh)
551
552 compengines.register(_truncatedbz2engine())
553
554 class _noopengine(compressionengine):
555 def name(self):
556 return 'none'
557
558 def bundletype(self):
559 """No compression is performed.
560
561 Use this compression engine to explicitly disable compression.
562 """
563 return 'none', 'UN'
564
565 # Clients always support uncompressed payloads. Servers don't because
566 # unless you are on a fast network, uncompressed payloads can easily
567 # saturate your network pipe.
568 def wireprotosupport(self):
569 return compewireprotosupport('none', 0, 10)
570
571 # We don't implement revlogheader because it is handled specially
572 # in the revlog class.
573
574 def compressstream(self, it, opts=None):
575 return it
576
577 def decompressorreader(self, fh):
578 return fh
579
580 class nooprevlogcompressor(object):
581 def compress(self, data):
582 return None
583
584 def revlogcompressor(self, opts=None):
585 return self.nooprevlogcompressor()
586
587 compengines.register(_noopengine())
588
589 class _zstdengine(compressionengine):
590 def name(self):
591 return 'zstd'
592
593 @propertycache
594 def _module(self):
595 # Not all installs have the zstd module available. So defer importing
596 # until first access.
597 try:
598 from .. import zstd
599 # Force delayed import.
600 zstd.__version__
601 return zstd
602 except ImportError:
603 return None
604
605 def available(self):
606 return bool(self._module)
607
608 def bundletype(self):
609 """A modern compression algorithm that is fast and highly flexible.
610
611 Only supported by Mercurial 4.1 and newer clients.
612
613 With the default settings, zstd compression is both faster and yields
614 better compression than ``gzip``. It also frequently yields better
615 compression than ``bzip2`` while operating at much higher speeds.
616
617 If this engine is available and backwards compatibility is not a
618 concern, it is likely the best available engine.
619 """
620 return 'zstd', 'ZS'
621
622 def wireprotosupport(self):
623 return compewireprotosupport('zstd', 50, 50)
624
625 def revlogheader(self):
626 return '\x28'
627
628 def compressstream(self, it, opts=None):
629 opts = opts or {}
630 # zstd level 3 is almost always significantly faster than zlib
631 # while providing no worse compression. It strikes a good balance
632 # between speed and compression.
633 level = opts.get('level', 3)
634
635 zstd = self._module
636 z = zstd.ZstdCompressor(level=level).compressobj()
637 for chunk in it:
638 data = z.compress(chunk)
639 if data:
640 yield data
641
642 yield z.flush()
643
644 def decompressorreader(self, fh):
645 return _ZstdCompressedStreamReader(fh, self._module)
646
647 class zstdrevlogcompressor(object):
648 def __init__(self, zstd, level=3):
649 # TODO consider omitting frame magic to save 4 bytes.
650 # This writes content sizes into the frame header. That is
651 # extra storage. But it allows a correct size memory allocation
652 # to hold the result.
653 self._cctx = zstd.ZstdCompressor(level=level)
654 self._dctx = zstd.ZstdDecompressor()
655 self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE
656 self._decompinsize = zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
657
658 def compress(self, data):
659 insize = len(data)
660 # Caller handles empty input case.
661 assert insize > 0
662
663 if insize < 50:
664 return None
665
666 elif insize <= 1000000:
667 compressed = self._cctx.compress(data)
668 if len(compressed) < insize:
669 return compressed
670 return None
671 else:
672 z = self._cctx.compressobj()
673 chunks = []
674 pos = 0
675 while pos < insize:
676 pos2 = pos + self._compinsize
677 chunk = z.compress(data[pos:pos2])
678 if chunk:
679 chunks.append(chunk)
680 pos = pos2
681 chunks.append(z.flush())
682
683 if sum(map(len, chunks)) < insize:
684 return ''.join(chunks)
685 return None
686
687 def decompress(self, data):
688 insize = len(data)
689
690 try:
691 # This was measured to be faster than other streaming
692 # decompressors.
693 dobj = self._dctx.decompressobj()
694 chunks = []
695 pos = 0
696 while pos < insize:
697 pos2 = pos + self._decompinsize
698 chunk = dobj.decompress(data[pos:pos2])
699 if chunk:
700 chunks.append(chunk)
701 pos = pos2
702 # Frame should be exhausted, so no finish() API.
703
704 return ''.join(chunks)
705 except Exception as e:
706 raise error.StorageError(_('revlog decompress error: %s') %
707 stringutil.forcebytestr(e))
708
709 def revlogcompressor(self, opts=None):
710 opts = opts or {}
711 return self.zstdrevlogcompressor(self._module,
712 level=opts.get('level', 3))
713
714 compengines.register(_zstdengine())
715
716 def bundlecompressiontopics():
717 """Obtains a list of available bundle compressions for use in help."""
718 # help.makeitemsdocs() expects a dict of names to items with a .__doc__.
719 items = {}
720
721 # We need to format the docstring. So use a dummy object/type to hold it
722 # rather than mutating the original.
723 class docobject(object):
724 pass
725
726 for name in compengines:
727 engine = compengines[name]
728
729 if not engine.available():
730 continue
731
732 bt = engine.bundletype()
733 if not bt or not bt[0]:
734 continue
735
736 doc = b'``%s``\n %s' % (bt[0], pycompat.getdoc(engine.bundletype))
737
738 value = docobject()
739 value.__doc__ = pycompat.sysstr(doc)
740 value._origdoc = engine.bundletype.__doc__
741 value._origfunc = engine.bundletype
742
743 items[bt[0]] = value
744
745 return items
746
747 i18nfunctions = bundlecompressiontopics().values()