comparison mercurial/utils/compression.py @ 43077:687b865b95ad

formatting: byteify all mercurial/ and hgext/ string literals Done with python3.7 contrib/byteify-strings.py -i $(hg files 'set:mercurial/**.py - mercurial/thirdparty/** + hgext/**.py - hgext/fsmonitor/pywatchman/** - mercurial/__init__.py') black -l 80 -t py33 -S $(hg files 'set:**.py - mercurial/thirdparty/** - "contrib/python-zstandard/**" - hgext/fsmonitor/pywatchman/**') # skip-blame mass-reformatting only Differential Revision: https://phab.mercurial-scm.org/D6972
author Augie Fackler <augie@google.com>
date Sun, 06 Oct 2019 09:48:39 -0400
parents 2372284d9457
children c59eb1560c44
comparison
equal deleted inserted replaced
43076:2372284d9457 43077:687b865b95ad
22 22
23 _ = i18n._ 23 _ = i18n._
24 24
25 # compression code 25 # compression code
26 26
27 SERVERROLE = 'server' 27 SERVERROLE = b'server'
28 CLIENTROLE = 'client' 28 CLIENTROLE = b'client'
29 29
30 compewireprotosupport = collections.namedtuple( 30 compewireprotosupport = collections.namedtuple(
31 r'compenginewireprotosupport', 31 r'compenginewireprotosupport',
32 (r'name', r'serverpriority', r'clientpriority'), 32 (r'name', r'serverpriority', r'clientpriority'),
33 ) 33 )
83 """Register a compression engine with the manager. 83 """Register a compression engine with the manager.
84 84
85 The argument must be a ``compressionengine`` instance. 85 The argument must be a ``compressionengine`` instance.
86 """ 86 """
87 if not isinstance(engine, compressionengine): 87 if not isinstance(engine, compressionengine):
88 raise ValueError(_('argument must be a compressionengine')) 88 raise ValueError(_(b'argument must be a compressionengine'))
89 89
90 name = engine.name() 90 name = engine.name()
91 91
92 if name in self._engines: 92 if name in self._engines:
93 raise error.Abort( 93 raise error.Abort(
94 _('compression engine %s already registered') % name 94 _(b'compression engine %s already registered') % name
95 ) 95 )
96 96
97 bundleinfo = engine.bundletype() 97 bundleinfo = engine.bundletype()
98 if bundleinfo: 98 if bundleinfo:
99 bundlename, bundletype = bundleinfo 99 bundlename, bundletype = bundleinfo
100 100
101 if bundlename in self._bundlenames: 101 if bundlename in self._bundlenames:
102 raise error.Abort( 102 raise error.Abort(
103 _('bundle name %s already registered') % bundlename 103 _(b'bundle name %s already registered') % bundlename
104 ) 104 )
105 if bundletype in self._bundletypes: 105 if bundletype in self._bundletypes:
106 raise error.Abort( 106 raise error.Abort(
107 _('bundle type %s already registered by %s') 107 _(b'bundle type %s already registered by %s')
108 % (bundletype, self._bundletypes[bundletype]) 108 % (bundletype, self._bundletypes[bundletype])
109 ) 109 )
110 110
111 # No external facing name declared. 111 # No external facing name declared.
112 if bundlename: 112 if bundlename:
118 if wiresupport: 118 if wiresupport:
119 wiretype = wiresupport.name 119 wiretype = wiresupport.name
120 if wiretype in self._wiretypes: 120 if wiretype in self._wiretypes:
121 raise error.Abort( 121 raise error.Abort(
122 _( 122 _(
123 'wire protocol compression %s already ' 123 b'wire protocol compression %s already '
124 'registered by %s' 124 b'registered by %s'
125 ) 125 )
126 % (wiretype, self._wiretypes[wiretype]) 126 % (wiretype, self._wiretypes[wiretype])
127 ) 127 )
128 128
129 self._wiretypes[wiretype] = name 129 self._wiretypes[wiretype] = name
130 130
131 revlogheader = engine.revlogheader() 131 revlogheader = engine.revlogheader()
132 if revlogheader and revlogheader in self._revlogheaders: 132 if revlogheader and revlogheader in self._revlogheaders:
133 raise error.Abort( 133 raise error.Abort(
134 _('revlog header %s already registered by %s') 134 _(b'revlog header %s already registered by %s')
135 % (revlogheader, self._revlogheaders[revlogheader]) 135 % (revlogheader, self._revlogheaders[revlogheader])
136 ) 136 )
137 137
138 if revlogheader: 138 if revlogheader:
139 self._revlogheaders[revlogheader] = name 139 self._revlogheaders[revlogheader] = name
156 Will abort if the engine is known but not available. 156 Will abort if the engine is known but not available.
157 """ 157 """
158 engine = self._engines[self._bundlenames[bundlename]] 158 engine = self._engines[self._bundlenames[bundlename]]
159 if not engine.available(): 159 if not engine.available():
160 raise error.Abort( 160 raise error.Abort(
161 _('compression engine %s could not be loaded') % engine.name() 161 _(b'compression engine %s could not be loaded') % engine.name()
162 ) 162 )
163 return engine 163 return engine
164 164
165 def forbundletype(self, bundletype): 165 def forbundletype(self, bundletype):
166 """Obtain a compression engine registered to a bundle type. 166 """Obtain a compression engine registered to a bundle type.
170 Will abort if the engine is known but not available. 170 Will abort if the engine is known but not available.
171 """ 171 """
172 engine = self._engines[self._bundletypes[bundletype]] 172 engine = self._engines[self._bundletypes[bundletype]]
173 if not engine.available(): 173 if not engine.available():
174 raise error.Abort( 174 raise error.Abort(
175 _('compression engine %s could not be loaded') % engine.name() 175 _(b'compression engine %s could not be loaded') % engine.name()
176 ) 176 )
177 return engine 177 return engine
178 178
179 def supportedwireengines(self, role, onlyavailable=True): 179 def supportedwireengines(self, role, onlyavailable=True):
180 """Obtain compression engines that support the wire protocol. 180 """Obtain compression engines that support the wire protocol.
184 If ``onlyavailable`` is set, filter out engines that can't be 184 If ``onlyavailable`` is set, filter out engines that can't be
185 loaded. 185 loaded.
186 """ 186 """
187 assert role in (SERVERROLE, CLIENTROLE) 187 assert role in (SERVERROLE, CLIENTROLE)
188 188
189 attr = 'serverpriority' if role == SERVERROLE else 'clientpriority' 189 attr = b'serverpriority' if role == SERVERROLE else b'clientpriority'
190 190
191 engines = [self._engines[e] for e in self._wiretypes.values()] 191 engines = [self._engines[e] for e in self._wiretypes.values()]
192 if onlyavailable: 192 if onlyavailable:
193 engines = [e for e in engines if e.available()] 193 engines = [e for e in engines if e.available()]
194 194
203 203
204 def forwiretype(self, wiretype): 204 def forwiretype(self, wiretype):
205 engine = self._engines[self._wiretypes[wiretype]] 205 engine = self._engines[self._wiretypes[wiretype]]
206 if not engine.available(): 206 if not engine.available():
207 raise error.Abort( 207 raise error.Abort(
208 _('compression engine %s could not be loaded') % engine.name() 208 _(b'compression engine %s could not be loaded') % engine.name()
209 ) 209 )
210 return engine 210 return engine
211 211
212 def forrevlogheader(self, header): 212 def forrevlogheader(self, header):
213 """Obtain a compression engine registered to a revlog header. 213 """Obtain a compression engine registered to a revlog header.
358 while self._pending: 358 while self._pending:
359 if len(self._pending[0]) > l + self._pos: 359 if len(self._pending[0]) > l + self._pos:
360 newbuf = self._pending[0] 360 newbuf = self._pending[0]
361 buf.append(newbuf[self._pos : self._pos + l]) 361 buf.append(newbuf[self._pos : self._pos + l])
362 self._pos += l 362 self._pos += l
363 return ''.join(buf) 363 return b''.join(buf)
364 364
365 newbuf = self._pending.pop(0) 365 newbuf = self._pending.pop(0)
366 if self._pos: 366 if self._pos:
367 buf.append(newbuf[self._pos :]) 367 buf.append(newbuf[self._pos :])
368 l -= len(newbuf) - self._pos 368 l -= len(newbuf) - self._pos
370 buf.append(newbuf) 370 buf.append(newbuf)
371 l -= len(newbuf) 371 l -= len(newbuf)
372 self._pos = 0 372 self._pos = 0
373 373
374 if self._eof: 374 if self._eof:
375 return ''.join(buf) 375 return b''.join(buf)
376 chunk = self._reader(65536) 376 chunk = self._reader(65536)
377 self._decompress(chunk) 377 self._decompress(chunk)
378 if not chunk and not self._pending and not self._eof: 378 if not chunk and not self._pending and not self._eof:
379 # No progress and no new data, bail out 379 # No progress and no new data, bail out
380 return ''.join(buf) 380 return b''.join(buf)
381 381
382 382
383 class _GzipCompressedStreamReader(_CompressedStreamReader): 383 class _GzipCompressedStreamReader(_CompressedStreamReader):
384 def __init__(self, fh): 384 def __init__(self, fh):
385 super(_GzipCompressedStreamReader, self).__init__(fh) 385 super(_GzipCompressedStreamReader, self).__init__(fh)
389 newbuf = self._decompobj.decompress(chunk) 389 newbuf = self._decompobj.decompress(chunk)
390 if newbuf: 390 if newbuf:
391 self._pending.append(newbuf) 391 self._pending.append(newbuf)
392 d = self._decompobj.copy() 392 d = self._decompobj.copy()
393 try: 393 try:
394 d.decompress('x') 394 d.decompress(b'x')
395 d.flush() 395 d.flush()
396 if d.unused_data == 'x': 396 if d.unused_data == b'x':
397 self._eof = True 397 self._eof = True
398 except zlib.error: 398 except zlib.error:
399 pass 399 pass
400 400
401 401
408 newbuf = self._decompobj.decompress(chunk) 408 newbuf = self._decompobj.decompress(chunk)
409 if newbuf: 409 if newbuf:
410 self._pending.append(newbuf) 410 self._pending.append(newbuf)
411 try: 411 try:
412 while True: 412 while True:
413 newbuf = self._decompobj.decompress('') 413 newbuf = self._decompobj.decompress(b'')
414 if newbuf: 414 if newbuf:
415 self._pending.append(newbuf) 415 self._pending.append(newbuf)
416 else: 416 else:
417 break 417 break
418 except EOFError: 418 except EOFError:
420 420
421 421
422 class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader): 422 class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader):
423 def __init__(self, fh): 423 def __init__(self, fh):
424 super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh) 424 super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh)
425 newbuf = self._decompobj.decompress('BZ') 425 newbuf = self._decompobj.decompress(b'BZ')
426 if newbuf: 426 if newbuf:
427 self._pending.append(newbuf) 427 self._pending.append(newbuf)
428 428
429 429
430 class _ZstdCompressedStreamReader(_CompressedStreamReader): 430 class _ZstdCompressedStreamReader(_CompressedStreamReader):
437 newbuf = self._decompobj.decompress(chunk) 437 newbuf = self._decompobj.decompress(chunk)
438 if newbuf: 438 if newbuf:
439 self._pending.append(newbuf) 439 self._pending.append(newbuf)
440 try: 440 try:
441 while True: 441 while True:
442 newbuf = self._decompobj.decompress('') 442 newbuf = self._decompobj.decompress(b'')
443 if newbuf: 443 if newbuf:
444 self._pending.append(newbuf) 444 self._pending.append(newbuf)
445 else: 445 else:
446 break 446 break
447 except self._zstd.ZstdError: 447 except self._zstd.ZstdError:
448 self._eof = True 448 self._eof = True
449 449
450 450
451 class _zlibengine(compressionengine): 451 class _zlibengine(compressionengine):
452 def name(self): 452 def name(self):
453 return 'zlib' 453 return b'zlib'
454 454
455 def bundletype(self): 455 def bundletype(self):
456 """zlib compression using the DEFLATE algorithm. 456 """zlib compression using the DEFLATE algorithm.
457 457
458 All Mercurial clients should support this format. The compression 458 All Mercurial clients should support this format. The compression
459 algorithm strikes a reasonable balance between compression ratio 459 algorithm strikes a reasonable balance between compression ratio
460 and size. 460 and size.
461 """ 461 """
462 return 'gzip', 'GZ' 462 return b'gzip', b'GZ'
463 463
464 def wireprotosupport(self): 464 def wireprotosupport(self):
465 return compewireprotosupport('zlib', 20, 20) 465 return compewireprotosupport(b'zlib', 20, 20)
466 466
467 def revlogheader(self): 467 def revlogheader(self):
468 return 'x' 468 return b'x'
469 469
470 def compressstream(self, it, opts=None): 470 def compressstream(self, it, opts=None):
471 opts = opts or {} 471 opts = opts or {}
472 472
473 z = zlib.compressobj(opts.get('level', -1)) 473 z = zlib.compressobj(opts.get(b'level', -1))
474 for chunk in it: 474 for chunk in it:
475 data = z.compress(chunk) 475 data = z.compress(chunk)
476 # Not all calls to compress emit data. It is cheaper to inspect 476 # Not all calls to compress emit data. It is cheaper to inspect
477 # here than to feed empty chunks through generator. 477 # here than to feed empty chunks through generator.
478 if data: 478 if data:
519 parts.append(z.compress(data[pos:pos2])) 519 parts.append(z.compress(data[pos:pos2]))
520 pos = pos2 520 pos = pos2
521 parts.append(z.flush()) 521 parts.append(z.flush())
522 522
523 if sum(map(len, parts)) < insize: 523 if sum(map(len, parts)) < insize:
524 return ''.join(parts) 524 return b''.join(parts)
525 return None 525 return None
526 526
527 def decompress(self, data): 527 def decompress(self, data):
528 try: 528 try:
529 return zlib.decompress(data) 529 return zlib.decompress(data)
530 except zlib.error as e: 530 except zlib.error as e:
531 raise error.StorageError( 531 raise error.StorageError(
532 _('revlog decompress error: %s') 532 _(b'revlog decompress error: %s')
533 % stringutil.forcebytestr(e) 533 % stringutil.forcebytestr(e)
534 ) 534 )
535 535
536 def revlogcompressor(self, opts=None): 536 def revlogcompressor(self, opts=None):
537 level = None 537 level = None
538 if opts is not None: 538 if opts is not None:
539 level = opts.get('zlib.level') 539 level = opts.get(b'zlib.level')
540 return self.zlibrevlogcompressor(level) 540 return self.zlibrevlogcompressor(level)
541 541
542 542
543 compengines.register(_zlibengine()) 543 compengines.register(_zlibengine())
544 544
545 545
546 class _bz2engine(compressionengine): 546 class _bz2engine(compressionengine):
547 def name(self): 547 def name(self):
548 return 'bz2' 548 return b'bz2'
549 549
550 def bundletype(self): 550 def bundletype(self):
551 """An algorithm that produces smaller bundles than ``gzip``. 551 """An algorithm that produces smaller bundles than ``gzip``.
552 552
553 All Mercurial clients should support this format. 553 All Mercurial clients should support this format.
557 decompression. 557 decompression.
558 558
559 If available, the ``zstd`` engine can yield similar or better 559 If available, the ``zstd`` engine can yield similar or better
560 compression at much higher speeds. 560 compression at much higher speeds.
561 """ 561 """
562 return 'bzip2', 'BZ' 562 return b'bzip2', b'BZ'
563 563
564 # We declare a protocol name but don't advertise by default because 564 # We declare a protocol name but don't advertise by default because
565 # it is slow. 565 # it is slow.
566 def wireprotosupport(self): 566 def wireprotosupport(self):
567 return compewireprotosupport('bzip2', 0, 0) 567 return compewireprotosupport(b'bzip2', 0, 0)
568 568
569 def compressstream(self, it, opts=None): 569 def compressstream(self, it, opts=None):
570 opts = opts or {} 570 opts = opts or {}
571 z = bz2.BZ2Compressor(opts.get('level', 9)) 571 z = bz2.BZ2Compressor(opts.get(b'level', 9))
572 for chunk in it: 572 for chunk in it:
573 data = z.compress(chunk) 573 data = z.compress(chunk)
574 if data: 574 if data:
575 yield data 575 yield data
576 576
583 compengines.register(_bz2engine()) 583 compengines.register(_bz2engine())
584 584
585 585
586 class _truncatedbz2engine(compressionengine): 586 class _truncatedbz2engine(compressionengine):
587 def name(self): 587 def name(self):
588 return 'bz2truncated' 588 return b'bz2truncated'
589 589
590 def bundletype(self): 590 def bundletype(self):
591 return None, '_truncatedBZ' 591 return None, b'_truncatedBZ'
592 592
593 # We don't implement compressstream because it is hackily handled elsewhere. 593 # We don't implement compressstream because it is hackily handled elsewhere.
594 594
595 def decompressorreader(self, fh): 595 def decompressorreader(self, fh):
596 return _TruncatedBZ2CompressedStreamReader(fh) 596 return _TruncatedBZ2CompressedStreamReader(fh)
599 compengines.register(_truncatedbz2engine()) 599 compengines.register(_truncatedbz2engine())
600 600
601 601
602 class _noopengine(compressionengine): 602 class _noopengine(compressionengine):
603 def name(self): 603 def name(self):
604 return 'none' 604 return b'none'
605 605
606 def bundletype(self): 606 def bundletype(self):
607 """No compression is performed. 607 """No compression is performed.
608 608
609 Use this compression engine to explicitly disable compression. 609 Use this compression engine to explicitly disable compression.
610 """ 610 """
611 return 'none', 'UN' 611 return b'none', b'UN'
612 612
613 # Clients always support uncompressed payloads. Servers don't because 613 # Clients always support uncompressed payloads. Servers don't because
614 # unless you are on a fast network, uncompressed payloads can easily 614 # unless you are on a fast network, uncompressed payloads can easily
615 # saturate your network pipe. 615 # saturate your network pipe.
616 def wireprotosupport(self): 616 def wireprotosupport(self):
617 return compewireprotosupport('none', 0, 10) 617 return compewireprotosupport(b'none', 0, 10)
618 618
619 # We don't implement revlogheader because it is handled specially 619 # We don't implement revlogheader because it is handled specially
620 # in the revlog class. 620 # in the revlog class.
621 621
622 def compressstream(self, it, opts=None): 622 def compressstream(self, it, opts=None):
636 compengines.register(_noopengine()) 636 compengines.register(_noopengine())
637 637
638 638
639 class _zstdengine(compressionengine): 639 class _zstdengine(compressionengine):
640 def name(self): 640 def name(self):
641 return 'zstd' 641 return b'zstd'
642 642
643 @propertycache 643 @propertycache
644 def _module(self): 644 def _module(self):
645 # Not all installs have the zstd module available. So defer importing 645 # Not all installs have the zstd module available. So defer importing
646 # until first access. 646 # until first access.
666 compression than ``bzip2`` while operating at much higher speeds. 666 compression than ``bzip2`` while operating at much higher speeds.
667 667
668 If this engine is available and backwards compatibility is not a 668 If this engine is available and backwards compatibility is not a
669 concern, it is likely the best available engine. 669 concern, it is likely the best available engine.
670 """ 670 """
671 return 'zstd', 'ZS' 671 return b'zstd', b'ZS'
672 672
673 def wireprotosupport(self): 673 def wireprotosupport(self):
674 return compewireprotosupport('zstd', 50, 50) 674 return compewireprotosupport(b'zstd', 50, 50)
675 675
676 def revlogheader(self): 676 def revlogheader(self):
677 return '\x28' 677 return b'\x28'
678 678
679 def compressstream(self, it, opts=None): 679 def compressstream(self, it, opts=None):
680 opts = opts or {} 680 opts = opts or {}
681 # zstd level 3 is almost always significantly faster than zlib 681 # zstd level 3 is almost always significantly faster than zlib
682 # while providing no worse compression. It strikes a good balance 682 # while providing no worse compression. It strikes a good balance
683 # between speed and compression. 683 # between speed and compression.
684 level = opts.get('level', 3) 684 level = opts.get(b'level', 3)
685 685
686 zstd = self._module 686 zstd = self._module
687 z = zstd.ZstdCompressor(level=level).compressobj() 687 z = zstd.ZstdCompressor(level=level).compressobj()
688 for chunk in it: 688 for chunk in it:
689 data = z.compress(chunk) 689 data = z.compress(chunk)
730 chunks.append(chunk) 730 chunks.append(chunk)
731 pos = pos2 731 pos = pos2
732 chunks.append(z.flush()) 732 chunks.append(z.flush())
733 733
734 if sum(map(len, chunks)) < insize: 734 if sum(map(len, chunks)) < insize:
735 return ''.join(chunks) 735 return b''.join(chunks)
736 return None 736 return None
737 737
738 def decompress(self, data): 738 def decompress(self, data):
739 insize = len(data) 739 insize = len(data)
740 740
750 if chunk: 750 if chunk:
751 chunks.append(chunk) 751 chunks.append(chunk)
752 pos = pos2 752 pos = pos2
753 # Frame should be exhausted, so no finish() API. 753 # Frame should be exhausted, so no finish() API.
754 754
755 return ''.join(chunks) 755 return b''.join(chunks)
756 except Exception as e: 756 except Exception as e:
757 raise error.StorageError( 757 raise error.StorageError(
758 _('revlog decompress error: %s') 758 _(b'revlog decompress error: %s')
759 % stringutil.forcebytestr(e) 759 % stringutil.forcebytestr(e)
760 ) 760 )
761 761
762 def revlogcompressor(self, opts=None): 762 def revlogcompressor(self, opts=None):
763 opts = opts or {} 763 opts = opts or {}
764 level = opts.get('zstd.level') 764 level = opts.get(b'zstd.level')
765 if level is None: 765 if level is None:
766 level = opts.get('level') 766 level = opts.get(b'level')
767 if level is None: 767 if level is None:
768 level = 3 768 level = 3
769 return self.zstdrevlogcompressor(self._module, level=level) 769 return self.zstdrevlogcompressor(self._module, level=level)
770 770
771 771