comparison mercurial/utils/compression.py @ 43076:2372284d9457

formatting: blacken the codebase This is using my patch to black (https://github.com/psf/black/pull/826) so we don't un-wrap collection literals. Done with: hg files 'set:**.py - mercurial/thirdparty/** - "contrib/python-zstandard/**"' | xargs black -S # skip-blame mass-reformatting only # no-check-commit reformats foo_bar functions Differential Revision: https://phab.mercurial-scm.org/D6971
author Augie Fackler <augie@google.com>
date Sun, 06 Oct 2019 09:45:02 -0400
parents bb271ec2fbfb
children 687b865b95ad
comparison
equal deleted inserted replaced
43075:57875cf423c9 43076:2372284d9457
13 from .. import ( 13 from .. import (
14 error, 14 error,
15 i18n, 15 i18n,
16 pycompat, 16 pycompat,
17 ) 17 )
18 from . import ( 18 from . import stringutil
19 stringutil,
20 )
21 19
22 safehasattr = pycompat.safehasattr 20 safehasattr = pycompat.safehasattr
23 21
24 22
25 _ = i18n._ 23 _ = i18n._
27 # compression code 25 # compression code
28 26
29 SERVERROLE = 'server' 27 SERVERROLE = 'server'
30 CLIENTROLE = 'client' 28 CLIENTROLE = 'client'
31 29
32 compewireprotosupport = collections.namedtuple(r'compenginewireprotosupport', 30 compewireprotosupport = collections.namedtuple(
33 (r'name', r'serverpriority', 31 r'compenginewireprotosupport',
34 r'clientpriority')) 32 (r'name', r'serverpriority', r'clientpriority'),
33 )
34
35 35
36 class propertycache(object): 36 class propertycache(object):
37 def __init__(self, func): 37 def __init__(self, func):
38 self.func = func 38 self.func = func
39 self.name = func.__name__ 39 self.name = func.__name__
40
40 def __get__(self, obj, type=None): 41 def __get__(self, obj, type=None):
41 result = self.func(obj) 42 result = self.func(obj)
42 self.cachevalue(obj, result) 43 self.cachevalue(obj, result)
43 return result 44 return result
44 45
45 def cachevalue(self, obj, value): 46 def cachevalue(self, obj, value):
46 # __dict__ assignment required to bypass __setattr__ (eg: repoview) 47 # __dict__ assignment required to bypass __setattr__ (eg: repoview)
47 obj.__dict__[self.name] = value 48 obj.__dict__[self.name] = value
48 49
50
49 class compressormanager(object): 51 class compressormanager(object):
50 """Holds registrations of various compression engines. 52 """Holds registrations of various compression engines.
51 53
52 This class essentially abstracts the differences between compression 54 This class essentially abstracts the differences between compression
53 engines to allow new compression formats to be added easily, possibly from 55 engines to allow new compression formats to be added easily, possibly from
54 extensions. 56 extensions.
55 57
56 Compressors are registered against the global instance by calling its 58 Compressors are registered against the global instance by calling its
57 ``register()`` method. 59 ``register()`` method.
58 """ 60 """
61
59 def __init__(self): 62 def __init__(self):
60 self._engines = {} 63 self._engines = {}
61 # Bundle spec human name to engine name. 64 # Bundle spec human name to engine name.
62 self._bundlenames = {} 65 self._bundlenames = {}
63 # Internal bundle identifier to engine name. 66 # Internal bundle identifier to engine name.
85 raise ValueError(_('argument must be a compressionengine')) 88 raise ValueError(_('argument must be a compressionengine'))
86 89
87 name = engine.name() 90 name = engine.name()
88 91
89 if name in self._engines: 92 if name in self._engines:
90 raise error.Abort(_('compression engine %s already registered') % 93 raise error.Abort(
91 name) 94 _('compression engine %s already registered') % name
95 )
92 96
93 bundleinfo = engine.bundletype() 97 bundleinfo = engine.bundletype()
94 if bundleinfo: 98 if bundleinfo:
95 bundlename, bundletype = bundleinfo 99 bundlename, bundletype = bundleinfo
96 100
97 if bundlename in self._bundlenames: 101 if bundlename in self._bundlenames:
98 raise error.Abort(_('bundle name %s already registered') % 102 raise error.Abort(
99 bundlename) 103 _('bundle name %s already registered') % bundlename
104 )
100 if bundletype in self._bundletypes: 105 if bundletype in self._bundletypes:
101 raise error.Abort(_('bundle type %s already registered by %s') % 106 raise error.Abort(
102 (bundletype, self._bundletypes[bundletype])) 107 _('bundle type %s already registered by %s')
108 % (bundletype, self._bundletypes[bundletype])
109 )
103 110
104 # No external facing name declared. 111 # No external facing name declared.
105 if bundlename: 112 if bundlename:
106 self._bundlenames[bundlename] = name 113 self._bundlenames[bundlename] = name
107 114
109 116
110 wiresupport = engine.wireprotosupport() 117 wiresupport = engine.wireprotosupport()
111 if wiresupport: 118 if wiresupport:
112 wiretype = wiresupport.name 119 wiretype = wiresupport.name
113 if wiretype in self._wiretypes: 120 if wiretype in self._wiretypes:
114 raise error.Abort(_('wire protocol compression %s already ' 121 raise error.Abort(
115 'registered by %s') % 122 _(
116 (wiretype, self._wiretypes[wiretype])) 123 'wire protocol compression %s already '
124 'registered by %s'
125 )
126 % (wiretype, self._wiretypes[wiretype])
127 )
117 128
118 self._wiretypes[wiretype] = name 129 self._wiretypes[wiretype] = name
119 130
120 revlogheader = engine.revlogheader() 131 revlogheader = engine.revlogheader()
121 if revlogheader and revlogheader in self._revlogheaders: 132 if revlogheader and revlogheader in self._revlogheaders:
122 raise error.Abort(_('revlog header %s already registered by %s') % 133 raise error.Abort(
123 (revlogheader, self._revlogheaders[revlogheader])) 134 _('revlog header %s already registered by %s')
135 % (revlogheader, self._revlogheaders[revlogheader])
136 )
124 137
125 if revlogheader: 138 if revlogheader:
126 self._revlogheaders[revlogheader] = name 139 self._revlogheaders[revlogheader] = name
127 140
128 self._engines[name] = engine 141 self._engines[name] = engine
142 155
143 Will abort if the engine is known but not available. 156 Will abort if the engine is known but not available.
144 """ 157 """
145 engine = self._engines[self._bundlenames[bundlename]] 158 engine = self._engines[self._bundlenames[bundlename]]
146 if not engine.available(): 159 if not engine.available():
147 raise error.Abort(_('compression engine %s could not be loaded') % 160 raise error.Abort(
148 engine.name()) 161 _('compression engine %s could not be loaded') % engine.name()
162 )
149 return engine 163 return engine
150 164
151 def forbundletype(self, bundletype): 165 def forbundletype(self, bundletype):
152 """Obtain a compression engine registered to a bundle type. 166 """Obtain a compression engine registered to a bundle type.
153 167
155 169
156 Will abort if the engine is known but not available. 170 Will abort if the engine is known but not available.
157 """ 171 """
158 engine = self._engines[self._bundletypes[bundletype]] 172 engine = self._engines[self._bundletypes[bundletype]]
159 if not engine.available(): 173 if not engine.available():
160 raise error.Abort(_('compression engine %s could not be loaded') % 174 raise error.Abort(
161 engine.name()) 175 _('compression engine %s could not be loaded') % engine.name()
176 )
162 return engine 177 return engine
163 178
164 def supportedwireengines(self, role, onlyavailable=True): 179 def supportedwireengines(self, role, onlyavailable=True):
165 """Obtain compression engines that support the wire protocol. 180 """Obtain compression engines that support the wire protocol.
166 181
187 return list(sorted(engines, key=getkey)) 202 return list(sorted(engines, key=getkey))
188 203
189 def forwiretype(self, wiretype): 204 def forwiretype(self, wiretype):
190 engine = self._engines[self._wiretypes[wiretype]] 205 engine = self._engines[self._wiretypes[wiretype]]
191 if not engine.available(): 206 if not engine.available():
192 raise error.Abort(_('compression engine %s could not be loaded') % 207 raise error.Abort(
193 engine.name()) 208 _('compression engine %s could not be loaded') % engine.name()
209 )
194 return engine 210 return engine
195 211
196 def forrevlogheader(self, header): 212 def forrevlogheader(self, header):
197 """Obtain a compression engine registered to a revlog header. 213 """Obtain a compression engine registered to a revlog header.
198 214
199 Will raise KeyError if the revlog header value isn't registered. 215 Will raise KeyError if the revlog header value isn't registered.
200 """ 216 """
201 return self._engines[self._revlogheaders[header]] 217 return self._engines[self._revlogheaders[header]]
202 218
219
203 compengines = compressormanager() 220 compengines = compressormanager()
221
204 222
205 class compressionengine(object): 223 class compressionengine(object):
206 """Base class for compression engines. 224 """Base class for compression engines.
207 225
208 Compression engines must implement the interface defined by this class. 226 Compression engines must implement the interface defined by this class.
209 """ 227 """
228
210 def name(self): 229 def name(self):
211 """Returns the name of the compression engine. 230 """Returns the name of the compression engine.
212 231
213 This is the key the engine is registered under. 232 This is the key the engine is registered under.
214 233
316 data or raise a ``StorageError``. 335 data or raise a ``StorageError``.
317 336
318 The object is reusable but is not thread safe. 337 The object is reusable but is not thread safe.
319 """ 338 """
320 raise NotImplementedError() 339 raise NotImplementedError()
340
321 341
322 class _CompressedStreamReader(object): 342 class _CompressedStreamReader(object):
323 def __init__(self, fh): 343 def __init__(self, fh):
324 if safehasattr(fh, 'unbufferedread'): 344 if safehasattr(fh, 'unbufferedread'):
325 self._reader = fh.unbufferedread 345 self._reader = fh.unbufferedread
336 buf = [] 356 buf = []
337 while True: 357 while True:
338 while self._pending: 358 while self._pending:
339 if len(self._pending[0]) > l + self._pos: 359 if len(self._pending[0]) > l + self._pos:
340 newbuf = self._pending[0] 360 newbuf = self._pending[0]
341 buf.append(newbuf[self._pos:self._pos + l]) 361 buf.append(newbuf[self._pos : self._pos + l])
342 self._pos += l 362 self._pos += l
343 return ''.join(buf) 363 return ''.join(buf)
344 364
345 newbuf = self._pending.pop(0) 365 newbuf = self._pending.pop(0)
346 if self._pos: 366 if self._pos:
347 buf.append(newbuf[self._pos:]) 367 buf.append(newbuf[self._pos :])
348 l -= len(newbuf) - self._pos 368 l -= len(newbuf) - self._pos
349 else: 369 else:
350 buf.append(newbuf) 370 buf.append(newbuf)
351 l -= len(newbuf) 371 l -= len(newbuf)
352 self._pos = 0 372 self._pos = 0
357 self._decompress(chunk) 377 self._decompress(chunk)
358 if not chunk and not self._pending and not self._eof: 378 if not chunk and not self._pending and not self._eof:
359 # No progress and no new data, bail out 379 # No progress and no new data, bail out
360 return ''.join(buf) 380 return ''.join(buf)
361 381
382
362 class _GzipCompressedStreamReader(_CompressedStreamReader): 383 class _GzipCompressedStreamReader(_CompressedStreamReader):
363 def __init__(self, fh): 384 def __init__(self, fh):
364 super(_GzipCompressedStreamReader, self).__init__(fh) 385 super(_GzipCompressedStreamReader, self).__init__(fh)
365 self._decompobj = zlib.decompressobj() 386 self._decompobj = zlib.decompressobj()
387
366 def _decompress(self, chunk): 388 def _decompress(self, chunk):
367 newbuf = self._decompobj.decompress(chunk) 389 newbuf = self._decompobj.decompress(chunk)
368 if newbuf: 390 if newbuf:
369 self._pending.append(newbuf) 391 self._pending.append(newbuf)
370 d = self._decompobj.copy() 392 d = self._decompobj.copy()
374 if d.unused_data == 'x': 396 if d.unused_data == 'x':
375 self._eof = True 397 self._eof = True
376 except zlib.error: 398 except zlib.error:
377 pass 399 pass
378 400
401
379 class _BZ2CompressedStreamReader(_CompressedStreamReader): 402 class _BZ2CompressedStreamReader(_CompressedStreamReader):
380 def __init__(self, fh): 403 def __init__(self, fh):
381 super(_BZ2CompressedStreamReader, self).__init__(fh) 404 super(_BZ2CompressedStreamReader, self).__init__(fh)
382 self._decompobj = bz2.BZ2Decompressor() 405 self._decompobj = bz2.BZ2Decompressor()
406
383 def _decompress(self, chunk): 407 def _decompress(self, chunk):
384 newbuf = self._decompobj.decompress(chunk) 408 newbuf = self._decompobj.decompress(chunk)
385 if newbuf: 409 if newbuf:
386 self._pending.append(newbuf) 410 self._pending.append(newbuf)
387 try: 411 try:
392 else: 416 else:
393 break 417 break
394 except EOFError: 418 except EOFError:
395 self._eof = True 419 self._eof = True
396 420
421
397 class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader): 422 class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader):
398 def __init__(self, fh): 423 def __init__(self, fh):
399 super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh) 424 super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh)
400 newbuf = self._decompobj.decompress('BZ') 425 newbuf = self._decompobj.decompress('BZ')
401 if newbuf: 426 if newbuf:
402 self._pending.append(newbuf) 427 self._pending.append(newbuf)
403 428
429
404 class _ZstdCompressedStreamReader(_CompressedStreamReader): 430 class _ZstdCompressedStreamReader(_CompressedStreamReader):
405 def __init__(self, fh, zstd): 431 def __init__(self, fh, zstd):
406 super(_ZstdCompressedStreamReader, self).__init__(fh) 432 super(_ZstdCompressedStreamReader, self).__init__(fh)
407 self._zstd = zstd 433 self._zstd = zstd
408 self._decompobj = zstd.ZstdDecompressor().decompressobj() 434 self._decompobj = zstd.ZstdDecompressor().decompressobj()
435
409 def _decompress(self, chunk): 436 def _decompress(self, chunk):
410 newbuf = self._decompobj.decompress(chunk) 437 newbuf = self._decompobj.decompress(chunk)
411 if newbuf: 438 if newbuf:
412 self._pending.append(newbuf) 439 self._pending.append(newbuf)
413 try: 440 try:
418 else: 445 else:
419 break 446 break
420 except self._zstd.ZstdError: 447 except self._zstd.ZstdError:
421 self._eof = True 448 self._eof = True
422 449
450
423 class _zlibengine(compressionengine): 451 class _zlibengine(compressionengine):
424 def name(self): 452 def name(self):
425 return 'zlib' 453 return 'zlib'
426 454
427 def bundletype(self): 455 def bundletype(self):
454 482
455 def decompressorreader(self, fh): 483 def decompressorreader(self, fh):
456 return _GzipCompressedStreamReader(fh) 484 return _GzipCompressedStreamReader(fh)
457 485
458 class zlibrevlogcompressor(object): 486 class zlibrevlogcompressor(object):
459
460 def __init__(self, level=None): 487 def __init__(self, level=None):
461 self._level = level 488 self._level = level
462 489
463 def compress(self, data): 490 def compress(self, data):
464 insize = len(data) 491 insize = len(data)
486 else: 513 else:
487 z = zlib.compressobj(level=self._level) 514 z = zlib.compressobj(level=self._level)
488 parts = [] 515 parts = []
489 pos = 0 516 pos = 0
490 while pos < insize: 517 while pos < insize:
491 pos2 = pos + 2**20 518 pos2 = pos + 2 ** 20
492 parts.append(z.compress(data[pos:pos2])) 519 parts.append(z.compress(data[pos:pos2]))
493 pos = pos2 520 pos = pos2
494 parts.append(z.flush()) 521 parts.append(z.flush())
495 522
496 if sum(map(len, parts)) < insize: 523 if sum(map(len, parts)) < insize:
499 526
500 def decompress(self, data): 527 def decompress(self, data):
501 try: 528 try:
502 return zlib.decompress(data) 529 return zlib.decompress(data)
503 except zlib.error as e: 530 except zlib.error as e:
504 raise error.StorageError(_('revlog decompress error: %s') % 531 raise error.StorageError(
505 stringutil.forcebytestr(e)) 532 _('revlog decompress error: %s')
533 % stringutil.forcebytestr(e)
534 )
506 535
507 def revlogcompressor(self, opts=None): 536 def revlogcompressor(self, opts=None):
508 level = None 537 level = None
509 if opts is not None: 538 if opts is not None:
510 level = opts.get('zlib.level') 539 level = opts.get('zlib.level')
511 return self.zlibrevlogcompressor(level) 540 return self.zlibrevlogcompressor(level)
512 541
542
513 compengines.register(_zlibengine()) 543 compengines.register(_zlibengine())
544
514 545
515 class _bz2engine(compressionengine): 546 class _bz2engine(compressionengine):
516 def name(self): 547 def name(self):
517 return 'bz2' 548 return 'bz2'
518 549
546 yield z.flush() 577 yield z.flush()
547 578
548 def decompressorreader(self, fh): 579 def decompressorreader(self, fh):
549 return _BZ2CompressedStreamReader(fh) 580 return _BZ2CompressedStreamReader(fh)
550 581
582
551 compengines.register(_bz2engine()) 583 compengines.register(_bz2engine())
584
552 585
553 class _truncatedbz2engine(compressionengine): 586 class _truncatedbz2engine(compressionengine):
554 def name(self): 587 def name(self):
555 return 'bz2truncated' 588 return 'bz2truncated'
556 589
560 # We don't implement compressstream because it is hackily handled elsewhere. 593 # We don't implement compressstream because it is hackily handled elsewhere.
561 594
562 def decompressorreader(self, fh): 595 def decompressorreader(self, fh):
563 return _TruncatedBZ2CompressedStreamReader(fh) 596 return _TruncatedBZ2CompressedStreamReader(fh)
564 597
598
565 compengines.register(_truncatedbz2engine()) 599 compengines.register(_truncatedbz2engine())
600
566 601
567 class _noopengine(compressionengine): 602 class _noopengine(compressionengine):
568 def name(self): 603 def name(self):
569 return 'none' 604 return 'none'
570 605
595 return None 630 return None
596 631
597 def revlogcompressor(self, opts=None): 632 def revlogcompressor(self, opts=None):
598 return self.nooprevlogcompressor() 633 return self.nooprevlogcompressor()
599 634
635
600 compengines.register(_noopengine()) 636 compengines.register(_noopengine())
637
601 638
602 class _zstdengine(compressionengine): 639 class _zstdengine(compressionengine):
603 def name(self): 640 def name(self):
604 return 'zstd' 641 return 'zstd'
605 642
607 def _module(self): 644 def _module(self):
608 # Not all installs have the zstd module available. So defer importing 645 # Not all installs have the zstd module available. So defer importing
609 # until first access. 646 # until first access.
610 try: 647 try:
611 from .. import zstd 648 from .. import zstd
649
612 # Force delayed import. 650 # Force delayed import.
613 zstd.__version__ 651 zstd.__version__
614 return zstd 652 return zstd
615 except ImportError: 653 except ImportError:
616 return None 654 return None
714 pos = pos2 752 pos = pos2
715 # Frame should be exhausted, so no finish() API. 753 # Frame should be exhausted, so no finish() API.
716 754
717 return ''.join(chunks) 755 return ''.join(chunks)
718 except Exception as e: 756 except Exception as e:
719 raise error.StorageError(_('revlog decompress error: %s') % 757 raise error.StorageError(
720 stringutil.forcebytestr(e)) 758 _('revlog decompress error: %s')
759 % stringutil.forcebytestr(e)
760 )
721 761
722 def revlogcompressor(self, opts=None): 762 def revlogcompressor(self, opts=None):
723 opts = opts or {} 763 opts = opts or {}
724 level = opts.get('zstd.level') 764 level = opts.get('zstd.level')
725 if level is None: 765 if level is None:
726 level = opts.get('level') 766 level = opts.get('level')
727 if level is None: 767 if level is None:
728 level = 3 768 level = 3
729 return self.zstdrevlogcompressor(self._module, level=level) 769 return self.zstdrevlogcompressor(self._module, level=level)
730 770
771
731 compengines.register(_zstdengine()) 772 compengines.register(_zstdengine())
773
732 774
733 def bundlecompressiontopics(): 775 def bundlecompressiontopics():
734 """Obtains a list of available bundle compressions for use in help.""" 776 """Obtains a list of available bundle compressions for use in help."""
735 # help.makeitemsdocs() expects a dict of names to items with a .__doc__. 777 # help.makeitemsdocs() expects a dict of names to items with a .__doc__.
736 items = {} 778 items = {}
759 801
760 items[bt[0]] = value 802 items[bt[0]] = value
761 803
762 return items 804 return items
763 805
806
764 i18nfunctions = bundlecompressiontopics().values() 807 i18nfunctions = bundlecompressiontopics().values()