Mercurial > public > mercurial-scm > hg-stable
comparison mercurial/pathutil.py @ 46685:5d483e3bb60e
typing: add some type annotations to mercurial/pathutil.py
Differential Revision: https://phab.mercurial-scm.org/D10128
author | Matt Harbison <matt_harbison@yahoo.com> |
---|---|
date | Sat, 06 Mar 2021 23:43:44 -0500 |
parents | 89a2afe31e82 |
children | a9f38b144096 |
comparison
equal
deleted
inserted
replaced
46684:7711853110b9 | 46685:5d483e3bb60e |
---|---|
13 policy, | 13 policy, |
14 pycompat, | 14 pycompat, |
15 util, | 15 util, |
16 ) | 16 ) |
17 | 17 |
18 if pycompat.TYPE_CHECKING: | |
19 from typing import ( | |
20 Any, | |
21 Callable, | |
22 Iterator, | |
23 Optional, | |
24 ) | |
25 | |
26 | |
18 rustdirs = policy.importrust('dirstate', 'Dirs') | 27 rustdirs = policy.importrust('dirstate', 'Dirs') |
19 parsers = policy.importmod('parsers') | 28 parsers = policy.importmod('parsers') |
20 | 29 |
21 | 30 |
22 def _lowerclean(s): | 31 def _lowerclean(s): |
32 # type: (bytes) -> bytes | |
23 return encoding.hfsignoreclean(s.lower()) | 33 return encoding.hfsignoreclean(s.lower()) |
24 | 34 |
25 | 35 |
26 class pathauditor(object): | 36 class pathauditor(object): |
27 """ensure that a filesystem path contains no banned components. | 37 """ensure that a filesystem path contains no banned components. |
57 self.normcase = util.normcase | 67 self.normcase = util.normcase |
58 else: | 68 else: |
59 self.normcase = lambda x: x | 69 self.normcase = lambda x: x |
60 | 70 |
61 def __call__(self, path, mode=None): | 71 def __call__(self, path, mode=None): |
72 # type: (bytes, Optional[Any]) -> None | |
62 """Check the relative path. | 73 """Check the relative path. |
63 path may contain a pattern (e.g. foodir/**.txt)""" | 74 path may contain a pattern (e.g. foodir/**.txt)""" |
64 | 75 |
65 path = util.localpath(path) | 76 path = util.localpath(path) |
66 normpath = self.normcase(path) | 77 normpath = self.normcase(path) |
117 | 128 |
118 if self._cached: | 129 if self._cached: |
119 self.audited.add(normpath) | 130 self.audited.add(normpath) |
120 | 131 |
121 def _checkfs(self, prefix, path): | 132 def _checkfs(self, prefix, path): |
133 # type: (bytes, bytes) -> None | |
122 """raise exception if a file system backed check fails""" | 134 """raise exception if a file system backed check fails""" |
123 curpath = os.path.join(self.root, prefix) | 135 curpath = os.path.join(self.root, prefix) |
124 try: | 136 try: |
125 st = os.lstat(curpath) | 137 st = os.lstat(curpath) |
126 except OSError as err: | 138 except OSError as err: |
141 if not self.callback or not self.callback(curpath): | 153 if not self.callback or not self.callback(curpath): |
142 msg = _(b"path '%s' is inside nested repo %r") | 154 msg = _(b"path '%s' is inside nested repo %r") |
143 raise error.Abort(msg % (path, pycompat.bytestr(prefix))) | 155 raise error.Abort(msg % (path, pycompat.bytestr(prefix))) |
144 | 156 |
145 def check(self, path): | 157 def check(self, path): |
158 # type: (bytes) -> bool | |
146 try: | 159 try: |
147 self(path) | 160 self(path) |
148 return True | 161 return True |
149 except (OSError, error.Abort): | 162 except (OSError, error.Abort): |
150 return False | 163 return False |
162 self.auditeddir.clear() | 175 self.auditeddir.clear() |
163 self._cached = False | 176 self._cached = False |
164 | 177 |
165 | 178 |
166 def canonpath(root, cwd, myname, auditor=None): | 179 def canonpath(root, cwd, myname, auditor=None): |
180 # type: (bytes, bytes, bytes, Optional[pathauditor]) -> bytes | |
167 """return the canonical path of myname, given cwd and root | 181 """return the canonical path of myname, given cwd and root |
168 | 182 |
169 >>> def check(root, cwd, myname): | 183 >>> def check(root, cwd, myname): |
170 ... a = pathauditor(root, realfs=False) | 184 ... a = pathauditor(root, realfs=False) |
171 ... try: | 185 ... try: |
264 _(b"%s not under root '%s'") % (myname, root), hint=hint | 278 _(b"%s not under root '%s'") % (myname, root), hint=hint |
265 ) | 279 ) |
266 | 280 |
267 | 281 |
268 def normasprefix(path): | 282 def normasprefix(path): |
283 # type: (bytes) -> bytes | |
269 """normalize the specified path as path prefix | 284 """normalize the specified path as path prefix |
270 | 285 |
271 Returned value can be used safely for "p.startswith(prefix)", | 286 Returned value can be used safely for "p.startswith(prefix)", |
272 "p[len(prefix):]", and so on. | 287 "p[len(prefix):]", and so on. |
273 | 288 |
287 else: | 302 else: |
288 return path | 303 return path |
289 | 304 |
290 | 305 |
291 def finddirs(path): | 306 def finddirs(path): |
307 # type: (bytes) -> Iterator[bytes] | |
292 pos = path.rfind(b'/') | 308 pos = path.rfind(b'/') |
293 while pos != -1: | 309 while pos != -1: |
294 yield path[:pos] | 310 yield path[:pos] |
295 pos = path.rfind(b'/', 0, pos) | 311 pos = path.rfind(b'/', 0, pos) |
296 yield b'' | 312 yield b'' |
316 else: | 332 else: |
317 for f in map: | 333 for f in map: |
318 addpath(f) | 334 addpath(f) |
319 | 335 |
320 def addpath(self, path): | 336 def addpath(self, path): |
337 # type: (bytes) -> None | |
321 dirs = self._dirs | 338 dirs = self._dirs |
322 for base in finddirs(path): | 339 for base in finddirs(path): |
323 if base.endswith(b'/'): | 340 if base.endswith(b'/'): |
324 raise ValueError( | 341 raise ValueError( |
325 "found invalid consecutive slashes in path: %r" % base | 342 "found invalid consecutive slashes in path: %r" % base |
328 dirs[base] += 1 | 345 dirs[base] += 1 |
329 return | 346 return |
330 dirs[base] = 1 | 347 dirs[base] = 1 |
331 | 348 |
332 def delpath(self, path): | 349 def delpath(self, path): |
350 # type: (bytes) -> None | |
333 dirs = self._dirs | 351 dirs = self._dirs |
334 for base in finddirs(path): | 352 for base in finddirs(path): |
335 if dirs[base] > 1: | 353 if dirs[base] > 1: |
336 dirs[base] -= 1 | 354 dirs[base] -= 1 |
337 return | 355 return |
339 | 357 |
340 def __iter__(self): | 358 def __iter__(self): |
341 return iter(self._dirs) | 359 return iter(self._dirs) |
342 | 360 |
343 def __contains__(self, d): | 361 def __contains__(self, d): |
362 # type: (bytes) -> bool | |
344 return d in self._dirs | 363 return d in self._dirs |
345 | 364 |
346 | 365 |
347 if util.safehasattr(parsers, 'dirs'): | 366 if util.safehasattr(parsers, 'dirs'): |
348 dirs = parsers.dirs | 367 dirs = parsers.dirs |
353 | 372 |
354 # forward two methods from posixpath that do what we need, but we'd | 373 # forward two methods from posixpath that do what we need, but we'd |
355 # rather not let our internals know that we're thinking in posix terms | 374 # rather not let our internals know that we're thinking in posix terms |
356 # - instead we'll let them be oblivious. | 375 # - instead we'll let them be oblivious. |
357 join = posixpath.join | 376 join = posixpath.join |
358 dirname = posixpath.dirname | 377 dirname = posixpath.dirname # type: Callable[[bytes], bytes] |