Mercurial > public > mercurial-scm > hg-stable
diff mercurial/scmutil.py @ 13972:d1f4e7fd970a
move path_auditor from util to scmutil
author | Adrian Buehlmann <adrian@cadifra.com> |
---|---|
date | Wed, 20 Apr 2011 22:43:31 +0200 |
parents | bfeaa88b875d |
children | 366fa83f9820 |
line wrap: on
line diff
--- a/mercurial/scmutil.py Wed Apr 20 21:41:41 2011 +0200 +++ b/mercurial/scmutil.py Wed Apr 20 22:43:31 2011 +0200 @@ -7,7 +7,7 @@ from i18n import _ import util, error -import os, errno +import os, errno, stat def checkportable(ui, f): '''Check if filename f is portable and warn or abort depending on config''' @@ -26,6 +26,81 @@ raise error.ConfigError( _("ui.portablefilenames value is invalid ('%s')") % val) +class path_auditor(object): + '''ensure that a filesystem path contains no banned components. + the following properties of a path are checked: + + - ends with a directory separator + - under top-level .hg + - starts at the root of a windows drive + - contains ".." + - traverses a symlink (e.g. a/symlink_here/b) + - inside a nested repository (a callback can be used to approve + some nested repositories, e.g., subrepositories) + ''' + + def __init__(self, root, callback=None): + self.audited = set() + self.auditeddir = set() + self.root = root + self.callback = callback + + def __call__(self, path): + '''Check the relative path. + path may contain a pattern (e.g. foodir/**.txt)''' + + if path in self.audited: + return + # AIX ignores "/" at end of path, others raise EISDIR. + if util.endswithsep(path): + raise util.Abort(_("path ends in directory separator: %s") % path) + normpath = os.path.normcase(path) + parts = util.splitpath(normpath) + if (os.path.splitdrive(path)[0] + or parts[0].lower() in ('.hg', '.hg.', '') + or os.pardir in parts): + raise util.Abort(_("path contains illegal component: %s") % path) + if '.hg' in path.lower(): + lparts = [p.lower() for p in parts] + for p in '.hg', '.hg.': + if p in lparts[1:]: + pos = lparts.index(p) + base = os.path.join(*parts[:pos]) + raise util.Abort(_('path %r is inside nested repo %r') + % (path, base)) + + parts.pop() + prefixes = [] + while parts: + prefix = os.sep.join(parts) + if prefix in self.auditeddir: + break + curpath = os.path.join(self.root, prefix) + try: + st = os.lstat(curpath) + except OSError, err: + # EINVAL can be raised as invalid path syntax under win32. + # They must be ignored for patterns can be checked too. + if err.errno not in (errno.ENOENT, errno.ENOTDIR, errno.EINVAL): + raise + else: + if stat.S_ISLNK(st.st_mode): + raise util.Abort( + _('path %r traverses symbolic link %r') + % (path, prefix)) + elif (stat.S_ISDIR(st.st_mode) and + os.path.isdir(os.path.join(curpath, '.hg'))): + if not self.callback or not self.callback(curpath): + raise util.Abort(_('path %r is inside nested repo %r') % + (path, prefix)) + prefixes.append(prefix) + parts.pop() + + self.audited.add(path) + # only add prefixes to the cache after checking everything: we don't + # want to add "foo/bar/baz" before checking if there's a "foo/.hg" + self.auditeddir.update(prefixes) + class opener(object): '''Open files relative to a base directory @@ -35,7 +110,7 @@ def __init__(self, base, audit=True): self.base = base if audit: - self.auditor = util.path_auditor(base) + self.auditor = path_auditor(base) else: self.auditor = util.always self.createmode = None @@ -132,7 +207,7 @@ name = os.path.join(root, cwd, name) name = os.path.normpath(name) if auditor is None: - auditor = util.path_auditor(root) + auditor = path_auditor(root) if name != rootsep and name.startswith(rootsep): name = name[len(rootsep):] auditor(name)