mercurial/admin/verify.py
changeset 50986 752c5a5b73c6
child 51610 dcb00d5c397a
equal deleted inserted replaced
50985:cf47b83d8ad0 50986:752c5a5b73c6
       
     1 # admin/verify.py - better repository integrity checking for Mercurial
       
     2 #
       
     3 # Copyright 2023 Octobus <contact@octobus.net>
       
     4 #
       
     5 # This software may be used and distributed according to the terms of the
       
     6 # GNU General Public License version 2 or any later version.
       
     7 
       
     8 import collections
       
     9 import copy
       
    10 import functools
       
    11 
       
    12 from ..i18n import _
       
    13 from .. import error, pycompat, registrar, requirements
       
    14 from ..utils import stringutil
       
    15 
       
    16 
       
    17 verify_table = {}
       
    18 verify_alias_table = {}
       
    19 check = registrar.verify_check(verify_table, verify_alias_table)
       
    20 
       
    21 
       
    22 # Use this to declare options/aliases in the middle of the hierarchy.
       
    23 # Checks like these are not run themselves and cannot have a body.
       
    24 # For an example, see the `revlogs` check.
       
    25 def noop_func(*args, **kwargs):
       
    26     return
       
    27 
       
    28 
       
    29 @check(b"working-copy.dirstate", alias=b"dirstate")
       
    30 def check_dirstate(ui, repo, **options):
       
    31     ui.status(_(b"checking dirstate\n"))
       
    32 
       
    33     parent1, parent2 = repo.dirstate.parents()
       
    34     m1 = repo[parent1].manifest()
       
    35     m2 = repo[parent2].manifest()
       
    36     errors = 0
       
    37 
       
    38     is_narrow = requirements.NARROW_REQUIREMENT in repo.requirements
       
    39     narrow_matcher = repo.narrowmatch() if is_narrow else None
       
    40 
       
    41     for err in repo.dirstate.verify(m1, m2, narrow_matcher):
       
    42         ui.warn(err[0] % err[1:])
       
    43         errors += 1
       
    44 
       
    45     return errors
       
    46 
       
    47 
       
    48 # Tree of all checks and their associated function
       
    49 pyramid = {}
       
    50 
       
    51 
       
    52 def build_pyramid(table, full_pyramid):
       
    53     """Create a pyramid of checks of the registered checks.
       
    54     It is a name-based hierarchy that can be arbitrarily nested."""
       
    55     for entry, func in sorted(table.items(), key=lambda x: x[0], reverse=True):
       
    56         cursor = full_pyramid
       
    57         levels = entry.split(b".")
       
    58         for level in levels[:-1]:
       
    59             current_node = cursor.setdefault(level, {})
       
    60             cursor = current_node
       
    61         if cursor.get(levels[-1]) is None:
       
    62             cursor[levels[-1]] = (entry, func)
       
    63         elif func is not noop_func:
       
    64             m = b"intermediate checks need to use `verify.noop_func`"
       
    65             raise error.ProgrammingError(m)
       
    66 
       
    67 
       
    68 def find_checks(name, table=None, alias_table=None, full_pyramid=None):
       
    69     """Find all checks for a given name and returns a dict of
       
    70     (qualified_check_name, check_function)
       
    71 
       
    72     # Examples
       
    73 
       
    74     Using a full qualified name:
       
    75     "working-copy.dirstate" -> {
       
    76         "working-copy.dirstate": CF,
       
    77     }
       
    78 
       
    79     Using a *prefix* of a qualified name:
       
    80     "store.revlogs" -> {
       
    81         "store.revlogs.changelog": CF,
       
    82         "store.revlogs.manifestlog": CF,
       
    83         "store.revlogs.filelog": CF,
       
    84     }
       
    85 
       
    86     Using a defined alias:
       
    87     "revlogs" -> {
       
    88         "store.revlogs.changelog": CF,
       
    89         "store.revlogs.manifestlog": CF,
       
    90         "store.revlogs.filelog": CF,
       
    91     }
       
    92 
       
    93     Using something that is none of the above will be an error.
       
    94     """
       
    95     if table is None:
       
    96         table = verify_table
       
    97     if alias_table is None:
       
    98         alias_table = verify_alias_table
       
    99 
       
   100     if name == b"full":
       
   101         return table
       
   102     checks = {}
       
   103 
       
   104     # is it a full name?
       
   105     check = table.get(name)
       
   106 
       
   107     if check is None:
       
   108         # is it an alias?
       
   109         qualified_name = alias_table.get(name)
       
   110         if qualified_name is not None:
       
   111             name = qualified_name
       
   112             check = table.get(name)
       
   113         else:
       
   114             split = name.split(b".", 1)
       
   115             if len(split) == 2:
       
   116                 # split[0] can be an alias
       
   117                 qualified_name = alias_table.get(split[0])
       
   118                 if qualified_name is not None:
       
   119                     name = b"%s.%s" % (qualified_name, split[1])
       
   120                     check = table.get(name)
       
   121     else:
       
   122         qualified_name = name
       
   123 
       
   124     # Maybe it's a subtree in the check hierarchy that does not
       
   125     # have an explicit alias.
       
   126     levels = name.split(b".")
       
   127     if full_pyramid is not None:
       
   128         if not full_pyramid:
       
   129             build_pyramid(table, full_pyramid)
       
   130 
       
   131         pyramid.clear()
       
   132         pyramid.update(full_pyramid.items())
       
   133     else:
       
   134         build_pyramid(table, pyramid)
       
   135 
       
   136     subtree = pyramid
       
   137     # Find subtree
       
   138     for level in levels:
       
   139         subtree = subtree.get(level)
       
   140         if subtree is None:
       
   141             hint = error.getsimilar(list(alias_table) + list(table), name)
       
   142             hint = error.similarity_hint(hint)
       
   143 
       
   144             raise error.InputError(_(b"unknown check %s" % name), hint=hint)
       
   145 
       
   146     # Get all checks in that subtree
       
   147     if isinstance(subtree, dict):
       
   148         stack = list(subtree.items())
       
   149         while stack:
       
   150             current_name, entry = stack.pop()
       
   151             if isinstance(entry, dict):
       
   152                 stack.extend(entry.items())
       
   153             else:
       
   154                 # (qualified_name, func)
       
   155                 checks[entry[0]] = entry[1]
       
   156     else:
       
   157         checks[name] = check
       
   158 
       
   159     return checks
       
   160 
       
   161 
       
   162 def pass_options(
       
   163     ui,
       
   164     checks,
       
   165     options,
       
   166     table=None,
       
   167     alias_table=None,
       
   168     full_pyramid=None,
       
   169 ):
       
   170     """Given a dict of checks (fully qualified name to function), and a list
       
   171     of options as given by the user, pass each option down to the right check
       
   172     function."""
       
   173     ui.debug(b"passing options to check functions\n")
       
   174     to_modify = collections.defaultdict(dict)
       
   175 
       
   176     if not checks:
       
   177         raise error.Error(_(b"`checks` required"))
       
   178 
       
   179     for option in sorted(options):
       
   180         split = option.split(b":")
       
   181         hint = _(
       
   182             b"syntax is 'check:option=value', "
       
   183             b"eg. revlogs.changelog:copies=yes"
       
   184         )
       
   185         option_error = error.InputError(
       
   186             _(b"invalid option '%s'") % option, hint=hint
       
   187         )
       
   188         if len(split) != 2:
       
   189             raise option_error
       
   190 
       
   191         check_name, option_value = split
       
   192         if not option_value:
       
   193             raise option_error
       
   194 
       
   195         split = option_value.split(b"=")
       
   196         if len(split) != 2:
       
   197             raise option_error
       
   198 
       
   199         option_name, value = split
       
   200         if not value:
       
   201             raise option_error
       
   202 
       
   203         path = b"%s:%s" % (check_name, option_name)
       
   204 
       
   205         matching_checks = find_checks(
       
   206             check_name,
       
   207             table=table,
       
   208             alias_table=alias_table,
       
   209             full_pyramid=full_pyramid,
       
   210         )
       
   211         for name in matching_checks:
       
   212             check = checks.get(name)
       
   213             if check is None:
       
   214                 msg = _(b"specified option '%s' for unselected check '%s'\n")
       
   215                 raise error.InputError(msg % (name, option_name))
       
   216 
       
   217             assert hasattr(check, "func")  # help Pytype
       
   218 
       
   219             if not hasattr(check.func, "options"):
       
   220                 raise error.InputError(
       
   221                     _(b"check '%s' has no option '%s'") % (name, option_name)
       
   222                 )
       
   223 
       
   224             try:
       
   225                 matching_option = next(
       
   226                     (o for o in check.func.options if o[0] == option_name)
       
   227                 )
       
   228             except StopIteration:
       
   229                 raise error.InputError(
       
   230                     _(b"check '%s' has no option '%s'") % (name, option_name)
       
   231                 )
       
   232 
       
   233             # transform the argument from cli string to the expected Python type
       
   234             _name, typ, _docstring = matching_option
       
   235 
       
   236             as_typed = None
       
   237             if isinstance(typ, bool):
       
   238                 as_bool = stringutil.parsebool(value)
       
   239                 if as_bool is None:
       
   240                     raise error.InputError(
       
   241                         _(b"'%s' is not a boolean ('%s')") % (path, value)
       
   242                     )
       
   243                 as_typed = as_bool
       
   244             elif isinstance(typ, list):
       
   245                 as_list = stringutil.parselist(value)
       
   246                 if as_list is None:
       
   247                     raise error.InputError(
       
   248                         _(b"'%s' is not a list ('%s')") % (path, value)
       
   249                     )
       
   250                 as_typed = as_list
       
   251             else:
       
   252                 raise error.ProgrammingError(b"unsupported type %s", type(typ))
       
   253 
       
   254             if option_name in to_modify[name]:
       
   255                 raise error.InputError(
       
   256                     _(b"duplicated option '%s' for '%s'") % (option_name, name)
       
   257                 )
       
   258             else:
       
   259                 assert as_typed is not None
       
   260                 to_modify[name][option_name] = as_typed
       
   261 
       
   262     # Manage case where a check is set but without command line options
       
   263     # it will later be set with default check options values
       
   264     for name, f in checks.items():
       
   265         if name not in to_modify:
       
   266             to_modify[name] = {}
       
   267 
       
   268     # Merge default options with command line options
       
   269     for check_name, cmd_options in to_modify.items():
       
   270         check = checks.get(check_name)
       
   271         func = checks[check_name]
       
   272         merged_options = {}
       
   273         # help Pytype
       
   274         assert check is not None
       
   275         assert check.func is not None
       
   276         assert hasattr(check.func, "options")
       
   277 
       
   278         if check.func.options:
       
   279             # copy the default value in case it's mutable (list, etc.)
       
   280             merged_options = {
       
   281                 o[0]: copy.deepcopy(o[1]) for o in check.func.options
       
   282             }
       
   283             if cmd_options:
       
   284                 for k, v in cmd_options.items():
       
   285                     merged_options[k] = v
       
   286         options = pycompat.strkwargs(merged_options)
       
   287         checks[check_name] = functools.partial(func, **options)
       
   288         ui.debug(b"merged options for '%s': '%r'\n" % (check_name, options))
       
   289 
       
   290     return checks
       
   291 
       
   292 
       
   293 def get_checks(
       
   294     repo,
       
   295     ui,
       
   296     names=None,
       
   297     options=None,
       
   298     table=None,
       
   299     alias_table=None,
       
   300     full_pyramid=None,
       
   301 ):
       
   302     """Given a list of function names and optionally a list of
       
   303     options, return matched checks with merged options (command line options
       
   304     values take precedence on default ones)
       
   305 
       
   306     It runs find checks, then resolve options and returns a dict of matched
       
   307     functions with resolved options.
       
   308     """
       
   309     funcs = {}
       
   310 
       
   311     if names is None:
       
   312         names = []
       
   313 
       
   314     if options is None:
       
   315         options = []
       
   316 
       
   317     # find checks
       
   318     for name in names:
       
   319         matched = find_checks(
       
   320             name,
       
   321             table=table,
       
   322             alias_table=alias_table,
       
   323             full_pyramid=full_pyramid,
       
   324         )
       
   325         matched_names = b", ".join(matched)
       
   326         ui.debug(b"found checks '%s' for name '%s'\n" % (matched_names, name))
       
   327         funcs.update(matched)
       
   328 
       
   329     funcs = {n: functools.partial(f, ui, repo) for n, f in funcs.items()}
       
   330 
       
   331     # resolve options
       
   332     checks = pass_options(
       
   333         ui,
       
   334         funcs,
       
   335         options,
       
   336         table=table,
       
   337         alias_table=alias_table,
       
   338         full_pyramid=full_pyramid,
       
   339     )
       
   340 
       
   341     return checks