comparison pylons_app/lib/celerylib/tasks.py @ 537:2256c78afe53 celery

implemented basic autoupdating statistics fetched from database
author Marcin Kuzminski <marcin@python-works.com>
date Wed, 22 Sep 2010 04:30:36 +0200
parents b12ea84fb906
children fb0c3af6031b
comparison
equal deleted inserted replaced
536:a5a17000e45b 537:2256c78afe53
1 from celery.decorators import task 1 from celery.decorators import task
2 from celery.task.sets import subtask 2 from celery.task.sets import subtask
3 from celeryconfig import PYLONS_CONFIG as config 3 from celeryconfig import PYLONS_CONFIG as config
4 from datetime import datetime, timedelta
5 from pylons.i18n.translation import _ 4 from pylons.i18n.translation import _
6 from pylons_app.lib.celerylib import run_task 5 from pylons_app.lib.celerylib import run_task
7 from pylons_app.lib.helpers import person 6 from pylons_app.lib.helpers import person
8 from pylons_app.lib.smtp_mailer import SmtpMailer 7 from pylons_app.lib.smtp_mailer import SmtpMailer
9 from pylons_app.lib.utils import OrderedDict 8 from pylons_app.lib.utils import OrderedDict
10 from operator import itemgetter 9 from operator import itemgetter
11 from vcs.backends.hg import MercurialRepository 10 from vcs.backends.hg import MercurialRepository
12 from time import mktime 11 from time import mktime
13 import calendar
14 import traceback 12 import traceback
15 import json 13 import json
16 14
17 __all__ = ['whoosh_index', 'get_commits_stats', 15 __all__ = ['whoosh_index', 'get_commits_stats',
18 'reset_user_password', 'send_email'] 16 'reset_user_password', 'send_email']
81 except LockHeld: 79 except LockHeld:
82 log.info('LockHeld') 80 log.info('LockHeld')
83 return 'LockHeld' 81 return 'LockHeld'
84 82
85 @task 83 @task
86 def get_commits_stats(repo): 84 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
85 author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty
86
87 from pylons_app.model.db import Statistics, Repository
87 log = get_commits_stats.get_logger() 88 log = get_commits_stats.get_logger()
88 aggregate = OrderedDict() 89 commits_by_day_author_aggregate = {}
89 overview_aggregate = OrderedDict() 90 commits_by_day_aggregate = {}
90 repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '') 91 repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
91 repo = MercurialRepository(repos_path + repo) 92 repo = MercurialRepository(repos_path + repo_name)
92 #graph range 93
93 td = datetime.today() + timedelta(days=1)
94 y, m, d = td.year, td.month, td.day
95
96 ts_min_y = mktime((y - 1, (td - timedelta(days=calendar.mdays[m])).month,
97 d, 0, 0, 0, 0, 0, 0,))
98 ts_min_m = mktime((y, (td - timedelta(days=calendar.mdays[m])).month,
99 d, 0, 0, 0, 0, 0, 0,))
100
101 ts_max_y = mktime((y, m, d, 0, 0, 0, 0, 0, 0,))
102 skip_date_limit = True 94 skip_date_limit = True
103 95 parse_limit = 500 #limit for single task changeset parsing
104 def author_key_cleaner(k): 96 last_rev = 0
105 k = person(k) 97 last_cs = None
106 k = k.replace('"', "") #for js data compatibilty 98 timegetter = itemgetter('time')
107 return k 99
108 100 sa = get_session()
109 for cs in repo[:200]:#added limit 200 until fix #29 is made 101
102 dbrepo = sa.query(Repository)\
103 .filter(Repository.repo_name == repo_name).scalar()
104 cur_stats = sa.query(Statistics)\
105 .filter(Statistics.repository == dbrepo).scalar()
106 if cur_stats:
107 last_rev = cur_stats.stat_on_revision
108
109 if last_rev == repo.revisions[-1]:
110 #pass silently without any work
111 return True
112
113 if cur_stats:
114 commits_by_day_aggregate = OrderedDict(
115 json.loads(
116 cur_stats.commit_activity_combined))
117 commits_by_day_author_aggregate = json.loads(cur_stats.commit_activity)
118
119 for cnt, rev in enumerate(repo.revisions[last_rev:]):
120 last_cs = cs = repo.get_changeset(rev)
110 k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1], 121 k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
111 cs.date.timetuple()[2]) 122 cs.date.timetuple()[2])
112 timetupple = [int(x) for x in k.split('-')] 123 timetupple = [int(x) for x in k.split('-')]
113 timetupple.extend([0 for _ in xrange(6)]) 124 timetupple.extend([0 for _ in xrange(6)])
114 k = mktime(timetupple) 125 k = mktime(timetupple)
115 if aggregate.has_key(author_key_cleaner(cs.author)): 126 if commits_by_day_author_aggregate.has_key(author_key_cleaner(cs.author)):
116 if aggregate[author_key_cleaner(cs.author)].has_key(k): 127 try:
117 aggregate[author_key_cleaner(cs.author)][k]["commits"] += 1 128 l = [timegetter(x) for x in commits_by_day_author_aggregate\
118 aggregate[author_key_cleaner(cs.author)][k]["added"] += len(cs.added) 129 [author_key_cleaner(cs.author)]['data']]
119 aggregate[author_key_cleaner(cs.author)][k]["changed"] += len(cs.changed) 130 time_pos = l.index(k)
120 aggregate[author_key_cleaner(cs.author)][k]["removed"] += len(cs.removed) 131 except ValueError:
132 time_pos = False
133
134 if time_pos >= 0 and time_pos is not False:
135
136 datadict = commits_by_day_author_aggregate\
137 [author_key_cleaner(cs.author)]['data'][time_pos]
138
139 datadict["commits"] += 1
140 datadict["added"] += len(cs.added)
141 datadict["changed"] += len(cs.changed)
142 datadict["removed"] += len(cs.removed)
143 #print datadict
121 144
122 else: 145 else:
123 #aggregate[author_key_cleaner(cs.author)].update(dates_range) 146 #print 'ELSE !!!!'
124 if k >= ts_min_y and k <= ts_max_y or skip_date_limit: 147 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
125 aggregate[author_key_cleaner(cs.author)][k] = {} 148
126 aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1 149 datadict = {"time":k,
127 aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added) 150 "commits":1,
128 aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed) 151 "added":len(cs.added),
129 aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed) 152 "changed":len(cs.changed),
153 "removed":len(cs.removed),
154 }
155 commits_by_day_author_aggregate\
156 [author_key_cleaner(cs.author)]['data'].append(datadict)
130 157
131 else: 158 else:
159 #print k, 'nokey ADDING'
132 if k >= ts_min_y and k <= ts_max_y or skip_date_limit: 160 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
133 aggregate[author_key_cleaner(cs.author)] = OrderedDict() 161 commits_by_day_author_aggregate[author_key_cleaner(cs.author)] = {
134 #aggregate[author_key_cleaner(cs.author)].update(dates_range) 162 "label":author_key_cleaner(cs.author),
135 aggregate[author_key_cleaner(cs.author)][k] = {} 163 "data":[{"time":k,
136 aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1 164 "commits":1,
137 aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added) 165 "added":len(cs.added),
138 aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed) 166 "changed":len(cs.changed),
139 aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed) 167 "removed":len(cs.removed),
140 168 }],
141 169 "schema":["commits"],
142 if overview_aggregate.has_key(k): 170 }
143 overview_aggregate[k] += 1 171
172 # #gather all data by day
173 if commits_by_day_aggregate.has_key(k):
174 commits_by_day_aggregate[k] += 1
144 else: 175 else:
145 overview_aggregate[k] = 1 176 commits_by_day_aggregate[k] = 1
146 177
178 if cnt >= parse_limit:
179 #don't fetch to much data since we can freeze application
180 break
181
147 overview_data = [] 182 overview_data = []
148 for k, v in overview_aggregate.items(): 183 for k, v in commits_by_day_aggregate.items():
149 overview_data.append([k, v]) 184 overview_data.append([k, v])
150 overview_data = sorted(overview_data, key=itemgetter(0)) 185 overview_data = sorted(overview_data, key=itemgetter(0))
151 data = {} 186
152 for author in aggregate: 187 if not commits_by_day_author_aggregate:
153 commit_data = sorted([{"time":x, 188 commits_by_day_author_aggregate[author_key_cleaner(repo.contact)] = {
154 "commits":aggregate[author][x]['commits'],
155 "added":aggregate[author][x]['added'],
156 "changed":aggregate[author][x]['changed'],
157 "removed":aggregate[author][x]['removed'],
158 } for x in aggregate[author]],
159 key=itemgetter('time'))
160
161 data[author] = {"label":author,
162 "data":commit_data,
163 "schema":["commits"]
164 }
165
166 if not data:
167 data[author_key_cleaner(repo.contact)] = {
168 "label":author_key_cleaner(repo.contact), 189 "label":author_key_cleaner(repo.contact),
169 "data":[0, 1], 190 "data":[0, 1],
170 "schema":["commits"], 191 "schema":["commits"],
171 } 192 }
172 193
173 return (ts_min_m, ts_max_y, json.dumps(data), json.dumps(overview_data)) 194 stats = cur_stats if cur_stats else Statistics()
195 stats.commit_activity = json.dumps(commits_by_day_author_aggregate)
196 stats.commit_activity_combined = json.dumps(overview_data)
197 stats.repository = dbrepo
198 stats.stat_on_revision = last_cs.revision
199 stats.languages = json.dumps({'_TOTAL_':0, '':0})
200
201 try:
202 sa.add(stats)
203 sa.commit()
204 except:
205 log.error(traceback.format_exc())
206 sa.rollback()
207 return False
208
209 return True
174 210
175 @task 211 @task
176 def reset_user_password(user_email): 212 def reset_user_password(user_email):
177 log = reset_user_password.get_logger() 213 log = reset_user_password.get_logger()
178 from pylons_app.lib import auth 214 from pylons_app.lib import auth
182 try: 218 try:
183 sa = get_session() 219 sa = get_session()
184 user = sa.query(User).filter(User.email == user_email).scalar() 220 user = sa.query(User).filter(User.email == user_email).scalar()
185 new_passwd = auth.PasswordGenerator().gen_password(8, 221 new_passwd = auth.PasswordGenerator().gen_password(8,
186 auth.PasswordGenerator.ALPHABETS_BIG_SMALL) 222 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
187 user.password = auth.get_crypt_password(new_passwd) 223 if user:
188 sa.add(user) 224 user.password = auth.get_crypt_password(new_passwd)
189 sa.commit() 225 sa.add(user)
190 log.info('change password for %s', user_email) 226 sa.commit()
227 log.info('change password for %s', user_email)
191 if new_passwd is None: 228 if new_passwd is None:
192 raise Exception('unable to generate new password') 229 raise Exception('unable to generate new password')
193 230
194 except: 231 except:
195 log.error(traceback.format_exc()) 232 log.error(traceback.format_exc())