mercurial/util.py
changeset 30798 f50c0db50025
parent 30794 31e1f0d4ab44
child 30821 7005c03f7387
--- a/mercurial/util.py	Sun Jan 08 10:08:29 2017 +0800
+++ b/mercurial/util.py	Mon Jan 02 13:27:20 2017 -0800
@@ -3000,6 +3000,8 @@
         self._bundlenames = {}
         # Internal bundle identifier to engine name.
         self._bundletypes = {}
+        # Revlog header to engine name.
+        self._revlogheaders = {}
         # Wire proto identifier to engine name.
         self._wiretypes = {}
 
@@ -3053,6 +3055,14 @@
 
             self._wiretypes[wiretype] = name
 
+        revlogheader = engine.revlogheader()
+        if revlogheader and revlogheader in self._revlogheaders:
+            raise error.Abort(_('revlog header %s already registered by %s') %
+                              (revlogheader, self._revlogheaders[revlogheader]))
+
+        if revlogheader:
+            self._revlogheaders[revlogheader] = name
+
         self._engines[name] = engine
 
     @property
@@ -3121,6 +3131,13 @@
                               engine.name())
         return engine
 
+    def forrevlogheader(self, header):
+        """Obtain a compression engine registered to a revlog header.
+
+        Will raise KeyError if the revlog header value isn't registered.
+        """
+        return self._engines[self._revlogheaders[header]]
+
 compengines = compressormanager()
 
 class compressionengine(object):
@@ -3186,6 +3203,16 @@
         """
         return None
 
+    def revlogheader(self):
+        """Header added to revlog chunks that identifies this engine.
+
+        If this engine can be used to compress revlogs, this method should
+        return the bytes used to identify chunks compressed with this engine.
+        Else, the method should return ``None`` to indicate it does not
+        participate in revlog compression.
+        """
+        return None
+
     def compressstream(self, it, opts=None):
         """Compress an iterator of chunks.
 
@@ -3215,6 +3242,13 @@
         the data could not be compressed (too small, not compressible, etc).
         The returned data should have a header uniquely identifying this
         compression format so decompression can be routed to this engine.
+        This header should be identified by the ``revlogheader()`` return
+        value.
+
+        The object has a ``decompress(data)`` method that decompresses
+        data. The method will only be called if ``data`` begins with
+        ``revlogheader()``. The method should return the raw, uncompressed
+        data or raise a ``RevlogError``.
 
         The object is reusable but is not thread safe.
         """
@@ -3230,6 +3264,9 @@
     def wireprotosupport(self):
         return compewireprotosupport('zlib', 20, 20)
 
+    def revlogheader(self):
+        return 'x'
+
     def compressstream(self, it, opts=None):
         opts = opts or {}
 
@@ -3286,6 +3323,13 @@
                     return ''.join(parts)
                 return None
 
+        def decompress(self, data):
+            try:
+                return zlib.decompress(data)
+            except zlib.error as e:
+                raise error.RevlogError(_('revlog decompress error: %s') %
+                                        str(e))
+
     def revlogcompressor(self, opts=None):
         return self.zlibrevlogcompressor()
 
@@ -3357,6 +3401,9 @@
     def wireprotosupport(self):
         return compewireprotosupport('none', 0, 10)
 
+    # We don't implement revlogheader because it is handled specially
+    # in the revlog class.
+
     def compressstream(self, it, opts=None):
         return it
 
@@ -3397,6 +3444,9 @@
     def wireprotosupport(self):
         return compewireprotosupport('zstd', 50, 50)
 
+    def revlogheader(self):
+        return '\x28'
+
     def compressstream(self, it, opts=None):
         opts = opts or {}
         # zstd level 3 is almost always significantly faster than zlib
@@ -3425,7 +3475,9 @@
             # pre-allocate a buffer to hold the result.
             self._cctx = zstd.ZstdCompressor(level=level,
                                              write_content_size=True)
+            self._dctx = zstd.ZstdDecompressor()
             self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE
+            self._decompinsize = zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
 
         def compress(self, data):
             insize = len(data)
@@ -3456,6 +3508,28 @@
                     return ''.join(chunks)
                 return None
 
+        def decompress(self, data):
+            insize = len(data)
+
+            try:
+                # This was measured to be faster than other streaming
+                # decompressors.
+                dobj = self._dctx.decompressobj()
+                chunks = []
+                pos = 0
+                while pos < insize:
+                    pos2 = pos + self._decompinsize
+                    chunk = dobj.decompress(data[pos:pos2])
+                    if chunk:
+                        chunks.append(chunk)
+                    pos = pos2
+                # Frame should be exhausted, so no finish() API.
+
+                return ''.join(chunks)
+            except Exception as e:
+                raise error.RevlogError(_('revlog decompress error: %s') %
+                                        str(e))
+
     def revlogcompressor(self, opts=None):
         opts = opts or {}
         return self.zstdrevlogcompressor(self._module,