diff mercurial/admin/verify.py @ 50997:752c5a5b73c6

admin-command: add verify command Start using the 'admin' namespace by adding a 'verify' command. Invocation is 'admin::verify'. The idea is to progressively add more focused checks than the existing verify command. To do so we need an advanced way to express what we want to check. The first check for admin::verify is 'working-copy.dirstate' which has no options, because it was an easy first check to implement, which verifies the integrity of the dirstate. This changeset was created with the help of Franck Bret.
author Rapha?l Gom?s <rgomes@octobus.net>
date Wed, 25 Jan 2023 15:34:27 +0100
parents
children dcb00d5c397a
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mercurial/admin/verify.py	Wed Jan 25 15:34:27 2023 +0100
@@ -0,0 +1,341 @@
+# admin/verify.py - better repository integrity checking for Mercurial
+#
+# Copyright 2023 Octobus <contact@octobus.net>
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+import collections
+import copy
+import functools
+
+from ..i18n import _
+from .. import error, pycompat, registrar, requirements
+from ..utils import stringutil
+
+
+verify_table = {}
+verify_alias_table = {}
+check = registrar.verify_check(verify_table, verify_alias_table)
+
+
+# Use this to declare options/aliases in the middle of the hierarchy.
+# Checks like these are not run themselves and cannot have a body.
+# For an example, see the `revlogs` check.
+def noop_func(*args, **kwargs):
+    return
+
+
+@check(b"working-copy.dirstate", alias=b"dirstate")
+def check_dirstate(ui, repo, **options):
+    ui.status(_(b"checking dirstate\n"))
+
+    parent1, parent2 = repo.dirstate.parents()
+    m1 = repo[parent1].manifest()
+    m2 = repo[parent2].manifest()
+    errors = 0
+
+    is_narrow = requirements.NARROW_REQUIREMENT in repo.requirements
+    narrow_matcher = repo.narrowmatch() if is_narrow else None
+
+    for err in repo.dirstate.verify(m1, m2, narrow_matcher):
+        ui.warn(err[0] % err[1:])
+        errors += 1
+
+    return errors
+
+
+# Tree of all checks and their associated function
+pyramid = {}
+
+
+def build_pyramid(table, full_pyramid):
+    """Create a pyramid of checks of the registered checks.
+    It is a name-based hierarchy that can be arbitrarily nested."""
+    for entry, func in sorted(table.items(), key=lambda x: x[0], reverse=True):
+        cursor = full_pyramid
+        levels = entry.split(b".")
+        for level in levels[:-1]:
+            current_node = cursor.setdefault(level, {})
+            cursor = current_node
+        if cursor.get(levels[-1]) is None:
+            cursor[levels[-1]] = (entry, func)
+        elif func is not noop_func:
+            m = b"intermediate checks need to use `verify.noop_func`"
+            raise error.ProgrammingError(m)
+
+
+def find_checks(name, table=None, alias_table=None, full_pyramid=None):
+    """Find all checks for a given name and returns a dict of
+    (qualified_check_name, check_function)
+
+    # Examples
+
+    Using a full qualified name:
+    "working-copy.dirstate" -> {
+        "working-copy.dirstate": CF,
+    }
+
+    Using a *prefix* of a qualified name:
+    "store.revlogs" -> {
+        "store.revlogs.changelog": CF,
+        "store.revlogs.manifestlog": CF,
+        "store.revlogs.filelog": CF,
+    }
+
+    Using a defined alias:
+    "revlogs" -> {
+        "store.revlogs.changelog": CF,
+        "store.revlogs.manifestlog": CF,
+        "store.revlogs.filelog": CF,
+    }
+
+    Using something that is none of the above will be an error.
+    """
+    if table is None:
+        table = verify_table
+    if alias_table is None:
+        alias_table = verify_alias_table
+
+    if name == b"full":
+        return table
+    checks = {}
+
+    # is it a full name?
+    check = table.get(name)
+
+    if check is None:
+        # is it an alias?
+        qualified_name = alias_table.get(name)
+        if qualified_name is not None:
+            name = qualified_name
+            check = table.get(name)
+        else:
+            split = name.split(b".", 1)
+            if len(split) == 2:
+                # split[0] can be an alias
+                qualified_name = alias_table.get(split[0])
+                if qualified_name is not None:
+                    name = b"%s.%s" % (qualified_name, split[1])
+                    check = table.get(name)
+    else:
+        qualified_name = name
+
+    # Maybe it's a subtree in the check hierarchy that does not
+    # have an explicit alias.
+    levels = name.split(b".")
+    if full_pyramid is not None:
+        if not full_pyramid:
+            build_pyramid(table, full_pyramid)
+
+        pyramid.clear()
+        pyramid.update(full_pyramid.items())
+    else:
+        build_pyramid(table, pyramid)
+
+    subtree = pyramid
+    # Find subtree
+    for level in levels:
+        subtree = subtree.get(level)
+        if subtree is None:
+            hint = error.getsimilar(list(alias_table) + list(table), name)
+            hint = error.similarity_hint(hint)
+
+            raise error.InputError(_(b"unknown check %s" % name), hint=hint)
+
+    # Get all checks in that subtree
+    if isinstance(subtree, dict):
+        stack = list(subtree.items())
+        while stack:
+            current_name, entry = stack.pop()
+            if isinstance(entry, dict):
+                stack.extend(entry.items())
+            else:
+                # (qualified_name, func)
+                checks[entry[0]] = entry[1]
+    else:
+        checks[name] = check
+
+    return checks
+
+
+def pass_options(
+    ui,
+    checks,
+    options,
+    table=None,
+    alias_table=None,
+    full_pyramid=None,
+):
+    """Given a dict of checks (fully qualified name to function), and a list
+    of options as given by the user, pass each option down to the right check
+    function."""
+    ui.debug(b"passing options to check functions\n")
+    to_modify = collections.defaultdict(dict)
+
+    if not checks:
+        raise error.Error(_(b"`checks` required"))
+
+    for option in sorted(options):
+        split = option.split(b":")
+        hint = _(
+            b"syntax is 'check:option=value', "
+            b"eg. revlogs.changelog:copies=yes"
+        )
+        option_error = error.InputError(
+            _(b"invalid option '%s'") % option, hint=hint
+        )
+        if len(split) != 2:
+            raise option_error
+
+        check_name, option_value = split
+        if not option_value:
+            raise option_error
+
+        split = option_value.split(b"=")
+        if len(split) != 2:
+            raise option_error
+
+        option_name, value = split
+        if not value:
+            raise option_error
+
+        path = b"%s:%s" % (check_name, option_name)
+
+        matching_checks = find_checks(
+            check_name,
+            table=table,
+            alias_table=alias_table,
+            full_pyramid=full_pyramid,
+        )
+        for name in matching_checks:
+            check = checks.get(name)
+            if check is None:
+                msg = _(b"specified option '%s' for unselected check '%s'\n")
+                raise error.InputError(msg % (name, option_name))
+
+            assert hasattr(check, "func")  # help Pytype
+
+            if not hasattr(check.func, "options"):
+                raise error.InputError(
+                    _(b"check '%s' has no option '%s'") % (name, option_name)
+                )
+
+            try:
+                matching_option = next(
+                    (o for o in check.func.options if o[0] == option_name)
+                )
+            except StopIteration:
+                raise error.InputError(
+                    _(b"check '%s' has no option '%s'") % (name, option_name)
+                )
+
+            # transform the argument from cli string to the expected Python type
+            _name, typ, _docstring = matching_option
+
+            as_typed = None
+            if isinstance(typ, bool):
+                as_bool = stringutil.parsebool(value)
+                if as_bool is None:
+                    raise error.InputError(
+                        _(b"'%s' is not a boolean ('%s')") % (path, value)
+                    )
+                as_typed = as_bool
+            elif isinstance(typ, list):
+                as_list = stringutil.parselist(value)
+                if as_list is None:
+                    raise error.InputError(
+                        _(b"'%s' is not a list ('%s')") % (path, value)
+                    )
+                as_typed = as_list
+            else:
+                raise error.ProgrammingError(b"unsupported type %s", type(typ))
+
+            if option_name in to_modify[name]:
+                raise error.InputError(
+                    _(b"duplicated option '%s' for '%s'") % (option_name, name)
+                )
+            else:
+                assert as_typed is not None
+                to_modify[name][option_name] = as_typed
+
+    # Manage case where a check is set but without command line options
+    # it will later be set with default check options values
+    for name, f in checks.items():
+        if name not in to_modify:
+            to_modify[name] = {}
+
+    # Merge default options with command line options
+    for check_name, cmd_options in to_modify.items():
+        check = checks.get(check_name)
+        func = checks[check_name]
+        merged_options = {}
+        # help Pytype
+        assert check is not None
+        assert check.func is not None
+        assert hasattr(check.func, "options")
+
+        if check.func.options:
+            # copy the default value in case it's mutable (list, etc.)
+            merged_options = {
+                o[0]: copy.deepcopy(o[1]) for o in check.func.options
+            }
+            if cmd_options:
+                for k, v in cmd_options.items():
+                    merged_options[k] = v
+        options = pycompat.strkwargs(merged_options)
+        checks[check_name] = functools.partial(func, **options)
+        ui.debug(b"merged options for '%s': '%r'\n" % (check_name, options))
+
+    return checks
+
+
+def get_checks(
+    repo,
+    ui,
+    names=None,
+    options=None,
+    table=None,
+    alias_table=None,
+    full_pyramid=None,
+):
+    """Given a list of function names and optionally a list of
+    options, return matched checks with merged options (command line options
+    values take precedence on default ones)
+
+    It runs find checks, then resolve options and returns a dict of matched
+    functions with resolved options.
+    """
+    funcs = {}
+
+    if names is None:
+        names = []
+
+    if options is None:
+        options = []
+
+    # find checks
+    for name in names:
+        matched = find_checks(
+            name,
+            table=table,
+            alias_table=alias_table,
+            full_pyramid=full_pyramid,
+        )
+        matched_names = b", ".join(matched)
+        ui.debug(b"found checks '%s' for name '%s'\n" % (matched_names, name))
+        funcs.update(matched)
+
+    funcs = {n: functools.partial(f, ui, repo) for n, f in funcs.items()}
+
+    # resolve options
+    checks = pass_options(
+        ui,
+        funcs,
+        options,
+        table=table,
+        alias_table=alias_table,
+        full_pyramid=full_pyramid,
+    )
+
+    return checks