mercurial/sslutil.py
changeset 14204 5fa21960b2f4
child 14616 64dfbe576455
equal deleted inserted replaced
14203:b230922eb0c3 14204:5fa21960b2f4
       
     1 # sslutil.py - SSL handling for mercurial
       
     2 #
       
     3 # Copyright 2005, 2006, 2007, 2008 Matt Mackall <mpm@selenic.com>
       
     4 # Copyright 2006, 2007 Alexis S. L. Carvalho <alexis@cecm.usp.br>
       
     5 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
       
     6 #
       
     7 # This software may be used and distributed according to the terms of the
       
     8 # GNU General Public License version 2 or any later version.
       
     9 import os
       
    10 
       
    11 from mercurial import util
       
    12 from mercurial.i18n import _
       
    13 try:
       
    14     # avoid using deprecated/broken FakeSocket in python 2.6
       
    15     import ssl
       
    16     ssl_wrap_socket = ssl.wrap_socket
       
    17     CERT_REQUIRED = ssl.CERT_REQUIRED
       
    18 except ImportError:
       
    19     CERT_REQUIRED = 2
       
    20 
       
    21     def ssl_wrap_socket(sock, key_file, cert_file,
       
    22                         cert_reqs=CERT_REQUIRED, ca_certs=None):
       
    23         if ca_certs:
       
    24             raise util.Abort(_(
       
    25                 'certificate checking requires Python 2.6'))
       
    26 
       
    27         ssl = socket.ssl(sock, key_file, cert_file)
       
    28         return httplib.FakeSocket(sock, ssl)
       
    29 
       
    30 def _verifycert(cert, hostname):
       
    31     '''Verify that cert (in socket.getpeercert() format) matches hostname.
       
    32     CRLs is not handled.
       
    33 
       
    34     Returns error message if any problems are found and None on success.
       
    35     '''
       
    36     if not cert:
       
    37         return _('no certificate received')
       
    38     dnsname = hostname.lower()
       
    39     def matchdnsname(certname):
       
    40         return (certname == dnsname or
       
    41                 '.' in dnsname and certname == '*.' + dnsname.split('.', 1)[1])
       
    42 
       
    43     san = cert.get('subjectAltName', [])
       
    44     if san:
       
    45         certnames = [value.lower() for key, value in san if key == 'DNS']
       
    46         for name in certnames:
       
    47             if matchdnsname(name):
       
    48                 return None
       
    49         return _('certificate is for %s') % ', '.join(certnames)
       
    50 
       
    51     # subject is only checked when subjectAltName is empty
       
    52     for s in cert.get('subject', []):
       
    53         key, value = s[0]
       
    54         if key == 'commonName':
       
    55             try:
       
    56                 # 'subject' entries are unicode
       
    57                 certname = value.lower().encode('ascii')
       
    58             except UnicodeEncodeError:
       
    59                 return _('IDN in certificate not supported')
       
    60             if matchdnsname(certname):
       
    61                 return None
       
    62             return _('certificate is for %s') % certname
       
    63     return _('no commonName or subjectAltName found in certificate')
       
    64 
       
    65 
       
    66 # CERT_REQUIRED means fetch the cert from the server all the time AND
       
    67 # validate it against the CA store provided in web.cacerts.
       
    68 #
       
    69 # We COMPLETELY ignore CERT_REQUIRED on Python <= 2.5, as it's totally
       
    70 # busted on those versions.
       
    71 
       
    72 def sslkwargs(ui, host):
       
    73     cacerts = ui.config('web', 'cacerts')
       
    74     hostfingerprint = ui.config('hostfingerprints', host)
       
    75     if cacerts and not hostfingerprint:
       
    76         cacerts = util.expandpath(cacerts)
       
    77         if not os.path.exists(cacerts):
       
    78             raise util.Abort(_('could not find web.cacerts: %s') % cacerts)
       
    79         return {'ca_certs': cacerts,
       
    80                 'cert_reqs': CERT_REQUIRED,
       
    81                 }
       
    82     return {}
       
    83 
       
    84 class validator(object):
       
    85     def __init__(self, ui, host):
       
    86         self.ui = ui
       
    87         self.host = host
       
    88 
       
    89     def __call__(self, sock):
       
    90         host = self.host
       
    91         cacerts = self.ui.config('web', 'cacerts')
       
    92         hostfingerprint = self.ui.config('hostfingerprints', host)
       
    93         if cacerts and not hostfingerprint:
       
    94             msg = _verifycert(sock.getpeercert(), host)
       
    95             if msg:
       
    96                 raise util.Abort(_('%s certificate error: %s '
       
    97                                    '(use --insecure to connect '
       
    98                                    'insecurely)') % (host, msg))
       
    99             self.ui.debug('%s certificate successfully verified\n' % host)
       
   100         else:
       
   101             if getattr(sock, 'getpeercert', False):
       
   102                 peercert = sock.getpeercert(True)
       
   103                 peerfingerprint = util.sha1(peercert).hexdigest()
       
   104                 nicefingerprint = ":".join([peerfingerprint[x:x + 2]
       
   105                     for x in xrange(0, len(peerfingerprint), 2)])
       
   106                 if hostfingerprint:
       
   107                     if peerfingerprint.lower() != \
       
   108                             hostfingerprint.replace(':', '').lower():
       
   109                         raise util.Abort(_('invalid certificate for %s '
       
   110                                            'with fingerprint %s') %
       
   111                                          (host, nicefingerprint))
       
   112                     self.ui.debug('%s certificate matched fingerprint %s\n' %
       
   113                                   (host, nicefingerprint))
       
   114                 else:
       
   115                     self.ui.warn(_('warning: %s certificate '
       
   116                                    'with fingerprint %s not verified '
       
   117                                    '(check hostfingerprints or web.cacerts '
       
   118                                    'config setting)\n') %
       
   119                                  (host, nicefingerprint))
       
   120             else: # python 2.5 ?
       
   121                 if hostfingerprint:
       
   122                     raise util.Abort(_('no certificate for %s with '
       
   123                                        'configured hostfingerprint') % host)
       
   124                 self.ui.warn(_('warning: %s certificate not verified '
       
   125                                '(check web.cacerts config setting)\n') %
       
   126                              host)