Mercurial > public > src > rhodecode
view pylons_app/lib/auth.py @ 52:25e516447a33
implemented autentication
author | marcink |
---|---|
date | Thu, 08 Apr 2010 12:00:06 +0200 |
parents | 8e250e86a670 |
children | 08707974eae4 |
line wrap: on
line source
import sqlite3 import os import logging from os.path import dirname as dn from datetime import datetime import crypt from pylons import session, url from pylons.controllers.util import abort, redirect from decorator import decorator log = logging.getLogger(__name__) ROOT = dn(dn(dn(os.path.realpath(__file__)))) def get_sqlite_conn_cur(): conn = sqlite3.connect(os.path.join(ROOT, 'auth.sqlite')) cur = conn.cursor() return conn, cur def admin_auth(username, password): conn, cur = get_sqlite_conn_cur() password_crypt = crypt.crypt(password, '6a') try: cur.execute("SELECT * FROM users WHERE username=?", (username,)) data = cur.fetchone() except sqlite3.OperationalError as e: data = None log.error(e) if data: if data[3]: if data[1] == username and data[2] == password_crypt and data[4]: log.info('user %s authenticated correctly', username) return True else: log.error('user %s is disabled', username) return False def authfunc(environ, username, password): conn, cur = get_sqlite_conn_cur() password_crypt = crypt.crypt(password, '6a') try: cur.execute("SELECT * FROM users WHERE username=?", (username,)) data = cur.fetchone() except sqlite3.OperationalError as e: data = None log.error(e) if data: if data[3]: if data[1] == username and data[2] == password_crypt: log.info('user %s authenticated correctly', username) if environ: http_accept = environ.get('HTTP_ACCEPT') if http_accept.startswith('application/mercurial') or \ environ['PATH_INFO'].find('raw-file') != -1: cmd = environ['PATH_INFO'] for qry in environ['QUERY_STRING'].split('&'): if qry.startswith('cmd'): cmd += "|" + qry try: cur.execute("""INSERT INTO user_logs VALUES(?,?,?,?)""", (None, data[0], cmd, datetime.now())) conn.commit() except Exception as e: conn.rollback() log.error(e) return True else: log.error('user %s is disabled', username) return False @decorator def authenticate(fn, *args, **kwargs): if not session.get('admin_user', False): redirect(url('admin_home'), 301) return fn(*args, **kwargs) def create_user_table(): """ Create a auth database """ conn, cur = get_sqlite_conn_cur() try: log.info('creating table %s', 'users') cur.execute("""DROP TABLE IF EXISTS users """) cur.execute("""CREATE TABLE users (user_id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT, password TEXT, active INTEGER, admin INTEGER)""") log.info('creating table %s', 'user_logs') cur.execute("""DROP TABLE IF EXISTS user_logs """) cur.execute("""CREATE TABLE user_logs (id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER, last_action TEXT, last_action_date DATETIME)""") conn.commit() except: conn.rollback() raise cur.close() def create_user(username, password, admin=False): conn, cur = get_sqlite_conn_cur() password_crypt = crypt.crypt(password, '6a') log.info('creating user %s', username) try: cur.execute("""INSERT INTO users values (?,?,?,?,?) """, (None, username, password_crypt, 1, admin)) conn.commit() except: conn.rollback() raise if __name__ == "__main__": create_user_table() create_user('marcink', 'qweqwe', True) create_user('lukaszd', 'qweqwe') create_user('adriand', 'qweqwe') create_user('radek', 'qweqwe') create_user('skrzeka', 'qweqwe') create_user('bart', 'qweqwe') create_user('maho', 'qweqwe') create_user('michalg', 'qweqwe') #authfunc('', 'marcink', 'qweqwe')