Mercurial > public > mercurial-scm > hg
diff mercurial/win32.py @ 43076:2372284d9457
formatting: blacken the codebase
This is using my patch to black
(https://github.com/psf/black/pull/826) so we don't un-wrap collection
literals.
Done with:
hg files 'set:**.py - mercurial/thirdparty/** - "contrib/python-zstandard/**"' | xargs black -S
# skip-blame mass-reformatting only
# no-check-commit reformats foo_bar functions
Differential Revision: https://phab.mercurial-scm.org/D6971
author | Augie Fackler <augie@google.com> |
---|---|
date | Sun, 06 Oct 2019 09:45:02 -0400 |
parents | 576474baa209 |
children | 687b865b95ad |
line wrap: on
line diff
--- a/mercurial/win32.py Sat Oct 05 10:29:34 2019 -0400 +++ b/mercurial/win32.py Sun Oct 06 09:45:02 2019 -0400 @@ -55,21 +55,25 @@ _WPARAM = ctypes.c_ulonglong _LPARAM = ctypes.c_longlong + class _FILETIME(ctypes.Structure): - _fields_ = [(r'dwLowDateTime', _DWORD), - (r'dwHighDateTime', _DWORD)] + _fields_ = [(r'dwLowDateTime', _DWORD), (r'dwHighDateTime', _DWORD)] + class _BY_HANDLE_FILE_INFORMATION(ctypes.Structure): - _fields_ = [(r'dwFileAttributes', _DWORD), - (r'ftCreationTime', _FILETIME), - (r'ftLastAccessTime', _FILETIME), - (r'ftLastWriteTime', _FILETIME), - (r'dwVolumeSerialNumber', _DWORD), - (r'nFileSizeHigh', _DWORD), - (r'nFileSizeLow', _DWORD), - (r'nNumberOfLinks', _DWORD), - (r'nFileIndexHigh', _DWORD), - (r'nFileIndexLow', _DWORD)] + _fields_ = [ + (r'dwFileAttributes', _DWORD), + (r'ftCreationTime', _FILETIME), + (r'ftLastAccessTime', _FILETIME), + (r'ftLastWriteTime', _FILETIME), + (r'dwVolumeSerialNumber', _DWORD), + (r'nFileSizeHigh', _DWORD), + (r'nFileSizeLow', _DWORD), + (r'nNumberOfLinks', _DWORD), + (r'nFileIndexHigh', _DWORD), + (r'nFileIndexLow', _DWORD), + ] + # CreateFile _FILE_SHARE_READ = 0x00000001 @@ -90,51 +94,65 @@ # GetExitCodeProcess _STILL_ACTIVE = 259 + class _STARTUPINFO(ctypes.Structure): - _fields_ = [(r'cb', _DWORD), - (r'lpReserved', _LPSTR), - (r'lpDesktop', _LPSTR), - (r'lpTitle', _LPSTR), - (r'dwX', _DWORD), - (r'dwY', _DWORD), - (r'dwXSize', _DWORD), - (r'dwYSize', _DWORD), - (r'dwXCountChars', _DWORD), - (r'dwYCountChars', _DWORD), - (r'dwFillAttribute', _DWORD), - (r'dwFlags', _DWORD), - (r'wShowWindow', _WORD), - (r'cbReserved2', _WORD), - (r'lpReserved2', ctypes.c_char_p), - (r'hStdInput', _HANDLE), - (r'hStdOutput', _HANDLE), - (r'hStdError', _HANDLE)] + _fields_ = [ + (r'cb', _DWORD), + (r'lpReserved', _LPSTR), + (r'lpDesktop', _LPSTR), + (r'lpTitle', _LPSTR), + (r'dwX', _DWORD), + (r'dwY', _DWORD), + (r'dwXSize', _DWORD), + (r'dwYSize', _DWORD), + (r'dwXCountChars', _DWORD), + (r'dwYCountChars', _DWORD), + (r'dwFillAttribute', _DWORD), + (r'dwFlags', _DWORD), + (r'wShowWindow', _WORD), + (r'cbReserved2', _WORD), + (r'lpReserved2', ctypes.c_char_p), + (r'hStdInput', _HANDLE), + (r'hStdOutput', _HANDLE), + (r'hStdError', _HANDLE), + ] + class _PROCESS_INFORMATION(ctypes.Structure): - _fields_ = [(r'hProcess', _HANDLE), - (r'hThread', _HANDLE), - (r'dwProcessId', _DWORD), - (r'dwThreadId', _DWORD)] + _fields_ = [ + (r'hProcess', _HANDLE), + (r'hThread', _HANDLE), + (r'dwProcessId', _DWORD), + (r'dwThreadId', _DWORD), + ] + _CREATE_NO_WINDOW = 0x08000000 _SW_HIDE = 0 + class _COORD(ctypes.Structure): - _fields_ = [(r'X', ctypes.c_short), - (r'Y', ctypes.c_short)] + _fields_ = [(r'X', ctypes.c_short), (r'Y', ctypes.c_short)] + class _SMALL_RECT(ctypes.Structure): - _fields_ = [(r'Left', ctypes.c_short), - (r'Top', ctypes.c_short), - (r'Right', ctypes.c_short), - (r'Bottom', ctypes.c_short)] + _fields_ = [ + (r'Left', ctypes.c_short), + (r'Top', ctypes.c_short), + (r'Right', ctypes.c_short), + (r'Bottom', ctypes.c_short), + ] + class _CONSOLE_SCREEN_BUFFER_INFO(ctypes.Structure): - _fields_ = [(r'dwSize', _COORD), - (r'dwCursorPosition', _COORD), - (r'wAttributes', _WORD), - (r'srWindow', _SMALL_RECT), - (r'dwMaximumWindowSize', _COORD)] + _fields_ = [ + (r'dwSize', _COORD), + (r'dwCursorPosition', _COORD), + (r'wAttributes', _WORD), + (r'srWindow', _SMALL_RECT), + (r'dwMaximumWindowSize', _COORD), + ] + _STD_OUTPUT_HANDLE = _DWORD(-11).value _STD_ERROR_HANDLE = _DWORD(-12).value @@ -150,11 +168,9 @@ class CERT_CHAIN_CONTEXT(ctypes.Structure): _fields_ = ( (r"cbSize", _DWORD), - # CERT_TRUST_STATUS struct (r"dwErrorStatus", _DWORD), (r"dwInfoStatus", _DWORD), - (r"cChain", _DWORD), (r"rgpChain", ctypes.c_void_p), (r"cLowerQualityChainContext", _DWORD), @@ -163,15 +179,16 @@ (r"dwRevocationFreshnessTime", _DWORD), ) + class CERT_USAGE_MATCH(ctypes.Structure): _fields_ = ( (r"dwType", _DWORD), - - # CERT_ENHKEY_USAGE struct + # CERT_ENHKEY_USAGE struct (r"cUsageIdentifier", _DWORD), - (r"rgpszUsageIdentifier", ctypes.c_void_p), # LPSTR * + (r"rgpszUsageIdentifier", ctypes.c_void_p), # LPSTR * ) + class CERT_CHAIN_PARA(ctypes.Structure): _fields_ = ( (r"cbSize", _DWORD), @@ -180,35 +197,45 @@ (r"dwUrlRetrievalTimeout", _DWORD), (r"fCheckRevocationFreshnessTime", _BOOL), (r"dwRevocationFreshnessTime", _DWORD), - (r"pftCacheResync", ctypes.c_void_p), # LPFILETIME - (r"pStrongSignPara", ctypes.c_void_p), # PCCERT_STRONG_SIGN_PARA + (r"pftCacheResync", ctypes.c_void_p), # LPFILETIME + (r"pStrongSignPara", ctypes.c_void_p), # PCCERT_STRONG_SIGN_PARA (r"dwStrongSignFlags", _DWORD), ) + # types of parameters of C functions used (required by pypy) -_crypt32.CertCreateCertificateContext.argtypes = [_DWORD, # cert encoding - ctypes.c_char_p, # cert - _DWORD] # cert size +_crypt32.CertCreateCertificateContext.argtypes = [ + _DWORD, # cert encoding + ctypes.c_char_p, # cert + _DWORD, +] # cert size _crypt32.CertCreateCertificateContext.restype = _PCCERT_CONTEXT _crypt32.CertGetCertificateChain.argtypes = [ - ctypes.c_void_p, # HCERTCHAINENGINE - _PCCERT_CONTEXT, - ctypes.c_void_p, # LPFILETIME - ctypes.c_void_p, # HCERTSTORE - ctypes.c_void_p, # PCERT_CHAIN_PARA - _DWORD, - ctypes.c_void_p, # LPVOID - ctypes.c_void_p # PCCERT_CHAIN_CONTEXT * - ] + ctypes.c_void_p, # HCERTCHAINENGINE + _PCCERT_CONTEXT, + ctypes.c_void_p, # LPFILETIME + ctypes.c_void_p, # HCERTSTORE + ctypes.c_void_p, # PCERT_CHAIN_PARA + _DWORD, + ctypes.c_void_p, # LPVOID + ctypes.c_void_p, # PCCERT_CHAIN_CONTEXT * +] _crypt32.CertGetCertificateChain.restype = _BOOL _crypt32.CertFreeCertificateContext.argtypes = [_PCCERT_CONTEXT] _crypt32.CertFreeCertificateContext.restype = _BOOL -_kernel32.CreateFileA.argtypes = [_LPCSTR, _DWORD, _DWORD, ctypes.c_void_p, - _DWORD, _DWORD, _HANDLE] +_kernel32.CreateFileA.argtypes = [ + _LPCSTR, + _DWORD, + _DWORD, + ctypes.c_void_p, + _DWORD, + _DWORD, + _HANDLE, +] _kernel32.CreateFileA.restype = _HANDLE _kernel32.GetFileInformationByHandle.argtypes = [_HANDLE, ctypes.c_void_p] @@ -237,8 +264,16 @@ _kernel32.GetDriveTypeA.argtypes = [_LPCSTR] _kernel32.GetDriveTypeA.restype = _UINT -_kernel32.GetVolumeInformationA.argtypes = [_LPCSTR, ctypes.c_void_p, _DWORD, - ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, _DWORD] +_kernel32.GetVolumeInformationA.argtypes = [ + _LPCSTR, + ctypes.c_void_p, + _DWORD, + ctypes.c_void_p, + ctypes.c_void_p, + ctypes.c_void_p, + ctypes.c_void_p, + _DWORD, +] _kernel32.GetVolumeInformationA.restype = _BOOL _kernel32.GetVolumePathNameA.argtypes = [_LPCSTR, ctypes.c_void_p, _DWORD] @@ -256,9 +291,18 @@ _kernel32.GetModuleFileNameA.argtypes = [_HANDLE, ctypes.c_void_p, _DWORD] _kernel32.GetModuleFileNameA.restype = _DWORD -_kernel32.CreateProcessA.argtypes = [_LPCSTR, _LPCSTR, ctypes.c_void_p, - ctypes.c_void_p, _BOOL, _DWORD, ctypes.c_void_p, _LPCSTR, ctypes.c_void_p, - ctypes.c_void_p] +_kernel32.CreateProcessA.argtypes = [ + _LPCSTR, + _LPCSTR, + ctypes.c_void_p, + ctypes.c_void_p, + _BOOL, + _DWORD, + ctypes.c_void_p, + _LPCSTR, + ctypes.c_void_p, + ctypes.c_void_p, +] _kernel32.CreateProcessA.restype = _BOOL _kernel32.ExitProcess.argtypes = [_UINT] @@ -296,24 +340,39 @@ _user32.EnumWindows.argtypes = [_WNDENUMPROC, _LPARAM] _user32.EnumWindows.restype = _BOOL -_kernel32.PeekNamedPipe.argtypes = [_HANDLE, ctypes.c_void_p, _DWORD, - ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] +_kernel32.PeekNamedPipe.argtypes = [ + _HANDLE, + ctypes.c_void_p, + _DWORD, + ctypes.c_void_p, + ctypes.c_void_p, + ctypes.c_void_p, +] _kernel32.PeekNamedPipe.restype = _BOOL + def _raiseoserror(name): # Force the code to a signed int to avoid an 'int too large' error. # See https://bugs.python.org/issue28474 code = _kernel32.GetLastError() - if code > 0x7fffffff: - code -= 2**32 + if code > 0x7FFFFFFF: + code -= 2 ** 32 err = ctypes.WinError(code=code) - raise OSError(err.errno, r'%s: %s' % (encoding.strfromlocal(name), - err.strerror)) + raise OSError( + err.errno, r'%s: %s' % (encoding.strfromlocal(name), err.strerror) + ) + def _getfileinfo(name): - fh = _kernel32.CreateFileA(name, 0, - _FILE_SHARE_READ | _FILE_SHARE_WRITE | _FILE_SHARE_DELETE, - None, _OPEN_EXISTING, _FILE_FLAG_BACKUP_SEMANTICS, None) + fh = _kernel32.CreateFileA( + name, + 0, + _FILE_SHARE_READ | _FILE_SHARE_WRITE | _FILE_SHARE_DELETE, + None, + _OPEN_EXISTING, + _FILE_FLAG_BACKUP_SEMANTICS, + None, + ) if fh == _INVALID_HANDLE_VALUE: _raiseoserror(name) try: @@ -324,6 +383,7 @@ finally: _kernel32.CloseHandle(fh) + def checkcertificatechain(cert, build=True): '''Tests the given certificate to see if there is a complete chain to a trusted root certificate. As a side effect, missing certificates are @@ -336,11 +396,13 @@ chainctxptr = ctypes.POINTER(CERT_CHAIN_CONTEXT) pchainctx = chainctxptr() - chainpara = CERT_CHAIN_PARA(cbSize=ctypes.sizeof(CERT_CHAIN_PARA), - RequestedUsage=CERT_USAGE_MATCH()) + chainpara = CERT_CHAIN_PARA( + cbSize=ctypes.sizeof(CERT_CHAIN_PARA), RequestedUsage=CERT_USAGE_MATCH() + ) - certctx = _crypt32.CertCreateCertificateContext(X509_ASN_ENCODING, cert, - len(cert)) + certctx = _crypt32.CertCreateCertificateContext( + X509_ASN_ENCODING, cert, len(cert) + ) if certctx is None: _raiseoserror('CertCreateCertificateContext') @@ -351,14 +413,16 @@ try: # Building the certificate chain will update root certs as necessary. - if not _crypt32.CertGetCertificateChain(None, # hChainEngine - certctx, # pCertContext - None, # pTime - None, # hAdditionalStore - ctypes.byref(chainpara), - flags, - None, # pvReserved - ctypes.byref(pchainctx)): + if not _crypt32.CertGetCertificateChain( + None, # hChainEngine + certctx, # pCertContext + None, # pTime + None, # hAdditionalStore + ctypes.byref(chainpara), + flags, + None, # pvReserved + ctypes.byref(pchainctx), + ): _raiseoserror('CertGetCertificateChain') chainctx = pchainctx.contents @@ -369,24 +433,30 @@ _crypt32.CertFreeCertificateChain(pchainctx) _crypt32.CertFreeCertificateContext(certctx) + def oslink(src, dst): try: if not _kernel32.CreateHardLinkA(dst, src, None): _raiseoserror(src) - except AttributeError: # Wine doesn't support this function + except AttributeError: # Wine doesn't support this function _raiseoserror(src) + def nlinks(name): '''return number of hardlinks for the given file''' return _getfileinfo(name).nNumberOfLinks + def samefile(path1, path2): '''Returns whether path1 and path2 refer to the same file or directory.''' res1 = _getfileinfo(path1) res2 = _getfileinfo(path2) - return (res1.dwVolumeSerialNumber == res2.dwVolumeSerialNumber + return ( + res1.dwVolumeSerialNumber == res2.dwVolumeSerialNumber and res1.nFileIndexHigh == res2.nFileIndexHigh - and res1.nFileIndexLow == res2.nFileIndexLow) + and res1.nFileIndexLow == res2.nFileIndexLow + ) + def samedevice(path1, path2): '''Returns whether path1 and path2 are on the same device.''' @@ -394,12 +464,14 @@ res2 = _getfileinfo(path2) return res1.dwVolumeSerialNumber == res2.dwVolumeSerialNumber + def peekpipe(pipe): handle = msvcrt.get_osfhandle(pipe.fileno()) avail = _DWORD() - if not _kernel32.PeekNamedPipe(handle, None, 0, None, ctypes.byref(avail), - None): + if not _kernel32.PeekNamedPipe( + handle, None, 0, None, ctypes.byref(avail), None + ): err = _kernel32.GetLastError() if err == _ERROR_BROKEN_PIPE: return 0 @@ -407,12 +479,14 @@ return avail.value + def lasterrorwaspipeerror(err): if err.errno != errno.EINVAL: return False err = _kernel32.GetLastError() return err == _ERROR_BROKEN_PIPE or err == _ERROR_NO_DATA + def testpid(pid): '''return True if pid is still running or unable to determine, False otherwise''' @@ -426,17 +500,19 @@ _kernel32.CloseHandle(h) return _kernel32.GetLastError() != _ERROR_INVALID_PARAMETER + def executablepath(): '''return full path of hg.exe''' size = 600 buf = ctypes.create_string_buffer(size + 1) len = _kernel32.GetModuleFileNameA(None, ctypes.byref(buf), size) if len == 0: - raise ctypes.WinError() # Note: WinError is a function + raise ctypes.WinError() # Note: WinError is a function elif len == size: raise ctypes.WinError(_ERROR_INSUFFICIENT_BUFFER) return buf.value + def getvolumename(path): """Get the mount point of the filesystem from a directory or file (best-effort) @@ -452,10 +528,11 @@ buf = ctypes.create_string_buffer(size) if not _kernel32.GetVolumePathNameA(realpath, ctypes.byref(buf), size): - raise ctypes.WinError() # Note: WinError is a function + raise ctypes.WinError() # Note: WinError is a function return buf.value + def getfstype(path): """Get the filesystem type name from a directory or file (best-effort) @@ -467,19 +544,25 @@ if t == _DRIVE_REMOTE: return 'cifs' - elif t not in (_DRIVE_REMOVABLE, _DRIVE_FIXED, _DRIVE_CDROM, - _DRIVE_RAMDISK): + elif t not in ( + _DRIVE_REMOVABLE, + _DRIVE_FIXED, + _DRIVE_CDROM, + _DRIVE_RAMDISK, + ): return None size = _MAX_PATH + 1 name = ctypes.create_string_buffer(size) - if not _kernel32.GetVolumeInformationA(volume, None, 0, None, None, None, - ctypes.byref(name), size): - raise ctypes.WinError() # Note: WinError is a function + if not _kernel32.GetVolumeInformationA( + volume, None, 0, None, None, None, ctypes.byref(name), size + ): + raise ctypes.WinError() # Note: WinError is a function return name.value + def getuser(): '''return name of current user''' size = _DWORD(300) @@ -488,36 +571,40 @@ raise ctypes.WinError() return buf.value + _signalhandler = [] + def setsignalhandler(): '''Register a termination handler for console events including CTRL+C. python signal handlers do not work well with socket operations. ''' + def handler(event): _kernel32.ExitProcess(1) if _signalhandler: - return # already registered + return # already registered h = _SIGNAL_HANDLER(handler) - _signalhandler.append(h) # needed to prevent garbage collection + _signalhandler.append(h) # needed to prevent garbage collection if not _kernel32.SetConsoleCtrlHandler(h, True): raise ctypes.WinError() + def hidewindow(): - def callback(hwnd, pid): wpid = _DWORD() _user32.GetWindowThreadProcessId(hwnd, ctypes.byref(wpid)) if pid == wpid.value: _user32.ShowWindow(hwnd, _SW_HIDE) - return False # stop enumerating windows + return False # stop enumerating windows return True pid = _kernel32.GetCurrentProcessId() _user32.EnumWindows(_WNDENUMPROC(callback), pid) + def termsize(): # cmd.exe does not handle CR like a unix console, the CR is # counted in the line length. On 80 columns consoles, if 80 @@ -527,24 +614,27 @@ height = 25 # Query stderr to avoid problems with redirections screenbuf = _kernel32.GetStdHandle( - _STD_ERROR_HANDLE) # don't close the handle returned + _STD_ERROR_HANDLE + ) # don't close the handle returned if screenbuf is None or screenbuf == _INVALID_HANDLE_VALUE: return width, height csbi = _CONSOLE_SCREEN_BUFFER_INFO() - if not _kernel32.GetConsoleScreenBufferInfo( - screenbuf, ctypes.byref(csbi)): + if not _kernel32.GetConsoleScreenBufferInfo(screenbuf, ctypes.byref(csbi)): return width, height width = csbi.srWindow.Right - csbi.srWindow.Left # don't '+ 1' height = csbi.srWindow.Bottom - csbi.srWindow.Top + 1 return width, height + def enablevtmode(): '''Enable virtual terminal mode for the associated console. Return True if enabled, else False.''' ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x4 - handle = _kernel32.GetStdHandle(_STD_OUTPUT_HANDLE) # don't close the handle + handle = _kernel32.GetStdHandle( + _STD_OUTPUT_HANDLE + ) # don't close the handle if handle == _INVALID_HANDLE_VALUE: return False @@ -561,6 +651,7 @@ return True + def spawndetached(args): # No standard library function really spawns a fully detached # process under win32 because they allocate pipes or other objects @@ -583,8 +674,17 @@ # TODO: CreateProcessW on py3? res = _kernel32.CreateProcessA( - None, encoding.strtolocal(args), None, None, False, _CREATE_NO_WINDOW, - env, encoding.getcwd(), ctypes.byref(si), ctypes.byref(pi)) + None, + encoding.strtolocal(args), + None, + None, + False, + _CREATE_NO_WINDOW, + env, + encoding.getcwd(), + ctypes.byref(si), + ctypes.byref(pi), + ) if not res: raise ctypes.WinError() @@ -593,15 +693,18 @@ return pi.dwProcessId + def unlink(f): '''try to implement POSIX' unlink semantics on Windows''' if os.path.isdir(f): # use EPERM because it is POSIX prescribed value, even though # unlink(2) on directories returns EISDIR on Linux - raise IOError(errno.EPERM, - r"Unlinking directory not permitted: '%s'" - % encoding.strfromlocal(f)) + raise IOError( + errno.EPERM, + r"Unlinking directory not permitted: '%s'" + % encoding.strfromlocal(f), + ) # POSIX allows to unlink and rename open files. Windows has serious # problems with doing that: @@ -621,7 +724,7 @@ # implicit zombie filename blocking on a temporary name. for tries in pycompat.xrange(10): - temp = '%s-%08x' % (f, random.randint(0, 0xffffffff)) + temp = '%s-%08x' % (f, random.randint(0, 0xFFFFFFFF)) try: os.rename(f, temp) # raises OSError EEXIST if temp exists break @@ -646,6 +749,7 @@ # leaving some potentially serious inconsistencies. pass + def makedir(path, notindexed): os.mkdir(path) if notindexed: