tests/httpserverauth.py
changeset 41587 ccaa52865fac
parent 41585 549af2fa089f
child 41589 46432c04f010
equal deleted inserted replaced
41586:7855a949b7c2 41587:ccaa52865fac
     1 from __future__ import absolute_import
     1 from __future__ import absolute_import
     2 
     2 
     3 import base64
     3 import base64
       
     4 import hashlib
     4 
     5 
     5 from mercurial.hgweb import common
     6 from mercurial.hgweb import common
       
     7 from mercurial import (
       
     8     node,
       
     9 )
       
    10 
       
    11 def parse_keqv_list(req, l):
       
    12     """Parse list of key=value strings where keys are not duplicated."""
       
    13     parsed = {}
       
    14     for elt in l:
       
    15         k, v = elt.split(b'=', 1)
       
    16         if v[0:1] == b'"' and v[-1:] == b'"':
       
    17             v = v[1:-1]
       
    18         parsed[k] = v
       
    19     return parsed
       
    20 
       
    21 class digestauthserver(object):
       
    22     def __init__(self):
       
    23         self._user_hashes = {}
       
    24 
       
    25     def gethashers(self):
       
    26         def _md5sum(x):
       
    27             m = hashlib.md5()
       
    28             m.update(x)
       
    29             return node.hex(m.digest())
       
    30 
       
    31         h = _md5sum
       
    32 
       
    33         kd = lambda s, d, h=h: h(b"%s:%s" % (s, d))
       
    34         return h, kd
       
    35 
       
    36     def adduser(self, user, password, realm):
       
    37         h, kd = self.gethashers()
       
    38         a1 = h(b'%s:%s:%s' % (user, realm, password))
       
    39         self._user_hashes[(user, realm)] = a1
       
    40 
       
    41     def makechallenge(self, realm):
       
    42         # We aren't testing the protocol here, just that the bytes make the
       
    43         # proper round trip.  So hardcoded seems fine.
       
    44         nonce = b'064af982c5b571cea6450d8eda91c20d'
       
    45         return b'realm="%s", nonce="%s", algorithm=MD5, qop="auth"' % (realm,
       
    46                                                                        nonce)
       
    47 
       
    48     def checkauth(self, req, header):
       
    49         log = req.rawenv[b'wsgi.errors']
       
    50 
       
    51         h, kd = self.gethashers()
       
    52         resp = parse_keqv_list(req, header.split(b', '))
       
    53 
       
    54         if resp.get(b'algorithm', b'MD5').upper() != b'MD5':
       
    55             log.write(b'Unsupported algorithm: %s' % resp.get(b'algorithm'))
       
    56             raise common.ErrorResponse(common.HTTP_FORBIDDEN,
       
    57                                        b"unknown algorithm")
       
    58         user = resp[b'username']
       
    59         realm = resp[b'realm']
       
    60         nonce = resp[b'nonce']
       
    61 
       
    62         ha1 = self._user_hashes.get((user, realm))
       
    63         if not ha1:
       
    64             log.write(b'No hash found for user/realm "%s/%s"' % (user, realm))
       
    65             raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad user")
       
    66 
       
    67         qop = resp.get(b'qop', b'auth')
       
    68         if qop != b'auth':
       
    69             log.write(b"Unsupported qop: %s" % qop)
       
    70             raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad qop")
       
    71 
       
    72         cnonce, ncvalue = resp.get(b'cnonce'), resp.get(b'nc')
       
    73         if not cnonce or not ncvalue:
       
    74             log.write(b'No cnonce (%s) or ncvalue (%s)' % (cnonce, ncvalue))
       
    75             raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"no cnonce")
       
    76 
       
    77         a2 = b'%s:%s' % (req.method, resp[b'uri'])
       
    78         noncebit = b"%s:%s:%s:%s:%s" % (nonce, ncvalue, cnonce, qop, h(a2))
       
    79 
       
    80         respdig = kd(ha1, noncebit)
       
    81         if respdig != resp[b'response']:
       
    82             log.write(b'User/realm "%s/%s" gave %s, but expected %s'
       
    83                       % (user, realm, resp[b'response'], respdig))
       
    84             return False
       
    85 
       
    86         return True
     6 
    87 
     7 def perform_authentication(hgweb, req, op):
    88 def perform_authentication(hgweb, req, op):
     8     auth = req.headers.get(b'Authorization')
    89     auth = req.headers.get(b'Authorization')
     9     if not auth:
    90     if not auth:
    10         raise common.ErrorResponse(common.HTTP_UNAUTHORIZED, b'who',
    91         raise common.ErrorResponse(common.HTTP_UNAUTHORIZED, b'who',