|
1 # sslutil.py - SSL handling for mercurial |
|
2 # |
|
3 # Copyright 2005, 2006, 2007, 2008 Matt Mackall <mpm@selenic.com> |
|
4 # Copyright 2006, 2007 Alexis S. L. Carvalho <alexis@cecm.usp.br> |
|
5 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com> |
|
6 # |
|
7 # This software may be used and distributed according to the terms of the |
|
8 # GNU General Public License version 2 or any later version. |
|
9 import os |
|
10 |
|
11 from mercurial import util |
|
12 from mercurial.i18n import _ |
|
13 try: |
|
14 # avoid using deprecated/broken FakeSocket in python 2.6 |
|
15 import ssl |
|
16 ssl_wrap_socket = ssl.wrap_socket |
|
17 CERT_REQUIRED = ssl.CERT_REQUIRED |
|
18 except ImportError: |
|
19 CERT_REQUIRED = 2 |
|
20 |
|
21 def ssl_wrap_socket(sock, key_file, cert_file, |
|
22 cert_reqs=CERT_REQUIRED, ca_certs=None): |
|
23 if ca_certs: |
|
24 raise util.Abort(_( |
|
25 'certificate checking requires Python 2.6')) |
|
26 |
|
27 ssl = socket.ssl(sock, key_file, cert_file) |
|
28 return httplib.FakeSocket(sock, ssl) |
|
29 |
|
30 def _verifycert(cert, hostname): |
|
31 '''Verify that cert (in socket.getpeercert() format) matches hostname. |
|
32 CRLs is not handled. |
|
33 |
|
34 Returns error message if any problems are found and None on success. |
|
35 ''' |
|
36 if not cert: |
|
37 return _('no certificate received') |
|
38 dnsname = hostname.lower() |
|
39 def matchdnsname(certname): |
|
40 return (certname == dnsname or |
|
41 '.' in dnsname and certname == '*.' + dnsname.split('.', 1)[1]) |
|
42 |
|
43 san = cert.get('subjectAltName', []) |
|
44 if san: |
|
45 certnames = [value.lower() for key, value in san if key == 'DNS'] |
|
46 for name in certnames: |
|
47 if matchdnsname(name): |
|
48 return None |
|
49 return _('certificate is for %s') % ', '.join(certnames) |
|
50 |
|
51 # subject is only checked when subjectAltName is empty |
|
52 for s in cert.get('subject', []): |
|
53 key, value = s[0] |
|
54 if key == 'commonName': |
|
55 try: |
|
56 # 'subject' entries are unicode |
|
57 certname = value.lower().encode('ascii') |
|
58 except UnicodeEncodeError: |
|
59 return _('IDN in certificate not supported') |
|
60 if matchdnsname(certname): |
|
61 return None |
|
62 return _('certificate is for %s') % certname |
|
63 return _('no commonName or subjectAltName found in certificate') |
|
64 |
|
65 |
|
66 # CERT_REQUIRED means fetch the cert from the server all the time AND |
|
67 # validate it against the CA store provided in web.cacerts. |
|
68 # |
|
69 # We COMPLETELY ignore CERT_REQUIRED on Python <= 2.5, as it's totally |
|
70 # busted on those versions. |
|
71 |
|
72 def sslkwargs(ui, host): |
|
73 cacerts = ui.config('web', 'cacerts') |
|
74 hostfingerprint = ui.config('hostfingerprints', host) |
|
75 if cacerts and not hostfingerprint: |
|
76 cacerts = util.expandpath(cacerts) |
|
77 if not os.path.exists(cacerts): |
|
78 raise util.Abort(_('could not find web.cacerts: %s') % cacerts) |
|
79 return {'ca_certs': cacerts, |
|
80 'cert_reqs': CERT_REQUIRED, |
|
81 } |
|
82 return {} |
|
83 |
|
84 class validator(object): |
|
85 def __init__(self, ui, host): |
|
86 self.ui = ui |
|
87 self.host = host |
|
88 |
|
89 def __call__(self, sock): |
|
90 host = self.host |
|
91 cacerts = self.ui.config('web', 'cacerts') |
|
92 hostfingerprint = self.ui.config('hostfingerprints', host) |
|
93 if cacerts and not hostfingerprint: |
|
94 msg = _verifycert(sock.getpeercert(), host) |
|
95 if msg: |
|
96 raise util.Abort(_('%s certificate error: %s ' |
|
97 '(use --insecure to connect ' |
|
98 'insecurely)') % (host, msg)) |
|
99 self.ui.debug('%s certificate successfully verified\n' % host) |
|
100 else: |
|
101 if getattr(sock, 'getpeercert', False): |
|
102 peercert = sock.getpeercert(True) |
|
103 peerfingerprint = util.sha1(peercert).hexdigest() |
|
104 nicefingerprint = ":".join([peerfingerprint[x:x + 2] |
|
105 for x in xrange(0, len(peerfingerprint), 2)]) |
|
106 if hostfingerprint: |
|
107 if peerfingerprint.lower() != \ |
|
108 hostfingerprint.replace(':', '').lower(): |
|
109 raise util.Abort(_('invalid certificate for %s ' |
|
110 'with fingerprint %s') % |
|
111 (host, nicefingerprint)) |
|
112 self.ui.debug('%s certificate matched fingerprint %s\n' % |
|
113 (host, nicefingerprint)) |
|
114 else: |
|
115 self.ui.warn(_('warning: %s certificate ' |
|
116 'with fingerprint %s not verified ' |
|
117 '(check hostfingerprints or web.cacerts ' |
|
118 'config setting)\n') % |
|
119 (host, nicefingerprint)) |
|
120 else: # python 2.5 ? |
|
121 if hostfingerprint: |
|
122 raise util.Abort(_('no certificate for %s with ' |
|
123 'configured hostfingerprint') % host) |
|
124 self.ui.warn(_('warning: %s certificate not verified ' |
|
125 '(check web.cacerts config setting)\n') % |
|
126 host) |