comparison mercurial/windows.py @ 49812:58dff81ffba1

typing: add type hints to the common posix/windows platform functions These are done in sync because some platforms have empty implementations, and it isn't obvious what the types should be without examining the other. We want the types aligned, so @overload definitions that differ aren't generated. The only differences here are the few methods that unconditionally raise an error are marked as `NoReturn`, which doesn't seem to bother pytype. A couple of the posix module functions needed to be updated with a modern ternary operator, because pytype seems to want to use the type of the second object in the old `return x and y` style.
author Matt Harbison <matt_harbison@yahoo.com>
date Fri, 16 Dec 2022 00:54:39 -0500
parents c43b283a19c3
children 464fe8b8f474
comparison
equal deleted inserted replaced
49811:0a91aba258e0 49812:58dff81ffba1
16 import sys 16 import sys
17 import winreg # pytype: disable=import-error 17 import winreg # pytype: disable=import-error
18 18
19 from typing import ( 19 from typing import (
20 BinaryIO, 20 BinaryIO,
21 Iterable,
22 Iterator,
23 List,
24 NoReturn,
25 Optional,
26 Sequence,
27 Union,
21 ) 28 )
22 29
23 from .i18n import _ 30 from .i18n import _
24 from .pycompat import getattr 31 from .pycompat import getattr
25 from . import ( 32 from . import (
181 188
182 # may be wrapped by win32mbcs extension 189 # may be wrapped by win32mbcs extension
183 listdir = osutil.listdir 190 listdir = osutil.listdir
184 191
185 192
186 def get_password(): 193 def get_password() -> bytes:
187 """Prompt for password with echo off, using Windows getch(). 194 """Prompt for password with echo off, using Windows getch().
188 195
189 This shouldn't be called directly- use ``ui.getpass()`` instead, which 196 This shouldn't be called directly- use ``ui.getpass()`` instead, which
190 checks if the session is interactive first. 197 checks if the session is interactive first.
191 """ 198 """
242 if not win32.lasterrorwaspipeerror(inst): 249 if not win32.lasterrorwaspipeerror(inst):
243 raise 250 raise
244 raise IOError(errno.EPIPE, 'Broken pipe') 251 raise IOError(errno.EPIPE, 'Broken pipe')
245 252
246 253
247 def openhardlinks(): 254 def openhardlinks() -> bool:
248 return True 255 return True
249 256
250 257
251 def parsepatchoutput(output_line): 258 def parsepatchoutput(output_line: bytes) -> bytes:
252 """parses the output produced by patch and returns the filename""" 259 """parses the output produced by patch and returns the filename"""
253 pf = output_line[14:] 260 pf = output_line[14:]
254 if pf[0] == b'`': 261 if pf[0] == b'`':
255 pf = pf[1:-1] # Remove the quotes 262 pf = pf[1:-1] # Remove the quotes
256 return pf 263 return pf
257 264
258 265
259 def sshargs(sshcmd, host, user, port): 266 def sshargs(
267 sshcmd: bytes, host: bytes, user: Optional[bytes], port: Optional[bytes]
268 ) -> bytes:
260 '''Build argument list for ssh or Plink''' 269 '''Build argument list for ssh or Plink'''
261 pflag = b'plink' in sshcmd.lower() and b'-P' or b'-p' 270 pflag = b'plink' in sshcmd.lower() and b'-P' or b'-p'
262 args = user and (b"%s@%s" % (user, host)) or host 271 args = user and (b"%s@%s" % (user, host)) or host
263 if args.startswith(b'-') or args.startswith(b'/'): 272 if args.startswith(b'-') or args.startswith(b'/'):
264 raise error.Abort( 273 raise error.Abort(
269 if port: 278 if port:
270 args = b'%s %s %s' % (pflag, shellquote(port), args) 279 args = b'%s %s %s' % (pflag, shellquote(port), args)
271 return args 280 return args
272 281
273 282
274 def setflags(f, l, x): 283 def setflags(f: bytes, l: bool, x: bool) -> None:
275 pass 284 pass
276 285
277 286
278 def copymode(src, dst, mode=None, enforcewritable=False): 287 def copymode(
288 src: bytes,
289 dst: bytes,
290 mode: Optional[bytes] = None,
291 enforcewritable: bool = False,
292 ) -> None:
279 pass 293 pass
280 294
281 295
282 def checkexec(path): 296 def checkexec(path: bytes) -> bool:
283 return False 297 return False
284 298
285 299
286 def checklink(path): 300 def checklink(path: bytes) -> bool:
287 return False 301 return False
288 302
289 303
290 def setbinary(fd): 304 def setbinary(fd) -> None:
291 # When run without console, pipes may expose invalid 305 # When run without console, pipes may expose invalid
292 # fileno(), usually set to -1. 306 # fileno(), usually set to -1.
293 fno = getattr(fd, 'fileno', None) 307 fno = getattr(fd, 'fileno', None)
294 if fno is not None and fno() >= 0: 308 if fno is not None and fno() >= 0:
295 msvcrt.setmode(fno(), os.O_BINARY) # pytype: disable=module-attr 309 msvcrt.setmode(fno(), os.O_BINARY) # pytype: disable=module-attr
296 310
297 311
298 def pconvert(path): 312 def pconvert(path: bytes) -> bytes:
299 return path.replace(pycompat.ossep, b'/') 313 return path.replace(pycompat.ossep, b'/')
300 314
301 315
302 def localpath(path): 316 def localpath(path: bytes) -> bytes:
303 return path.replace(b'/', b'\\') 317 return path.replace(b'/', b'\\')
304 318
305 319
306 def normpath(path): 320 def normpath(path):
307 return pconvert(os.path.normpath(path)) 321 return pconvert(os.path.normpath(path))
308 322
309 323
310 def normcase(path): 324 def normcase(path: bytes) -> bytes:
311 return encoding.upper(path) # NTFS compares via upper() 325 return encoding.upper(path) # NTFS compares via upper()
312 326
313 327
314 DRIVE_RE_B = re.compile(b'^[a-z]:') 328 DRIVE_RE_B = re.compile(b'^[a-z]:')
315 DRIVE_RE_S = re.compile('^[a-z]:') 329 DRIVE_RE_S = re.compile('^[a-z]:')
466 # quote we've appended to the end) 480 # quote we've appended to the end)
467 _quotere = None 481 _quotere = None
468 _needsshellquote = None 482 _needsshellquote = None
469 483
470 484
471 def shellquote(s): 485 def shellquote(s: bytes) -> bytes:
472 r""" 486 r"""
473 >>> shellquote(br'C:\Users\xyz') 487 >>> shellquote(br'C:\Users\xyz')
474 '"C:\\Users\\xyz"' 488 '"C:\\Users\\xyz"'
475 >>> shellquote(br'C:\Users\xyz/mixed') 489 >>> shellquote(br'C:\Users\xyz/mixed')
476 '"C:\\Users\\xyz/mixed"' 490 '"C:\\Users\\xyz/mixed"'
502 if s.startswith(b'"') and s.endswith(b'"'): 516 if s.startswith(b'"') and s.endswith(b'"'):
503 return s[1:-1] 517 return s[1:-1]
504 return s 518 return s
505 519
506 520
507 def shellsplit(s): 521 def shellsplit(s: bytes) -> List[bytes]:
508 """Parse a command string in cmd.exe way (best-effort)""" 522 """Parse a command string in cmd.exe way (best-effort)"""
509 return pycompat.maplist(_unquote, pycompat.shlexsplit(s, posix=False)) 523 return pycompat.maplist(_unquote, pycompat.shlexsplit(s, posix=False))
510 524
511 525
512 # if you change this stub into a real check, please try to implement the 526 # if you change this stub into a real check, please try to implement the
513 # username and groupname functions above, too. 527 # username and groupname functions above, too.
514 def isowner(st): 528 def isowner(st: os.stat_result) -> bool:
515 return True 529 return True
516 530
517 531
518 def findexe(command): 532 def findexe(command: bytes) -> Optional[bytes]:
519 """Find executable for command searching like cmd.exe does. 533 """Find executable for command searching like cmd.exe does.
520 If command is a basename then PATH is searched for command. 534 If command is a basename then PATH is searched for command.
521 PATH isn't searched if command is an absolute or relative path. 535 PATH isn't searched if command is an absolute or relative path.
522 An extension from PATHEXT is found and added if not present. 536 An extension from PATHEXT is found and added if not present.
523 If command isn't found None is returned.""" 537 If command isn't found None is returned."""
524 pathext = encoding.environ.get(b'PATHEXT', b'.COM;.EXE;.BAT;.CMD') 538 pathext = encoding.environ.get(b'PATHEXT', b'.COM;.EXE;.BAT;.CMD')
525 pathexts = [ext for ext in pathext.lower().split(pycompat.ospathsep)] 539 pathexts = [ext for ext in pathext.lower().split(pycompat.ospathsep)]
526 if os.path.splitext(command)[1].lower() in pathexts: 540 if os.path.splitext(command)[1].lower() in pathexts:
527 pathexts = [b''] 541 pathexts = [b'']
528 542
529 def findexisting(pathcommand): 543 def findexisting(pathcommand: bytes) -> Optional[bytes]:
530 """Will append extension (if needed) and return existing file""" 544 """Will append extension (if needed) and return existing file"""
531 for ext in pathexts: 545 for ext in pathexts:
532 executable = pathcommand + ext 546 executable = pathcommand + ext
533 if os.path.exists(executable): 547 if os.path.exists(executable):
534 return executable 548 return executable
545 559
546 560
547 _wantedkinds = {stat.S_IFREG, stat.S_IFLNK} 561 _wantedkinds = {stat.S_IFREG, stat.S_IFLNK}
548 562
549 563
550 def statfiles(files): 564 def statfiles(files: Sequence[bytes]) -> Iterator[Optional[os.stat_result]]:
551 """Stat each file in files. Yield each stat, or None if a file 565 """Stat each file in files. Yield each stat, or None if a file
552 does not exist or has a type we don't care about. 566 does not exist or has a type we don't care about.
553 567
554 Cluster and cache stat per directory to minimize number of OS stat calls.""" 568 Cluster and cache stat per directory to minimize number of OS stat calls."""
555 dircache = {} # dirname -> filename -> status | None if file does not exist 569 dircache = {} # dirname -> filename -> status | None if file does not exist
571 dmap = {} 585 dmap = {}
572 cache = dircache.setdefault(dir, dmap) 586 cache = dircache.setdefault(dir, dmap)
573 yield cache.get(base, None) 587 yield cache.get(base, None)
574 588
575 589
576 def username(uid=None): 590 def username(uid: Optional[int] = None) -> Optional[bytes]:
577 """Return the name of the user with the given uid. 591 """Return the name of the user with the given uid.
578 592
579 If uid is None, return the name of the current user.""" 593 If uid is None, return the name of the current user."""
580 if not uid: 594 if not uid:
581 try: 595 try:
586 # which exists on Windows. 600 # which exists on Windows.
587 pass 601 pass
588 return None 602 return None
589 603
590 604
591 def groupname(gid=None): 605 def groupname(gid: Optional[int] = None) -> Optional[bytes]:
592 """Return the name of the group with the given gid. 606 """Return the name of the group with the given gid.
593 607
594 If gid is None, return the name of the current group.""" 608 If gid is None, return the name of the current group."""
595 return None 609 return None
596 610
638 652
639 def gethgcmd(): 653 def gethgcmd():
640 return [encoding.strtolocal(arg) for arg in [sys.executable] + sys.argv[:1]] 654 return [encoding.strtolocal(arg) for arg in [sys.executable] + sys.argv[:1]]
641 655
642 656
643 def groupmembers(name): 657 def groupmembers(name: bytes) -> List[bytes]:
644 # Don't support groups on Windows for now 658 # Don't support groups on Windows for now
645 raise KeyError 659 raise KeyError
646 660
647 661
648 def isexec(f): 662 def isexec(f: bytes) -> bool:
649 return False 663 return False
650 664
651 665
652 class cachestat: 666 class cachestat:
653 def __init__(self, path): 667 def __init__(self, path):
655 669
656 def cacheable(self): 670 def cacheable(self):
657 return False 671 return False
658 672
659 673
660 def lookupreg(key, valname=None, scope=None): 674 def lookupreg(
675 key: bytes,
676 valname: Optional[bytes] = None,
677 scope: Optional[Union[int, Iterable[int]]] = None,
678 ) -> Optional[bytes]:
661 """Look up a key/value name in the Windows registry. 679 """Look up a key/value name in the Windows registry.
662 680
663 valname: value name. If unspecified, the default value for the key 681 valname: value name. If unspecified, the default value for the key
664 is used. 682 is used.
665 scope: optionally specify scope for registry lookup, this can be 683 scope: optionally specify scope for registry lookup, this can be
691 709
692 710
693 expandglobs = True 711 expandglobs = True
694 712
695 713
696 def statislink(st): 714 def statislink(st: Optional[os.stat_result]) -> bool:
697 '''check whether a stat result is a symlink''' 715 '''check whether a stat result is a symlink'''
698 return False 716 return False
699 717
700 718
701 def statisexec(st): 719 def statisexec(st: Optional[os.stat_result]) -> bool:
702 '''check whether a stat result is an executable file''' 720 '''check whether a stat result is an executable file'''
703 return False 721 return False
704 722
705 723
706 def poll(fds): 724 def poll(fds):
707 # see posix.py for description 725 # see posix.py for description
708 raise NotImplementedError() 726 raise NotImplementedError()
709 727
710 728
711 def readpipe(pipe): 729 def readpipe(pipe) -> bytes:
712 """Read all available data from a pipe.""" 730 """Read all available data from a pipe."""
713 chunks = [] 731 chunks = []
714 while True: 732 while True:
715 size = win32.peekpipe(pipe) 733 size = win32.peekpipe(pipe)
716 if not size: 734 if not size:
722 chunks.append(s) 740 chunks.append(s)
723 741
724 return b''.join(chunks) 742 return b''.join(chunks)
725 743
726 744
727 def bindunixsocket(sock, path): 745 def bindunixsocket(sock, path: bytes) -> NoReturn:
728 raise NotImplementedError('unsupported platform') 746 raise NotImplementedError('unsupported platform')