Mercurial > public > mercurial-scm > hg
comparison mercurial/windows.py @ 49812:58dff81ffba1
typing: add type hints to the common posix/windows platform functions
These are done in sync because some platforms have empty implementations, and it
isn't obvious what the types should be without examining the other. We want the
types aligned, so @overload definitions that differ aren't generated. The only
differences here are the few methods that unconditionally raise an error are
marked as `NoReturn`, which doesn't seem to bother pytype.
A couple of the posix module functions needed to be updated with a modern
ternary operator, because pytype seems to want to use the type of the second
object in the old `return x and y` style.
author | Matt Harbison <matt_harbison@yahoo.com> |
---|---|
date | Fri, 16 Dec 2022 00:54:39 -0500 |
parents | c43b283a19c3 |
children | 464fe8b8f474 |
comparison
equal
deleted
inserted
replaced
49811:0a91aba258e0 | 49812:58dff81ffba1 |
---|---|
16 import sys | 16 import sys |
17 import winreg # pytype: disable=import-error | 17 import winreg # pytype: disable=import-error |
18 | 18 |
19 from typing import ( | 19 from typing import ( |
20 BinaryIO, | 20 BinaryIO, |
21 Iterable, | |
22 Iterator, | |
23 List, | |
24 NoReturn, | |
25 Optional, | |
26 Sequence, | |
27 Union, | |
21 ) | 28 ) |
22 | 29 |
23 from .i18n import _ | 30 from .i18n import _ |
24 from .pycompat import getattr | 31 from .pycompat import getattr |
25 from . import ( | 32 from . import ( |
181 | 188 |
182 # may be wrapped by win32mbcs extension | 189 # may be wrapped by win32mbcs extension |
183 listdir = osutil.listdir | 190 listdir = osutil.listdir |
184 | 191 |
185 | 192 |
186 def get_password(): | 193 def get_password() -> bytes: |
187 """Prompt for password with echo off, using Windows getch(). | 194 """Prompt for password with echo off, using Windows getch(). |
188 | 195 |
189 This shouldn't be called directly- use ``ui.getpass()`` instead, which | 196 This shouldn't be called directly- use ``ui.getpass()`` instead, which |
190 checks if the session is interactive first. | 197 checks if the session is interactive first. |
191 """ | 198 """ |
242 if not win32.lasterrorwaspipeerror(inst): | 249 if not win32.lasterrorwaspipeerror(inst): |
243 raise | 250 raise |
244 raise IOError(errno.EPIPE, 'Broken pipe') | 251 raise IOError(errno.EPIPE, 'Broken pipe') |
245 | 252 |
246 | 253 |
247 def openhardlinks(): | 254 def openhardlinks() -> bool: |
248 return True | 255 return True |
249 | 256 |
250 | 257 |
251 def parsepatchoutput(output_line): | 258 def parsepatchoutput(output_line: bytes) -> bytes: |
252 """parses the output produced by patch and returns the filename""" | 259 """parses the output produced by patch and returns the filename""" |
253 pf = output_line[14:] | 260 pf = output_line[14:] |
254 if pf[0] == b'`': | 261 if pf[0] == b'`': |
255 pf = pf[1:-1] # Remove the quotes | 262 pf = pf[1:-1] # Remove the quotes |
256 return pf | 263 return pf |
257 | 264 |
258 | 265 |
259 def sshargs(sshcmd, host, user, port): | 266 def sshargs( |
267 sshcmd: bytes, host: bytes, user: Optional[bytes], port: Optional[bytes] | |
268 ) -> bytes: | |
260 '''Build argument list for ssh or Plink''' | 269 '''Build argument list for ssh or Plink''' |
261 pflag = b'plink' in sshcmd.lower() and b'-P' or b'-p' | 270 pflag = b'plink' in sshcmd.lower() and b'-P' or b'-p' |
262 args = user and (b"%s@%s" % (user, host)) or host | 271 args = user and (b"%s@%s" % (user, host)) or host |
263 if args.startswith(b'-') or args.startswith(b'/'): | 272 if args.startswith(b'-') or args.startswith(b'/'): |
264 raise error.Abort( | 273 raise error.Abort( |
269 if port: | 278 if port: |
270 args = b'%s %s %s' % (pflag, shellquote(port), args) | 279 args = b'%s %s %s' % (pflag, shellquote(port), args) |
271 return args | 280 return args |
272 | 281 |
273 | 282 |
274 def setflags(f, l, x): | 283 def setflags(f: bytes, l: bool, x: bool) -> None: |
275 pass | 284 pass |
276 | 285 |
277 | 286 |
278 def copymode(src, dst, mode=None, enforcewritable=False): | 287 def copymode( |
288 src: bytes, | |
289 dst: bytes, | |
290 mode: Optional[bytes] = None, | |
291 enforcewritable: bool = False, | |
292 ) -> None: | |
279 pass | 293 pass |
280 | 294 |
281 | 295 |
282 def checkexec(path): | 296 def checkexec(path: bytes) -> bool: |
283 return False | 297 return False |
284 | 298 |
285 | 299 |
286 def checklink(path): | 300 def checklink(path: bytes) -> bool: |
287 return False | 301 return False |
288 | 302 |
289 | 303 |
290 def setbinary(fd): | 304 def setbinary(fd) -> None: |
291 # When run without console, pipes may expose invalid | 305 # When run without console, pipes may expose invalid |
292 # fileno(), usually set to -1. | 306 # fileno(), usually set to -1. |
293 fno = getattr(fd, 'fileno', None) | 307 fno = getattr(fd, 'fileno', None) |
294 if fno is not None and fno() >= 0: | 308 if fno is not None and fno() >= 0: |
295 msvcrt.setmode(fno(), os.O_BINARY) # pytype: disable=module-attr | 309 msvcrt.setmode(fno(), os.O_BINARY) # pytype: disable=module-attr |
296 | 310 |
297 | 311 |
298 def pconvert(path): | 312 def pconvert(path: bytes) -> bytes: |
299 return path.replace(pycompat.ossep, b'/') | 313 return path.replace(pycompat.ossep, b'/') |
300 | 314 |
301 | 315 |
302 def localpath(path): | 316 def localpath(path: bytes) -> bytes: |
303 return path.replace(b'/', b'\\') | 317 return path.replace(b'/', b'\\') |
304 | 318 |
305 | 319 |
306 def normpath(path): | 320 def normpath(path): |
307 return pconvert(os.path.normpath(path)) | 321 return pconvert(os.path.normpath(path)) |
308 | 322 |
309 | 323 |
310 def normcase(path): | 324 def normcase(path: bytes) -> bytes: |
311 return encoding.upper(path) # NTFS compares via upper() | 325 return encoding.upper(path) # NTFS compares via upper() |
312 | 326 |
313 | 327 |
314 DRIVE_RE_B = re.compile(b'^[a-z]:') | 328 DRIVE_RE_B = re.compile(b'^[a-z]:') |
315 DRIVE_RE_S = re.compile('^[a-z]:') | 329 DRIVE_RE_S = re.compile('^[a-z]:') |
466 # quote we've appended to the end) | 480 # quote we've appended to the end) |
467 _quotere = None | 481 _quotere = None |
468 _needsshellquote = None | 482 _needsshellquote = None |
469 | 483 |
470 | 484 |
471 def shellquote(s): | 485 def shellquote(s: bytes) -> bytes: |
472 r""" | 486 r""" |
473 >>> shellquote(br'C:\Users\xyz') | 487 >>> shellquote(br'C:\Users\xyz') |
474 '"C:\\Users\\xyz"' | 488 '"C:\\Users\\xyz"' |
475 >>> shellquote(br'C:\Users\xyz/mixed') | 489 >>> shellquote(br'C:\Users\xyz/mixed') |
476 '"C:\\Users\\xyz/mixed"' | 490 '"C:\\Users\\xyz/mixed"' |
502 if s.startswith(b'"') and s.endswith(b'"'): | 516 if s.startswith(b'"') and s.endswith(b'"'): |
503 return s[1:-1] | 517 return s[1:-1] |
504 return s | 518 return s |
505 | 519 |
506 | 520 |
507 def shellsplit(s): | 521 def shellsplit(s: bytes) -> List[bytes]: |
508 """Parse a command string in cmd.exe way (best-effort)""" | 522 """Parse a command string in cmd.exe way (best-effort)""" |
509 return pycompat.maplist(_unquote, pycompat.shlexsplit(s, posix=False)) | 523 return pycompat.maplist(_unquote, pycompat.shlexsplit(s, posix=False)) |
510 | 524 |
511 | 525 |
512 # if you change this stub into a real check, please try to implement the | 526 # if you change this stub into a real check, please try to implement the |
513 # username and groupname functions above, too. | 527 # username and groupname functions above, too. |
514 def isowner(st): | 528 def isowner(st: os.stat_result) -> bool: |
515 return True | 529 return True |
516 | 530 |
517 | 531 |
518 def findexe(command): | 532 def findexe(command: bytes) -> Optional[bytes]: |
519 """Find executable for command searching like cmd.exe does. | 533 """Find executable for command searching like cmd.exe does. |
520 If command is a basename then PATH is searched for command. | 534 If command is a basename then PATH is searched for command. |
521 PATH isn't searched if command is an absolute or relative path. | 535 PATH isn't searched if command is an absolute or relative path. |
522 An extension from PATHEXT is found and added if not present. | 536 An extension from PATHEXT is found and added if not present. |
523 If command isn't found None is returned.""" | 537 If command isn't found None is returned.""" |
524 pathext = encoding.environ.get(b'PATHEXT', b'.COM;.EXE;.BAT;.CMD') | 538 pathext = encoding.environ.get(b'PATHEXT', b'.COM;.EXE;.BAT;.CMD') |
525 pathexts = [ext for ext in pathext.lower().split(pycompat.ospathsep)] | 539 pathexts = [ext for ext in pathext.lower().split(pycompat.ospathsep)] |
526 if os.path.splitext(command)[1].lower() in pathexts: | 540 if os.path.splitext(command)[1].lower() in pathexts: |
527 pathexts = [b''] | 541 pathexts = [b''] |
528 | 542 |
529 def findexisting(pathcommand): | 543 def findexisting(pathcommand: bytes) -> Optional[bytes]: |
530 """Will append extension (if needed) and return existing file""" | 544 """Will append extension (if needed) and return existing file""" |
531 for ext in pathexts: | 545 for ext in pathexts: |
532 executable = pathcommand + ext | 546 executable = pathcommand + ext |
533 if os.path.exists(executable): | 547 if os.path.exists(executable): |
534 return executable | 548 return executable |
545 | 559 |
546 | 560 |
547 _wantedkinds = {stat.S_IFREG, stat.S_IFLNK} | 561 _wantedkinds = {stat.S_IFREG, stat.S_IFLNK} |
548 | 562 |
549 | 563 |
550 def statfiles(files): | 564 def statfiles(files: Sequence[bytes]) -> Iterator[Optional[os.stat_result]]: |
551 """Stat each file in files. Yield each stat, or None if a file | 565 """Stat each file in files. Yield each stat, or None if a file |
552 does not exist or has a type we don't care about. | 566 does not exist or has a type we don't care about. |
553 | 567 |
554 Cluster and cache stat per directory to minimize number of OS stat calls.""" | 568 Cluster and cache stat per directory to minimize number of OS stat calls.""" |
555 dircache = {} # dirname -> filename -> status | None if file does not exist | 569 dircache = {} # dirname -> filename -> status | None if file does not exist |
571 dmap = {} | 585 dmap = {} |
572 cache = dircache.setdefault(dir, dmap) | 586 cache = dircache.setdefault(dir, dmap) |
573 yield cache.get(base, None) | 587 yield cache.get(base, None) |
574 | 588 |
575 | 589 |
576 def username(uid=None): | 590 def username(uid: Optional[int] = None) -> Optional[bytes]: |
577 """Return the name of the user with the given uid. | 591 """Return the name of the user with the given uid. |
578 | 592 |
579 If uid is None, return the name of the current user.""" | 593 If uid is None, return the name of the current user.""" |
580 if not uid: | 594 if not uid: |
581 try: | 595 try: |
586 # which exists on Windows. | 600 # which exists on Windows. |
587 pass | 601 pass |
588 return None | 602 return None |
589 | 603 |
590 | 604 |
591 def groupname(gid=None): | 605 def groupname(gid: Optional[int] = None) -> Optional[bytes]: |
592 """Return the name of the group with the given gid. | 606 """Return the name of the group with the given gid. |
593 | 607 |
594 If gid is None, return the name of the current group.""" | 608 If gid is None, return the name of the current group.""" |
595 return None | 609 return None |
596 | 610 |
638 | 652 |
639 def gethgcmd(): | 653 def gethgcmd(): |
640 return [encoding.strtolocal(arg) for arg in [sys.executable] + sys.argv[:1]] | 654 return [encoding.strtolocal(arg) for arg in [sys.executable] + sys.argv[:1]] |
641 | 655 |
642 | 656 |
643 def groupmembers(name): | 657 def groupmembers(name: bytes) -> List[bytes]: |
644 # Don't support groups on Windows for now | 658 # Don't support groups on Windows for now |
645 raise KeyError | 659 raise KeyError |
646 | 660 |
647 | 661 |
648 def isexec(f): | 662 def isexec(f: bytes) -> bool: |
649 return False | 663 return False |
650 | 664 |
651 | 665 |
652 class cachestat: | 666 class cachestat: |
653 def __init__(self, path): | 667 def __init__(self, path): |
655 | 669 |
656 def cacheable(self): | 670 def cacheable(self): |
657 return False | 671 return False |
658 | 672 |
659 | 673 |
660 def lookupreg(key, valname=None, scope=None): | 674 def lookupreg( |
675 key: bytes, | |
676 valname: Optional[bytes] = None, | |
677 scope: Optional[Union[int, Iterable[int]]] = None, | |
678 ) -> Optional[bytes]: | |
661 """Look up a key/value name in the Windows registry. | 679 """Look up a key/value name in the Windows registry. |
662 | 680 |
663 valname: value name. If unspecified, the default value for the key | 681 valname: value name. If unspecified, the default value for the key |
664 is used. | 682 is used. |
665 scope: optionally specify scope for registry lookup, this can be | 683 scope: optionally specify scope for registry lookup, this can be |
691 | 709 |
692 | 710 |
693 expandglobs = True | 711 expandglobs = True |
694 | 712 |
695 | 713 |
696 def statislink(st): | 714 def statislink(st: Optional[os.stat_result]) -> bool: |
697 '''check whether a stat result is a symlink''' | 715 '''check whether a stat result is a symlink''' |
698 return False | 716 return False |
699 | 717 |
700 | 718 |
701 def statisexec(st): | 719 def statisexec(st: Optional[os.stat_result]) -> bool: |
702 '''check whether a stat result is an executable file''' | 720 '''check whether a stat result is an executable file''' |
703 return False | 721 return False |
704 | 722 |
705 | 723 |
706 def poll(fds): | 724 def poll(fds): |
707 # see posix.py for description | 725 # see posix.py for description |
708 raise NotImplementedError() | 726 raise NotImplementedError() |
709 | 727 |
710 | 728 |
711 def readpipe(pipe): | 729 def readpipe(pipe) -> bytes: |
712 """Read all available data from a pipe.""" | 730 """Read all available data from a pipe.""" |
713 chunks = [] | 731 chunks = [] |
714 while True: | 732 while True: |
715 size = win32.peekpipe(pipe) | 733 size = win32.peekpipe(pipe) |
716 if not size: | 734 if not size: |
722 chunks.append(s) | 740 chunks.append(s) |
723 | 741 |
724 return b''.join(chunks) | 742 return b''.join(chunks) |
725 | 743 |
726 | 744 |
727 def bindunixsocket(sock, path): | 745 def bindunixsocket(sock, path: bytes) -> NoReturn: |
728 raise NotImplementedError('unsupported platform') | 746 raise NotImplementedError('unsupported platform') |