Mercurial > public > src > rhodecode
comparison pylons_app/lib/celerylib/tasks.py @ 537:2256c78afe53 celery
implemented basic autoupdating statistics fetched from database
author | Marcin Kuzminski <marcin@python-works.com> |
---|---|
date | Wed, 22 Sep 2010 04:30:36 +0200 |
parents | b12ea84fb906 |
children | fb0c3af6031b |
comparison
equal
deleted
inserted
replaced
536:a5a17000e45b | 537:2256c78afe53 |
---|---|
1 from celery.decorators import task | 1 from celery.decorators import task |
2 from celery.task.sets import subtask | 2 from celery.task.sets import subtask |
3 from celeryconfig import PYLONS_CONFIG as config | 3 from celeryconfig import PYLONS_CONFIG as config |
4 from datetime import datetime, timedelta | |
5 from pylons.i18n.translation import _ | 4 from pylons.i18n.translation import _ |
6 from pylons_app.lib.celerylib import run_task | 5 from pylons_app.lib.celerylib import run_task |
7 from pylons_app.lib.helpers import person | 6 from pylons_app.lib.helpers import person |
8 from pylons_app.lib.smtp_mailer import SmtpMailer | 7 from pylons_app.lib.smtp_mailer import SmtpMailer |
9 from pylons_app.lib.utils import OrderedDict | 8 from pylons_app.lib.utils import OrderedDict |
10 from operator import itemgetter | 9 from operator import itemgetter |
11 from vcs.backends.hg import MercurialRepository | 10 from vcs.backends.hg import MercurialRepository |
12 from time import mktime | 11 from time import mktime |
13 import calendar | |
14 import traceback | 12 import traceback |
15 import json | 13 import json |
16 | 14 |
17 __all__ = ['whoosh_index', 'get_commits_stats', | 15 __all__ = ['whoosh_index', 'get_commits_stats', |
18 'reset_user_password', 'send_email'] | 16 'reset_user_password', 'send_email'] |
81 except LockHeld: | 79 except LockHeld: |
82 log.info('LockHeld') | 80 log.info('LockHeld') |
83 return 'LockHeld' | 81 return 'LockHeld' |
84 | 82 |
85 @task | 83 @task |
86 def get_commits_stats(repo): | 84 def get_commits_stats(repo_name, ts_min_y, ts_max_y): |
85 author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty | |
86 | |
87 from pylons_app.model.db import Statistics, Repository | |
87 log = get_commits_stats.get_logger() | 88 log = get_commits_stats.get_logger() |
88 aggregate = OrderedDict() | 89 commits_by_day_author_aggregate = {} |
89 overview_aggregate = OrderedDict() | 90 commits_by_day_aggregate = {} |
90 repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '') | 91 repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '') |
91 repo = MercurialRepository(repos_path + repo) | 92 repo = MercurialRepository(repos_path + repo_name) |
92 #graph range | 93 |
93 td = datetime.today() + timedelta(days=1) | |
94 y, m, d = td.year, td.month, td.day | |
95 | |
96 ts_min_y = mktime((y - 1, (td - timedelta(days=calendar.mdays[m])).month, | |
97 d, 0, 0, 0, 0, 0, 0,)) | |
98 ts_min_m = mktime((y, (td - timedelta(days=calendar.mdays[m])).month, | |
99 d, 0, 0, 0, 0, 0, 0,)) | |
100 | |
101 ts_max_y = mktime((y, m, d, 0, 0, 0, 0, 0, 0,)) | |
102 skip_date_limit = True | 94 skip_date_limit = True |
103 | 95 parse_limit = 500 #limit for single task changeset parsing |
104 def author_key_cleaner(k): | 96 last_rev = 0 |
105 k = person(k) | 97 last_cs = None |
106 k = k.replace('"', "") #for js data compatibilty | 98 timegetter = itemgetter('time') |
107 return k | 99 |
108 | 100 sa = get_session() |
109 for cs in repo[:200]:#added limit 200 until fix #29 is made | 101 |
102 dbrepo = sa.query(Repository)\ | |
103 .filter(Repository.repo_name == repo_name).scalar() | |
104 cur_stats = sa.query(Statistics)\ | |
105 .filter(Statistics.repository == dbrepo).scalar() | |
106 if cur_stats: | |
107 last_rev = cur_stats.stat_on_revision | |
108 | |
109 if last_rev == repo.revisions[-1]: | |
110 #pass silently without any work | |
111 return True | |
112 | |
113 if cur_stats: | |
114 commits_by_day_aggregate = OrderedDict( | |
115 json.loads( | |
116 cur_stats.commit_activity_combined)) | |
117 commits_by_day_author_aggregate = json.loads(cur_stats.commit_activity) | |
118 | |
119 for cnt, rev in enumerate(repo.revisions[last_rev:]): | |
120 last_cs = cs = repo.get_changeset(rev) | |
110 k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1], | 121 k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1], |
111 cs.date.timetuple()[2]) | 122 cs.date.timetuple()[2]) |
112 timetupple = [int(x) for x in k.split('-')] | 123 timetupple = [int(x) for x in k.split('-')] |
113 timetupple.extend([0 for _ in xrange(6)]) | 124 timetupple.extend([0 for _ in xrange(6)]) |
114 k = mktime(timetupple) | 125 k = mktime(timetupple) |
115 if aggregate.has_key(author_key_cleaner(cs.author)): | 126 if commits_by_day_author_aggregate.has_key(author_key_cleaner(cs.author)): |
116 if aggregate[author_key_cleaner(cs.author)].has_key(k): | 127 try: |
117 aggregate[author_key_cleaner(cs.author)][k]["commits"] += 1 | 128 l = [timegetter(x) for x in commits_by_day_author_aggregate\ |
118 aggregate[author_key_cleaner(cs.author)][k]["added"] += len(cs.added) | 129 [author_key_cleaner(cs.author)]['data']] |
119 aggregate[author_key_cleaner(cs.author)][k]["changed"] += len(cs.changed) | 130 time_pos = l.index(k) |
120 aggregate[author_key_cleaner(cs.author)][k]["removed"] += len(cs.removed) | 131 except ValueError: |
132 time_pos = False | |
133 | |
134 if time_pos >= 0 and time_pos is not False: | |
135 | |
136 datadict = commits_by_day_author_aggregate\ | |
137 [author_key_cleaner(cs.author)]['data'][time_pos] | |
138 | |
139 datadict["commits"] += 1 | |
140 datadict["added"] += len(cs.added) | |
141 datadict["changed"] += len(cs.changed) | |
142 datadict["removed"] += len(cs.removed) | |
143 #print datadict | |
121 | 144 |
122 else: | 145 else: |
123 #aggregate[author_key_cleaner(cs.author)].update(dates_range) | 146 #print 'ELSE !!!!' |
124 if k >= ts_min_y and k <= ts_max_y or skip_date_limit: | 147 if k >= ts_min_y and k <= ts_max_y or skip_date_limit: |
125 aggregate[author_key_cleaner(cs.author)][k] = {} | 148 |
126 aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1 | 149 datadict = {"time":k, |
127 aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added) | 150 "commits":1, |
128 aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed) | 151 "added":len(cs.added), |
129 aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed) | 152 "changed":len(cs.changed), |
153 "removed":len(cs.removed), | |
154 } | |
155 commits_by_day_author_aggregate\ | |
156 [author_key_cleaner(cs.author)]['data'].append(datadict) | |
130 | 157 |
131 else: | 158 else: |
159 #print k, 'nokey ADDING' | |
132 if k >= ts_min_y and k <= ts_max_y or skip_date_limit: | 160 if k >= ts_min_y and k <= ts_max_y or skip_date_limit: |
133 aggregate[author_key_cleaner(cs.author)] = OrderedDict() | 161 commits_by_day_author_aggregate[author_key_cleaner(cs.author)] = { |
134 #aggregate[author_key_cleaner(cs.author)].update(dates_range) | 162 "label":author_key_cleaner(cs.author), |
135 aggregate[author_key_cleaner(cs.author)][k] = {} | 163 "data":[{"time":k, |
136 aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1 | 164 "commits":1, |
137 aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added) | 165 "added":len(cs.added), |
138 aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed) | 166 "changed":len(cs.changed), |
139 aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed) | 167 "removed":len(cs.removed), |
140 | 168 }], |
141 | 169 "schema":["commits"], |
142 if overview_aggregate.has_key(k): | 170 } |
143 overview_aggregate[k] += 1 | 171 |
172 # #gather all data by day | |
173 if commits_by_day_aggregate.has_key(k): | |
174 commits_by_day_aggregate[k] += 1 | |
144 else: | 175 else: |
145 overview_aggregate[k] = 1 | 176 commits_by_day_aggregate[k] = 1 |
146 | 177 |
178 if cnt >= parse_limit: | |
179 #don't fetch to much data since we can freeze application | |
180 break | |
181 | |
147 overview_data = [] | 182 overview_data = [] |
148 for k, v in overview_aggregate.items(): | 183 for k, v in commits_by_day_aggregate.items(): |
149 overview_data.append([k, v]) | 184 overview_data.append([k, v]) |
150 overview_data = sorted(overview_data, key=itemgetter(0)) | 185 overview_data = sorted(overview_data, key=itemgetter(0)) |
151 data = {} | 186 |
152 for author in aggregate: | 187 if not commits_by_day_author_aggregate: |
153 commit_data = sorted([{"time":x, | 188 commits_by_day_author_aggregate[author_key_cleaner(repo.contact)] = { |
154 "commits":aggregate[author][x]['commits'], | |
155 "added":aggregate[author][x]['added'], | |
156 "changed":aggregate[author][x]['changed'], | |
157 "removed":aggregate[author][x]['removed'], | |
158 } for x in aggregate[author]], | |
159 key=itemgetter('time')) | |
160 | |
161 data[author] = {"label":author, | |
162 "data":commit_data, | |
163 "schema":["commits"] | |
164 } | |
165 | |
166 if not data: | |
167 data[author_key_cleaner(repo.contact)] = { | |
168 "label":author_key_cleaner(repo.contact), | 189 "label":author_key_cleaner(repo.contact), |
169 "data":[0, 1], | 190 "data":[0, 1], |
170 "schema":["commits"], | 191 "schema":["commits"], |
171 } | 192 } |
172 | 193 |
173 return (ts_min_m, ts_max_y, json.dumps(data), json.dumps(overview_data)) | 194 stats = cur_stats if cur_stats else Statistics() |
195 stats.commit_activity = json.dumps(commits_by_day_author_aggregate) | |
196 stats.commit_activity_combined = json.dumps(overview_data) | |
197 stats.repository = dbrepo | |
198 stats.stat_on_revision = last_cs.revision | |
199 stats.languages = json.dumps({'_TOTAL_':0, '':0}) | |
200 | |
201 try: | |
202 sa.add(stats) | |
203 sa.commit() | |
204 except: | |
205 log.error(traceback.format_exc()) | |
206 sa.rollback() | |
207 return False | |
208 | |
209 return True | |
174 | 210 |
175 @task | 211 @task |
176 def reset_user_password(user_email): | 212 def reset_user_password(user_email): |
177 log = reset_user_password.get_logger() | 213 log = reset_user_password.get_logger() |
178 from pylons_app.lib import auth | 214 from pylons_app.lib import auth |
182 try: | 218 try: |
183 sa = get_session() | 219 sa = get_session() |
184 user = sa.query(User).filter(User.email == user_email).scalar() | 220 user = sa.query(User).filter(User.email == user_email).scalar() |
185 new_passwd = auth.PasswordGenerator().gen_password(8, | 221 new_passwd = auth.PasswordGenerator().gen_password(8, |
186 auth.PasswordGenerator.ALPHABETS_BIG_SMALL) | 222 auth.PasswordGenerator.ALPHABETS_BIG_SMALL) |
187 user.password = auth.get_crypt_password(new_passwd) | 223 if user: |
188 sa.add(user) | 224 user.password = auth.get_crypt_password(new_passwd) |
189 sa.commit() | 225 sa.add(user) |
190 log.info('change password for %s', user_email) | 226 sa.commit() |
227 log.info('change password for %s', user_email) | |
191 if new_passwd is None: | 228 if new_passwd is None: |
192 raise Exception('unable to generate new password') | 229 raise Exception('unable to generate new password') |
193 | 230 |
194 except: | 231 except: |
195 log.error(traceback.format_exc()) | 232 log.error(traceback.format_exc()) |