view hgext/git/manifest.py @ 52523:db6efd74cf14

git: add missing `repository.imanifestdict` methods to `gittreemanifest` The next logical step was to explicitly subclass `repository.imanifestdict`, but pytype flagged a few missing methods, because they're marked abstract. File "/mnt/c/Users/Matt/hg/hgext/git/manifest.py", line 238, in copy: Can't instantiate gittreemanifest with abstract methods dirs, fastdelta, set [not-instantiable] File "/mnt/c/Users/Matt/hg/hgext/git/manifest.py", line 287, in read: Can't instantiate gittreemanifest with abstract methods dirs, fastdelta, set [not-instantiable] File "/mnt/c/Users/Matt/hg/hgext/git/manifest.py", line 309, in read: Can't instantiate gittreemanifest with abstract methods dirs, fastdelta, set [not-instantiable] I'm not bothering to figure out how to implement them- add them to appease pytype, and put a TODO to fill in a proper implementation later.
author Matt Harbison <matt_harbison@yahoo.com>
date Wed, 11 Dec 2024 02:02:34 -0500
parents 48cdbd4d5443
children 6412dcec52d3
line wrap: on
line source

from __future__ import annotations

import typing

from typing import (
    Any,
    Iterable,
    Iterator,
    Set,
)

from mercurial import (
    match as matchmod,
    pathutil,
    pycompat,
    util,
)
from mercurial.interfaces import (
    repository,
    util as interfaceutil,
)
from . import gitutil

if typing.TYPE_CHECKING:
    from typing import (
        ByteString,  # TODO: change to Buffer for 3.14
    )

pygit2 = gitutil.get_pygit2()


@interfaceutil.implementer(repository.imanifestdict)
class gittreemanifest:
    """Expose git trees (and optionally a builder's overlay) as a manifestdict.

    Very similar to mercurial.manifest.treemanifest.
    """

    def __init__(self, git_repo, root_tree, pending_changes):
        """Initializer.

        Args:
          git_repo: The git_repo we're walking (required to look up child
              trees).
          root_tree: The root Git tree object for this manifest.
          pending_changes: A dict in which pending changes will be
              tracked. The enclosing memgittreemanifestctx will use this to
              construct any required Tree objects in Git during it's
              `write()` method.
        """
        self._git_repo = git_repo
        self._tree = root_tree
        if pending_changes is None:
            pending_changes = {}
        # dict of path: Optional[Tuple(node, flags)]
        self._pending_changes = pending_changes

    def _resolve_entry(self, path) -> tuple[bytes, bytes]:
        """Given a path, load its node and flags, or raise KeyError if missing.

        This takes into account any pending writes in the builder.
        """
        upath = pycompat.fsdecode(path)
        ent = None
        if path in self._pending_changes:
            val = self._pending_changes[path]
            if val is None:
                raise KeyError
            return val
        t = self._tree
        comps = upath.split('/')
        te = self._tree
        for comp in comps[:-1]:
            te = te[comp]
            t = self._git_repo[te.id]
        ent = t[comps[-1]]
        if ent.filemode == pygit2.GIT_FILEMODE_BLOB:
            flags = b''
        elif ent.filemode == pygit2.GIT_FILEMODE_BLOB_EXECUTABLE:
            flags = b'x'
        elif ent.filemode == pygit2.GIT_FILEMODE_LINK:
            flags = b'l'
        else:
            raise ValueError('unsupported mode %s' % oct(ent.filemode))
        return ent.id.raw, flags

    def __getitem__(self, path: bytes) -> bytes:
        return self._resolve_entry(path)[0]

    def find(self, path: bytes) -> tuple[bytes, bytes]:
        return self._resolve_entry(path)

    def __len__(self) -> int:
        return len(list(self.walk(matchmod.always())))

    def __nonzero__(self) -> bool:
        try:
            next(iter(self))
            return True
        except StopIteration:
            return False

    __bool__ = __nonzero__

    def __contains__(self, path: bytes) -> bool:
        try:
            self._resolve_entry(path)
            return True
        except KeyError:
            return False

    def iterkeys(self) -> Iterator[bytes]:
        return self.walk(matchmod.always())

    def keys(self) -> list[bytes]:
        return list(self.iterkeys())

    def __iter__(self) -> Iterator[bytes]:
        return self.iterkeys()

    def set(self, path: bytes, node: bytes, flags: bytes) -> None:
        raise NotImplementedError  # TODO: implement this

    def __setitem__(self, path: bytes, node: bytes) -> None:
        self._pending_changes[path] = node, self.flags(path)

    def __delitem__(self, path: bytes) -> None:
        # TODO: should probably KeyError for already-deleted  files?
        self._pending_changes[path] = None

    def filesnotin(self, other, match=None) -> Set[bytes]:
        if match is not None:
            match = matchmod.badmatch(match, lambda path, msg: None)
            sm2 = set(other.walk(match))
            return {f for f in self.walk(match) if f not in sm2}
        return {f for f in self if f not in other}

    @util.propertycache
    def _dirs(self):
        return pathutil.dirs(self)

    def dirs(self) -> pathutil.dirs:
        return self._dirs  # TODO: why is there a prpoertycache?

    def hasdir(self, dir: bytes) -> bool:
        return dir in self._dirs

    def diff(
        self,
        other: Any,  # TODO: 'manifestdict' or (better) equivalent interface
        match: Any = lambda x: True,  # TODO: Optional[matchmod.basematcher] = None,
        clean: bool = False,
    ) -> dict[
        bytes,
        tuple[tuple[bytes | None, bytes], tuple[bytes | None, bytes]] | None,
    ]:
        """Finds changes between the current manifest and m2.

        The result is returned as a dict with filename as key and
        values of the form ((n1,fl1),(n2,fl2)), where n1/n2 is the
        nodeid in the current/other manifest and fl1/fl2 is the flag
        in the current/other manifest. Where the file does not exist,
        the nodeid will be None and the flags will be the empty
        string.
        """
        result = {}

        def _iterativediff(t1, t2, subdir):
            """compares two trees and appends new tree nodes to examine to
            the stack"""
            if t1 is None:
                t1 = {}
            if t2 is None:
                t2 = {}

            for e1 in t1:
                realname = subdir + pycompat.fsencode(e1.name)

                if e1.type == pygit2.GIT_OBJ_TREE:
                    try:
                        e2 = t2[e1.name]
                        if e2.type != pygit2.GIT_OBJ_TREE:
                            e2 = None
                    except KeyError:
                        e2 = None

                    stack.append((realname + b'/', e1, e2))
                else:
                    n1, fl1 = self.find(realname)

                    try:
                        e2 = t2[e1.name]
                        n2, fl2 = other.find(realname)
                    except KeyError:
                        e2 = None
                        n2, fl2 = (None, b'')

                    if e2 is not None and e2.type == pygit2.GIT_OBJ_TREE:
                        stack.append((realname + b'/', None, e2))

                    if not match(realname):
                        continue

                    if n1 != n2 or fl1 != fl2:
                        result[realname] = ((n1, fl1), (n2, fl2))
                    elif clean:
                        result[realname] = None

            for e2 in t2:
                if e2.name in t1:
                    continue

                realname = subdir + pycompat.fsencode(e2.name)

                if e2.type == pygit2.GIT_OBJ_TREE:
                    stack.append((realname + b'/', None, e2))
                elif match(realname):
                    n2, fl2 = other.find(realname)
                    result[realname] = ((None, b''), (n2, fl2))

        stack = []
        _iterativediff(self._tree, other._tree, b'')
        while stack:
            subdir, t1, t2 = stack.pop()
            # stack is populated in the function call
            _iterativediff(t1, t2, subdir)

        return result

    def setflag(self, path: bytes, flag: bytes) -> None:
        node, unused_flag = self._resolve_entry(path)
        self._pending_changes[path] = node, flag

    def get(self, path: bytes, default=None) -> bytes | None:
        try:
            return self._resolve_entry(path)[0]
        except KeyError:
            return default

    def flags(self, path: bytes) -> bytes:
        try:
            return self._resolve_entry(path)[1]
        except KeyError:
            return b''

    def copy(self) -> 'gittreemanifest':
        return gittreemanifest(
            self._git_repo, self._tree, dict(self._pending_changes)
        )

    def items(self) -> Iterator[tuple[bytes, bytes]]:
        for f in self:
            # TODO: build a proper iterator version of this
            yield f, self[f]

    def iteritems(self) -> Iterator[tuple[bytes, bytes]]:
        return self.items()

    def iterentries(self) -> Iterator[tuple[bytes, bytes, bytes]]:
        for f in self:
            # TODO: build a proper iterator version of this
            yield f, *self._resolve_entry(f)

    def text(self) -> ByteString:
        # TODO can this method move out of the manifest iface?
        raise NotImplementedError

    def fastdelta(
        self, base: ByteString, changes: Iterable[tuple[bytes, bool]]
    ) -> tuple[ByteString, ByteString]:
        raise NotImplementedError  # TODO: implement this

    def _walkonetree(self, tree, match, subdir):
        for te in tree:
            # TODO: can we prune dir walks with the matcher?
            realname = subdir + pycompat.fsencode(te.name)
            if te.type == pygit2.GIT_OBJ_TREE:
                for inner in self._walkonetree(
                    self._git_repo[te.id], match, realname + b'/'
                ):
                    yield inner
            elif match(realname):
                yield pycompat.fsencode(realname)

    def walk(self, match: matchmod.basematcher) -> Iterator[bytes]:
        # TODO: this is a very lazy way to merge in the pending
        # changes. There is absolutely room for optimization here by
        # being clever about walking over the sets...
        baseline = set(self._walkonetree(self._tree, match, b''))
        deleted = {p for p, v in self._pending_changes.items() if v is None}
        pend = {p for p in self._pending_changes if match(p)}
        return iter(sorted((baseline | pend) - deleted))


class gittreemanifestctx(repository.imanifestrevisionstored):
    def __init__(self, repo, gittree):
        self._repo = repo
        self._tree = gittree

    def read(self):
        return gittreemanifest(self._repo, self._tree, None)

    def readfast(self, shallow: bool = False):
        return self.read()

    def copy(self):
        # NB: it's important that we return a memgittreemanifestctx
        # because the caller expects a mutable manifest.
        return memgittreemanifestctx(self._repo, self._tree)

    def find(self, path: bytes) -> tuple[bytes, bytes]:
        return self.read().find(path)


class memgittreemanifestctx(repository.imanifestrevisionwritable):
    def __init__(self, repo, tree):
        self._repo = repo
        self._tree = tree
        # dict of path: Optional[Tuple(node, flags)]
        self._pending_changes = {}

    def read(self):
        return gittreemanifest(self._repo, self._tree, self._pending_changes)

    def copy(self):
        # TODO: if we have a builder in play, what should happen here?
        # Maybe we can shuffle copy() into the immutable interface.
        return memgittreemanifestctx(self._repo, self._tree)

    def write(self, transaction, link, p1, p2, added, removed, match=None):
        # We're not (for now, anyway) going to audit filenames, so we
        # can ignore added and removed.

        # TODO what does this match argument get used for? hopefully
        # just narrow?
        assert not match or isinstance(match, matchmod.alwaysmatcher)

        touched_dirs = pathutil.dirs(list(self._pending_changes))
        trees = {
            b'': self._tree,
        }
        # path: treebuilder
        builders = {
            b'': self._repo.TreeBuilder(self._tree),
        }
        # get a TreeBuilder for every tree in the touched_dirs set
        for d in sorted(touched_dirs, key=lambda x: (len(x), x)):
            if d == b'':
                # loaded root tree above
                continue
            comps = d.split(b'/')
            full = b''
            for part in comps:
                parent = trees[full]
                try:
                    parent_tree_id = parent[pycompat.fsdecode(part)].id
                    new = self._repo[parent_tree_id]
                except KeyError:
                    # new directory
                    new = None
                full += b'/' + part
                if new is not None:
                    # existing directory
                    trees[full] = new
                    builders[full] = self._repo.TreeBuilder(new)
                else:
                    # new directory, use an empty dict to easily
                    # generate KeyError as any nested new dirs get
                    # created.
                    trees[full] = {}
                    builders[full] = self._repo.TreeBuilder()
        for f, info in self._pending_changes.items():
            if b'/' not in f:
                dirname = b''
                basename = f
            else:
                dirname, basename = f.rsplit(b'/', 1)
                dirname = b'/' + dirname
            if info is None:
                builders[dirname].remove(pycompat.fsdecode(basename))
            else:
                n, fl = info
                mode = {
                    b'': pygit2.GIT_FILEMODE_BLOB,
                    b'x': pygit2.GIT_FILEMODE_BLOB_EXECUTABLE,
                    b'l': pygit2.GIT_FILEMODE_LINK,
                }[fl]
                builders[dirname].insert(
                    pycompat.fsdecode(basename), gitutil.togitnode(n), mode
                )
        # This visits the buffered TreeBuilders in deepest-first
        # order, bubbling up the edits.
        for b in sorted(builders, key=len, reverse=True):
            if b == b'':
                break
            cb = builders[b]
            dn, bn = b.rsplit(b'/', 1)
            builders[dn].insert(
                pycompat.fsdecode(bn), cb.write(), pygit2.GIT_FILEMODE_TREE
            )
        return builders[b''].write().raw