Mercurial > public > mercurial-scm > hg
comparison mercurial/util.py @ 30761:7283719e2bfd
util: declare wire protocol support of compression engines
This patch implements a new compression engine API allowing
compression engines to declare support for the wire protocol.
Support is declared by returning a compression format string
identifier that will be added to payloads to signal the compression
type of data that follows and default integer priorities of the
engine.
Accessor methods have been added to the compression engine manager
class to facilitate use.
Note that the "none" and "bz2" engines declare wire protocol support
but aren't enabled by default due to their priorities being 0. It
is essentially free from a coding perspective to support these
compression formats, so we do it in case anyone may derive use from
it.
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Sat, 24 Dec 2016 13:51:12 -0700 |
parents | c1b7b2285522 |
children | c390b40fe1d7 |
comparison
equal
deleted
inserted
replaced
30760:753b9d43ca81 | 30761:7283719e2bfd |
---|---|
2955 raise exc_val | 2955 raise exc_val |
2956 return received and suppressed | 2956 return received and suppressed |
2957 | 2957 |
2958 # compression code | 2958 # compression code |
2959 | 2959 |
2960 SERVERROLE = 'server' | |
2961 CLIENTROLE = 'client' | |
2962 | |
2963 compewireprotosupport = collections.namedtuple(u'compenginewireprotosupport', | |
2964 (u'name', u'serverpriority', | |
2965 u'clientpriority')) | |
2966 | |
2960 class compressormanager(object): | 2967 class compressormanager(object): |
2961 """Holds registrations of various compression engines. | 2968 """Holds registrations of various compression engines. |
2962 | 2969 |
2963 This class essentially abstracts the differences between compression | 2970 This class essentially abstracts the differences between compression |
2964 engines to allow new compression formats to be added easily, possibly from | 2971 engines to allow new compression formats to be added easily, possibly from |
2971 self._engines = {} | 2978 self._engines = {} |
2972 # Bundle spec human name to engine name. | 2979 # Bundle spec human name to engine name. |
2973 self._bundlenames = {} | 2980 self._bundlenames = {} |
2974 # Internal bundle identifier to engine name. | 2981 # Internal bundle identifier to engine name. |
2975 self._bundletypes = {} | 2982 self._bundletypes = {} |
2983 # Wire proto identifier to engine name. | |
2984 self._wiretypes = {} | |
2976 | 2985 |
2977 def __getitem__(self, key): | 2986 def __getitem__(self, key): |
2978 return self._engines[key] | 2987 return self._engines[key] |
2979 | 2988 |
2980 def __contains__(self, key): | 2989 def __contains__(self, key): |
3011 # No external facing name declared. | 3020 # No external facing name declared. |
3012 if bundlename: | 3021 if bundlename: |
3013 self._bundlenames[bundlename] = name | 3022 self._bundlenames[bundlename] = name |
3014 | 3023 |
3015 self._bundletypes[bundletype] = name | 3024 self._bundletypes[bundletype] = name |
3025 | |
3026 wiresupport = engine.wireprotosupport() | |
3027 if wiresupport: | |
3028 wiretype = wiresupport.name | |
3029 if wiretype in self._wiretypes: | |
3030 raise error.Abort(_('wire protocol compression %s already ' | |
3031 'registered by %s') % | |
3032 (wiretype, self._wiretypes[wiretype])) | |
3033 | |
3034 self._wiretypes[wiretype] = name | |
3016 | 3035 |
3017 self._engines[name] = engine | 3036 self._engines[name] = engine |
3018 | 3037 |
3019 @property | 3038 @property |
3020 def supportedbundlenames(self): | 3039 def supportedbundlenames(self): |
3048 if not engine.available(): | 3067 if not engine.available(): |
3049 raise error.Abort(_('compression engine %s could not be loaded') % | 3068 raise error.Abort(_('compression engine %s could not be loaded') % |
3050 engine.name()) | 3069 engine.name()) |
3051 return engine | 3070 return engine |
3052 | 3071 |
3072 def supportedwireengines(self, role, onlyavailable=True): | |
3073 """Obtain compression engines that support the wire protocol. | |
3074 | |
3075 Returns a list of engines in prioritized order, most desired first. | |
3076 | |
3077 If ``onlyavailable`` is set, filter out engines that can't be | |
3078 loaded. | |
3079 """ | |
3080 assert role in (SERVERROLE, CLIENTROLE) | |
3081 | |
3082 attr = 'serverpriority' if role == SERVERROLE else 'clientpriority' | |
3083 | |
3084 engines = [self._engines[e] for e in self._wiretypes.values()] | |
3085 if onlyavailable: | |
3086 engines = [e for e in engines if e.available()] | |
3087 | |
3088 def getkey(e): | |
3089 # Sort first by priority, highest first. In case of tie, sort | |
3090 # alphabetically. This is arbitrary, but ensures output is | |
3091 # stable. | |
3092 w = e.wireprotosupport() | |
3093 return -1 * getattr(w, attr), w.name | |
3094 | |
3095 return list(sorted(engines, key=getkey)) | |
3096 | |
3097 def forwiretype(self, wiretype): | |
3098 engine = self._engines[self._wiretypes[wiretype]] | |
3099 if not engine.available(): | |
3100 raise error.Abort(_('compression engine %s could not be loaded') % | |
3101 engine.name()) | |
3102 return engine | |
3103 | |
3053 compengines = compressormanager() | 3104 compengines = compressormanager() |
3054 | 3105 |
3055 class compressionengine(object): | 3106 class compressionengine(object): |
3056 """Base class for compression engines. | 3107 """Base class for compression engines. |
3057 | 3108 |
3088 If bundle compression is supported, the class must also implement | 3139 If bundle compression is supported, the class must also implement |
3089 ``compressstream`` and `decompressorreader``. | 3140 ``compressstream`` and `decompressorreader``. |
3090 """ | 3141 """ |
3091 return None | 3142 return None |
3092 | 3143 |
3144 def wireprotosupport(self): | |
3145 """Declare support for this compression format on the wire protocol. | |
3146 | |
3147 If this compression engine isn't supported for compressing wire | |
3148 protocol payloads, returns None. | |
3149 | |
3150 Otherwise, returns ``compenginewireprotosupport`` with the following | |
3151 fields: | |
3152 | |
3153 * String format identifier | |
3154 * Integer priority for the server | |
3155 * Integer priority for the client | |
3156 | |
3157 The integer priorities are used to order the advertisement of format | |
3158 support by server and client. The highest integer is advertised | |
3159 first. Integers with non-positive values aren't advertised. | |
3160 | |
3161 The priority values are somewhat arbitrary and only used for default | |
3162 ordering. The relative order can be changed via config options. | |
3163 | |
3164 If wire protocol compression is supported, the class must also implement | |
3165 ``compressstream`` and ``decompressorreader``. | |
3166 """ | |
3167 return None | |
3168 | |
3093 def compressstream(self, it, opts=None): | 3169 def compressstream(self, it, opts=None): |
3094 """Compress an iterator of chunks. | 3170 """Compress an iterator of chunks. |
3095 | 3171 |
3096 The method receives an iterator (ideally a generator) of chunks of | 3172 The method receives an iterator (ideally a generator) of chunks of |
3097 bytes to be compressed. It returns an iterator (ideally a generator) | 3173 bytes to be compressed. It returns an iterator (ideally a generator) |
3115 def name(self): | 3191 def name(self): |
3116 return 'zlib' | 3192 return 'zlib' |
3117 | 3193 |
3118 def bundletype(self): | 3194 def bundletype(self): |
3119 return 'gzip', 'GZ' | 3195 return 'gzip', 'GZ' |
3196 | |
3197 def wireprotosupport(self): | |
3198 return compewireprotosupport('zlib', 20, 20) | |
3120 | 3199 |
3121 def compressstream(self, it, opts=None): | 3200 def compressstream(self, it, opts=None): |
3122 opts = opts or {} | 3201 opts = opts or {} |
3123 | 3202 |
3124 z = zlib.compressobj(opts.get('level', -1)) | 3203 z = zlib.compressobj(opts.get('level', -1)) |
3149 return 'bz2' | 3228 return 'bz2' |
3150 | 3229 |
3151 def bundletype(self): | 3230 def bundletype(self): |
3152 return 'bzip2', 'BZ' | 3231 return 'bzip2', 'BZ' |
3153 | 3232 |
3233 # We declare a protocol name but don't advertise by default because | |
3234 # it is slow. | |
3235 def wireprotosupport(self): | |
3236 return compewireprotosupport('bzip2', 0, 0) | |
3237 | |
3154 def compressstream(self, it, opts=None): | 3238 def compressstream(self, it, opts=None): |
3155 opts = opts or {} | 3239 opts = opts or {} |
3156 z = bz2.BZ2Compressor(opts.get('level', 9)) | 3240 z = bz2.BZ2Compressor(opts.get('level', 9)) |
3157 for chunk in it: | 3241 for chunk in it: |
3158 data = z.compress(chunk) | 3242 data = z.compress(chunk) |
3196 def name(self): | 3280 def name(self): |
3197 return 'none' | 3281 return 'none' |
3198 | 3282 |
3199 def bundletype(self): | 3283 def bundletype(self): |
3200 return 'none', 'UN' | 3284 return 'none', 'UN' |
3285 | |
3286 # Clients always support uncompressed payloads. Servers don't because | |
3287 # unless you are on a fast network, uncompressed payloads can easily | |
3288 # saturate your network pipe. | |
3289 def wireprotosupport(self): | |
3290 return compewireprotosupport('none', 0, 10) | |
3201 | 3291 |
3202 def compressstream(self, it, opts=None): | 3292 def compressstream(self, it, opts=None): |
3203 return it | 3293 return it |
3204 | 3294 |
3205 def decompressorreader(self, fh): | 3295 def decompressorreader(self, fh): |
3227 return bool(self._module) | 3317 return bool(self._module) |
3228 | 3318 |
3229 def bundletype(self): | 3319 def bundletype(self): |
3230 return 'zstd', 'ZS' | 3320 return 'zstd', 'ZS' |
3231 | 3321 |
3322 def wireprotosupport(self): | |
3323 return compewireprotosupport('zstd', 50, 50) | |
3324 | |
3232 def compressstream(self, it, opts=None): | 3325 def compressstream(self, it, opts=None): |
3233 opts = opts or {} | 3326 opts = opts or {} |
3234 # zstd level 3 is almost always significantly faster than zlib | 3327 # zstd level 3 is almost always significantly faster than zlib |
3235 # while providing no worse compression. It strikes a good balance | 3328 # while providing no worse compression. It strikes a good balance |
3236 # between speed and compression. | 3329 # between speed and compression. |