equal
deleted
inserted
replaced
30 Iterator, |
30 Iterator, |
31 Optional, |
31 Optional, |
32 ] |
32 ] |
33 |
33 |
34 |
34 |
35 def _lowerclean(s): |
35 def _lowerclean(s: bytes) -> bytes: |
36 # type: (bytes) -> bytes |
|
37 return encoding.hfsignoreclean(s.lower()) |
36 return encoding.hfsignoreclean(s.lower()) |
38 |
37 |
39 |
38 |
40 class pathauditor: |
39 class pathauditor: |
41 """ensure that a filesystem path contains no banned components. |
40 """ensure that a filesystem path contains no banned components. |
70 if os.path.lexists(root) and not util.fscasesensitive(root): |
69 if os.path.lexists(root) and not util.fscasesensitive(root): |
71 self.normcase = util.normcase |
70 self.normcase = util.normcase |
72 else: |
71 else: |
73 self.normcase = lambda x: x |
72 self.normcase = lambda x: x |
74 |
73 |
75 def __call__(self, path, mode=None): |
74 def __call__(self, path: bytes, mode: Optional[Any] = None) -> None: |
76 # type: (bytes, Optional[Any]) -> None |
|
77 """Check the relative path. |
75 """Check the relative path. |
78 path may contain a pattern (e.g. foodir/**.txt)""" |
76 path may contain a pattern (e.g. foodir/**.txt)""" |
79 |
77 |
80 path = util.localpath(path) |
78 path = util.localpath(path) |
81 if path in self.audited: |
79 if path in self.audited: |
168 if not callback or not callback(curpath): |
166 if not callback or not callback(curpath): |
169 msg = _(b"path '%s' is inside nested repo %r") |
167 msg = _(b"path '%s' is inside nested repo %r") |
170 raise error.Abort(msg % (path, pycompat.bytestr(prefix))) |
168 raise error.Abort(msg % (path, pycompat.bytestr(prefix))) |
171 return True |
169 return True |
172 |
170 |
173 def check(self, path): |
171 def check(self, path: bytes) -> bool: |
174 # type: (bytes) -> bool |
|
175 try: |
172 try: |
176 self(path) |
173 self(path) |
177 return True |
174 return True |
178 except (OSError, error.Abort): |
175 except (OSError, error.Abort): |
179 return False |
176 return False |
190 self.audited.clear() |
187 self.audited.clear() |
191 self.auditeddir.clear() |
188 self.auditeddir.clear() |
192 self._cached = False |
189 self._cached = False |
193 |
190 |
194 |
191 |
195 def canonpath(root, cwd, myname, auditor=None): |
192 def canonpath( |
196 # type: (bytes, bytes, bytes, Optional[pathauditor]) -> bytes |
193 root: bytes, |
|
194 cwd: bytes, |
|
195 myname: bytes, |
|
196 auditor: Optional[pathauditor] = None, |
|
197 ) -> bytes: |
197 """return the canonical path of myname, given cwd and root |
198 """return the canonical path of myname, given cwd and root |
198 |
199 |
199 >>> def check(root, cwd, myname): |
200 >>> def check(root, cwd, myname): |
200 ... a = pathauditor(root, realfs=False) |
201 ... a = pathauditor(root, realfs=False) |
201 ... try: |
202 ... try: |
293 raise error.Abort( |
294 raise error.Abort( |
294 _(b"%s not under root '%s'") % (myname, root), hint=hint |
295 _(b"%s not under root '%s'") % (myname, root), hint=hint |
295 ) |
296 ) |
296 |
297 |
297 |
298 |
298 def normasprefix(path): |
299 def normasprefix(path: bytes) -> bytes: |
299 # type: (bytes) -> bytes |
|
300 """normalize the specified path as path prefix |
300 """normalize the specified path as path prefix |
301 |
301 |
302 Returned value can be used safely for "p.startswith(prefix)", |
302 Returned value can be used safely for "p.startswith(prefix)", |
303 "p[len(prefix):]", and so on. |
303 "p[len(prefix):]", and so on. |
304 |
304 |
317 return path + pycompat.ossep |
317 return path + pycompat.ossep |
318 else: |
318 else: |
319 return path |
319 return path |
320 |
320 |
321 |
321 |
322 def finddirs(path): |
322 def finddirs(path: bytes) -> Iterator[bytes]: |
323 # type: (bytes) -> Iterator[bytes] |
|
324 pos = path.rfind(b'/') |
323 pos = path.rfind(b'/') |
325 while pos != -1: |
324 while pos != -1: |
326 yield path[:pos] |
325 yield path[:pos] |
327 pos = path.rfind(b'/', 0, pos) |
326 pos = path.rfind(b'/', 0, pos) |
328 yield b'' |
327 yield b'' |
353 raise error.ProgrammingError(msg) |
352 raise error.ProgrammingError(msg) |
354 else: |
353 else: |
355 for f in map: |
354 for f in map: |
356 addpath(f) |
355 addpath(f) |
357 |
356 |
358 def addpath(self, path): |
357 def addpath(self, path: bytes) -> None: |
359 # type: (bytes) -> None |
|
360 dirs = self._dirs |
358 dirs = self._dirs |
361 for base in finddirs(path): |
359 for base in finddirs(path): |
362 if base.endswith(b'/'): |
360 if base.endswith(b'/'): |
363 raise ValueError( |
361 raise ValueError( |
364 "found invalid consecutive slashes in path: %r" % base |
362 "found invalid consecutive slashes in path: %r" % base |
366 if base in dirs: |
364 if base in dirs: |
367 dirs[base] += 1 |
365 dirs[base] += 1 |
368 return |
366 return |
369 dirs[base] = 1 |
367 dirs[base] = 1 |
370 |
368 |
371 def delpath(self, path): |
369 def delpath(self, path: bytes) -> None: |
372 # type: (bytes) -> None |
|
373 dirs = self._dirs |
370 dirs = self._dirs |
374 for base in finddirs(path): |
371 for base in finddirs(path): |
375 if dirs[base] > 1: |
372 if dirs[base] > 1: |
376 dirs[base] -= 1 |
373 dirs[base] -= 1 |
377 return |
374 return |
378 del dirs[base] |
375 del dirs[base] |
379 |
376 |
380 def __iter__(self): |
377 def __iter__(self): |
381 return iter(self._dirs) |
378 return iter(self._dirs) |
382 |
379 |
383 def __contains__(self, d): |
380 def __contains__(self, d: bytes) -> bool: |
384 # type: (bytes) -> bool |
|
385 return d in self._dirs |
381 return d in self._dirs |
386 |
382 |
387 |
383 |
388 if hasattr(parsers, 'dirs'): |
384 if hasattr(parsers, 'dirs'): |
389 dirs = parsers.dirs |
385 dirs = parsers.dirs |