1 from __future__ import absolute_import |
1 from __future__ import absolute_import |
2 |
2 |
3 import base64 |
3 import base64 |
|
4 import hashlib |
4 |
5 |
5 from mercurial.hgweb import common |
6 from mercurial.hgweb import common |
|
7 from mercurial import ( |
|
8 node, |
|
9 ) |
|
10 |
|
11 def parse_keqv_list(req, l): |
|
12 """Parse list of key=value strings where keys are not duplicated.""" |
|
13 parsed = {} |
|
14 for elt in l: |
|
15 k, v = elt.split(b'=', 1) |
|
16 if v[0:1] == b'"' and v[-1:] == b'"': |
|
17 v = v[1:-1] |
|
18 parsed[k] = v |
|
19 return parsed |
|
20 |
|
21 class digestauthserver(object): |
|
22 def __init__(self): |
|
23 self._user_hashes = {} |
|
24 |
|
25 def gethashers(self): |
|
26 def _md5sum(x): |
|
27 m = hashlib.md5() |
|
28 m.update(x) |
|
29 return node.hex(m.digest()) |
|
30 |
|
31 h = _md5sum |
|
32 |
|
33 kd = lambda s, d, h=h: h(b"%s:%s" % (s, d)) |
|
34 return h, kd |
|
35 |
|
36 def adduser(self, user, password, realm): |
|
37 h, kd = self.gethashers() |
|
38 a1 = h(b'%s:%s:%s' % (user, realm, password)) |
|
39 self._user_hashes[(user, realm)] = a1 |
|
40 |
|
41 def makechallenge(self, realm): |
|
42 # We aren't testing the protocol here, just that the bytes make the |
|
43 # proper round trip. So hardcoded seems fine. |
|
44 nonce = b'064af982c5b571cea6450d8eda91c20d' |
|
45 return b'realm="%s", nonce="%s", algorithm=MD5, qop="auth"' % (realm, |
|
46 nonce) |
|
47 |
|
48 def checkauth(self, req, header): |
|
49 log = req.rawenv[b'wsgi.errors'] |
|
50 |
|
51 h, kd = self.gethashers() |
|
52 resp = parse_keqv_list(req, header.split(b', ')) |
|
53 |
|
54 if resp.get(b'algorithm', b'MD5').upper() != b'MD5': |
|
55 log.write(b'Unsupported algorithm: %s' % resp.get(b'algorithm')) |
|
56 raise common.ErrorResponse(common.HTTP_FORBIDDEN, |
|
57 b"unknown algorithm") |
|
58 user = resp[b'username'] |
|
59 realm = resp[b'realm'] |
|
60 nonce = resp[b'nonce'] |
|
61 |
|
62 ha1 = self._user_hashes.get((user, realm)) |
|
63 if not ha1: |
|
64 log.write(b'No hash found for user/realm "%s/%s"' % (user, realm)) |
|
65 raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad user") |
|
66 |
|
67 qop = resp.get(b'qop', b'auth') |
|
68 if qop != b'auth': |
|
69 log.write(b"Unsupported qop: %s" % qop) |
|
70 raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad qop") |
|
71 |
|
72 cnonce, ncvalue = resp.get(b'cnonce'), resp.get(b'nc') |
|
73 if not cnonce or not ncvalue: |
|
74 log.write(b'No cnonce (%s) or ncvalue (%s)' % (cnonce, ncvalue)) |
|
75 raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"no cnonce") |
|
76 |
|
77 a2 = b'%s:%s' % (req.method, resp[b'uri']) |
|
78 noncebit = b"%s:%s:%s:%s:%s" % (nonce, ncvalue, cnonce, qop, h(a2)) |
|
79 |
|
80 respdig = kd(ha1, noncebit) |
|
81 if respdig != resp[b'response']: |
|
82 log.write(b'User/realm "%s/%s" gave %s, but expected %s' |
|
83 % (user, realm, resp[b'response'], respdig)) |
|
84 return False |
|
85 |
|
86 return True |
6 |
87 |
7 def perform_authentication(hgweb, req, op): |
88 def perform_authentication(hgweb, req, op): |
8 auth = req.headers.get(b'Authorization') |
89 auth = req.headers.get(b'Authorization') |
9 if not auth: |
90 if not auth: |
10 raise common.ErrorResponse(common.HTTP_UNAUTHORIZED, b'who', |
91 raise common.ErrorResponse(common.HTTP_UNAUTHORIZED, b'who', |