Mercurial > public > mercurial-scm > hg
comparison mercurial/sslutil.py @ 14204:5fa21960b2f4
sslutil: extracted ssl methods from httpsconnection in url.py
This makes it easier to share ssl cert validation with other http
implementations.
author | Augie Fackler <durin42@gmail.com> |
---|---|
date | Wed, 04 May 2011 22:08:55 -0500 |
parents | |
children | 64dfbe576455 |
comparison
equal
deleted
inserted
replaced
14203:b230922eb0c3 | 14204:5fa21960b2f4 |
---|---|
1 # sslutil.py - SSL handling for mercurial | |
2 # | |
3 # Copyright 2005, 2006, 2007, 2008 Matt Mackall <mpm@selenic.com> | |
4 # Copyright 2006, 2007 Alexis S. L. Carvalho <alexis@cecm.usp.br> | |
5 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com> | |
6 # | |
7 # This software may be used and distributed according to the terms of the | |
8 # GNU General Public License version 2 or any later version. | |
9 import os | |
10 | |
11 from mercurial import util | |
12 from mercurial.i18n import _ | |
13 try: | |
14 # avoid using deprecated/broken FakeSocket in python 2.6 | |
15 import ssl | |
16 ssl_wrap_socket = ssl.wrap_socket | |
17 CERT_REQUIRED = ssl.CERT_REQUIRED | |
18 except ImportError: | |
19 CERT_REQUIRED = 2 | |
20 | |
21 def ssl_wrap_socket(sock, key_file, cert_file, | |
22 cert_reqs=CERT_REQUIRED, ca_certs=None): | |
23 if ca_certs: | |
24 raise util.Abort(_( | |
25 'certificate checking requires Python 2.6')) | |
26 | |
27 ssl = socket.ssl(sock, key_file, cert_file) | |
28 return httplib.FakeSocket(sock, ssl) | |
29 | |
30 def _verifycert(cert, hostname): | |
31 '''Verify that cert (in socket.getpeercert() format) matches hostname. | |
32 CRLs is not handled. | |
33 | |
34 Returns error message if any problems are found and None on success. | |
35 ''' | |
36 if not cert: | |
37 return _('no certificate received') | |
38 dnsname = hostname.lower() | |
39 def matchdnsname(certname): | |
40 return (certname == dnsname or | |
41 '.' in dnsname and certname == '*.' + dnsname.split('.', 1)[1]) | |
42 | |
43 san = cert.get('subjectAltName', []) | |
44 if san: | |
45 certnames = [value.lower() for key, value in san if key == 'DNS'] | |
46 for name in certnames: | |
47 if matchdnsname(name): | |
48 return None | |
49 return _('certificate is for %s') % ', '.join(certnames) | |
50 | |
51 # subject is only checked when subjectAltName is empty | |
52 for s in cert.get('subject', []): | |
53 key, value = s[0] | |
54 if key == 'commonName': | |
55 try: | |
56 # 'subject' entries are unicode | |
57 certname = value.lower().encode('ascii') | |
58 except UnicodeEncodeError: | |
59 return _('IDN in certificate not supported') | |
60 if matchdnsname(certname): | |
61 return None | |
62 return _('certificate is for %s') % certname | |
63 return _('no commonName or subjectAltName found in certificate') | |
64 | |
65 | |
66 # CERT_REQUIRED means fetch the cert from the server all the time AND | |
67 # validate it against the CA store provided in web.cacerts. | |
68 # | |
69 # We COMPLETELY ignore CERT_REQUIRED on Python <= 2.5, as it's totally | |
70 # busted on those versions. | |
71 | |
72 def sslkwargs(ui, host): | |
73 cacerts = ui.config('web', 'cacerts') | |
74 hostfingerprint = ui.config('hostfingerprints', host) | |
75 if cacerts and not hostfingerprint: | |
76 cacerts = util.expandpath(cacerts) | |
77 if not os.path.exists(cacerts): | |
78 raise util.Abort(_('could not find web.cacerts: %s') % cacerts) | |
79 return {'ca_certs': cacerts, | |
80 'cert_reqs': CERT_REQUIRED, | |
81 } | |
82 return {} | |
83 | |
84 class validator(object): | |
85 def __init__(self, ui, host): | |
86 self.ui = ui | |
87 self.host = host | |
88 | |
89 def __call__(self, sock): | |
90 host = self.host | |
91 cacerts = self.ui.config('web', 'cacerts') | |
92 hostfingerprint = self.ui.config('hostfingerprints', host) | |
93 if cacerts and not hostfingerprint: | |
94 msg = _verifycert(sock.getpeercert(), host) | |
95 if msg: | |
96 raise util.Abort(_('%s certificate error: %s ' | |
97 '(use --insecure to connect ' | |
98 'insecurely)') % (host, msg)) | |
99 self.ui.debug('%s certificate successfully verified\n' % host) | |
100 else: | |
101 if getattr(sock, 'getpeercert', False): | |
102 peercert = sock.getpeercert(True) | |
103 peerfingerprint = util.sha1(peercert).hexdigest() | |
104 nicefingerprint = ":".join([peerfingerprint[x:x + 2] | |
105 for x in xrange(0, len(peerfingerprint), 2)]) | |
106 if hostfingerprint: | |
107 if peerfingerprint.lower() != \ | |
108 hostfingerprint.replace(':', '').lower(): | |
109 raise util.Abort(_('invalid certificate for %s ' | |
110 'with fingerprint %s') % | |
111 (host, nicefingerprint)) | |
112 self.ui.debug('%s certificate matched fingerprint %s\n' % | |
113 (host, nicefingerprint)) | |
114 else: | |
115 self.ui.warn(_('warning: %s certificate ' | |
116 'with fingerprint %s not verified ' | |
117 '(check hostfingerprints or web.cacerts ' | |
118 'config setting)\n') % | |
119 (host, nicefingerprint)) | |
120 else: # python 2.5 ? | |
121 if hostfingerprint: | |
122 raise util.Abort(_('no certificate for %s with ' | |
123 'configured hostfingerprint') % host) | |
124 self.ui.warn(_('warning: %s certificate not verified ' | |
125 '(check web.cacerts config setting)\n') % | |
126 host) |