--- a/mercurial/sslutil.py Sat Oct 05 10:29:34 2019 -0400
+++ b/mercurial/sslutil.py Sun Oct 06 09:45:02 2019 -0400
@@ -113,6 +113,7 @@
return ssl.wrap_socket(socket, **args)
+
def _hostsettings(ui, hostname):
"""Obtain security settings for a hostname.
@@ -149,10 +150,11 @@
def validateprotocol(protocol, key):
if protocol not in configprotocols:
raise error.Abort(
- _('unsupported protocol from hostsecurity.%s: %s') %
- (key, protocol),
- hint=_('valid protocols: %s') %
- ' '.join(sorted(configprotocols)))
+ _('unsupported protocol from hostsecurity.%s: %s')
+ % (key, protocol),
+ hint=_('valid protocols: %s')
+ % ' '.join(sorted(configprotocols)),
+ )
# We default to TLS 1.1+ where we can because TLS 1.0 has known
# vulnerabilities (like BEAST and POODLE). We allow users to downgrade to
@@ -165,10 +167,15 @@
# the bold warnings on the web site.
# internal config: hostsecurity.disabletls10warning
if not ui.configbool('hostsecurity', 'disabletls10warning'):
- ui.warn(_('warning: connecting to %s using legacy security '
- 'technology (TLS 1.0); see '
- 'https://mercurial-scm.org/wiki/SecureConnections for '
- 'more info\n') % bhostname)
+ ui.warn(
+ _(
+ 'warning: connecting to %s using legacy security '
+ 'technology (TLS 1.0); see '
+ 'https://mercurial-scm.org/wiki/SecureConnections for '
+ 'more info\n'
+ )
+ % bhostname
+ )
defaultprotocol = 'tls1.0'
key = 'minimumprotocol'
@@ -196,10 +203,10 @@
fingerprints = ui.configlist('hostsecurity', '%s:fingerprints' % bhostname)
for fingerprint in fingerprints:
if not (fingerprint.startswith(('sha1:', 'sha256:', 'sha512:'))):
- raise error.Abort(_('invalid fingerprint for %s: %s') % (
- bhostname, fingerprint),
- hint=_('must begin with "sha1:", "sha256:", '
- 'or "sha512:"'))
+ raise error.Abort(
+ _('invalid fingerprint for %s: %s') % (bhostname, fingerprint),
+ hint=_('must begin with "sha1:", "sha256:", ' 'or "sha512:"'),
+ )
alg, fingerprint = fingerprint.split(':', 1)
fingerprint = fingerprint.replace(':', '').lower()
@@ -231,9 +238,14 @@
# being performed.
cafile = ui.config('hostsecurity', '%s:verifycertsfile' % bhostname)
if s['certfingerprints'] and cafile:
- ui.warn(_('(hostsecurity.%s:verifycertsfile ignored when host '
- 'fingerprints defined; using host fingerprints for '
- 'verification)\n') % bhostname)
+ ui.warn(
+ _(
+ '(hostsecurity.%s:verifycertsfile ignored when host '
+ 'fingerprints defined; using host fingerprints for '
+ 'verification)\n'
+ )
+ % bhostname
+ )
# Try to hook up CA certificate validation unless something above
# makes it not necessary.
@@ -242,9 +254,10 @@
if cafile:
cafile = util.expandpath(cafile)
if not os.path.exists(cafile):
- raise error.Abort(_('path specified by %s does not exist: %s') %
- ('hostsecurity.%s:verifycertsfile' % (
- bhostname,), cafile))
+ raise error.Abort(
+ _('path specified by %s does not exist: %s')
+ % ('hostsecurity.%s:verifycertsfile' % (bhostname,), cafile)
+ )
s['cafile'] = cafile
else:
# Find global certificates file in config.
@@ -253,8 +266,9 @@
if cafile:
cafile = util.expandpath(cafile)
if not os.path.exists(cafile):
- raise error.Abort(_('could not find web.cacerts: %s') %
- cafile)
+ raise error.Abort(
+ _('could not find web.cacerts: %s') % cafile
+ )
elif s['allowloaddefaultcerts']:
# CAs not defined in config. Try to find system bundles.
cafile = _defaultcacerts(ui)
@@ -281,6 +295,7 @@
return s
+
def protocolsettings(protocol):
"""Resolve the protocol for a config value.
@@ -304,10 +319,14 @@
# full/real SSLContext available to us.
if supportedprotocols == {'tls1.0'}:
if protocol != 'tls1.0':
- raise error.Abort(_('current Python does not support protocol '
- 'setting %s') % protocol,
- hint=_('upgrade Python or disable setting since '
- 'only TLS 1.0 is supported'))
+ raise error.Abort(
+ _('current Python does not support protocol ' 'setting %s')
+ % protocol,
+ hint=_(
+ 'upgrade Python or disable setting since '
+ 'only TLS 1.0 is supported'
+ ),
+ )
return ssl.PROTOCOL_TLSv1, 0, 'tls1.0'
@@ -333,6 +352,7 @@
return ssl.PROTOCOL_SSLv23, options, protocol
+
def wrapsocket(sock, keyfile, certfile, ui, serverhostname=None):
"""Add SSL/TLS to a socket.
@@ -352,21 +372,29 @@
if b'SSLKEYLOGFILE' in encoding.environ:
try:
import sslkeylog
- sslkeylog.set_keylog(pycompat.fsdecode(
- encoding.environ[b'SSLKEYLOGFILE']))
+
+ sslkeylog.set_keylog(
+ pycompat.fsdecode(encoding.environ[b'SSLKEYLOGFILE'])
+ )
ui.warn(
- b'sslkeylog enabled by SSLKEYLOGFILE environment variable\n')
+ b'sslkeylog enabled by SSLKEYLOGFILE environment variable\n'
+ )
except ImportError:
- ui.warn(b'sslkeylog module missing, '
- b'but SSLKEYLOGFILE set in environment\n')
+ ui.warn(
+ b'sslkeylog module missing, '
+ b'but SSLKEYLOGFILE set in environment\n'
+ )
for f in (keyfile, certfile):
if f and not os.path.exists(f):
raise error.Abort(
_('certificate file (%s) does not exist; cannot connect to %s')
% (f, pycompat.bytesurl(serverhostname)),
- hint=_('restore missing file or fix references '
- 'in Mercurial config'))
+ hint=_(
+ 'restore missing file or fix references '
+ 'in Mercurial config'
+ ),
+ )
settings = _hostsettings(ui, serverhostname)
@@ -392,26 +420,31 @@
raise error.Abort(
_('could not set ciphers: %s')
% stringutil.forcebytestr(e.args[0]),
- hint=_('change cipher string (%s) in config') %
- settings['ciphers'])
+ hint=_('change cipher string (%s) in config')
+ % settings['ciphers'],
+ )
if certfile is not None:
+
def password():
f = keyfile or certfile
return ui.getpass(_('passphrase for %s: ') % f, '')
+
sslcontext.load_cert_chain(certfile, keyfile, password)
if settings['cafile'] is not None:
try:
sslcontext.load_verify_locations(cafile=settings['cafile'])
except ssl.SSLError as e:
- if len(e.args) == 1: # pypy has different SSLError args
+ if len(e.args) == 1: # pypy has different SSLError args
msg = e.args[0]
else:
msg = e.args[1]
- raise error.Abort(_('error loading CA file %s: %s') % (
- settings['cafile'], stringutil.forcebytestr(msg)),
- hint=_('file is empty or malformed?'))
+ raise error.Abort(
+ _('error loading CA file %s: %s')
+ % (settings['cafile'], stringutil.forcebytestr(msg)),
+ hint=_('file is empty or malformed?'),
+ )
caloaded = True
elif settings['allowloaddefaultcerts']:
# This is a no-op on old Python.
@@ -433,13 +466,21 @@
# When the main 20916 bug occurs, 'sslcontext.get_ca_certs()' is a
# non-empty list, but the following conditional is otherwise True.
try:
- if (caloaded and settings['verifymode'] == ssl.CERT_REQUIRED and
- modernssl and not sslcontext.get_ca_certs()):
- ui.warn(_('(an attempt was made to load CA certificates but '
- 'none were loaded; see '
- 'https://mercurial-scm.org/wiki/SecureConnections '
- 'for how to configure Mercurial to avoid this '
- 'error)\n'))
+ if (
+ caloaded
+ and settings['verifymode'] == ssl.CERT_REQUIRED
+ and modernssl
+ and not sslcontext.get_ca_certs()
+ ):
+ ui.warn(
+ _(
+ '(an attempt was made to load CA certificates but '
+ 'none were loaded; see '
+ 'https://mercurial-scm.org/wiki/SecureConnections '
+ 'for how to configure Mercurial to avoid this '
+ 'error)\n'
+ )
+ )
except ssl.SSLError:
pass
@@ -459,51 +500,76 @@
# client doesn't support modern TLS versions introduced
# several years from when this comment was written).
if supportedprotocols != {'tls1.0'}:
- ui.warn(_(
- '(could not communicate with %s using security '
- 'protocols %s; if you are using a modern Mercurial '
- 'version, consider contacting the operator of this '
- 'server; see '
- 'https://mercurial-scm.org/wiki/SecureConnections '
- 'for more info)\n') % (
+ ui.warn(
+ _(
+ '(could not communicate with %s using security '
+ 'protocols %s; if you are using a modern Mercurial '
+ 'version, consider contacting the operator of this '
+ 'server; see '
+ 'https://mercurial-scm.org/wiki/SecureConnections '
+ 'for more info)\n'
+ )
+ % (
pycompat.bytesurl(serverhostname),
- ', '.join(sorted(supportedprotocols))))
+ ', '.join(sorted(supportedprotocols)),
+ )
+ )
else:
- ui.warn(_(
- '(could not communicate with %s using TLS 1.0; the '
- 'likely cause of this is the server no longer '
- 'supports TLS 1.0 because it has known security '
- 'vulnerabilities; see '
- 'https://mercurial-scm.org/wiki/SecureConnections '
- 'for more info)\n') %
- pycompat.bytesurl(serverhostname))
+ ui.warn(
+ _(
+ '(could not communicate with %s using TLS 1.0; the '
+ 'likely cause of this is the server no longer '
+ 'supports TLS 1.0 because it has known security '
+ 'vulnerabilities; see '
+ 'https://mercurial-scm.org/wiki/SecureConnections '
+ 'for more info)\n'
+ )
+ % pycompat.bytesurl(serverhostname)
+ )
else:
# We attempted TLS 1.1+. We can only get here if the client
# supports the configured protocol. So the likely reason is
# the client wants better security than the server can
# offer.
- ui.warn(_(
- '(could not negotiate a common security protocol (%s+) '
- 'with %s; the likely cause is Mercurial is configured '
- 'to be more secure than the server can support)\n') % (
- settings['protocolui'],
- pycompat.bytesurl(serverhostname)))
- ui.warn(_('(consider contacting the operator of this '
- 'server and ask them to support modern TLS '
- 'protocol versions; or, set '
- 'hostsecurity.%s:minimumprotocol=tls1.0 to allow '
- 'use of legacy, less secure protocols when '
- 'communicating with this server)\n') %
- pycompat.bytesurl(serverhostname))
- ui.warn(_(
- '(see https://mercurial-scm.org/wiki/SecureConnections '
- 'for more info)\n'))
+ ui.warn(
+ _(
+ '(could not negotiate a common security protocol (%s+) '
+ 'with %s; the likely cause is Mercurial is configured '
+ 'to be more secure than the server can support)\n'
+ )
+ % (
+ settings['protocolui'],
+ pycompat.bytesurl(serverhostname),
+ )
+ )
+ ui.warn(
+ _(
+ '(consider contacting the operator of this '
+ 'server and ask them to support modern TLS '
+ 'protocol versions; or, set '
+ 'hostsecurity.%s:minimumprotocol=tls1.0 to allow '
+ 'use of legacy, less secure protocols when '
+ 'communicating with this server)\n'
+ )
+ % pycompat.bytesurl(serverhostname)
+ )
+ ui.warn(
+ _(
+ '(see https://mercurial-scm.org/wiki/SecureConnections '
+ 'for more info)\n'
+ )
+ )
- elif (e.reason == r'CERTIFICATE_VERIFY_FAILED' and
- pycompat.iswindows):
+ elif (
+ e.reason == r'CERTIFICATE_VERIFY_FAILED' and pycompat.iswindows
+ ):
- ui.warn(_('(the full certificate chain may not be available '
- 'locally; see "hg help debugssl")\n'))
+ ui.warn(
+ _(
+ '(the full certificate chain may not be available '
+ 'locally; see "hg help debugssl")\n'
+ )
+ )
raise
# check if wrap_socket failed silently because socket had been
@@ -521,8 +587,10 @@
return sslsocket
-def wrapserversocket(sock, ui, certfile=None, keyfile=None, cafile=None,
- requireclientcert=False):
+
+def wrapserversocket(
+ sock, ui, certfile=None, keyfile=None, cafile=None, requireclientcert=False
+):
"""Wrap a socket for use by servers.
``certfile`` and ``keyfile`` specify the files containing the certificate's
@@ -539,8 +607,9 @@
# doesn't have to be as detailed as for wrapsocket().
for f in (certfile, keyfile, cafile):
if f and not os.path.exists(f):
- raise error.Abort(_('referenced certificate file (%s) does not '
- 'exist') % f)
+ raise error.Abort(
+ _('referenced certificate file (%s) does not ' 'exist') % f
+ )
protocol, options, _protocolui = protocolsettings('tls1.0')
@@ -558,8 +627,9 @@
raise error.Abort(_('TLS 1.2 not supported by this Python'))
protocol = ssl.PROTOCOL_TLSv1_2
elif exactprotocol:
- raise error.Abort(_('invalid value for serverexactprotocol: %s') %
- exactprotocol)
+ raise error.Abort(
+ _('invalid value for serverexactprotocol: %s') % exactprotocol
+ )
if modernssl:
# We /could/ use create_default_context() here since it doesn't load
@@ -592,9 +662,11 @@
return sslcontext.wrap_socket(sock, server_side=True)
+
class wildcarderror(Exception):
"""Represents an error parsing wildcards in DNS name."""
+
def _dnsnamematch(dn, hostname, maxwildcards=1):
"""Match DNS names according RFC 6125 section 6.4.3.
@@ -615,7 +687,8 @@
wildcards = leftmost.count('*')
if wildcards > maxwildcards:
raise wildcarderror(
- _('too many wildcards in certificate DNS name: %s') % dn)
+ _('too many wildcards in certificate DNS name: %s') % dn
+ )
# speed up common case w/o wildcards
if not wildcards:
@@ -645,6 +718,7 @@
pat = re.compile(br'\A' + br'\.'.join(pats) + br'\Z', re.IGNORECASE)
return pat.match(hostname) is not None
+
def _verifycert(cert, hostname):
'''Verify that cert (in socket.getpeercert() format) matches hostname.
CRLs is not handled.
@@ -695,6 +769,7 @@
else:
return _('no commonName or subjectAltName found in certificate')
+
def _plainapplepython():
"""return true if this seems to be a pure Apple Python that
* is unfrozen and presumably has the whole mercurial module in the file
@@ -703,12 +778,17 @@
for using system certificate store CAs in addition to the provided
cacerts file
"""
- if (not pycompat.isdarwin or procutil.mainfrozen() or
- not pycompat.sysexecutable):
+ if (
+ not pycompat.isdarwin
+ or procutil.mainfrozen()
+ or not pycompat.sysexecutable
+ ):
return False
exe = os.path.realpath(pycompat.sysexecutable).lower()
- return (exe.startswith('/usr/bin/python') or
- exe.startswith('/system/library/frameworks/python.framework/'))
+ return exe.startswith('/usr/bin/python') or exe.startswith(
+ '/system/library/frameworks/python.framework/'
+ )
+
_systemcacertpaths = [
# RHEL, CentOS, and Fedora
@@ -717,6 +797,7 @@
'/etc/ssl/certs/ca-certificates.crt',
]
+
def _defaultcacerts(ui):
"""return path to default CA certificates or None.
@@ -731,6 +812,7 @@
# and usable, assume the user intends it to be used and use it.
try:
import certifi
+
certs = certifi.where()
if os.path.exists(certs):
ui.debug('using ca certificates from certifi\n')
@@ -745,9 +827,13 @@
# Assertion: this code is only called if certificates are being verified.
if pycompat.iswindows:
if not _canloaddefaultcerts:
- ui.warn(_('(unable to load Windows CA certificates; see '
- 'https://mercurial-scm.org/wiki/SecureConnections for '
- 'how to configure Mercurial to avoid this message)\n'))
+ ui.warn(
+ _(
+ '(unable to load Windows CA certificates; see '
+ 'https://mercurial-scm.org/wiki/SecureConnections for '
+ 'how to configure Mercurial to avoid this message)\n'
+ )
+ )
return None
@@ -756,7 +842,8 @@
# trick.
if _plainapplepython():
dummycert = os.path.join(
- os.path.dirname(pycompat.fsencode(__file__)), 'dummycert.pem')
+ os.path.dirname(pycompat.fsencode(__file__)), 'dummycert.pem'
+ )
if os.path.exists(dummycert):
return dummycert
@@ -767,9 +854,13 @@
# files. Also consider exporting the keychain certs to a file during
# Mercurial install.
if not _canloaddefaultcerts:
- ui.warn(_('(unable to load CA certificates; see '
- 'https://mercurial-scm.org/wiki/SecureConnections for '
- 'how to configure Mercurial to avoid this message)\n'))
+ ui.warn(
+ _(
+ '(unable to load CA certificates; see '
+ 'https://mercurial-scm.org/wiki/SecureConnections for '
+ 'how to configure Mercurial to avoid this message)\n'
+ )
+ )
return None
# / is writable on Windows. Out of an abundance of caution make sure
@@ -787,20 +878,30 @@
if not _canloaddefaultcerts:
for path in _systemcacertpaths:
if os.path.isfile(path):
- ui.warn(_('(using CA certificates from %s; if you see this '
- 'message, your Mercurial install is not properly '
- 'configured; see '
- 'https://mercurial-scm.org/wiki/SecureConnections '
- 'for how to configure Mercurial to avoid this '
- 'message)\n') % path)
+ ui.warn(
+ _(
+ '(using CA certificates from %s; if you see this '
+ 'message, your Mercurial install is not properly '
+ 'configured; see '
+ 'https://mercurial-scm.org/wiki/SecureConnections '
+ 'for how to configure Mercurial to avoid this '
+ 'message)\n'
+ )
+ % path
+ )
return path
- ui.warn(_('(unable to load CA certificates; see '
- 'https://mercurial-scm.org/wiki/SecureConnections for '
- 'how to configure Mercurial to avoid this message)\n'))
+ ui.warn(
+ _(
+ '(unable to load CA certificates; see '
+ 'https://mercurial-scm.org/wiki/SecureConnections for '
+ 'how to configure Mercurial to avoid this message)\n'
+ )
+ )
return None
+
def validatesocket(sock):
"""Validate a socket meets security requirements.
@@ -818,8 +919,9 @@
raise error.Abort(_('%s ssl connection error') % host)
if not peercert:
- raise error.Abort(_('%s certificate error: '
- 'no certificate received') % host)
+ raise error.Abort(
+ _('%s certificate error: ' 'no certificate received') % host
+ )
if settings['disablecertverification']:
# We don't print the certificate fingerprint because it shouldn't
@@ -828,9 +930,14 @@
# to verify the certificate and this message would have printed the
# fingerprint. So printing the fingerprint here adds little to no
# value.
- ui.warn(_('warning: connection security to %s is disabled per current '
- 'settings; communication is susceptible to eavesdropping '
- 'and tampering\n') % host)
+ ui.warn(
+ _(
+ 'warning: connection security to %s is disabled per current '
+ 'settings; communication is susceptible to eavesdropping '
+ 'and tampering\n'
+ )
+ % host
+ )
return
# If a certificate fingerprint is pinned, use it and only it to
@@ -842,23 +949,29 @@
}
def fmtfingerprint(s):
- return ':'.join([s[x:x + 2] for x in range(0, len(s), 2)])
+ return ':'.join([s[x : x + 2] for x in range(0, len(s), 2)])
nicefingerprint = 'sha256:%s' % fmtfingerprint(peerfingerprints['sha256'])
if settings['certfingerprints']:
for hash, fingerprint in settings['certfingerprints']:
if peerfingerprints[hash].lower() == fingerprint:
- ui.debug('%s certificate matched fingerprint %s:%s\n' %
- (host, hash, fmtfingerprint(fingerprint)))
+ ui.debug(
+ '%s certificate matched fingerprint %s:%s\n'
+ % (host, hash, fmtfingerprint(fingerprint))
+ )
if settings['legacyfingerprint']:
- ui.warn(_('(SHA-1 fingerprint for %s found in legacy '
- '[hostfingerprints] section; '
- 'if you trust this fingerprint, remove the old '
- 'SHA-1 fingerprint from [hostfingerprints] and '
- 'add the following entry to the new '
- '[hostsecurity] section: %s:fingerprints=%s)\n') %
- (host, host, nicefingerprint))
+ ui.warn(
+ _(
+ '(SHA-1 fingerprint for %s found in legacy '
+ '[hostfingerprints] section; '
+ 'if you trust this fingerprint, remove the old '
+ 'SHA-1 fingerprint from [hostfingerprints] and '
+ 'add the following entry to the new '
+ '[hostsecurity] section: %s:fingerprints=%s)\n'
+ )
+ % (host, host, nicefingerprint)
+ )
return
# Pinned fingerprint didn't match. This is a fatal error.
@@ -868,25 +981,37 @@
else:
section = 'hostsecurity'
nice = '%s:%s' % (hash, fmtfingerprint(peerfingerprints[hash]))
- raise error.Abort(_('certificate for %s has unexpected '
- 'fingerprint %s') % (host, nice),
- hint=_('check %s configuration') % section)
+ raise error.Abort(
+ _('certificate for %s has unexpected ' 'fingerprint %s')
+ % (host, nice),
+ hint=_('check %s configuration') % section,
+ )
# Security is enabled but no CAs are loaded. We can't establish trust
# for the cert so abort.
if not sock._hgstate['caloaded']:
raise error.Abort(
- _('unable to verify security of %s (no loaded CA certificates); '
- 'refusing to connect') % host,
- hint=_('see https://mercurial-scm.org/wiki/SecureConnections for '
- 'how to configure Mercurial to avoid this error or set '
- 'hostsecurity.%s:fingerprints=%s to trust this server') %
- (host, nicefingerprint))
+ _(
+ 'unable to verify security of %s (no loaded CA certificates); '
+ 'refusing to connect'
+ )
+ % host,
+ hint=_(
+ 'see https://mercurial-scm.org/wiki/SecureConnections for '
+ 'how to configure Mercurial to avoid this error or set '
+ 'hostsecurity.%s:fingerprints=%s to trust this server'
+ )
+ % (host, nicefingerprint),
+ )
msg = _verifycert(peercert2, shost)
if msg:
- raise error.Abort(_('%s certificate error: %s') % (host, msg),
- hint=_('set hostsecurity.%s:certfingerprints=%s '
- 'config setting or use --insecure to connect '
- 'insecurely') %
- (host, nicefingerprint))
+ raise error.Abort(
+ _('%s certificate error: %s') % (host, msg),
+ hint=_(
+ 'set hostsecurity.%s:certfingerprints=%s '
+ 'config setting or use --insecure to connect '
+ 'insecurely'
+ )
+ % (host, nicefingerprint),
+ )