Mercurial > public > mercurial-scm > hg-stable
diff mercurial/sslutil.py @ 43076:2372284d9457
formatting: blacken the codebase
This is using my patch to black
(https://github.com/psf/black/pull/826) so we don't un-wrap collection
literals.
Done with:
hg files 'set:**.py - mercurial/thirdparty/** - "contrib/python-zstandard/**"' | xargs black -S
# skip-blame mass-reformatting only
# no-check-commit reformats foo_bar functions
Differential Revision: https://phab.mercurial-scm.org/D6971
author | Augie Fackler <augie@google.com> |
---|---|
date | Sun, 06 Oct 2019 09:45:02 -0400 |
parents | c8d55ff80da1 |
children | 687b865b95ad |
line wrap: on
line diff
--- a/mercurial/sslutil.py Sat Oct 05 10:29:34 2019 -0400 +++ b/mercurial/sslutil.py Sun Oct 06 09:45:02 2019 -0400 @@ -113,6 +113,7 @@ return ssl.wrap_socket(socket, **args) + def _hostsettings(ui, hostname): """Obtain security settings for a hostname. @@ -149,10 +150,11 @@ def validateprotocol(protocol, key): if protocol not in configprotocols: raise error.Abort( - _('unsupported protocol from hostsecurity.%s: %s') % - (key, protocol), - hint=_('valid protocols: %s') % - ' '.join(sorted(configprotocols))) + _('unsupported protocol from hostsecurity.%s: %s') + % (key, protocol), + hint=_('valid protocols: %s') + % ' '.join(sorted(configprotocols)), + ) # We default to TLS 1.1+ where we can because TLS 1.0 has known # vulnerabilities (like BEAST and POODLE). We allow users to downgrade to @@ -165,10 +167,15 @@ # the bold warnings on the web site. # internal config: hostsecurity.disabletls10warning if not ui.configbool('hostsecurity', 'disabletls10warning'): - ui.warn(_('warning: connecting to %s using legacy security ' - 'technology (TLS 1.0); see ' - 'https://mercurial-scm.org/wiki/SecureConnections for ' - 'more info\n') % bhostname) + ui.warn( + _( + 'warning: connecting to %s using legacy security ' + 'technology (TLS 1.0); see ' + 'https://mercurial-scm.org/wiki/SecureConnections for ' + 'more info\n' + ) + % bhostname + ) defaultprotocol = 'tls1.0' key = 'minimumprotocol' @@ -196,10 +203,10 @@ fingerprints = ui.configlist('hostsecurity', '%s:fingerprints' % bhostname) for fingerprint in fingerprints: if not (fingerprint.startswith(('sha1:', 'sha256:', 'sha512:'))): - raise error.Abort(_('invalid fingerprint for %s: %s') % ( - bhostname, fingerprint), - hint=_('must begin with "sha1:", "sha256:", ' - 'or "sha512:"')) + raise error.Abort( + _('invalid fingerprint for %s: %s') % (bhostname, fingerprint), + hint=_('must begin with "sha1:", "sha256:", ' 'or "sha512:"'), + ) alg, fingerprint = fingerprint.split(':', 1) fingerprint = fingerprint.replace(':', '').lower() @@ -231,9 +238,14 @@ # being performed. cafile = ui.config('hostsecurity', '%s:verifycertsfile' % bhostname) if s['certfingerprints'] and cafile: - ui.warn(_('(hostsecurity.%s:verifycertsfile ignored when host ' - 'fingerprints defined; using host fingerprints for ' - 'verification)\n') % bhostname) + ui.warn( + _( + '(hostsecurity.%s:verifycertsfile ignored when host ' + 'fingerprints defined; using host fingerprints for ' + 'verification)\n' + ) + % bhostname + ) # Try to hook up CA certificate validation unless something above # makes it not necessary. @@ -242,9 +254,10 @@ if cafile: cafile = util.expandpath(cafile) if not os.path.exists(cafile): - raise error.Abort(_('path specified by %s does not exist: %s') % - ('hostsecurity.%s:verifycertsfile' % ( - bhostname,), cafile)) + raise error.Abort( + _('path specified by %s does not exist: %s') + % ('hostsecurity.%s:verifycertsfile' % (bhostname,), cafile) + ) s['cafile'] = cafile else: # Find global certificates file in config. @@ -253,8 +266,9 @@ if cafile: cafile = util.expandpath(cafile) if not os.path.exists(cafile): - raise error.Abort(_('could not find web.cacerts: %s') % - cafile) + raise error.Abort( + _('could not find web.cacerts: %s') % cafile + ) elif s['allowloaddefaultcerts']: # CAs not defined in config. Try to find system bundles. cafile = _defaultcacerts(ui) @@ -281,6 +295,7 @@ return s + def protocolsettings(protocol): """Resolve the protocol for a config value. @@ -304,10 +319,14 @@ # full/real SSLContext available to us. if supportedprotocols == {'tls1.0'}: if protocol != 'tls1.0': - raise error.Abort(_('current Python does not support protocol ' - 'setting %s') % protocol, - hint=_('upgrade Python or disable setting since ' - 'only TLS 1.0 is supported')) + raise error.Abort( + _('current Python does not support protocol ' 'setting %s') + % protocol, + hint=_( + 'upgrade Python or disable setting since ' + 'only TLS 1.0 is supported' + ), + ) return ssl.PROTOCOL_TLSv1, 0, 'tls1.0' @@ -333,6 +352,7 @@ return ssl.PROTOCOL_SSLv23, options, protocol + def wrapsocket(sock, keyfile, certfile, ui, serverhostname=None): """Add SSL/TLS to a socket. @@ -352,21 +372,29 @@ if b'SSLKEYLOGFILE' in encoding.environ: try: import sslkeylog - sslkeylog.set_keylog(pycompat.fsdecode( - encoding.environ[b'SSLKEYLOGFILE'])) + + sslkeylog.set_keylog( + pycompat.fsdecode(encoding.environ[b'SSLKEYLOGFILE']) + ) ui.warn( - b'sslkeylog enabled by SSLKEYLOGFILE environment variable\n') + b'sslkeylog enabled by SSLKEYLOGFILE environment variable\n' + ) except ImportError: - ui.warn(b'sslkeylog module missing, ' - b'but SSLKEYLOGFILE set in environment\n') + ui.warn( + b'sslkeylog module missing, ' + b'but SSLKEYLOGFILE set in environment\n' + ) for f in (keyfile, certfile): if f and not os.path.exists(f): raise error.Abort( _('certificate file (%s) does not exist; cannot connect to %s') % (f, pycompat.bytesurl(serverhostname)), - hint=_('restore missing file or fix references ' - 'in Mercurial config')) + hint=_( + 'restore missing file or fix references ' + 'in Mercurial config' + ), + ) settings = _hostsettings(ui, serverhostname) @@ -392,26 +420,31 @@ raise error.Abort( _('could not set ciphers: %s') % stringutil.forcebytestr(e.args[0]), - hint=_('change cipher string (%s) in config') % - settings['ciphers']) + hint=_('change cipher string (%s) in config') + % settings['ciphers'], + ) if certfile is not None: + def password(): f = keyfile or certfile return ui.getpass(_('passphrase for %s: ') % f, '') + sslcontext.load_cert_chain(certfile, keyfile, password) if settings['cafile'] is not None: try: sslcontext.load_verify_locations(cafile=settings['cafile']) except ssl.SSLError as e: - if len(e.args) == 1: # pypy has different SSLError args + if len(e.args) == 1: # pypy has different SSLError args msg = e.args[0] else: msg = e.args[1] - raise error.Abort(_('error loading CA file %s: %s') % ( - settings['cafile'], stringutil.forcebytestr(msg)), - hint=_('file is empty or malformed?')) + raise error.Abort( + _('error loading CA file %s: %s') + % (settings['cafile'], stringutil.forcebytestr(msg)), + hint=_('file is empty or malformed?'), + ) caloaded = True elif settings['allowloaddefaultcerts']: # This is a no-op on old Python. @@ -433,13 +466,21 @@ # When the main 20916 bug occurs, 'sslcontext.get_ca_certs()' is a # non-empty list, but the following conditional is otherwise True. try: - if (caloaded and settings['verifymode'] == ssl.CERT_REQUIRED and - modernssl and not sslcontext.get_ca_certs()): - ui.warn(_('(an attempt was made to load CA certificates but ' - 'none were loaded; see ' - 'https://mercurial-scm.org/wiki/SecureConnections ' - 'for how to configure Mercurial to avoid this ' - 'error)\n')) + if ( + caloaded + and settings['verifymode'] == ssl.CERT_REQUIRED + and modernssl + and not sslcontext.get_ca_certs() + ): + ui.warn( + _( + '(an attempt was made to load CA certificates but ' + 'none were loaded; see ' + 'https://mercurial-scm.org/wiki/SecureConnections ' + 'for how to configure Mercurial to avoid this ' + 'error)\n' + ) + ) except ssl.SSLError: pass @@ -459,51 +500,76 @@ # client doesn't support modern TLS versions introduced # several years from when this comment was written). if supportedprotocols != {'tls1.0'}: - ui.warn(_( - '(could not communicate with %s using security ' - 'protocols %s; if you are using a modern Mercurial ' - 'version, consider contacting the operator of this ' - 'server; see ' - 'https://mercurial-scm.org/wiki/SecureConnections ' - 'for more info)\n') % ( + ui.warn( + _( + '(could not communicate with %s using security ' + 'protocols %s; if you are using a modern Mercurial ' + 'version, consider contacting the operator of this ' + 'server; see ' + 'https://mercurial-scm.org/wiki/SecureConnections ' + 'for more info)\n' + ) + % ( pycompat.bytesurl(serverhostname), - ', '.join(sorted(supportedprotocols)))) + ', '.join(sorted(supportedprotocols)), + ) + ) else: - ui.warn(_( - '(could not communicate with %s using TLS 1.0; the ' - 'likely cause of this is the server no longer ' - 'supports TLS 1.0 because it has known security ' - 'vulnerabilities; see ' - 'https://mercurial-scm.org/wiki/SecureConnections ' - 'for more info)\n') % - pycompat.bytesurl(serverhostname)) + ui.warn( + _( + '(could not communicate with %s using TLS 1.0; the ' + 'likely cause of this is the server no longer ' + 'supports TLS 1.0 because it has known security ' + 'vulnerabilities; see ' + 'https://mercurial-scm.org/wiki/SecureConnections ' + 'for more info)\n' + ) + % pycompat.bytesurl(serverhostname) + ) else: # We attempted TLS 1.1+. We can only get here if the client # supports the configured protocol. So the likely reason is # the client wants better security than the server can # offer. - ui.warn(_( - '(could not negotiate a common security protocol (%s+) ' - 'with %s; the likely cause is Mercurial is configured ' - 'to be more secure than the server can support)\n') % ( - settings['protocolui'], - pycompat.bytesurl(serverhostname))) - ui.warn(_('(consider contacting the operator of this ' - 'server and ask them to support modern TLS ' - 'protocol versions; or, set ' - 'hostsecurity.%s:minimumprotocol=tls1.0 to allow ' - 'use of legacy, less secure protocols when ' - 'communicating with this server)\n') % - pycompat.bytesurl(serverhostname)) - ui.warn(_( - '(see https://mercurial-scm.org/wiki/SecureConnections ' - 'for more info)\n')) + ui.warn( + _( + '(could not negotiate a common security protocol (%s+) ' + 'with %s; the likely cause is Mercurial is configured ' + 'to be more secure than the server can support)\n' + ) + % ( + settings['protocolui'], + pycompat.bytesurl(serverhostname), + ) + ) + ui.warn( + _( + '(consider contacting the operator of this ' + 'server and ask them to support modern TLS ' + 'protocol versions; or, set ' + 'hostsecurity.%s:minimumprotocol=tls1.0 to allow ' + 'use of legacy, less secure protocols when ' + 'communicating with this server)\n' + ) + % pycompat.bytesurl(serverhostname) + ) + ui.warn( + _( + '(see https://mercurial-scm.org/wiki/SecureConnections ' + 'for more info)\n' + ) + ) - elif (e.reason == r'CERTIFICATE_VERIFY_FAILED' and - pycompat.iswindows): + elif ( + e.reason == r'CERTIFICATE_VERIFY_FAILED' and pycompat.iswindows + ): - ui.warn(_('(the full certificate chain may not be available ' - 'locally; see "hg help debugssl")\n')) + ui.warn( + _( + '(the full certificate chain may not be available ' + 'locally; see "hg help debugssl")\n' + ) + ) raise # check if wrap_socket failed silently because socket had been @@ -521,8 +587,10 @@ return sslsocket -def wrapserversocket(sock, ui, certfile=None, keyfile=None, cafile=None, - requireclientcert=False): + +def wrapserversocket( + sock, ui, certfile=None, keyfile=None, cafile=None, requireclientcert=False +): """Wrap a socket for use by servers. ``certfile`` and ``keyfile`` specify the files containing the certificate's @@ -539,8 +607,9 @@ # doesn't have to be as detailed as for wrapsocket(). for f in (certfile, keyfile, cafile): if f and not os.path.exists(f): - raise error.Abort(_('referenced certificate file (%s) does not ' - 'exist') % f) + raise error.Abort( + _('referenced certificate file (%s) does not ' 'exist') % f + ) protocol, options, _protocolui = protocolsettings('tls1.0') @@ -558,8 +627,9 @@ raise error.Abort(_('TLS 1.2 not supported by this Python')) protocol = ssl.PROTOCOL_TLSv1_2 elif exactprotocol: - raise error.Abort(_('invalid value for serverexactprotocol: %s') % - exactprotocol) + raise error.Abort( + _('invalid value for serverexactprotocol: %s') % exactprotocol + ) if modernssl: # We /could/ use create_default_context() here since it doesn't load @@ -592,9 +662,11 @@ return sslcontext.wrap_socket(sock, server_side=True) + class wildcarderror(Exception): """Represents an error parsing wildcards in DNS name.""" + def _dnsnamematch(dn, hostname, maxwildcards=1): """Match DNS names according RFC 6125 section 6.4.3. @@ -615,7 +687,8 @@ wildcards = leftmost.count('*') if wildcards > maxwildcards: raise wildcarderror( - _('too many wildcards in certificate DNS name: %s') % dn) + _('too many wildcards in certificate DNS name: %s') % dn + ) # speed up common case w/o wildcards if not wildcards: @@ -645,6 +718,7 @@ pat = re.compile(br'\A' + br'\.'.join(pats) + br'\Z', re.IGNORECASE) return pat.match(hostname) is not None + def _verifycert(cert, hostname): '''Verify that cert (in socket.getpeercert() format) matches hostname. CRLs is not handled. @@ -695,6 +769,7 @@ else: return _('no commonName or subjectAltName found in certificate') + def _plainapplepython(): """return true if this seems to be a pure Apple Python that * is unfrozen and presumably has the whole mercurial module in the file @@ -703,12 +778,17 @@ for using system certificate store CAs in addition to the provided cacerts file """ - if (not pycompat.isdarwin or procutil.mainfrozen() or - not pycompat.sysexecutable): + if ( + not pycompat.isdarwin + or procutil.mainfrozen() + or not pycompat.sysexecutable + ): return False exe = os.path.realpath(pycompat.sysexecutable).lower() - return (exe.startswith('/usr/bin/python') or - exe.startswith('/system/library/frameworks/python.framework/')) + return exe.startswith('/usr/bin/python') or exe.startswith( + '/system/library/frameworks/python.framework/' + ) + _systemcacertpaths = [ # RHEL, CentOS, and Fedora @@ -717,6 +797,7 @@ '/etc/ssl/certs/ca-certificates.crt', ] + def _defaultcacerts(ui): """return path to default CA certificates or None. @@ -731,6 +812,7 @@ # and usable, assume the user intends it to be used and use it. try: import certifi + certs = certifi.where() if os.path.exists(certs): ui.debug('using ca certificates from certifi\n') @@ -745,9 +827,13 @@ # Assertion: this code is only called if certificates are being verified. if pycompat.iswindows: if not _canloaddefaultcerts: - ui.warn(_('(unable to load Windows CA certificates; see ' - 'https://mercurial-scm.org/wiki/SecureConnections for ' - 'how to configure Mercurial to avoid this message)\n')) + ui.warn( + _( + '(unable to load Windows CA certificates; see ' + 'https://mercurial-scm.org/wiki/SecureConnections for ' + 'how to configure Mercurial to avoid this message)\n' + ) + ) return None @@ -756,7 +842,8 @@ # trick. if _plainapplepython(): dummycert = os.path.join( - os.path.dirname(pycompat.fsencode(__file__)), 'dummycert.pem') + os.path.dirname(pycompat.fsencode(__file__)), 'dummycert.pem' + ) if os.path.exists(dummycert): return dummycert @@ -767,9 +854,13 @@ # files. Also consider exporting the keychain certs to a file during # Mercurial install. if not _canloaddefaultcerts: - ui.warn(_('(unable to load CA certificates; see ' - 'https://mercurial-scm.org/wiki/SecureConnections for ' - 'how to configure Mercurial to avoid this message)\n')) + ui.warn( + _( + '(unable to load CA certificates; see ' + 'https://mercurial-scm.org/wiki/SecureConnections for ' + 'how to configure Mercurial to avoid this message)\n' + ) + ) return None # / is writable on Windows. Out of an abundance of caution make sure @@ -787,20 +878,30 @@ if not _canloaddefaultcerts: for path in _systemcacertpaths: if os.path.isfile(path): - ui.warn(_('(using CA certificates from %s; if you see this ' - 'message, your Mercurial install is not properly ' - 'configured; see ' - 'https://mercurial-scm.org/wiki/SecureConnections ' - 'for how to configure Mercurial to avoid this ' - 'message)\n') % path) + ui.warn( + _( + '(using CA certificates from %s; if you see this ' + 'message, your Mercurial install is not properly ' + 'configured; see ' + 'https://mercurial-scm.org/wiki/SecureConnections ' + 'for how to configure Mercurial to avoid this ' + 'message)\n' + ) + % path + ) return path - ui.warn(_('(unable to load CA certificates; see ' - 'https://mercurial-scm.org/wiki/SecureConnections for ' - 'how to configure Mercurial to avoid this message)\n')) + ui.warn( + _( + '(unable to load CA certificates; see ' + 'https://mercurial-scm.org/wiki/SecureConnections for ' + 'how to configure Mercurial to avoid this message)\n' + ) + ) return None + def validatesocket(sock): """Validate a socket meets security requirements. @@ -818,8 +919,9 @@ raise error.Abort(_('%s ssl connection error') % host) if not peercert: - raise error.Abort(_('%s certificate error: ' - 'no certificate received') % host) + raise error.Abort( + _('%s certificate error: ' 'no certificate received') % host + ) if settings['disablecertverification']: # We don't print the certificate fingerprint because it shouldn't @@ -828,9 +930,14 @@ # to verify the certificate and this message would have printed the # fingerprint. So printing the fingerprint here adds little to no # value. - ui.warn(_('warning: connection security to %s is disabled per current ' - 'settings; communication is susceptible to eavesdropping ' - 'and tampering\n') % host) + ui.warn( + _( + 'warning: connection security to %s is disabled per current ' + 'settings; communication is susceptible to eavesdropping ' + 'and tampering\n' + ) + % host + ) return # If a certificate fingerprint is pinned, use it and only it to @@ -842,23 +949,29 @@ } def fmtfingerprint(s): - return ':'.join([s[x:x + 2] for x in range(0, len(s), 2)]) + return ':'.join([s[x : x + 2] for x in range(0, len(s), 2)]) nicefingerprint = 'sha256:%s' % fmtfingerprint(peerfingerprints['sha256']) if settings['certfingerprints']: for hash, fingerprint in settings['certfingerprints']: if peerfingerprints[hash].lower() == fingerprint: - ui.debug('%s certificate matched fingerprint %s:%s\n' % - (host, hash, fmtfingerprint(fingerprint))) + ui.debug( + '%s certificate matched fingerprint %s:%s\n' + % (host, hash, fmtfingerprint(fingerprint)) + ) if settings['legacyfingerprint']: - ui.warn(_('(SHA-1 fingerprint for %s found in legacy ' - '[hostfingerprints] section; ' - 'if you trust this fingerprint, remove the old ' - 'SHA-1 fingerprint from [hostfingerprints] and ' - 'add the following entry to the new ' - '[hostsecurity] section: %s:fingerprints=%s)\n') % - (host, host, nicefingerprint)) + ui.warn( + _( + '(SHA-1 fingerprint for %s found in legacy ' + '[hostfingerprints] section; ' + 'if you trust this fingerprint, remove the old ' + 'SHA-1 fingerprint from [hostfingerprints] and ' + 'add the following entry to the new ' + '[hostsecurity] section: %s:fingerprints=%s)\n' + ) + % (host, host, nicefingerprint) + ) return # Pinned fingerprint didn't match. This is a fatal error. @@ -868,25 +981,37 @@ else: section = 'hostsecurity' nice = '%s:%s' % (hash, fmtfingerprint(peerfingerprints[hash])) - raise error.Abort(_('certificate for %s has unexpected ' - 'fingerprint %s') % (host, nice), - hint=_('check %s configuration') % section) + raise error.Abort( + _('certificate for %s has unexpected ' 'fingerprint %s') + % (host, nice), + hint=_('check %s configuration') % section, + ) # Security is enabled but no CAs are loaded. We can't establish trust # for the cert so abort. if not sock._hgstate['caloaded']: raise error.Abort( - _('unable to verify security of %s (no loaded CA certificates); ' - 'refusing to connect') % host, - hint=_('see https://mercurial-scm.org/wiki/SecureConnections for ' - 'how to configure Mercurial to avoid this error or set ' - 'hostsecurity.%s:fingerprints=%s to trust this server') % - (host, nicefingerprint)) + _( + 'unable to verify security of %s (no loaded CA certificates); ' + 'refusing to connect' + ) + % host, + hint=_( + 'see https://mercurial-scm.org/wiki/SecureConnections for ' + 'how to configure Mercurial to avoid this error or set ' + 'hostsecurity.%s:fingerprints=%s to trust this server' + ) + % (host, nicefingerprint), + ) msg = _verifycert(peercert2, shost) if msg: - raise error.Abort(_('%s certificate error: %s') % (host, msg), - hint=_('set hostsecurity.%s:certfingerprints=%s ' - 'config setting or use --insecure to connect ' - 'insecurely') % - (host, nicefingerprint)) + raise error.Abort( + _('%s certificate error: %s') % (host, msg), + hint=_( + 'set hostsecurity.%s:certfingerprints=%s ' + 'config setting or use --insecure to connect ' + 'insecurely' + ) + % (host, nicefingerprint), + )