|
1 # admin/verify.py - better repository integrity checking for Mercurial |
|
2 # |
|
3 # Copyright 2023 Octobus <contact@octobus.net> |
|
4 # |
|
5 # This software may be used and distributed according to the terms of the |
|
6 # GNU General Public License version 2 or any later version. |
|
7 |
|
8 import collections |
|
9 import copy |
|
10 import functools |
|
11 |
|
12 from ..i18n import _ |
|
13 from .. import error, pycompat, registrar, requirements |
|
14 from ..utils import stringutil |
|
15 |
|
16 |
|
17 verify_table = {} |
|
18 verify_alias_table = {} |
|
19 check = registrar.verify_check(verify_table, verify_alias_table) |
|
20 |
|
21 |
|
22 # Use this to declare options/aliases in the middle of the hierarchy. |
|
23 # Checks like these are not run themselves and cannot have a body. |
|
24 # For an example, see the `revlogs` check. |
|
25 def noop_func(*args, **kwargs): |
|
26 return |
|
27 |
|
28 |
|
29 @check(b"working-copy.dirstate", alias=b"dirstate") |
|
30 def check_dirstate(ui, repo, **options): |
|
31 ui.status(_(b"checking dirstate\n")) |
|
32 |
|
33 parent1, parent2 = repo.dirstate.parents() |
|
34 m1 = repo[parent1].manifest() |
|
35 m2 = repo[parent2].manifest() |
|
36 errors = 0 |
|
37 |
|
38 is_narrow = requirements.NARROW_REQUIREMENT in repo.requirements |
|
39 narrow_matcher = repo.narrowmatch() if is_narrow else None |
|
40 |
|
41 for err in repo.dirstate.verify(m1, m2, narrow_matcher): |
|
42 ui.warn(err[0] % err[1:]) |
|
43 errors += 1 |
|
44 |
|
45 return errors |
|
46 |
|
47 |
|
48 # Tree of all checks and their associated function |
|
49 pyramid = {} |
|
50 |
|
51 |
|
52 def build_pyramid(table, full_pyramid): |
|
53 """Create a pyramid of checks of the registered checks. |
|
54 It is a name-based hierarchy that can be arbitrarily nested.""" |
|
55 for entry, func in sorted(table.items(), key=lambda x: x[0], reverse=True): |
|
56 cursor = full_pyramid |
|
57 levels = entry.split(b".") |
|
58 for level in levels[:-1]: |
|
59 current_node = cursor.setdefault(level, {}) |
|
60 cursor = current_node |
|
61 if cursor.get(levels[-1]) is None: |
|
62 cursor[levels[-1]] = (entry, func) |
|
63 elif func is not noop_func: |
|
64 m = b"intermediate checks need to use `verify.noop_func`" |
|
65 raise error.ProgrammingError(m) |
|
66 |
|
67 |
|
68 def find_checks(name, table=None, alias_table=None, full_pyramid=None): |
|
69 """Find all checks for a given name and returns a dict of |
|
70 (qualified_check_name, check_function) |
|
71 |
|
72 # Examples |
|
73 |
|
74 Using a full qualified name: |
|
75 "working-copy.dirstate" -> { |
|
76 "working-copy.dirstate": CF, |
|
77 } |
|
78 |
|
79 Using a *prefix* of a qualified name: |
|
80 "store.revlogs" -> { |
|
81 "store.revlogs.changelog": CF, |
|
82 "store.revlogs.manifestlog": CF, |
|
83 "store.revlogs.filelog": CF, |
|
84 } |
|
85 |
|
86 Using a defined alias: |
|
87 "revlogs" -> { |
|
88 "store.revlogs.changelog": CF, |
|
89 "store.revlogs.manifestlog": CF, |
|
90 "store.revlogs.filelog": CF, |
|
91 } |
|
92 |
|
93 Using something that is none of the above will be an error. |
|
94 """ |
|
95 if table is None: |
|
96 table = verify_table |
|
97 if alias_table is None: |
|
98 alias_table = verify_alias_table |
|
99 |
|
100 if name == b"full": |
|
101 return table |
|
102 checks = {} |
|
103 |
|
104 # is it a full name? |
|
105 check = table.get(name) |
|
106 |
|
107 if check is None: |
|
108 # is it an alias? |
|
109 qualified_name = alias_table.get(name) |
|
110 if qualified_name is not None: |
|
111 name = qualified_name |
|
112 check = table.get(name) |
|
113 else: |
|
114 split = name.split(b".", 1) |
|
115 if len(split) == 2: |
|
116 # split[0] can be an alias |
|
117 qualified_name = alias_table.get(split[0]) |
|
118 if qualified_name is not None: |
|
119 name = b"%s.%s" % (qualified_name, split[1]) |
|
120 check = table.get(name) |
|
121 else: |
|
122 qualified_name = name |
|
123 |
|
124 # Maybe it's a subtree in the check hierarchy that does not |
|
125 # have an explicit alias. |
|
126 levels = name.split(b".") |
|
127 if full_pyramid is not None: |
|
128 if not full_pyramid: |
|
129 build_pyramid(table, full_pyramid) |
|
130 |
|
131 pyramid.clear() |
|
132 pyramid.update(full_pyramid.items()) |
|
133 else: |
|
134 build_pyramid(table, pyramid) |
|
135 |
|
136 subtree = pyramid |
|
137 # Find subtree |
|
138 for level in levels: |
|
139 subtree = subtree.get(level) |
|
140 if subtree is None: |
|
141 hint = error.getsimilar(list(alias_table) + list(table), name) |
|
142 hint = error.similarity_hint(hint) |
|
143 |
|
144 raise error.InputError(_(b"unknown check %s" % name), hint=hint) |
|
145 |
|
146 # Get all checks in that subtree |
|
147 if isinstance(subtree, dict): |
|
148 stack = list(subtree.items()) |
|
149 while stack: |
|
150 current_name, entry = stack.pop() |
|
151 if isinstance(entry, dict): |
|
152 stack.extend(entry.items()) |
|
153 else: |
|
154 # (qualified_name, func) |
|
155 checks[entry[0]] = entry[1] |
|
156 else: |
|
157 checks[name] = check |
|
158 |
|
159 return checks |
|
160 |
|
161 |
|
162 def pass_options( |
|
163 ui, |
|
164 checks, |
|
165 options, |
|
166 table=None, |
|
167 alias_table=None, |
|
168 full_pyramid=None, |
|
169 ): |
|
170 """Given a dict of checks (fully qualified name to function), and a list |
|
171 of options as given by the user, pass each option down to the right check |
|
172 function.""" |
|
173 ui.debug(b"passing options to check functions\n") |
|
174 to_modify = collections.defaultdict(dict) |
|
175 |
|
176 if not checks: |
|
177 raise error.Error(_(b"`checks` required")) |
|
178 |
|
179 for option in sorted(options): |
|
180 split = option.split(b":") |
|
181 hint = _( |
|
182 b"syntax is 'check:option=value', " |
|
183 b"eg. revlogs.changelog:copies=yes" |
|
184 ) |
|
185 option_error = error.InputError( |
|
186 _(b"invalid option '%s'") % option, hint=hint |
|
187 ) |
|
188 if len(split) != 2: |
|
189 raise option_error |
|
190 |
|
191 check_name, option_value = split |
|
192 if not option_value: |
|
193 raise option_error |
|
194 |
|
195 split = option_value.split(b"=") |
|
196 if len(split) != 2: |
|
197 raise option_error |
|
198 |
|
199 option_name, value = split |
|
200 if not value: |
|
201 raise option_error |
|
202 |
|
203 path = b"%s:%s" % (check_name, option_name) |
|
204 |
|
205 matching_checks = find_checks( |
|
206 check_name, |
|
207 table=table, |
|
208 alias_table=alias_table, |
|
209 full_pyramid=full_pyramid, |
|
210 ) |
|
211 for name in matching_checks: |
|
212 check = checks.get(name) |
|
213 if check is None: |
|
214 msg = _(b"specified option '%s' for unselected check '%s'\n") |
|
215 raise error.InputError(msg % (name, option_name)) |
|
216 |
|
217 assert hasattr(check, "func") # help Pytype |
|
218 |
|
219 if not hasattr(check.func, "options"): |
|
220 raise error.InputError( |
|
221 _(b"check '%s' has no option '%s'") % (name, option_name) |
|
222 ) |
|
223 |
|
224 try: |
|
225 matching_option = next( |
|
226 (o for o in check.func.options if o[0] == option_name) |
|
227 ) |
|
228 except StopIteration: |
|
229 raise error.InputError( |
|
230 _(b"check '%s' has no option '%s'") % (name, option_name) |
|
231 ) |
|
232 |
|
233 # transform the argument from cli string to the expected Python type |
|
234 _name, typ, _docstring = matching_option |
|
235 |
|
236 as_typed = None |
|
237 if isinstance(typ, bool): |
|
238 as_bool = stringutil.parsebool(value) |
|
239 if as_bool is None: |
|
240 raise error.InputError( |
|
241 _(b"'%s' is not a boolean ('%s')") % (path, value) |
|
242 ) |
|
243 as_typed = as_bool |
|
244 elif isinstance(typ, list): |
|
245 as_list = stringutil.parselist(value) |
|
246 if as_list is None: |
|
247 raise error.InputError( |
|
248 _(b"'%s' is not a list ('%s')") % (path, value) |
|
249 ) |
|
250 as_typed = as_list |
|
251 else: |
|
252 raise error.ProgrammingError(b"unsupported type %s", type(typ)) |
|
253 |
|
254 if option_name in to_modify[name]: |
|
255 raise error.InputError( |
|
256 _(b"duplicated option '%s' for '%s'") % (option_name, name) |
|
257 ) |
|
258 else: |
|
259 assert as_typed is not None |
|
260 to_modify[name][option_name] = as_typed |
|
261 |
|
262 # Manage case where a check is set but without command line options |
|
263 # it will later be set with default check options values |
|
264 for name, f in checks.items(): |
|
265 if name not in to_modify: |
|
266 to_modify[name] = {} |
|
267 |
|
268 # Merge default options with command line options |
|
269 for check_name, cmd_options in to_modify.items(): |
|
270 check = checks.get(check_name) |
|
271 func = checks[check_name] |
|
272 merged_options = {} |
|
273 # help Pytype |
|
274 assert check is not None |
|
275 assert check.func is not None |
|
276 assert hasattr(check.func, "options") |
|
277 |
|
278 if check.func.options: |
|
279 # copy the default value in case it's mutable (list, etc.) |
|
280 merged_options = { |
|
281 o[0]: copy.deepcopy(o[1]) for o in check.func.options |
|
282 } |
|
283 if cmd_options: |
|
284 for k, v in cmd_options.items(): |
|
285 merged_options[k] = v |
|
286 options = pycompat.strkwargs(merged_options) |
|
287 checks[check_name] = functools.partial(func, **options) |
|
288 ui.debug(b"merged options for '%s': '%r'\n" % (check_name, options)) |
|
289 |
|
290 return checks |
|
291 |
|
292 |
|
293 def get_checks( |
|
294 repo, |
|
295 ui, |
|
296 names=None, |
|
297 options=None, |
|
298 table=None, |
|
299 alias_table=None, |
|
300 full_pyramid=None, |
|
301 ): |
|
302 """Given a list of function names and optionally a list of |
|
303 options, return matched checks with merged options (command line options |
|
304 values take precedence on default ones) |
|
305 |
|
306 It runs find checks, then resolve options and returns a dict of matched |
|
307 functions with resolved options. |
|
308 """ |
|
309 funcs = {} |
|
310 |
|
311 if names is None: |
|
312 names = [] |
|
313 |
|
314 if options is None: |
|
315 options = [] |
|
316 |
|
317 # find checks |
|
318 for name in names: |
|
319 matched = find_checks( |
|
320 name, |
|
321 table=table, |
|
322 alias_table=alias_table, |
|
323 full_pyramid=full_pyramid, |
|
324 ) |
|
325 matched_names = b", ".join(matched) |
|
326 ui.debug(b"found checks '%s' for name '%s'\n" % (matched_names, name)) |
|
327 funcs.update(matched) |
|
328 |
|
329 funcs = {n: functools.partial(f, ui, repo) for n, f in funcs.items()} |
|
330 |
|
331 # resolve options |
|
332 checks = pass_options( |
|
333 ui, |
|
334 funcs, |
|
335 options, |
|
336 table=table, |
|
337 alias_table=alias_table, |
|
338 full_pyramid=full_pyramid, |
|
339 ) |
|
340 |
|
341 return checks |