22 |
22 |
23 _ = i18n._ |
23 _ = i18n._ |
24 |
24 |
25 # compression code |
25 # compression code |
26 |
26 |
27 SERVERROLE = 'server' |
27 SERVERROLE = b'server' |
28 CLIENTROLE = 'client' |
28 CLIENTROLE = b'client' |
29 |
29 |
30 compewireprotosupport = collections.namedtuple( |
30 compewireprotosupport = collections.namedtuple( |
31 r'compenginewireprotosupport', |
31 r'compenginewireprotosupport', |
32 (r'name', r'serverpriority', r'clientpriority'), |
32 (r'name', r'serverpriority', r'clientpriority'), |
33 ) |
33 ) |
83 """Register a compression engine with the manager. |
83 """Register a compression engine with the manager. |
84 |
84 |
85 The argument must be a ``compressionengine`` instance. |
85 The argument must be a ``compressionengine`` instance. |
86 """ |
86 """ |
87 if not isinstance(engine, compressionengine): |
87 if not isinstance(engine, compressionengine): |
88 raise ValueError(_('argument must be a compressionengine')) |
88 raise ValueError(_(b'argument must be a compressionengine')) |
89 |
89 |
90 name = engine.name() |
90 name = engine.name() |
91 |
91 |
92 if name in self._engines: |
92 if name in self._engines: |
93 raise error.Abort( |
93 raise error.Abort( |
94 _('compression engine %s already registered') % name |
94 _(b'compression engine %s already registered') % name |
95 ) |
95 ) |
96 |
96 |
97 bundleinfo = engine.bundletype() |
97 bundleinfo = engine.bundletype() |
98 if bundleinfo: |
98 if bundleinfo: |
99 bundlename, bundletype = bundleinfo |
99 bundlename, bundletype = bundleinfo |
100 |
100 |
101 if bundlename in self._bundlenames: |
101 if bundlename in self._bundlenames: |
102 raise error.Abort( |
102 raise error.Abort( |
103 _('bundle name %s already registered') % bundlename |
103 _(b'bundle name %s already registered') % bundlename |
104 ) |
104 ) |
105 if bundletype in self._bundletypes: |
105 if bundletype in self._bundletypes: |
106 raise error.Abort( |
106 raise error.Abort( |
107 _('bundle type %s already registered by %s') |
107 _(b'bundle type %s already registered by %s') |
108 % (bundletype, self._bundletypes[bundletype]) |
108 % (bundletype, self._bundletypes[bundletype]) |
109 ) |
109 ) |
110 |
110 |
111 # No external facing name declared. |
111 # No external facing name declared. |
112 if bundlename: |
112 if bundlename: |
118 if wiresupport: |
118 if wiresupport: |
119 wiretype = wiresupport.name |
119 wiretype = wiresupport.name |
120 if wiretype in self._wiretypes: |
120 if wiretype in self._wiretypes: |
121 raise error.Abort( |
121 raise error.Abort( |
122 _( |
122 _( |
123 'wire protocol compression %s already ' |
123 b'wire protocol compression %s already ' |
124 'registered by %s' |
124 b'registered by %s' |
125 ) |
125 ) |
126 % (wiretype, self._wiretypes[wiretype]) |
126 % (wiretype, self._wiretypes[wiretype]) |
127 ) |
127 ) |
128 |
128 |
129 self._wiretypes[wiretype] = name |
129 self._wiretypes[wiretype] = name |
130 |
130 |
131 revlogheader = engine.revlogheader() |
131 revlogheader = engine.revlogheader() |
132 if revlogheader and revlogheader in self._revlogheaders: |
132 if revlogheader and revlogheader in self._revlogheaders: |
133 raise error.Abort( |
133 raise error.Abort( |
134 _('revlog header %s already registered by %s') |
134 _(b'revlog header %s already registered by %s') |
135 % (revlogheader, self._revlogheaders[revlogheader]) |
135 % (revlogheader, self._revlogheaders[revlogheader]) |
136 ) |
136 ) |
137 |
137 |
138 if revlogheader: |
138 if revlogheader: |
139 self._revlogheaders[revlogheader] = name |
139 self._revlogheaders[revlogheader] = name |
156 Will abort if the engine is known but not available. |
156 Will abort if the engine is known but not available. |
157 """ |
157 """ |
158 engine = self._engines[self._bundlenames[bundlename]] |
158 engine = self._engines[self._bundlenames[bundlename]] |
159 if not engine.available(): |
159 if not engine.available(): |
160 raise error.Abort( |
160 raise error.Abort( |
161 _('compression engine %s could not be loaded') % engine.name() |
161 _(b'compression engine %s could not be loaded') % engine.name() |
162 ) |
162 ) |
163 return engine |
163 return engine |
164 |
164 |
165 def forbundletype(self, bundletype): |
165 def forbundletype(self, bundletype): |
166 """Obtain a compression engine registered to a bundle type. |
166 """Obtain a compression engine registered to a bundle type. |
170 Will abort if the engine is known but not available. |
170 Will abort if the engine is known but not available. |
171 """ |
171 """ |
172 engine = self._engines[self._bundletypes[bundletype]] |
172 engine = self._engines[self._bundletypes[bundletype]] |
173 if not engine.available(): |
173 if not engine.available(): |
174 raise error.Abort( |
174 raise error.Abort( |
175 _('compression engine %s could not be loaded') % engine.name() |
175 _(b'compression engine %s could not be loaded') % engine.name() |
176 ) |
176 ) |
177 return engine |
177 return engine |
178 |
178 |
179 def supportedwireengines(self, role, onlyavailable=True): |
179 def supportedwireengines(self, role, onlyavailable=True): |
180 """Obtain compression engines that support the wire protocol. |
180 """Obtain compression engines that support the wire protocol. |
184 If ``onlyavailable`` is set, filter out engines that can't be |
184 If ``onlyavailable`` is set, filter out engines that can't be |
185 loaded. |
185 loaded. |
186 """ |
186 """ |
187 assert role in (SERVERROLE, CLIENTROLE) |
187 assert role in (SERVERROLE, CLIENTROLE) |
188 |
188 |
189 attr = 'serverpriority' if role == SERVERROLE else 'clientpriority' |
189 attr = b'serverpriority' if role == SERVERROLE else b'clientpriority' |
190 |
190 |
191 engines = [self._engines[e] for e in self._wiretypes.values()] |
191 engines = [self._engines[e] for e in self._wiretypes.values()] |
192 if onlyavailable: |
192 if onlyavailable: |
193 engines = [e for e in engines if e.available()] |
193 engines = [e for e in engines if e.available()] |
194 |
194 |
203 |
203 |
204 def forwiretype(self, wiretype): |
204 def forwiretype(self, wiretype): |
205 engine = self._engines[self._wiretypes[wiretype]] |
205 engine = self._engines[self._wiretypes[wiretype]] |
206 if not engine.available(): |
206 if not engine.available(): |
207 raise error.Abort( |
207 raise error.Abort( |
208 _('compression engine %s could not be loaded') % engine.name() |
208 _(b'compression engine %s could not be loaded') % engine.name() |
209 ) |
209 ) |
210 return engine |
210 return engine |
211 |
211 |
212 def forrevlogheader(self, header): |
212 def forrevlogheader(self, header): |
213 """Obtain a compression engine registered to a revlog header. |
213 """Obtain a compression engine registered to a revlog header. |
358 while self._pending: |
358 while self._pending: |
359 if len(self._pending[0]) > l + self._pos: |
359 if len(self._pending[0]) > l + self._pos: |
360 newbuf = self._pending[0] |
360 newbuf = self._pending[0] |
361 buf.append(newbuf[self._pos : self._pos + l]) |
361 buf.append(newbuf[self._pos : self._pos + l]) |
362 self._pos += l |
362 self._pos += l |
363 return ''.join(buf) |
363 return b''.join(buf) |
364 |
364 |
365 newbuf = self._pending.pop(0) |
365 newbuf = self._pending.pop(0) |
366 if self._pos: |
366 if self._pos: |
367 buf.append(newbuf[self._pos :]) |
367 buf.append(newbuf[self._pos :]) |
368 l -= len(newbuf) - self._pos |
368 l -= len(newbuf) - self._pos |
370 buf.append(newbuf) |
370 buf.append(newbuf) |
371 l -= len(newbuf) |
371 l -= len(newbuf) |
372 self._pos = 0 |
372 self._pos = 0 |
373 |
373 |
374 if self._eof: |
374 if self._eof: |
375 return ''.join(buf) |
375 return b''.join(buf) |
376 chunk = self._reader(65536) |
376 chunk = self._reader(65536) |
377 self._decompress(chunk) |
377 self._decompress(chunk) |
378 if not chunk and not self._pending and not self._eof: |
378 if not chunk and not self._pending and not self._eof: |
379 # No progress and no new data, bail out |
379 # No progress and no new data, bail out |
380 return ''.join(buf) |
380 return b''.join(buf) |
381 |
381 |
382 |
382 |
383 class _GzipCompressedStreamReader(_CompressedStreamReader): |
383 class _GzipCompressedStreamReader(_CompressedStreamReader): |
384 def __init__(self, fh): |
384 def __init__(self, fh): |
385 super(_GzipCompressedStreamReader, self).__init__(fh) |
385 super(_GzipCompressedStreamReader, self).__init__(fh) |
420 |
420 |
421 |
421 |
422 class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader): |
422 class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader): |
423 def __init__(self, fh): |
423 def __init__(self, fh): |
424 super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh) |
424 super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh) |
425 newbuf = self._decompobj.decompress('BZ') |
425 newbuf = self._decompobj.decompress(b'BZ') |
426 if newbuf: |
426 if newbuf: |
427 self._pending.append(newbuf) |
427 self._pending.append(newbuf) |
428 |
428 |
429 |
429 |
430 class _ZstdCompressedStreamReader(_CompressedStreamReader): |
430 class _ZstdCompressedStreamReader(_CompressedStreamReader): |
437 newbuf = self._decompobj.decompress(chunk) |
437 newbuf = self._decompobj.decompress(chunk) |
438 if newbuf: |
438 if newbuf: |
439 self._pending.append(newbuf) |
439 self._pending.append(newbuf) |
440 try: |
440 try: |
441 while True: |
441 while True: |
442 newbuf = self._decompobj.decompress('') |
442 newbuf = self._decompobj.decompress(b'') |
443 if newbuf: |
443 if newbuf: |
444 self._pending.append(newbuf) |
444 self._pending.append(newbuf) |
445 else: |
445 else: |
446 break |
446 break |
447 except self._zstd.ZstdError: |
447 except self._zstd.ZstdError: |
448 self._eof = True |
448 self._eof = True |
449 |
449 |
450 |
450 |
451 class _zlibengine(compressionengine): |
451 class _zlibengine(compressionengine): |
452 def name(self): |
452 def name(self): |
453 return 'zlib' |
453 return b'zlib' |
454 |
454 |
455 def bundletype(self): |
455 def bundletype(self): |
456 """zlib compression using the DEFLATE algorithm. |
456 """zlib compression using the DEFLATE algorithm. |
457 |
457 |
458 All Mercurial clients should support this format. The compression |
458 All Mercurial clients should support this format. The compression |
459 algorithm strikes a reasonable balance between compression ratio |
459 algorithm strikes a reasonable balance between compression ratio |
460 and size. |
460 and size. |
461 """ |
461 """ |
462 return 'gzip', 'GZ' |
462 return b'gzip', b'GZ' |
463 |
463 |
464 def wireprotosupport(self): |
464 def wireprotosupport(self): |
465 return compewireprotosupport('zlib', 20, 20) |
465 return compewireprotosupport(b'zlib', 20, 20) |
466 |
466 |
467 def revlogheader(self): |
467 def revlogheader(self): |
468 return 'x' |
468 return b'x' |
469 |
469 |
470 def compressstream(self, it, opts=None): |
470 def compressstream(self, it, opts=None): |
471 opts = opts or {} |
471 opts = opts or {} |
472 |
472 |
473 z = zlib.compressobj(opts.get('level', -1)) |
473 z = zlib.compressobj(opts.get(b'level', -1)) |
474 for chunk in it: |
474 for chunk in it: |
475 data = z.compress(chunk) |
475 data = z.compress(chunk) |
476 # Not all calls to compress emit data. It is cheaper to inspect |
476 # Not all calls to compress emit data. It is cheaper to inspect |
477 # here than to feed empty chunks through generator. |
477 # here than to feed empty chunks through generator. |
478 if data: |
478 if data: |
519 parts.append(z.compress(data[pos:pos2])) |
519 parts.append(z.compress(data[pos:pos2])) |
520 pos = pos2 |
520 pos = pos2 |
521 parts.append(z.flush()) |
521 parts.append(z.flush()) |
522 |
522 |
523 if sum(map(len, parts)) < insize: |
523 if sum(map(len, parts)) < insize: |
524 return ''.join(parts) |
524 return b''.join(parts) |
525 return None |
525 return None |
526 |
526 |
527 def decompress(self, data): |
527 def decompress(self, data): |
528 try: |
528 try: |
529 return zlib.decompress(data) |
529 return zlib.decompress(data) |
530 except zlib.error as e: |
530 except zlib.error as e: |
531 raise error.StorageError( |
531 raise error.StorageError( |
532 _('revlog decompress error: %s') |
532 _(b'revlog decompress error: %s') |
533 % stringutil.forcebytestr(e) |
533 % stringutil.forcebytestr(e) |
534 ) |
534 ) |
535 |
535 |
536 def revlogcompressor(self, opts=None): |
536 def revlogcompressor(self, opts=None): |
537 level = None |
537 level = None |
538 if opts is not None: |
538 if opts is not None: |
539 level = opts.get('zlib.level') |
539 level = opts.get(b'zlib.level') |
540 return self.zlibrevlogcompressor(level) |
540 return self.zlibrevlogcompressor(level) |
541 |
541 |
542 |
542 |
543 compengines.register(_zlibengine()) |
543 compengines.register(_zlibengine()) |
544 |
544 |
545 |
545 |
546 class _bz2engine(compressionengine): |
546 class _bz2engine(compressionengine): |
547 def name(self): |
547 def name(self): |
548 return 'bz2' |
548 return b'bz2' |
549 |
549 |
550 def bundletype(self): |
550 def bundletype(self): |
551 """An algorithm that produces smaller bundles than ``gzip``. |
551 """An algorithm that produces smaller bundles than ``gzip``. |
552 |
552 |
553 All Mercurial clients should support this format. |
553 All Mercurial clients should support this format. |
557 decompression. |
557 decompression. |
558 |
558 |
559 If available, the ``zstd`` engine can yield similar or better |
559 If available, the ``zstd`` engine can yield similar or better |
560 compression at much higher speeds. |
560 compression at much higher speeds. |
561 """ |
561 """ |
562 return 'bzip2', 'BZ' |
562 return b'bzip2', b'BZ' |
563 |
563 |
564 # We declare a protocol name but don't advertise by default because |
564 # We declare a protocol name but don't advertise by default because |
565 # it is slow. |
565 # it is slow. |
566 def wireprotosupport(self): |
566 def wireprotosupport(self): |
567 return compewireprotosupport('bzip2', 0, 0) |
567 return compewireprotosupport(b'bzip2', 0, 0) |
568 |
568 |
569 def compressstream(self, it, opts=None): |
569 def compressstream(self, it, opts=None): |
570 opts = opts or {} |
570 opts = opts or {} |
571 z = bz2.BZ2Compressor(opts.get('level', 9)) |
571 z = bz2.BZ2Compressor(opts.get(b'level', 9)) |
572 for chunk in it: |
572 for chunk in it: |
573 data = z.compress(chunk) |
573 data = z.compress(chunk) |
574 if data: |
574 if data: |
575 yield data |
575 yield data |
576 |
576 |
583 compengines.register(_bz2engine()) |
583 compengines.register(_bz2engine()) |
584 |
584 |
585 |
585 |
586 class _truncatedbz2engine(compressionengine): |
586 class _truncatedbz2engine(compressionengine): |
587 def name(self): |
587 def name(self): |
588 return 'bz2truncated' |
588 return b'bz2truncated' |
589 |
589 |
590 def bundletype(self): |
590 def bundletype(self): |
591 return None, '_truncatedBZ' |
591 return None, b'_truncatedBZ' |
592 |
592 |
593 # We don't implement compressstream because it is hackily handled elsewhere. |
593 # We don't implement compressstream because it is hackily handled elsewhere. |
594 |
594 |
595 def decompressorreader(self, fh): |
595 def decompressorreader(self, fh): |
596 return _TruncatedBZ2CompressedStreamReader(fh) |
596 return _TruncatedBZ2CompressedStreamReader(fh) |
599 compengines.register(_truncatedbz2engine()) |
599 compengines.register(_truncatedbz2engine()) |
600 |
600 |
601 |
601 |
602 class _noopengine(compressionengine): |
602 class _noopengine(compressionengine): |
603 def name(self): |
603 def name(self): |
604 return 'none' |
604 return b'none' |
605 |
605 |
606 def bundletype(self): |
606 def bundletype(self): |
607 """No compression is performed. |
607 """No compression is performed. |
608 |
608 |
609 Use this compression engine to explicitly disable compression. |
609 Use this compression engine to explicitly disable compression. |
610 """ |
610 """ |
611 return 'none', 'UN' |
611 return b'none', b'UN' |
612 |
612 |
613 # Clients always support uncompressed payloads. Servers don't because |
613 # Clients always support uncompressed payloads. Servers don't because |
614 # unless you are on a fast network, uncompressed payloads can easily |
614 # unless you are on a fast network, uncompressed payloads can easily |
615 # saturate your network pipe. |
615 # saturate your network pipe. |
616 def wireprotosupport(self): |
616 def wireprotosupport(self): |
617 return compewireprotosupport('none', 0, 10) |
617 return compewireprotosupport(b'none', 0, 10) |
618 |
618 |
619 # We don't implement revlogheader because it is handled specially |
619 # We don't implement revlogheader because it is handled specially |
620 # in the revlog class. |
620 # in the revlog class. |
621 |
621 |
622 def compressstream(self, it, opts=None): |
622 def compressstream(self, it, opts=None): |
666 compression than ``bzip2`` while operating at much higher speeds. |
666 compression than ``bzip2`` while operating at much higher speeds. |
667 |
667 |
668 If this engine is available and backwards compatibility is not a |
668 If this engine is available and backwards compatibility is not a |
669 concern, it is likely the best available engine. |
669 concern, it is likely the best available engine. |
670 """ |
670 """ |
671 return 'zstd', 'ZS' |
671 return b'zstd', b'ZS' |
672 |
672 |
673 def wireprotosupport(self): |
673 def wireprotosupport(self): |
674 return compewireprotosupport('zstd', 50, 50) |
674 return compewireprotosupport(b'zstd', 50, 50) |
675 |
675 |
676 def revlogheader(self): |
676 def revlogheader(self): |
677 return '\x28' |
677 return b'\x28' |
678 |
678 |
679 def compressstream(self, it, opts=None): |
679 def compressstream(self, it, opts=None): |
680 opts = opts or {} |
680 opts = opts or {} |
681 # zstd level 3 is almost always significantly faster than zlib |
681 # zstd level 3 is almost always significantly faster than zlib |
682 # while providing no worse compression. It strikes a good balance |
682 # while providing no worse compression. It strikes a good balance |
683 # between speed and compression. |
683 # between speed and compression. |
684 level = opts.get('level', 3) |
684 level = opts.get(b'level', 3) |
685 |
685 |
686 zstd = self._module |
686 zstd = self._module |
687 z = zstd.ZstdCompressor(level=level).compressobj() |
687 z = zstd.ZstdCompressor(level=level).compressobj() |
688 for chunk in it: |
688 for chunk in it: |
689 data = z.compress(chunk) |
689 data = z.compress(chunk) |
750 if chunk: |
750 if chunk: |
751 chunks.append(chunk) |
751 chunks.append(chunk) |
752 pos = pos2 |
752 pos = pos2 |
753 # Frame should be exhausted, so no finish() API. |
753 # Frame should be exhausted, so no finish() API. |
754 |
754 |
755 return ''.join(chunks) |
755 return b''.join(chunks) |
756 except Exception as e: |
756 except Exception as e: |
757 raise error.StorageError( |
757 raise error.StorageError( |
758 _('revlog decompress error: %s') |
758 _(b'revlog decompress error: %s') |
759 % stringutil.forcebytestr(e) |
759 % stringutil.forcebytestr(e) |
760 ) |
760 ) |
761 |
761 |
762 def revlogcompressor(self, opts=None): |
762 def revlogcompressor(self, opts=None): |
763 opts = opts or {} |
763 opts = opts or {} |
764 level = opts.get('zstd.level') |
764 level = opts.get(b'zstd.level') |
765 if level is None: |
765 if level is None: |
766 level = opts.get('level') |
766 level = opts.get(b'level') |
767 if level is None: |
767 if level is None: |
768 level = 3 |
768 level = 3 |
769 return self.zstdrevlogcompressor(self._module, level=level) |
769 return self.zstdrevlogcompressor(self._module, level=level) |
770 |
770 |
771 |
771 |