Mercurial > public > mercurial-scm > hg
comparison mercurial/posix.py @ 49812:58dff81ffba1
typing: add type hints to the common posix/windows platform functions
These are done in sync because some platforms have empty implementations, and it
isn't obvious what the types should be without examining the other. We want the
types aligned, so @overload definitions that differ aren't generated. The only
differences here are the few methods that unconditionally raise an error are
marked as `NoReturn`, which doesn't seem to bother pytype.
A couple of the posix module functions needed to be updated with a modern
ternary operator, because pytype seems to want to use the type of the second
object in the old `return x and y` style.
author | Matt Harbison <matt_harbison@yahoo.com> |
---|---|
date | Fri, 16 Dec 2022 00:54:39 -0500 |
parents | 0a91aba258e0 |
children | 464fe8b8f474 |
comparison
equal
deleted
inserted
replaced
49811:0a91aba258e0 | 49812:58dff81ffba1 |
---|---|
18 import sys | 18 import sys |
19 import tempfile | 19 import tempfile |
20 import unicodedata | 20 import unicodedata |
21 | 21 |
22 from typing import ( | 22 from typing import ( |
23 Iterable, | |
24 Iterator, | |
23 List, | 25 List, |
24 NoReturn, | 26 NoReturn, |
25 Optional, | 27 Optional, |
28 Sequence, | |
29 Union, | |
26 ) | 30 ) |
27 | 31 |
28 from .i18n import _ | 32 from .i18n import _ |
29 from .pycompat import ( | 33 from .pycompat import ( |
30 getattr, | 34 getattr, |
89 if nh: | 93 if nh: |
90 return nh, ht[1] | 94 return nh, ht[1] |
91 return ht[0] + b'/', ht[1] | 95 return ht[0] + b'/', ht[1] |
92 | 96 |
93 | 97 |
94 def openhardlinks(): | 98 def openhardlinks() -> bool: |
95 '''return true if it is safe to hold open file handles to hardlinks''' | 99 '''return true if it is safe to hold open file handles to hardlinks''' |
96 return True | 100 return True |
97 | 101 |
98 | 102 |
99 def nlinks(name: bytes) -> int: | 103 def nlinks(name: bytes) -> int: |
100 '''return number of hardlinks for the given file''' | 104 '''return number of hardlinks for the given file''' |
101 return os.lstat(name).st_nlink | 105 return os.lstat(name).st_nlink |
102 | 106 |
103 | 107 |
104 def parsepatchoutput(output_line): | 108 def parsepatchoutput(output_line: bytes) -> bytes: |
105 """parses the output produced by patch and returns the filename""" | 109 """parses the output produced by patch and returns the filename""" |
106 pf = output_line[14:] | 110 pf = output_line[14:] |
107 if pycompat.sysplatform == b'OpenVMS': | 111 if pycompat.sysplatform == b'OpenVMS': |
108 if pf[0] == b'`': | 112 if pf[0] == b'`': |
109 pf = pf[1:-1] # Remove the quotes | 113 pf = pf[1:-1] # Remove the quotes |
111 if pf.startswith(b"'") and pf.endswith(b"'") and b" " in pf: | 115 if pf.startswith(b"'") and pf.endswith(b"'") and b" " in pf: |
112 pf = pf[1:-1] # Remove the quotes | 116 pf = pf[1:-1] # Remove the quotes |
113 return pf | 117 return pf |
114 | 118 |
115 | 119 |
116 def sshargs(sshcmd, host, user, port): | 120 def sshargs( |
121 sshcmd: bytes, host: bytes, user: Optional[bytes], port: Optional[bytes] | |
122 ) -> bytes: | |
117 '''Build argument list for ssh''' | 123 '''Build argument list for ssh''' |
118 args = user and (b"%s@%s" % (user, host)) or host | 124 args = user and (b"%s@%s" % (user, host)) or host |
119 if b'-' in args[:1]: | 125 if b'-' in args[:1]: |
120 raise error.Abort( | 126 raise error.Abort( |
121 _(b'illegal ssh hostname or username starting with -: %s') % args | 127 _(b'illegal ssh hostname or username starting with -: %s') % args |
124 if port: | 130 if port: |
125 args = b'-p %s %s' % (shellquote(port), args) | 131 args = b'-p %s %s' % (shellquote(port), args) |
126 return args | 132 return args |
127 | 133 |
128 | 134 |
129 def isexec(f): | 135 def isexec(f: bytes) -> bool: |
130 """check whether a file is executable""" | 136 """check whether a file is executable""" |
131 return os.lstat(f).st_mode & 0o100 != 0 | 137 return os.lstat(f).st_mode & 0o100 != 0 |
132 | 138 |
133 | 139 |
134 def setflags(f, l, x): | 140 def setflags(f: bytes, l: bool, x: bool) -> None: |
135 st = os.lstat(f) | 141 st = os.lstat(f) |
136 s = st.st_mode | 142 s = st.st_mode |
137 if l: | 143 if l: |
138 if not stat.S_ISLNK(s): | 144 if not stat.S_ISLNK(s): |
139 # switch file to link | 145 # switch file to link |
173 elif not x and sx: | 179 elif not x and sx: |
174 # Turn off all +x bits | 180 # Turn off all +x bits |
175 os.chmod(f, s & 0o666) | 181 os.chmod(f, s & 0o666) |
176 | 182 |
177 | 183 |
178 def copymode(src, dst, mode=None, enforcewritable=False): | 184 def copymode( |
185 src: bytes, | |
186 dst: bytes, | |
187 mode: Optional[bytes] = None, | |
188 enforcewritable: bool = False, | |
189 ) -> None: | |
179 """Copy the file mode from the file at path src to dst. | 190 """Copy the file mode from the file at path src to dst. |
180 If src doesn't exist, we're using mode instead. If mode is None, we're | 191 If src doesn't exist, we're using mode instead. If mode is None, we're |
181 using umask.""" | 192 using umask.""" |
182 try: | 193 try: |
183 st_mode = os.lstat(src).st_mode & 0o777 | 194 st_mode = os.lstat(src).st_mode & 0o777 |
193 new_mode |= stat.S_IWUSR | 204 new_mode |= stat.S_IWUSR |
194 | 205 |
195 os.chmod(dst, new_mode) | 206 os.chmod(dst, new_mode) |
196 | 207 |
197 | 208 |
198 def checkexec(path): | 209 def checkexec(path: bytes) -> bool: |
199 """ | 210 """ |
200 Check whether the given path is on a filesystem with UNIX-like exec flags | 211 Check whether the given path is on a filesystem with UNIX-like exec flags |
201 | 212 |
202 Requires a directory (like /foo/.hg) | 213 Requires a directory (like /foo/.hg) |
203 """ | 214 """ |
273 except (IOError, OSError): | 284 except (IOError, OSError): |
274 # we don't care, the user probably won't be able to commit anyway | 285 # we don't care, the user probably won't be able to commit anyway |
275 return False | 286 return False |
276 | 287 |
277 | 288 |
278 def checklink(path): | 289 def checklink(path: bytes) -> bool: |
279 """check whether the given path is on a symlink-capable filesystem""" | 290 """check whether the given path is on a symlink-capable filesystem""" |
280 # mktemp is not racy because symlink creation will fail if the | 291 # mktemp is not racy because symlink creation will fail if the |
281 # file already exists | 292 # file already exists |
282 while True: | 293 while True: |
283 cachedir = os.path.join(path, b'.hg', b'wcache') | 294 cachedir = os.path.join(path, b'.hg', b'wcache') |
360 Returns None if we are unsure. Raises OSError on ENOENT, EPERM, etc. | 371 Returns None if we are unsure. Raises OSError on ENOENT, EPERM, etc. |
361 """ | 372 """ |
362 return getattr(osutil, 'getfstype', lambda x: None)(dirpath) | 373 return getattr(osutil, 'getfstype', lambda x: None)(dirpath) |
363 | 374 |
364 | 375 |
365 def get_password(): | 376 def get_password() -> bytes: |
366 return encoding.strtolocal(getpass.getpass('')) | 377 return encoding.strtolocal(getpass.getpass('')) |
367 | 378 |
368 | 379 |
369 def setbinary(fd): | 380 def setbinary(fd) -> None: |
370 pass | 381 pass |
371 | 382 |
372 | 383 |
373 def pconvert(path): | 384 def pconvert(path: bytes) -> bytes: |
374 return path | 385 return path |
375 | 386 |
376 | 387 |
377 def localpath(path): | 388 def localpath(path: bytes) -> bytes: |
378 return path | 389 return path |
379 | 390 |
380 | 391 |
381 def samefile(fpath1: bytes, fpath2: bytes) -> bool: | 392 def samefile(fpath1: bytes, fpath2: bytes) -> bool: |
382 """Returns whether path1 and path2 refer to the same file. This is only | 393 """Returns whether path1 and path2 refer to the same file. This is only |
391 st2 = os.lstat(fpath2) | 402 st2 = os.lstat(fpath2) |
392 return st1.st_dev == st2.st_dev | 403 return st1.st_dev == st2.st_dev |
393 | 404 |
394 | 405 |
395 # os.path.normcase is a no-op, which doesn't help us on non-native filesystems | 406 # os.path.normcase is a no-op, which doesn't help us on non-native filesystems |
396 def normcase(path): | 407 def normcase(path: bytes) -> bytes: |
397 return path.lower() | 408 return path.lower() |
398 | 409 |
399 | 410 |
400 # what normcase does to ASCII strings | 411 # what normcase does to ASCII strings |
401 normcasespec = encoding.normcasespecs.lower | 412 normcasespec = encoding.normcasespecs.lower |
402 # fallback normcase function for non-ASCII strings | 413 # fallback normcase function for non-ASCII strings |
403 normcasefallback = normcase | 414 normcasefallback = normcase |
404 | 415 |
405 if pycompat.isdarwin: | 416 if pycompat.isdarwin: |
406 | 417 |
407 def normcase(path): | 418 def normcase(path: bytes) -> bytes: |
408 """ | 419 """ |
409 Normalize a filename for OS X-compatible comparison: | 420 Normalize a filename for OS X-compatible comparison: |
410 - escape-encode invalid characters | 421 - escape-encode invalid characters |
411 - decompose to NFD | 422 - decompose to NFD |
412 - lowercase | 423 - lowercase |
427 except UnicodeDecodeError: | 438 except UnicodeDecodeError: |
428 return normcasefallback(path) | 439 return normcasefallback(path) |
429 | 440 |
430 normcasespec = encoding.normcasespecs.lower | 441 normcasespec = encoding.normcasespecs.lower |
431 | 442 |
432 def normcasefallback(path): | 443 def normcasefallback(path: bytes) -> bytes: |
433 try: | 444 try: |
434 u = path.decode('utf-8') | 445 u = path.decode('utf-8') |
435 except UnicodeDecodeError: | 446 except UnicodeDecodeError: |
436 # OS X percent-encodes any bytes that aren't valid utf-8 | 447 # OS X percent-encodes any bytes that aren't valid utf-8 |
437 s = b'' | 448 s = b'' |
468 ], | 479 ], |
469 reverse=True, | 480 reverse=True, |
470 ) | 481 ) |
471 | 482 |
472 # use upper-ing as normcase as same as NTFS workaround | 483 # use upper-ing as normcase as same as NTFS workaround |
473 def normcase(path): | 484 def normcase(path: bytes) -> bytes: |
474 pathlen = len(path) | 485 pathlen = len(path) |
475 if (pathlen == 0) or (path[0] != pycompat.ossep): | 486 if (pathlen == 0) or (path[0] != pycompat.ossep): |
476 # treat as relative | 487 # treat as relative |
477 return encoding.upper(path) | 488 return encoding.upper(path) |
478 | 489 |
494 | 505 |
495 # Cygwin translates native ACLs to POSIX permissions, | 506 # Cygwin translates native ACLs to POSIX permissions, |
496 # but these translations are not supported by native | 507 # but these translations are not supported by native |
497 # tools, so the exec bit tends to be set erroneously. | 508 # tools, so the exec bit tends to be set erroneously. |
498 # Therefore, disable executable bit access on Cygwin. | 509 # Therefore, disable executable bit access on Cygwin. |
499 def checkexec(path): | 510 def checkexec(path: bytes) -> bool: |
500 return False | 511 return False |
501 | 512 |
502 # Similarly, Cygwin's symlink emulation is likely to create | 513 # Similarly, Cygwin's symlink emulation is likely to create |
503 # problems when Mercurial is used from both Cygwin and native | 514 # problems when Mercurial is used from both Cygwin and native |
504 # Windows, with other native tools, or on shared volumes | 515 # Windows, with other native tools, or on shared volumes |
505 def checklink(path): | 516 def checklink(path: bytes) -> bool: |
506 return False | 517 return False |
507 | 518 |
508 | 519 |
509 _needsshellquote = None | 520 _needsshellquote = None |
510 | 521 |
511 | 522 |
512 def shellquote(s): | 523 def shellquote(s: bytes) -> bytes: |
513 if pycompat.sysplatform == b'OpenVMS': | 524 if pycompat.sysplatform == b'OpenVMS': |
514 return b'"%s"' % s | 525 return b'"%s"' % s |
515 global _needsshellquote | 526 global _needsshellquote |
516 if _needsshellquote is None: | 527 if _needsshellquote is None: |
517 _needsshellquote = re.compile(br'[^a-zA-Z0-9._/+-]').search | 528 _needsshellquote = re.compile(br'[^a-zA-Z0-9._/+-]').search |
520 return s | 531 return s |
521 else: | 532 else: |
522 return b"'%s'" % s.replace(b"'", b"'\\''") | 533 return b"'%s'" % s.replace(b"'", b"'\\''") |
523 | 534 |
524 | 535 |
525 def shellsplit(s): | 536 def shellsplit(s: bytes) -> List[bytes]: |
526 """Parse a command string in POSIX shell way (best-effort)""" | 537 """Parse a command string in POSIX shell way (best-effort)""" |
527 return pycompat.shlexsplit(s, posix=True) | 538 return pycompat.shlexsplit(s, posix=True) |
528 | 539 |
529 | 540 |
530 def testpid(pid: int) -> bool: | 541 def testpid(pid: int) -> bool: |
536 return True | 547 return True |
537 except OSError as inst: | 548 except OSError as inst: |
538 return inst.errno != errno.ESRCH | 549 return inst.errno != errno.ESRCH |
539 | 550 |
540 | 551 |
541 def isowner(st): | 552 def isowner(st: os.stat_result) -> bool: |
542 """Return True if the stat object st is from the current user.""" | 553 """Return True if the stat object st is from the current user.""" |
543 return st.st_uid == os.getuid() | 554 return st.st_uid == os.getuid() |
544 | 555 |
545 | 556 |
546 def findexe(command): | 557 def findexe(command: bytes) -> Optional[bytes]: |
547 """Find executable for command searching like which does. | 558 """Find executable for command searching like which does. |
548 If command is a basename then PATH is searched for command. | 559 If command is a basename then PATH is searched for command. |
549 PATH isn't searched if command is an absolute or relative path. | 560 PATH isn't searched if command is an absolute or relative path. |
550 If command isn't found None is returned.""" | 561 If command isn't found None is returned.""" |
551 if pycompat.sysplatform == b'OpenVMS': | 562 if pycompat.sysplatform == b'OpenVMS': |
552 return command | 563 return command |
553 | 564 |
554 def findexisting(executable): | 565 def findexisting(executable: bytes) -> Optional[bytes]: |
555 b'Will return executable if existing file' | 566 b'Will return executable if existing file' |
556 if os.path.isfile(executable) and os.access(executable, os.X_OK): | 567 if os.path.isfile(executable) and os.access(executable, os.X_OK): |
557 return executable | 568 return executable |
558 return None | 569 return None |
559 | 570 |
575 | 586 |
576 | 587 |
577 _wantedkinds = {stat.S_IFREG, stat.S_IFLNK} | 588 _wantedkinds = {stat.S_IFREG, stat.S_IFLNK} |
578 | 589 |
579 | 590 |
580 def statfiles(files): | 591 def statfiles(files: Sequence[bytes]) -> Iterator[Optional[os.stat_result]]: |
581 """Stat each file in files. Yield each stat, or None if a file does not | 592 """Stat each file in files. Yield each stat, or None if a file does not |
582 exist or has a type we don't care about.""" | 593 exist or has a type we don't care about.""" |
583 lstat = os.lstat | 594 lstat = os.lstat |
584 getkind = stat.S_IFMT | 595 getkind = stat.S_IFMT |
585 for nf in files: | 596 for nf in files: |
595 def getuser() -> bytes: | 606 def getuser() -> bytes: |
596 '''return name of current user''' | 607 '''return name of current user''' |
597 return pycompat.fsencode(getpass.getuser()) | 608 return pycompat.fsencode(getpass.getuser()) |
598 | 609 |
599 | 610 |
600 def username(uid=None): | 611 def username(uid: Optional[int] = None) -> Optional[bytes]: |
601 """Return the name of the user with the given uid. | 612 """Return the name of the user with the given uid. |
602 | 613 |
603 If uid is None, return the name of the current user.""" | 614 If uid is None, return the name of the current user.""" |
604 | 615 |
605 if uid is None: | 616 if uid is None: |
608 return pycompat.fsencode(pwd.getpwuid(uid)[0]) | 619 return pycompat.fsencode(pwd.getpwuid(uid)[0]) |
609 except KeyError: | 620 except KeyError: |
610 return b'%d' % uid | 621 return b'%d' % uid |
611 | 622 |
612 | 623 |
613 def groupname(gid=None): | 624 def groupname(gid: Optional[int] = None) -> Optional[bytes]: |
614 """Return the name of the group with the given gid. | 625 """Return the name of the group with the given gid. |
615 | 626 |
616 If gid is None, return the name of the current group.""" | 627 If gid is None, return the name of the current group.""" |
617 | 628 |
618 if gid is None: | 629 if gid is None: |
621 return pycompat.fsencode(grp.getgrgid(gid)[0]) | 632 return pycompat.fsencode(grp.getgrgid(gid)[0]) |
622 except KeyError: | 633 except KeyError: |
623 return pycompat.bytestr(gid) | 634 return pycompat.bytestr(gid) |
624 | 635 |
625 | 636 |
626 def groupmembers(name): | 637 def groupmembers(name: bytes) -> List[bytes]: |
627 """Return the list of members of the group with the given | 638 """Return the list of members of the group with the given |
628 name, KeyError if the group does not exist. | 639 name, KeyError if the group does not exist. |
629 """ | 640 """ |
630 name = pycompat.fsdecode(name) | 641 name = pycompat.fsdecode(name) |
631 return pycompat.rapply(pycompat.fsencode, list(grp.getgrnam(name).gr_mem)) | 642 return pycompat.rapply(pycompat.fsencode, list(grp.getgrnam(name).gr_mem)) |
641 | 652 |
642 def makedir(path: bytes, notindexed: bool) -> None: | 653 def makedir(path: bytes, notindexed: bool) -> None: |
643 os.mkdir(path) | 654 os.mkdir(path) |
644 | 655 |
645 | 656 |
646 def lookupreg(key, name=None, scope=None): | 657 def lookupreg( |
658 key: bytes, | |
659 name: Optional[bytes] = None, | |
660 scope: Optional[Union[int, Iterable[int]]] = None, | |
661 ) -> Optional[bytes]: | |
647 return None | 662 return None |
648 | 663 |
649 | 664 |
650 def hidewindow() -> None: | 665 def hidewindow() -> None: |
651 """Hide current shell window. | 666 """Hide current shell window. |
688 | 703 |
689 def __ne__(self, other): | 704 def __ne__(self, other): |
690 return not self == other | 705 return not self == other |
691 | 706 |
692 | 707 |
693 def statislink(st): | 708 def statislink(st: Optional[os.stat_result]) -> bool: |
694 '''check whether a stat result is a symlink''' | 709 '''check whether a stat result is a symlink''' |
695 return st and stat.S_ISLNK(st.st_mode) | 710 return stat.S_ISLNK(st.st_mode) if st else False |
696 | 711 |
697 | 712 |
698 def statisexec(st): | 713 def statisexec(st: Optional[os.stat_result]) -> bool: |
699 '''check whether a stat result is an executable file''' | 714 '''check whether a stat result is an executable file''' |
700 return st and (st.st_mode & 0o100 != 0) | 715 return (st.st_mode & 0o100 != 0) if st else False |
701 | 716 |
702 | 717 |
703 def poll(fds): | 718 def poll(fds): |
704 """block until something happens on any file descriptor | 719 """block until something happens on any file descriptor |
705 | 720 |
712 except ValueError: # out of range file descriptor | 727 except ValueError: # out of range file descriptor |
713 raise NotImplementedError() | 728 raise NotImplementedError() |
714 return sorted(list(set(sum(res, [])))) | 729 return sorted(list(set(sum(res, [])))) |
715 | 730 |
716 | 731 |
717 def readpipe(pipe): | 732 def readpipe(pipe) -> bytes: |
718 """Read all available data from a pipe.""" | 733 """Read all available data from a pipe.""" |
719 # We can't fstat() a pipe because Linux will always report 0. | 734 # We can't fstat() a pipe because Linux will always report 0. |
720 # So, we set the pipe to non-blocking mode and read everything | 735 # So, we set the pipe to non-blocking mode and read everything |
721 # that's available. | 736 # that's available. |
722 flags = fcntl.fcntl(pipe, fcntl.F_GETFL) | 737 flags = fcntl.fcntl(pipe, fcntl.F_GETFL) |
737 return b''.join(chunks) | 752 return b''.join(chunks) |
738 finally: | 753 finally: |
739 fcntl.fcntl(pipe, fcntl.F_SETFL, oldflags) | 754 fcntl.fcntl(pipe, fcntl.F_SETFL, oldflags) |
740 | 755 |
741 | 756 |
742 def bindunixsocket(sock, path): | 757 def bindunixsocket(sock, path: bytes) -> None: |
743 """Bind the UNIX domain socket to the specified path""" | 758 """Bind the UNIX domain socket to the specified path""" |
744 # use relative path instead of full path at bind() if possible, since | 759 # use relative path instead of full path at bind() if possible, since |
745 # AF_UNIX path has very small length limit (107 chars) on common | 760 # AF_UNIX path has very small length limit (107 chars) on common |
746 # platforms (see sys/un.h) | 761 # platforms (see sys/un.h) |
747 dirname, basename = os.path.split(path) | 762 dirname, basename = os.path.split(path) |