comparison mercurial/posix.py @ 49812:58dff81ffba1

typing: add type hints to the common posix/windows platform functions These are done in sync because some platforms have empty implementations, and it isn't obvious what the types should be without examining the other. We want the types aligned, so @overload definitions that differ aren't generated. The only differences here are the few methods that unconditionally raise an error are marked as `NoReturn`, which doesn't seem to bother pytype. A couple of the posix module functions needed to be updated with a modern ternary operator, because pytype seems to want to use the type of the second object in the old `return x and y` style.
author Matt Harbison <matt_harbison@yahoo.com>
date Fri, 16 Dec 2022 00:54:39 -0500
parents 0a91aba258e0
children 464fe8b8f474
comparison
equal deleted inserted replaced
49811:0a91aba258e0 49812:58dff81ffba1
18 import sys 18 import sys
19 import tempfile 19 import tempfile
20 import unicodedata 20 import unicodedata
21 21
22 from typing import ( 22 from typing import (
23 Iterable,
24 Iterator,
23 List, 25 List,
24 NoReturn, 26 NoReturn,
25 Optional, 27 Optional,
28 Sequence,
29 Union,
26 ) 30 )
27 31
28 from .i18n import _ 32 from .i18n import _
29 from .pycompat import ( 33 from .pycompat import (
30 getattr, 34 getattr,
89 if nh: 93 if nh:
90 return nh, ht[1] 94 return nh, ht[1]
91 return ht[0] + b'/', ht[1] 95 return ht[0] + b'/', ht[1]
92 96
93 97
94 def openhardlinks(): 98 def openhardlinks() -> bool:
95 '''return true if it is safe to hold open file handles to hardlinks''' 99 '''return true if it is safe to hold open file handles to hardlinks'''
96 return True 100 return True
97 101
98 102
99 def nlinks(name: bytes) -> int: 103 def nlinks(name: bytes) -> int:
100 '''return number of hardlinks for the given file''' 104 '''return number of hardlinks for the given file'''
101 return os.lstat(name).st_nlink 105 return os.lstat(name).st_nlink
102 106
103 107
104 def parsepatchoutput(output_line): 108 def parsepatchoutput(output_line: bytes) -> bytes:
105 """parses the output produced by patch and returns the filename""" 109 """parses the output produced by patch and returns the filename"""
106 pf = output_line[14:] 110 pf = output_line[14:]
107 if pycompat.sysplatform == b'OpenVMS': 111 if pycompat.sysplatform == b'OpenVMS':
108 if pf[0] == b'`': 112 if pf[0] == b'`':
109 pf = pf[1:-1] # Remove the quotes 113 pf = pf[1:-1] # Remove the quotes
111 if pf.startswith(b"'") and pf.endswith(b"'") and b" " in pf: 115 if pf.startswith(b"'") and pf.endswith(b"'") and b" " in pf:
112 pf = pf[1:-1] # Remove the quotes 116 pf = pf[1:-1] # Remove the quotes
113 return pf 117 return pf
114 118
115 119
116 def sshargs(sshcmd, host, user, port): 120 def sshargs(
121 sshcmd: bytes, host: bytes, user: Optional[bytes], port: Optional[bytes]
122 ) -> bytes:
117 '''Build argument list for ssh''' 123 '''Build argument list for ssh'''
118 args = user and (b"%s@%s" % (user, host)) or host 124 args = user and (b"%s@%s" % (user, host)) or host
119 if b'-' in args[:1]: 125 if b'-' in args[:1]:
120 raise error.Abort( 126 raise error.Abort(
121 _(b'illegal ssh hostname or username starting with -: %s') % args 127 _(b'illegal ssh hostname or username starting with -: %s') % args
124 if port: 130 if port:
125 args = b'-p %s %s' % (shellquote(port), args) 131 args = b'-p %s %s' % (shellquote(port), args)
126 return args 132 return args
127 133
128 134
129 def isexec(f): 135 def isexec(f: bytes) -> bool:
130 """check whether a file is executable""" 136 """check whether a file is executable"""
131 return os.lstat(f).st_mode & 0o100 != 0 137 return os.lstat(f).st_mode & 0o100 != 0
132 138
133 139
134 def setflags(f, l, x): 140 def setflags(f: bytes, l: bool, x: bool) -> None:
135 st = os.lstat(f) 141 st = os.lstat(f)
136 s = st.st_mode 142 s = st.st_mode
137 if l: 143 if l:
138 if not stat.S_ISLNK(s): 144 if not stat.S_ISLNK(s):
139 # switch file to link 145 # switch file to link
173 elif not x and sx: 179 elif not x and sx:
174 # Turn off all +x bits 180 # Turn off all +x bits
175 os.chmod(f, s & 0o666) 181 os.chmod(f, s & 0o666)
176 182
177 183
178 def copymode(src, dst, mode=None, enforcewritable=False): 184 def copymode(
185 src: bytes,
186 dst: bytes,
187 mode: Optional[bytes] = None,
188 enforcewritable: bool = False,
189 ) -> None:
179 """Copy the file mode from the file at path src to dst. 190 """Copy the file mode from the file at path src to dst.
180 If src doesn't exist, we're using mode instead. If mode is None, we're 191 If src doesn't exist, we're using mode instead. If mode is None, we're
181 using umask.""" 192 using umask."""
182 try: 193 try:
183 st_mode = os.lstat(src).st_mode & 0o777 194 st_mode = os.lstat(src).st_mode & 0o777
193 new_mode |= stat.S_IWUSR 204 new_mode |= stat.S_IWUSR
194 205
195 os.chmod(dst, new_mode) 206 os.chmod(dst, new_mode)
196 207
197 208
198 def checkexec(path): 209 def checkexec(path: bytes) -> bool:
199 """ 210 """
200 Check whether the given path is on a filesystem with UNIX-like exec flags 211 Check whether the given path is on a filesystem with UNIX-like exec flags
201 212
202 Requires a directory (like /foo/.hg) 213 Requires a directory (like /foo/.hg)
203 """ 214 """
273 except (IOError, OSError): 284 except (IOError, OSError):
274 # we don't care, the user probably won't be able to commit anyway 285 # we don't care, the user probably won't be able to commit anyway
275 return False 286 return False
276 287
277 288
278 def checklink(path): 289 def checklink(path: bytes) -> bool:
279 """check whether the given path is on a symlink-capable filesystem""" 290 """check whether the given path is on a symlink-capable filesystem"""
280 # mktemp is not racy because symlink creation will fail if the 291 # mktemp is not racy because symlink creation will fail if the
281 # file already exists 292 # file already exists
282 while True: 293 while True:
283 cachedir = os.path.join(path, b'.hg', b'wcache') 294 cachedir = os.path.join(path, b'.hg', b'wcache')
360 Returns None if we are unsure. Raises OSError on ENOENT, EPERM, etc. 371 Returns None if we are unsure. Raises OSError on ENOENT, EPERM, etc.
361 """ 372 """
362 return getattr(osutil, 'getfstype', lambda x: None)(dirpath) 373 return getattr(osutil, 'getfstype', lambda x: None)(dirpath)
363 374
364 375
365 def get_password(): 376 def get_password() -> bytes:
366 return encoding.strtolocal(getpass.getpass('')) 377 return encoding.strtolocal(getpass.getpass(''))
367 378
368 379
369 def setbinary(fd): 380 def setbinary(fd) -> None:
370 pass 381 pass
371 382
372 383
373 def pconvert(path): 384 def pconvert(path: bytes) -> bytes:
374 return path 385 return path
375 386
376 387
377 def localpath(path): 388 def localpath(path: bytes) -> bytes:
378 return path 389 return path
379 390
380 391
381 def samefile(fpath1: bytes, fpath2: bytes) -> bool: 392 def samefile(fpath1: bytes, fpath2: bytes) -> bool:
382 """Returns whether path1 and path2 refer to the same file. This is only 393 """Returns whether path1 and path2 refer to the same file. This is only
391 st2 = os.lstat(fpath2) 402 st2 = os.lstat(fpath2)
392 return st1.st_dev == st2.st_dev 403 return st1.st_dev == st2.st_dev
393 404
394 405
395 # os.path.normcase is a no-op, which doesn't help us on non-native filesystems 406 # os.path.normcase is a no-op, which doesn't help us on non-native filesystems
396 def normcase(path): 407 def normcase(path: bytes) -> bytes:
397 return path.lower() 408 return path.lower()
398 409
399 410
400 # what normcase does to ASCII strings 411 # what normcase does to ASCII strings
401 normcasespec = encoding.normcasespecs.lower 412 normcasespec = encoding.normcasespecs.lower
402 # fallback normcase function for non-ASCII strings 413 # fallback normcase function for non-ASCII strings
403 normcasefallback = normcase 414 normcasefallback = normcase
404 415
405 if pycompat.isdarwin: 416 if pycompat.isdarwin:
406 417
407 def normcase(path): 418 def normcase(path: bytes) -> bytes:
408 """ 419 """
409 Normalize a filename for OS X-compatible comparison: 420 Normalize a filename for OS X-compatible comparison:
410 - escape-encode invalid characters 421 - escape-encode invalid characters
411 - decompose to NFD 422 - decompose to NFD
412 - lowercase 423 - lowercase
427 except UnicodeDecodeError: 438 except UnicodeDecodeError:
428 return normcasefallback(path) 439 return normcasefallback(path)
429 440
430 normcasespec = encoding.normcasespecs.lower 441 normcasespec = encoding.normcasespecs.lower
431 442
432 def normcasefallback(path): 443 def normcasefallback(path: bytes) -> bytes:
433 try: 444 try:
434 u = path.decode('utf-8') 445 u = path.decode('utf-8')
435 except UnicodeDecodeError: 446 except UnicodeDecodeError:
436 # OS X percent-encodes any bytes that aren't valid utf-8 447 # OS X percent-encodes any bytes that aren't valid utf-8
437 s = b'' 448 s = b''
468 ], 479 ],
469 reverse=True, 480 reverse=True,
470 ) 481 )
471 482
472 # use upper-ing as normcase as same as NTFS workaround 483 # use upper-ing as normcase as same as NTFS workaround
473 def normcase(path): 484 def normcase(path: bytes) -> bytes:
474 pathlen = len(path) 485 pathlen = len(path)
475 if (pathlen == 0) or (path[0] != pycompat.ossep): 486 if (pathlen == 0) or (path[0] != pycompat.ossep):
476 # treat as relative 487 # treat as relative
477 return encoding.upper(path) 488 return encoding.upper(path)
478 489
494 505
495 # Cygwin translates native ACLs to POSIX permissions, 506 # Cygwin translates native ACLs to POSIX permissions,
496 # but these translations are not supported by native 507 # but these translations are not supported by native
497 # tools, so the exec bit tends to be set erroneously. 508 # tools, so the exec bit tends to be set erroneously.
498 # Therefore, disable executable bit access on Cygwin. 509 # Therefore, disable executable bit access on Cygwin.
499 def checkexec(path): 510 def checkexec(path: bytes) -> bool:
500 return False 511 return False
501 512
502 # Similarly, Cygwin's symlink emulation is likely to create 513 # Similarly, Cygwin's symlink emulation is likely to create
503 # problems when Mercurial is used from both Cygwin and native 514 # problems when Mercurial is used from both Cygwin and native
504 # Windows, with other native tools, or on shared volumes 515 # Windows, with other native tools, or on shared volumes
505 def checklink(path): 516 def checklink(path: bytes) -> bool:
506 return False 517 return False
507 518
508 519
509 _needsshellquote = None 520 _needsshellquote = None
510 521
511 522
512 def shellquote(s): 523 def shellquote(s: bytes) -> bytes:
513 if pycompat.sysplatform == b'OpenVMS': 524 if pycompat.sysplatform == b'OpenVMS':
514 return b'"%s"' % s 525 return b'"%s"' % s
515 global _needsshellquote 526 global _needsshellquote
516 if _needsshellquote is None: 527 if _needsshellquote is None:
517 _needsshellquote = re.compile(br'[^a-zA-Z0-9._/+-]').search 528 _needsshellquote = re.compile(br'[^a-zA-Z0-9._/+-]').search
520 return s 531 return s
521 else: 532 else:
522 return b"'%s'" % s.replace(b"'", b"'\\''") 533 return b"'%s'" % s.replace(b"'", b"'\\''")
523 534
524 535
525 def shellsplit(s): 536 def shellsplit(s: bytes) -> List[bytes]:
526 """Parse a command string in POSIX shell way (best-effort)""" 537 """Parse a command string in POSIX shell way (best-effort)"""
527 return pycompat.shlexsplit(s, posix=True) 538 return pycompat.shlexsplit(s, posix=True)
528 539
529 540
530 def testpid(pid: int) -> bool: 541 def testpid(pid: int) -> bool:
536 return True 547 return True
537 except OSError as inst: 548 except OSError as inst:
538 return inst.errno != errno.ESRCH 549 return inst.errno != errno.ESRCH
539 550
540 551
541 def isowner(st): 552 def isowner(st: os.stat_result) -> bool:
542 """Return True if the stat object st is from the current user.""" 553 """Return True if the stat object st is from the current user."""
543 return st.st_uid == os.getuid() 554 return st.st_uid == os.getuid()
544 555
545 556
546 def findexe(command): 557 def findexe(command: bytes) -> Optional[bytes]:
547 """Find executable for command searching like which does. 558 """Find executable for command searching like which does.
548 If command is a basename then PATH is searched for command. 559 If command is a basename then PATH is searched for command.
549 PATH isn't searched if command is an absolute or relative path. 560 PATH isn't searched if command is an absolute or relative path.
550 If command isn't found None is returned.""" 561 If command isn't found None is returned."""
551 if pycompat.sysplatform == b'OpenVMS': 562 if pycompat.sysplatform == b'OpenVMS':
552 return command 563 return command
553 564
554 def findexisting(executable): 565 def findexisting(executable: bytes) -> Optional[bytes]:
555 b'Will return executable if existing file' 566 b'Will return executable if existing file'
556 if os.path.isfile(executable) and os.access(executable, os.X_OK): 567 if os.path.isfile(executable) and os.access(executable, os.X_OK):
557 return executable 568 return executable
558 return None 569 return None
559 570
575 586
576 587
577 _wantedkinds = {stat.S_IFREG, stat.S_IFLNK} 588 _wantedkinds = {stat.S_IFREG, stat.S_IFLNK}
578 589
579 590
580 def statfiles(files): 591 def statfiles(files: Sequence[bytes]) -> Iterator[Optional[os.stat_result]]:
581 """Stat each file in files. Yield each stat, or None if a file does not 592 """Stat each file in files. Yield each stat, or None if a file does not
582 exist or has a type we don't care about.""" 593 exist or has a type we don't care about."""
583 lstat = os.lstat 594 lstat = os.lstat
584 getkind = stat.S_IFMT 595 getkind = stat.S_IFMT
585 for nf in files: 596 for nf in files:
595 def getuser() -> bytes: 606 def getuser() -> bytes:
596 '''return name of current user''' 607 '''return name of current user'''
597 return pycompat.fsencode(getpass.getuser()) 608 return pycompat.fsencode(getpass.getuser())
598 609
599 610
600 def username(uid=None): 611 def username(uid: Optional[int] = None) -> Optional[bytes]:
601 """Return the name of the user with the given uid. 612 """Return the name of the user with the given uid.
602 613
603 If uid is None, return the name of the current user.""" 614 If uid is None, return the name of the current user."""
604 615
605 if uid is None: 616 if uid is None:
608 return pycompat.fsencode(pwd.getpwuid(uid)[0]) 619 return pycompat.fsencode(pwd.getpwuid(uid)[0])
609 except KeyError: 620 except KeyError:
610 return b'%d' % uid 621 return b'%d' % uid
611 622
612 623
613 def groupname(gid=None): 624 def groupname(gid: Optional[int] = None) -> Optional[bytes]:
614 """Return the name of the group with the given gid. 625 """Return the name of the group with the given gid.
615 626
616 If gid is None, return the name of the current group.""" 627 If gid is None, return the name of the current group."""
617 628
618 if gid is None: 629 if gid is None:
621 return pycompat.fsencode(grp.getgrgid(gid)[0]) 632 return pycompat.fsencode(grp.getgrgid(gid)[0])
622 except KeyError: 633 except KeyError:
623 return pycompat.bytestr(gid) 634 return pycompat.bytestr(gid)
624 635
625 636
626 def groupmembers(name): 637 def groupmembers(name: bytes) -> List[bytes]:
627 """Return the list of members of the group with the given 638 """Return the list of members of the group with the given
628 name, KeyError if the group does not exist. 639 name, KeyError if the group does not exist.
629 """ 640 """
630 name = pycompat.fsdecode(name) 641 name = pycompat.fsdecode(name)
631 return pycompat.rapply(pycompat.fsencode, list(grp.getgrnam(name).gr_mem)) 642 return pycompat.rapply(pycompat.fsencode, list(grp.getgrnam(name).gr_mem))
641 652
642 def makedir(path: bytes, notindexed: bool) -> None: 653 def makedir(path: bytes, notindexed: bool) -> None:
643 os.mkdir(path) 654 os.mkdir(path)
644 655
645 656
646 def lookupreg(key, name=None, scope=None): 657 def lookupreg(
658 key: bytes,
659 name: Optional[bytes] = None,
660 scope: Optional[Union[int, Iterable[int]]] = None,
661 ) -> Optional[bytes]:
647 return None 662 return None
648 663
649 664
650 def hidewindow() -> None: 665 def hidewindow() -> None:
651 """Hide current shell window. 666 """Hide current shell window.
688 703
689 def __ne__(self, other): 704 def __ne__(self, other):
690 return not self == other 705 return not self == other
691 706
692 707
693 def statislink(st): 708 def statislink(st: Optional[os.stat_result]) -> bool:
694 '''check whether a stat result is a symlink''' 709 '''check whether a stat result is a symlink'''
695 return st and stat.S_ISLNK(st.st_mode) 710 return stat.S_ISLNK(st.st_mode) if st else False
696 711
697 712
698 def statisexec(st): 713 def statisexec(st: Optional[os.stat_result]) -> bool:
699 '''check whether a stat result is an executable file''' 714 '''check whether a stat result is an executable file'''
700 return st and (st.st_mode & 0o100 != 0) 715 return (st.st_mode & 0o100 != 0) if st else False
701 716
702 717
703 def poll(fds): 718 def poll(fds):
704 """block until something happens on any file descriptor 719 """block until something happens on any file descriptor
705 720
712 except ValueError: # out of range file descriptor 727 except ValueError: # out of range file descriptor
713 raise NotImplementedError() 728 raise NotImplementedError()
714 return sorted(list(set(sum(res, [])))) 729 return sorted(list(set(sum(res, []))))
715 730
716 731
717 def readpipe(pipe): 732 def readpipe(pipe) -> bytes:
718 """Read all available data from a pipe.""" 733 """Read all available data from a pipe."""
719 # We can't fstat() a pipe because Linux will always report 0. 734 # We can't fstat() a pipe because Linux will always report 0.
720 # So, we set the pipe to non-blocking mode and read everything 735 # So, we set the pipe to non-blocking mode and read everything
721 # that's available. 736 # that's available.
722 flags = fcntl.fcntl(pipe, fcntl.F_GETFL) 737 flags = fcntl.fcntl(pipe, fcntl.F_GETFL)
737 return b''.join(chunks) 752 return b''.join(chunks)
738 finally: 753 finally:
739 fcntl.fcntl(pipe, fcntl.F_SETFL, oldflags) 754 fcntl.fcntl(pipe, fcntl.F_SETFL, oldflags)
740 755
741 756
742 def bindunixsocket(sock, path): 757 def bindunixsocket(sock, path: bytes) -> None:
743 """Bind the UNIX domain socket to the specified path""" 758 """Bind the UNIX domain socket to the specified path"""
744 # use relative path instead of full path at bind() if possible, since 759 # use relative path instead of full path at bind() if possible, since
745 # AF_UNIX path has very small length limit (107 chars) on common 760 # AF_UNIX path has very small length limit (107 chars) on common
746 # platforms (see sys/un.h) 761 # platforms (see sys/un.h)
747 dirname, basename = os.path.split(path) 762 dirname, basename = os.path.split(path)