Mercurial > public > src > rhodecode
view pylons_app/lib/celerylib/tasks.py @ 527:a9e50dce3081 celery
Removed config names from whoosh and celery,
celery is now configured based on the config name it's using
on celeryconfig. And whoosh uses it's own logger configured just for whoosh
Test creates a fresh whoosh index now, for more accurate checks
fixed tests for searching
author | Marcin Kuzminski <marcin@python-works.com> |
---|---|
date | Fri, 17 Sep 2010 22:54:30 +0200 |
parents | a3d9d24acbec |
children | 5c376ac2d4c9 |
line wrap: on
line source
from celery.decorators import task from celery.task.sets import subtask from celeryconfig import PYLONS_CONFIG as config from datetime import datetime, timedelta from pylons.i18n.translation import _ from pylons_app.lib.celerylib import run_task from pylons_app.lib.helpers import person from pylons_app.lib.smtp_mailer import SmtpMailer from pylons_app.lib.utils import OrderedDict from time import mktime from vcs.backends.hg import MercurialRepository import calendar import traceback __all__ = ['whoosh_index', 'get_commits_stats', 'reset_user_password', 'send_email'] def get_session(): from sqlalchemy import engine_from_config from sqlalchemy.orm import sessionmaker, scoped_session engine = engine_from_config(dict(config.items('app:main')), 'sqlalchemy.db1.') sa = scoped_session(sessionmaker(bind=engine)) return sa def get_hg_settings(): from pylons_app.model.db import HgAppSettings try: sa = get_session() ret = sa.query(HgAppSettings).all() finally: sa.remove() if not ret: raise Exception('Could not get application settings !') settings = {} for each in ret: settings['hg_app_' + each.app_settings_name] = each.app_settings_value return settings def get_hg_ui_settings(): from pylons_app.model.db import HgAppUi try: sa = get_session() ret = sa.query(HgAppUi).all() finally: sa.remove() if not ret: raise Exception('Could not get application ui settings !') settings = {} for each in ret: k = each.ui_key v = each.ui_value if k == '/': k = 'root_path' if k.find('.') != -1: k = k.replace('.', '_') if each.ui_section == 'hooks': v = each.ui_active settings[each.ui_section + '_' + k] = v return settings @task def whoosh_index(repo_location, full_index): log = whoosh_index.get_logger() from pylons_app.lib.indexers import DaemonLock from pylons_app.lib.indexers.daemon import WhooshIndexingDaemon, LockHeld try: l = DaemonLock() WhooshIndexingDaemon(repo_location=repo_location)\ .run(full_index=full_index) l.release() return 'Done' except LockHeld: log.info('LockHeld') return 'LockHeld' @task def get_commits_stats(repo): log = get_commits_stats.get_logger() aggregate = OrderedDict() repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '') repo = MercurialRepository(repos_path + repo) #graph range td = datetime.today() + timedelta(days=1) y, m, d = td.year, td.month, td.day ts_min = mktime((y, (td - timedelta(days=calendar.mdays[m])).month, d, 0, 0, 0, 0, 0, 0,)) ts_max = mktime((y, m, d, 0, 0, 0, 0, 0, 0,)) def author_key_cleaner(k): k = person(k) k = k.replace('"', "'") #for js data compatibilty return k for cs in repo[:200]:#added limit 200 until fix #29 is made k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1], cs.date.timetuple()[2]) timetupple = [int(x) for x in k.split('-')] timetupple.extend([0 for _ in xrange(6)]) k = mktime(timetupple) if aggregate.has_key(author_key_cleaner(cs.author)): if aggregate[author_key_cleaner(cs.author)].has_key(k): aggregate[author_key_cleaner(cs.author)][k]["commits"] += 1 aggregate[author_key_cleaner(cs.author)][k]["added"] += len(cs.added) aggregate[author_key_cleaner(cs.author)][k]["changed"] += len(cs.changed) aggregate[author_key_cleaner(cs.author)][k]["removed"] += len(cs.removed) else: #aggregate[author_key_cleaner(cs.author)].update(dates_range) if k >= ts_min and k <= ts_max: aggregate[author_key_cleaner(cs.author)][k] = {} aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1 aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added) aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed) aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed) else: if k >= ts_min and k <= ts_max: aggregate[author_key_cleaner(cs.author)] = OrderedDict() #aggregate[author_key_cleaner(cs.author)].update(dates_range) aggregate[author_key_cleaner(cs.author)][k] = {} aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1 aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added) aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed) aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed) d = '' tmpl0 = u""""%s":%s""" tmpl1 = u"""{label:"%s",data:%s,schema:["commits"]},""" for author in aggregate: d += tmpl0 % (author, tmpl1 \ % (author, [{"time":x, "commits":aggregate[author][x]['commits'], "added":aggregate[author][x]['added'], "changed":aggregate[author][x]['changed'], "removed":aggregate[author][x]['removed'], } for x in aggregate[author]])) if d == '': d = '"%s":{label:"%s",data:[[0,1],]}' \ % (author_key_cleaner(repo.contact), author_key_cleaner(repo.contact)) return (ts_min, ts_max, d) @task def reset_user_password(user_email): log = reset_user_password.get_logger() from pylons_app.lib import auth from pylons_app.model.db import User try: try: sa = get_session() user = sa.query(User).filter(User.email == user_email).scalar() new_passwd = auth.PasswordGenerator().gen_password(8, auth.PasswordGenerator.ALPHABETS_BIG_SMALL) user.password = auth.get_crypt_password(new_passwd) sa.add(user) sa.commit() log.info('change password for %s', user_email) if new_passwd is None: raise Exception('unable to generate new password') except: log.error(traceback.format_exc()) sa.rollback() run_task(send_email, user_email, "Your new hg-app password", 'Your new hg-app password:%s' % (new_passwd)) log.info('send new password mail to %s', user_email) except: log.error('Failed to update user password') log.error(traceback.format_exc()) return True @task def send_email(recipients, subject, body): log = send_email.get_logger() email_config = dict(config.items('DEFAULT')) mail_from = email_config.get('app_email_from') user = email_config.get('smtp_username') passwd = email_config.get('smtp_password') mail_server = email_config.get('smtp_server') mail_port = email_config.get('smtp_port') tls = email_config.get('smtp_use_tls') ssl = False try: m = SmtpMailer(mail_from, user, passwd, mail_server, mail_port, ssl, tls) m.send(recipients, subject, body) except: log.error('Mail sending failed') log.error(traceback.format_exc()) return False return True