comparison mercurial/util.py @ 30761:7283719e2bfd

util: declare wire protocol support of compression engines This patch implements a new compression engine API allowing compression engines to declare support for the wire protocol. Support is declared by returning a compression format string identifier that will be added to payloads to signal the compression type of data that follows and default integer priorities of the engine. Accessor methods have been added to the compression engine manager class to facilitate use. Note that the "none" and "bz2" engines declare wire protocol support but aren't enabled by default due to their priorities being 0. It is essentially free from a coding perspective to support these compression formats, so we do it in case anyone may derive use from it.
author Gregory Szorc <gregory.szorc@gmail.com>
date Sat, 24 Dec 2016 13:51:12 -0700
parents c1b7b2285522
children c390b40fe1d7
comparison
equal deleted inserted replaced
30760:753b9d43ca81 30761:7283719e2bfd
2955 raise exc_val 2955 raise exc_val
2956 return received and suppressed 2956 return received and suppressed
2957 2957
2958 # compression code 2958 # compression code
2959 2959
2960 SERVERROLE = 'server'
2961 CLIENTROLE = 'client'
2962
2963 compewireprotosupport = collections.namedtuple(u'compenginewireprotosupport',
2964 (u'name', u'serverpriority',
2965 u'clientpriority'))
2966
2960 class compressormanager(object): 2967 class compressormanager(object):
2961 """Holds registrations of various compression engines. 2968 """Holds registrations of various compression engines.
2962 2969
2963 This class essentially abstracts the differences between compression 2970 This class essentially abstracts the differences between compression
2964 engines to allow new compression formats to be added easily, possibly from 2971 engines to allow new compression formats to be added easily, possibly from
2971 self._engines = {} 2978 self._engines = {}
2972 # Bundle spec human name to engine name. 2979 # Bundle spec human name to engine name.
2973 self._bundlenames = {} 2980 self._bundlenames = {}
2974 # Internal bundle identifier to engine name. 2981 # Internal bundle identifier to engine name.
2975 self._bundletypes = {} 2982 self._bundletypes = {}
2983 # Wire proto identifier to engine name.
2984 self._wiretypes = {}
2976 2985
2977 def __getitem__(self, key): 2986 def __getitem__(self, key):
2978 return self._engines[key] 2987 return self._engines[key]
2979 2988
2980 def __contains__(self, key): 2989 def __contains__(self, key):
3011 # No external facing name declared. 3020 # No external facing name declared.
3012 if bundlename: 3021 if bundlename:
3013 self._bundlenames[bundlename] = name 3022 self._bundlenames[bundlename] = name
3014 3023
3015 self._bundletypes[bundletype] = name 3024 self._bundletypes[bundletype] = name
3025
3026 wiresupport = engine.wireprotosupport()
3027 if wiresupport:
3028 wiretype = wiresupport.name
3029 if wiretype in self._wiretypes:
3030 raise error.Abort(_('wire protocol compression %s already '
3031 'registered by %s') %
3032 (wiretype, self._wiretypes[wiretype]))
3033
3034 self._wiretypes[wiretype] = name
3016 3035
3017 self._engines[name] = engine 3036 self._engines[name] = engine
3018 3037
3019 @property 3038 @property
3020 def supportedbundlenames(self): 3039 def supportedbundlenames(self):
3048 if not engine.available(): 3067 if not engine.available():
3049 raise error.Abort(_('compression engine %s could not be loaded') % 3068 raise error.Abort(_('compression engine %s could not be loaded') %
3050 engine.name()) 3069 engine.name())
3051 return engine 3070 return engine
3052 3071
3072 def supportedwireengines(self, role, onlyavailable=True):
3073 """Obtain compression engines that support the wire protocol.
3074
3075 Returns a list of engines in prioritized order, most desired first.
3076
3077 If ``onlyavailable`` is set, filter out engines that can't be
3078 loaded.
3079 """
3080 assert role in (SERVERROLE, CLIENTROLE)
3081
3082 attr = 'serverpriority' if role == SERVERROLE else 'clientpriority'
3083
3084 engines = [self._engines[e] for e in self._wiretypes.values()]
3085 if onlyavailable:
3086 engines = [e for e in engines if e.available()]
3087
3088 def getkey(e):
3089 # Sort first by priority, highest first. In case of tie, sort
3090 # alphabetically. This is arbitrary, but ensures output is
3091 # stable.
3092 w = e.wireprotosupport()
3093 return -1 * getattr(w, attr), w.name
3094
3095 return list(sorted(engines, key=getkey))
3096
3097 def forwiretype(self, wiretype):
3098 engine = self._engines[self._wiretypes[wiretype]]
3099 if not engine.available():
3100 raise error.Abort(_('compression engine %s could not be loaded') %
3101 engine.name())
3102 return engine
3103
3053 compengines = compressormanager() 3104 compengines = compressormanager()
3054 3105
3055 class compressionengine(object): 3106 class compressionengine(object):
3056 """Base class for compression engines. 3107 """Base class for compression engines.
3057 3108
3088 If bundle compression is supported, the class must also implement 3139 If bundle compression is supported, the class must also implement
3089 ``compressstream`` and `decompressorreader``. 3140 ``compressstream`` and `decompressorreader``.
3090 """ 3141 """
3091 return None 3142 return None
3092 3143
3144 def wireprotosupport(self):
3145 """Declare support for this compression format on the wire protocol.
3146
3147 If this compression engine isn't supported for compressing wire
3148 protocol payloads, returns None.
3149
3150 Otherwise, returns ``compenginewireprotosupport`` with the following
3151 fields:
3152
3153 * String format identifier
3154 * Integer priority for the server
3155 * Integer priority for the client
3156
3157 The integer priorities are used to order the advertisement of format
3158 support by server and client. The highest integer is advertised
3159 first. Integers with non-positive values aren't advertised.
3160
3161 The priority values are somewhat arbitrary and only used for default
3162 ordering. The relative order can be changed via config options.
3163
3164 If wire protocol compression is supported, the class must also implement
3165 ``compressstream`` and ``decompressorreader``.
3166 """
3167 return None
3168
3093 def compressstream(self, it, opts=None): 3169 def compressstream(self, it, opts=None):
3094 """Compress an iterator of chunks. 3170 """Compress an iterator of chunks.
3095 3171
3096 The method receives an iterator (ideally a generator) of chunks of 3172 The method receives an iterator (ideally a generator) of chunks of
3097 bytes to be compressed. It returns an iterator (ideally a generator) 3173 bytes to be compressed. It returns an iterator (ideally a generator)
3115 def name(self): 3191 def name(self):
3116 return 'zlib' 3192 return 'zlib'
3117 3193
3118 def bundletype(self): 3194 def bundletype(self):
3119 return 'gzip', 'GZ' 3195 return 'gzip', 'GZ'
3196
3197 def wireprotosupport(self):
3198 return compewireprotosupport('zlib', 20, 20)
3120 3199
3121 def compressstream(self, it, opts=None): 3200 def compressstream(self, it, opts=None):
3122 opts = opts or {} 3201 opts = opts or {}
3123 3202
3124 z = zlib.compressobj(opts.get('level', -1)) 3203 z = zlib.compressobj(opts.get('level', -1))
3149 return 'bz2' 3228 return 'bz2'
3150 3229
3151 def bundletype(self): 3230 def bundletype(self):
3152 return 'bzip2', 'BZ' 3231 return 'bzip2', 'BZ'
3153 3232
3233 # We declare a protocol name but don't advertise by default because
3234 # it is slow.
3235 def wireprotosupport(self):
3236 return compewireprotosupport('bzip2', 0, 0)
3237
3154 def compressstream(self, it, opts=None): 3238 def compressstream(self, it, opts=None):
3155 opts = opts or {} 3239 opts = opts or {}
3156 z = bz2.BZ2Compressor(opts.get('level', 9)) 3240 z = bz2.BZ2Compressor(opts.get('level', 9))
3157 for chunk in it: 3241 for chunk in it:
3158 data = z.compress(chunk) 3242 data = z.compress(chunk)
3196 def name(self): 3280 def name(self):
3197 return 'none' 3281 return 'none'
3198 3282
3199 def bundletype(self): 3283 def bundletype(self):
3200 return 'none', 'UN' 3284 return 'none', 'UN'
3285
3286 # Clients always support uncompressed payloads. Servers don't because
3287 # unless you are on a fast network, uncompressed payloads can easily
3288 # saturate your network pipe.
3289 def wireprotosupport(self):
3290 return compewireprotosupport('none', 0, 10)
3201 3291
3202 def compressstream(self, it, opts=None): 3292 def compressstream(self, it, opts=None):
3203 return it 3293 return it
3204 3294
3205 def decompressorreader(self, fh): 3295 def decompressorreader(self, fh):
3227 return bool(self._module) 3317 return bool(self._module)
3228 3318
3229 def bundletype(self): 3319 def bundletype(self):
3230 return 'zstd', 'ZS' 3320 return 'zstd', 'ZS'
3231 3321
3322 def wireprotosupport(self):
3323 return compewireprotosupport('zstd', 50, 50)
3324
3232 def compressstream(self, it, opts=None): 3325 def compressstream(self, it, opts=None):
3233 opts = opts or {} 3326 opts = opts or {}
3234 # zstd level 3 is almost always significantly faster than zlib 3327 # zstd level 3 is almost always significantly faster than zlib
3235 # while providing no worse compression. It strikes a good balance 3328 # while providing no worse compression. It strikes a good balance
3236 # between speed and compression. 3329 # between speed and compression.