Mercurial > public > mercurial-scm > hg
comparison mercurial/util.py @ 30798:f50c0db50025
util: compression APIs to support revlog decompression
Previously, compression engines had APIs for performing revlog
compression but no mechanism to perform revlog decompression. This
patch changes that.
Revlog decompression is slightly more complicated than compression
because in the compression case there is (currently) only a single
engine that can be used at a time. However for decompression, a
revlog could contain chunks from multiple compression engines. This
means decompression needs to map to multiple engines and
decompressors. This functionality is outside the scope of this patch.
But it drives the decision for engines to declare a byte header
sequence that identifies revlog data as belonging to an engine and
an API for obtaining an engine from a revlog header.
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Mon, 02 Jan 2017 13:27:20 -0800 |
parents | 31e1f0d4ab44 |
children | 7005c03f7387 |
comparison
equal
deleted
inserted
replaced
30797:0bde7372e4c0 | 30798:f50c0db50025 |
---|---|
2998 self._engines = {} | 2998 self._engines = {} |
2999 # Bundle spec human name to engine name. | 2999 # Bundle spec human name to engine name. |
3000 self._bundlenames = {} | 3000 self._bundlenames = {} |
3001 # Internal bundle identifier to engine name. | 3001 # Internal bundle identifier to engine name. |
3002 self._bundletypes = {} | 3002 self._bundletypes = {} |
3003 # Revlog header to engine name. | |
3004 self._revlogheaders = {} | |
3003 # Wire proto identifier to engine name. | 3005 # Wire proto identifier to engine name. |
3004 self._wiretypes = {} | 3006 self._wiretypes = {} |
3005 | 3007 |
3006 def __getitem__(self, key): | 3008 def __getitem__(self, key): |
3007 return self._engines[key] | 3009 return self._engines[key] |
3051 'registered by %s') % | 3053 'registered by %s') % |
3052 (wiretype, self._wiretypes[wiretype])) | 3054 (wiretype, self._wiretypes[wiretype])) |
3053 | 3055 |
3054 self._wiretypes[wiretype] = name | 3056 self._wiretypes[wiretype] = name |
3055 | 3057 |
3058 revlogheader = engine.revlogheader() | |
3059 if revlogheader and revlogheader in self._revlogheaders: | |
3060 raise error.Abort(_('revlog header %s already registered by %s') % | |
3061 (revlogheader, self._revlogheaders[revlogheader])) | |
3062 | |
3063 if revlogheader: | |
3064 self._revlogheaders[revlogheader] = name | |
3065 | |
3056 self._engines[name] = engine | 3066 self._engines[name] = engine |
3057 | 3067 |
3058 @property | 3068 @property |
3059 def supportedbundlenames(self): | 3069 def supportedbundlenames(self): |
3060 return set(self._bundlenames.keys()) | 3070 return set(self._bundlenames.keys()) |
3119 if not engine.available(): | 3129 if not engine.available(): |
3120 raise error.Abort(_('compression engine %s could not be loaded') % | 3130 raise error.Abort(_('compression engine %s could not be loaded') % |
3121 engine.name()) | 3131 engine.name()) |
3122 return engine | 3132 return engine |
3123 | 3133 |
3134 def forrevlogheader(self, header): | |
3135 """Obtain a compression engine registered to a revlog header. | |
3136 | |
3137 Will raise KeyError if the revlog header value isn't registered. | |
3138 """ | |
3139 return self._engines[self._revlogheaders[header]] | |
3140 | |
3124 compengines = compressormanager() | 3141 compengines = compressormanager() |
3125 | 3142 |
3126 class compressionengine(object): | 3143 class compressionengine(object): |
3127 """Base class for compression engines. | 3144 """Base class for compression engines. |
3128 | 3145 |
3184 If wire protocol compression is supported, the class must also implement | 3201 If wire protocol compression is supported, the class must also implement |
3185 ``compressstream`` and ``decompressorreader``. | 3202 ``compressstream`` and ``decompressorreader``. |
3186 """ | 3203 """ |
3187 return None | 3204 return None |
3188 | 3205 |
3206 def revlogheader(self): | |
3207 """Header added to revlog chunks that identifies this engine. | |
3208 | |
3209 If this engine can be used to compress revlogs, this method should | |
3210 return the bytes used to identify chunks compressed with this engine. | |
3211 Else, the method should return ``None`` to indicate it does not | |
3212 participate in revlog compression. | |
3213 """ | |
3214 return None | |
3215 | |
3189 def compressstream(self, it, opts=None): | 3216 def compressstream(self, it, opts=None): |
3190 """Compress an iterator of chunks. | 3217 """Compress an iterator of chunks. |
3191 | 3218 |
3192 The method receives an iterator (ideally a generator) of chunks of | 3219 The method receives an iterator (ideally a generator) of chunks of |
3193 bytes to be compressed. It returns an iterator (ideally a generator) | 3220 bytes to be compressed. It returns an iterator (ideally a generator) |
3213 The object has a ``compress(data)`` method that compresses binary | 3240 The object has a ``compress(data)`` method that compresses binary |
3214 data. This method returns compressed binary data or ``None`` if | 3241 data. This method returns compressed binary data or ``None`` if |
3215 the data could not be compressed (too small, not compressible, etc). | 3242 the data could not be compressed (too small, not compressible, etc). |
3216 The returned data should have a header uniquely identifying this | 3243 The returned data should have a header uniquely identifying this |
3217 compression format so decompression can be routed to this engine. | 3244 compression format so decompression can be routed to this engine. |
3245 This header should be identified by the ``revlogheader()`` return | |
3246 value. | |
3247 | |
3248 The object has a ``decompress(data)`` method that decompresses | |
3249 data. The method will only be called if ``data`` begins with | |
3250 ``revlogheader()``. The method should return the raw, uncompressed | |
3251 data or raise a ``RevlogError``. | |
3218 | 3252 |
3219 The object is reusable but is not thread safe. | 3253 The object is reusable but is not thread safe. |
3220 """ | 3254 """ |
3221 raise NotImplementedError() | 3255 raise NotImplementedError() |
3222 | 3256 |
3227 def bundletype(self): | 3261 def bundletype(self): |
3228 return 'gzip', 'GZ' | 3262 return 'gzip', 'GZ' |
3229 | 3263 |
3230 def wireprotosupport(self): | 3264 def wireprotosupport(self): |
3231 return compewireprotosupport('zlib', 20, 20) | 3265 return compewireprotosupport('zlib', 20, 20) |
3266 | |
3267 def revlogheader(self): | |
3268 return 'x' | |
3232 | 3269 |
3233 def compressstream(self, it, opts=None): | 3270 def compressstream(self, it, opts=None): |
3234 opts = opts or {} | 3271 opts = opts or {} |
3235 | 3272 |
3236 z = zlib.compressobj(opts.get('level', -1)) | 3273 z = zlib.compressobj(opts.get('level', -1)) |
3284 | 3321 |
3285 if sum(map(len, parts)) < insize: | 3322 if sum(map(len, parts)) < insize: |
3286 return ''.join(parts) | 3323 return ''.join(parts) |
3287 return None | 3324 return None |
3288 | 3325 |
3326 def decompress(self, data): | |
3327 try: | |
3328 return zlib.decompress(data) | |
3329 except zlib.error as e: | |
3330 raise error.RevlogError(_('revlog decompress error: %s') % | |
3331 str(e)) | |
3332 | |
3289 def revlogcompressor(self, opts=None): | 3333 def revlogcompressor(self, opts=None): |
3290 return self.zlibrevlogcompressor() | 3334 return self.zlibrevlogcompressor() |
3291 | 3335 |
3292 compengines.register(_zlibengine()) | 3336 compengines.register(_zlibengine()) |
3293 | 3337 |
3354 # Clients always support uncompressed payloads. Servers don't because | 3398 # Clients always support uncompressed payloads. Servers don't because |
3355 # unless you are on a fast network, uncompressed payloads can easily | 3399 # unless you are on a fast network, uncompressed payloads can easily |
3356 # saturate your network pipe. | 3400 # saturate your network pipe. |
3357 def wireprotosupport(self): | 3401 def wireprotosupport(self): |
3358 return compewireprotosupport('none', 0, 10) | 3402 return compewireprotosupport('none', 0, 10) |
3403 | |
3404 # We don't implement revlogheader because it is handled specially | |
3405 # in the revlog class. | |
3359 | 3406 |
3360 def compressstream(self, it, opts=None): | 3407 def compressstream(self, it, opts=None): |
3361 return it | 3408 return it |
3362 | 3409 |
3363 def decompressorreader(self, fh): | 3410 def decompressorreader(self, fh): |
3395 return 'zstd', 'ZS' | 3442 return 'zstd', 'ZS' |
3396 | 3443 |
3397 def wireprotosupport(self): | 3444 def wireprotosupport(self): |
3398 return compewireprotosupport('zstd', 50, 50) | 3445 return compewireprotosupport('zstd', 50, 50) |
3399 | 3446 |
3447 def revlogheader(self): | |
3448 return '\x28' | |
3449 | |
3400 def compressstream(self, it, opts=None): | 3450 def compressstream(self, it, opts=None): |
3401 opts = opts or {} | 3451 opts = opts or {} |
3402 # zstd level 3 is almost always significantly faster than zlib | 3452 # zstd level 3 is almost always significantly faster than zlib |
3403 # while providing no worse compression. It strikes a good balance | 3453 # while providing no worse compression. It strikes a good balance |
3404 # between speed and compression. | 3454 # between speed and compression. |
3423 # Writing the content size adds a few bytes to the output. However, | 3473 # Writing the content size adds a few bytes to the output. However, |
3424 # it allows decompression to be more optimal since we can | 3474 # it allows decompression to be more optimal since we can |
3425 # pre-allocate a buffer to hold the result. | 3475 # pre-allocate a buffer to hold the result. |
3426 self._cctx = zstd.ZstdCompressor(level=level, | 3476 self._cctx = zstd.ZstdCompressor(level=level, |
3427 write_content_size=True) | 3477 write_content_size=True) |
3478 self._dctx = zstd.ZstdDecompressor() | |
3428 self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE | 3479 self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE |
3480 self._decompinsize = zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE | |
3429 | 3481 |
3430 def compress(self, data): | 3482 def compress(self, data): |
3431 insize = len(data) | 3483 insize = len(data) |
3432 # Caller handles empty input case. | 3484 # Caller handles empty input case. |
3433 assert insize > 0 | 3485 assert insize > 0 |
3454 | 3506 |
3455 if sum(map(len, chunks)) < insize: | 3507 if sum(map(len, chunks)) < insize: |
3456 return ''.join(chunks) | 3508 return ''.join(chunks) |
3457 return None | 3509 return None |
3458 | 3510 |
3511 def decompress(self, data): | |
3512 insize = len(data) | |
3513 | |
3514 try: | |
3515 # This was measured to be faster than other streaming | |
3516 # decompressors. | |
3517 dobj = self._dctx.decompressobj() | |
3518 chunks = [] | |
3519 pos = 0 | |
3520 while pos < insize: | |
3521 pos2 = pos + self._decompinsize | |
3522 chunk = dobj.decompress(data[pos:pos2]) | |
3523 if chunk: | |
3524 chunks.append(chunk) | |
3525 pos = pos2 | |
3526 # Frame should be exhausted, so no finish() API. | |
3527 | |
3528 return ''.join(chunks) | |
3529 except Exception as e: | |
3530 raise error.RevlogError(_('revlog decompress error: %s') % | |
3531 str(e)) | |
3532 | |
3459 def revlogcompressor(self, opts=None): | 3533 def revlogcompressor(self, opts=None): |
3460 opts = opts or {} | 3534 opts = opts or {} |
3461 return self.zstdrevlogcompressor(self._module, | 3535 return self.zstdrevlogcompressor(self._module, |
3462 level=opts.get('level', 3)) | 3536 level=opts.get('level', 3)) |
3463 | 3537 |