mercurial/util.py
changeset 14076 924c82157d46
parent 14064 e4bfb9c337f3
child 14077 c285bdb0572a
--- a/mercurial/util.py	Sat Apr 30 16:33:47 2011 +0200
+++ b/mercurial/util.py	Sat Apr 30 09:43:20 2011 -0700
@@ -17,7 +17,7 @@
 import error, osutil, encoding
 import errno, re, shutil, sys, tempfile, traceback
 import os, time, calendar, textwrap, unicodedata, signal
-import imp, socket
+import imp, socket, urllib
 
 # Python compatibility
 
@@ -1283,3 +1283,265 @@
     If s is not a valid boolean, returns None.
     """
     return _booleans.get(s.lower(), None)
+
+class url(object):
+    """Reliable URL parser.
+
+    This parses URLs and provides attributes for the following
+    components:
+
+    <scheme>://<user>:<passwd>@<host>:<port>/<path>?<query>#<fragment>
+
+    Missing components are set to None. The only exception is
+    fragment, which is set to '' if present but empty.
+
+    If parsefragment is False, fragment is included in query. If
+    parsequery is False, query is included in path. If both are
+    False, both fragment and query are included in path.
+
+    See http://www.ietf.org/rfc/rfc2396.txt for more information.
+
+    Note that for backward compatibility reasons, bundle URLs do not
+    take host names. That means 'bundle://../' has a path of '../'.
+
+    Examples:
+
+    >>> url('http://www.ietf.org/rfc/rfc2396.txt')
+    <url scheme: 'http', host: 'www.ietf.org', path: 'rfc/rfc2396.txt'>
+    >>> url('ssh://[::1]:2200//home/joe/repo')
+    <url scheme: 'ssh', host: '[::1]', port: '2200', path: '/home/joe/repo'>
+    >>> url('file:///home/joe/repo')
+    <url scheme: 'file', path: '/home/joe/repo'>
+    >>> url('bundle:foo')
+    <url scheme: 'bundle', path: 'foo'>
+    >>> url('bundle://../foo')
+    <url scheme: 'bundle', path: '../foo'>
+    >>> url('c:\\\\foo\\\\bar')
+    <url path: 'c:\\\\foo\\\\bar'>
+
+    Authentication credentials:
+
+    >>> url('ssh://joe:xyz@x/repo')
+    <url scheme: 'ssh', user: 'joe', passwd: 'xyz', host: 'x', path: 'repo'>
+    >>> url('ssh://joe@x/repo')
+    <url scheme: 'ssh', user: 'joe', host: 'x', path: 'repo'>
+
+    Query strings and fragments:
+
+    >>> url('http://host/a?b#c')
+    <url scheme: 'http', host: 'host', path: 'a', query: 'b', fragment: 'c'>
+    >>> url('http://host/a?b#c', parsequery=False, parsefragment=False)
+    <url scheme: 'http', host: 'host', path: 'a?b#c'>
+    """
+
+    _safechars = "!~*'()+"
+    _safepchars = "/!~*'()+"
+    _matchscheme = re.compile(r'^[a-zA-Z0-9+.\-]+:').match
+
+    def __init__(self, path, parsequery=True, parsefragment=True):
+        # We slowly chomp away at path until we have only the path left
+        self.scheme = self.user = self.passwd = self.host = None
+        self.port = self.path = self.query = self.fragment = None
+        self._localpath = True
+        self._hostport = ''
+        self._origpath = path
+
+        # special case for Windows drive letters
+        if hasdriveletter(path):
+            self.path = path
+            return
+
+        # For compatibility reasons, we can't handle bundle paths as
+        # normal URLS
+        if path.startswith('bundle:'):
+            self.scheme = 'bundle'
+            path = path[7:]
+            if path.startswith('//'):
+                path = path[2:]
+            self.path = path
+            return
+
+        if self._matchscheme(path):
+            parts = path.split(':', 1)
+            if parts[0]:
+                self.scheme, path = parts
+                self._localpath = False
+
+        if not path:
+            path = None
+            if self._localpath:
+                self.path = ''
+                return
+        else:
+            if parsefragment and '#' in path:
+                path, self.fragment = path.split('#', 1)
+                if not path:
+                    path = None
+            if self._localpath:
+                self.path = path
+                return
+
+            if parsequery and '?' in path:
+                path, self.query = path.split('?', 1)
+                if not path:
+                    path = None
+                if not self.query:
+                    self.query = None
+
+            # // is required to specify a host/authority
+            if path and path.startswith('//'):
+                parts = path[2:].split('/', 1)
+                if len(parts) > 1:
+                    self.host, path = parts
+                    path = path
+                else:
+                    self.host = parts[0]
+                    path = None
+                if not self.host:
+                    self.host = None
+                    if path:
+                        path = '/' + path
+
+            if self.host and '@' in self.host:
+                self.user, self.host = self.host.rsplit('@', 1)
+                if ':' in self.user:
+                    self.user, self.passwd = self.user.split(':', 1)
+                if not self.host:
+                    self.host = None
+
+            # Don't split on colons in IPv6 addresses without ports
+            if (self.host and ':' in self.host and
+                not (self.host.startswith('[') and self.host.endswith(']'))):
+                self._hostport = self.host
+                self.host, self.port = self.host.rsplit(':', 1)
+                if not self.host:
+                    self.host = None
+
+            if (self.host and self.scheme == 'file' and
+                self.host not in ('localhost', '127.0.0.1', '[::1]')):
+                raise Abort(_('file:// URLs can only refer to localhost'))
+
+        self.path = path
+
+        for a in ('user', 'passwd', 'host', 'port',
+                  'path', 'query', 'fragment'):
+            v = getattr(self, a)
+            if v is not None:
+                setattr(self, a, urllib.unquote(v))
+
+    def __repr__(self):
+        attrs = []
+        for a in ('scheme', 'user', 'passwd', 'host', 'port', 'path',
+                  'query', 'fragment'):
+            v = getattr(self, a)
+            if v is not None:
+                attrs.append('%s: %r' % (a, v))
+        return '<url %s>' % ', '.join(attrs)
+
+    def __str__(self):
+        """Join the URL's components back into a URL string.
+
+        Examples:
+
+        >>> str(url('http://user:pw@host:80/?foo#bar'))
+        'http://user:pw@host:80/?foo#bar'
+        >>> str(url('ssh://user:pw@[::1]:2200//home/joe#'))
+        'ssh://user:pw@[::1]:2200//home/joe#'
+        >>> str(url('http://localhost:80//'))
+        'http://localhost:80//'
+        >>> str(url('http://localhost:80/'))
+        'http://localhost:80/'
+        >>> str(url('http://localhost:80'))
+        'http://localhost:80/'
+        >>> str(url('bundle:foo'))
+        'bundle:foo'
+        >>> str(url('bundle://../foo'))
+        'bundle:../foo'
+        >>> str(url('path'))
+        'path'
+        """
+        if self._localpath:
+            s = self.path
+            if self.scheme == 'bundle':
+                s = 'bundle:' + s
+            if self.fragment:
+                s += '#' + self.fragment
+            return s
+
+        s = self.scheme + ':'
+        if (self.user or self.passwd or self.host or
+            self.scheme and not self.path):
+            s += '//'
+        if self.user:
+            s += urllib.quote(self.user, safe=self._safechars)
+        if self.passwd:
+            s += ':' + urllib.quote(self.passwd, safe=self._safechars)
+        if self.user or self.passwd:
+            s += '@'
+        if self.host:
+            if not (self.host.startswith('[') and self.host.endswith(']')):
+                s += urllib.quote(self.host)
+            else:
+                s += self.host
+        if self.port:
+            s += ':' + urllib.quote(self.port)
+        if self.host:
+            s += '/'
+        if self.path:
+            s += urllib.quote(self.path, safe=self._safepchars)
+        if self.query:
+            s += '?' + urllib.quote(self.query, safe=self._safepchars)
+        if self.fragment is not None:
+            s += '#' + urllib.quote(self.fragment, safe=self._safepchars)
+        return s
+
+    def authinfo(self):
+        user, passwd = self.user, self.passwd
+        try:
+            self.user, self.passwd = None, None
+            s = str(self)
+        finally:
+            self.user, self.passwd = user, passwd
+        if not self.user:
+            return (s, None)
+        return (s, (None, (str(self), self.host),
+                    self.user, self.passwd or ''))
+
+    def localpath(self):
+        if self.scheme == 'file' or self.scheme == 'bundle':
+            path = self.path or '/'
+            # For Windows, we need to promote hosts containing drive
+            # letters to paths with drive letters.
+            if hasdriveletter(self._hostport):
+                path = self._hostport + '/' + self.path
+            elif self.host is not None and self.path:
+                path = '/' + path
+            # We also need to handle the case of file:///C:/, which
+            # should return C:/, not /C:/.
+            elif hasdriveletter(path):
+                # Strip leading slash from paths with drive names
+                return path[1:]
+            return path
+        return self._origpath
+
+def hasscheme(path):
+    return bool(url(path).scheme)
+
+def hasdriveletter(path):
+    return path[1:2] == ':' and path[0:1].isalpha()
+
+def localpath(path):
+    return url(path, parsequery=False, parsefragment=False).localpath()
+
+def hidepassword(u):
+    '''hide user credential in a url string'''
+    u = url(u)
+    if u.passwd:
+        u.passwd = '***'
+    return str(u)
+
+def removeauth(u):
+    '''remove all authentication information from a url string'''
+    u = url(u)
+    u.user = u.passwd = None
+    return str(u)