Mercurial > public > mercurial-scm > hg
comparison tests/httpserverauth.py @ 41587:ccaa52865fac
tests: add code to handle HTTP digests on the server side
It's not hooked up yet. Mostly this was cargoculted and simplified from some
python.org code[1]. It's not trying to test the security as much as it is
trying to make sure that clients are sending out the right data when challenged.
(And they aren't on py3.)
[1] http://svn.python.org/projects/sandbox/trunk/digestauth/digestauth.py
author | Matt Harbison <matt_harbison@yahoo.com> |
---|---|
date | Tue, 05 Feb 2019 13:32:39 -0500 |
parents | 549af2fa089f |
children | 46432c04f010 |
comparison
equal
deleted
inserted
replaced
41586:7855a949b7c2 | 41587:ccaa52865fac |
---|---|
1 from __future__ import absolute_import | 1 from __future__ import absolute_import |
2 | 2 |
3 import base64 | 3 import base64 |
4 import hashlib | |
4 | 5 |
5 from mercurial.hgweb import common | 6 from mercurial.hgweb import common |
7 from mercurial import ( | |
8 node, | |
9 ) | |
10 | |
11 def parse_keqv_list(req, l): | |
12 """Parse list of key=value strings where keys are not duplicated.""" | |
13 parsed = {} | |
14 for elt in l: | |
15 k, v = elt.split(b'=', 1) | |
16 if v[0:1] == b'"' and v[-1:] == b'"': | |
17 v = v[1:-1] | |
18 parsed[k] = v | |
19 return parsed | |
20 | |
21 class digestauthserver(object): | |
22 def __init__(self): | |
23 self._user_hashes = {} | |
24 | |
25 def gethashers(self): | |
26 def _md5sum(x): | |
27 m = hashlib.md5() | |
28 m.update(x) | |
29 return node.hex(m.digest()) | |
30 | |
31 h = _md5sum | |
32 | |
33 kd = lambda s, d, h=h: h(b"%s:%s" % (s, d)) | |
34 return h, kd | |
35 | |
36 def adduser(self, user, password, realm): | |
37 h, kd = self.gethashers() | |
38 a1 = h(b'%s:%s:%s' % (user, realm, password)) | |
39 self._user_hashes[(user, realm)] = a1 | |
40 | |
41 def makechallenge(self, realm): | |
42 # We aren't testing the protocol here, just that the bytes make the | |
43 # proper round trip. So hardcoded seems fine. | |
44 nonce = b'064af982c5b571cea6450d8eda91c20d' | |
45 return b'realm="%s", nonce="%s", algorithm=MD5, qop="auth"' % (realm, | |
46 nonce) | |
47 | |
48 def checkauth(self, req, header): | |
49 log = req.rawenv[b'wsgi.errors'] | |
50 | |
51 h, kd = self.gethashers() | |
52 resp = parse_keqv_list(req, header.split(b', ')) | |
53 | |
54 if resp.get(b'algorithm', b'MD5').upper() != b'MD5': | |
55 log.write(b'Unsupported algorithm: %s' % resp.get(b'algorithm')) | |
56 raise common.ErrorResponse(common.HTTP_FORBIDDEN, | |
57 b"unknown algorithm") | |
58 user = resp[b'username'] | |
59 realm = resp[b'realm'] | |
60 nonce = resp[b'nonce'] | |
61 | |
62 ha1 = self._user_hashes.get((user, realm)) | |
63 if not ha1: | |
64 log.write(b'No hash found for user/realm "%s/%s"' % (user, realm)) | |
65 raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad user") | |
66 | |
67 qop = resp.get(b'qop', b'auth') | |
68 if qop != b'auth': | |
69 log.write(b"Unsupported qop: %s" % qop) | |
70 raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad qop") | |
71 | |
72 cnonce, ncvalue = resp.get(b'cnonce'), resp.get(b'nc') | |
73 if not cnonce or not ncvalue: | |
74 log.write(b'No cnonce (%s) or ncvalue (%s)' % (cnonce, ncvalue)) | |
75 raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"no cnonce") | |
76 | |
77 a2 = b'%s:%s' % (req.method, resp[b'uri']) | |
78 noncebit = b"%s:%s:%s:%s:%s" % (nonce, ncvalue, cnonce, qop, h(a2)) | |
79 | |
80 respdig = kd(ha1, noncebit) | |
81 if respdig != resp[b'response']: | |
82 log.write(b'User/realm "%s/%s" gave %s, but expected %s' | |
83 % (user, realm, resp[b'response'], respdig)) | |
84 return False | |
85 | |
86 return True | |
6 | 87 |
7 def perform_authentication(hgweb, req, op): | 88 def perform_authentication(hgweb, req, op): |
8 auth = req.headers.get(b'Authorization') | 89 auth = req.headers.get(b'Authorization') |
9 if not auth: | 90 if not auth: |
10 raise common.ErrorResponse(common.HTTP_UNAUTHORIZED, b'who', | 91 raise common.ErrorResponse(common.HTTP_UNAUTHORIZED, b'who', |