view tests/test-rust-ancestor.py @ 52413:20fe0bf9a9a5

rust-pyo3: dagop submodule implementation This is the first demonstration that the passing of objects through the Python interpreter from `rustext` to `pyo3_rustext` actually works. In the Python tests, we conflate the presence of the `pyo3_rustext` package with that of `rustext`, as we do not plan to support building one and not the other (we hope to convert fully to PyO3 soon). The skipping is actually done by the base test class. The implementation of `rank` is as trivial as it was in `hg-cpython`, yet it required to be able to convert exceptions from `vcsgraph`.
author Georges Racinet <georges.racinet@cloudcrane.io>
date Sat, 30 Nov 2024 20:58:09 +0100
parents cf5b47b885b1
children 3ffcdbf0b432
line wrap: on
line source

import sys

from mercurial.node import wdirrev

from mercurial.testing import revlog as revlogtesting

try:
    from mercurial import pyo3_rustext, rustext

    rustext.__name__  # trigger immediate actual import
    pyo3_rustext.__name__
except ImportError:
    rustext = pyo3_rustext = None
else:
    # this would fail already without appropriate ancestor.__package__
    from mercurial.rustext.ancestor import (
        AncestorsIterator,
        LazyAncestors,
        MissingAncestors,
    )
    from mercurial.rustext import dagop

try:
    from mercurial.cext import parsers as cparsers
except ImportError:
    cparsers = None


class rustancestorstest(revlogtesting.RustRevlogBasedTestBase):
    """Test the correctness of binding to Rust code.

    This test is merely for the binding to Rust itself: extraction of
    Python variable, giving back the results etc.

    It is not meant to test the algorithmic correctness of the operations
    on ancestors it provides. Hence the very simple embedded index data is
    good enough.

    Algorithmic correctness is asserted by the Rust unit tests.
    """

    def testiteratorrevlist(self):
        idx = self.parserustindex()
        # checking test assumption about the index binary data:
        self.assertEqual(
            {i: (r[5], r[6]) for i, r in enumerate(idx)},
            {0: (-1, -1), 1: (0, -1), 2: (1, -1), 3: (2, -1)},
        )
        ait = AncestorsIterator(idx, [3], 0, True)
        self.assertEqual([r for r in ait], [3, 2, 1, 0])

        ait = AncestorsIterator(idx, [3], 0, False)
        self.assertEqual([r for r in ait], [2, 1, 0])

    def testlazyancestors(self):
        idx = self.parserustindex()
        start_count = sys.getrefcount(idx.inner)  # should be 2 (see Python doc)
        self.assertEqual(
            {i: (r[5], r[6]) for i, r in enumerate(idx)},
            {0: (-1, -1), 1: (0, -1), 2: (1, -1), 3: (2, -1)},
        )
        lazy = LazyAncestors(idx, [3], 0, True)
        # the LazyAncestors instance holds just one reference to the
        # inner revlog.
        self.assertEqual(sys.getrefcount(idx.inner), start_count + 1)

        self.assertTrue(2 in lazy)
        self.assertTrue(bool(lazy))
        self.assertEqual(list(lazy), [3, 2, 1, 0])
        # a second time to validate that we spawn new iterators
        self.assertEqual(list(lazy), [3, 2, 1, 0])

        # now let's watch the refcounts closer
        ait = iter(lazy)
        self.assertEqual(sys.getrefcount(idx.inner), start_count + 2)
        del ait
        self.assertEqual(sys.getrefcount(idx.inner), start_count + 1)
        del lazy
        self.assertEqual(sys.getrefcount(idx.inner), start_count)

        # let's check bool for an empty one
        self.assertFalse(LazyAncestors(idx, [0], 0, False))

    def testmissingancestors(self):
        idx = self.parserustindex()
        missanc = MissingAncestors(idx, [1])
        self.assertTrue(missanc.hasbases())
        self.assertEqual(missanc.missingancestors([3]), [2, 3])
        missanc.addbases({2})
        self.assertEqual(missanc.bases(), {1, 2})
        self.assertEqual(missanc.missingancestors([3]), [3])
        self.assertEqual(missanc.basesheads(), {2})

    def testmissingancestorsremove(self):
        idx = self.parserustindex()
        missanc = MissingAncestors(idx, [1])
        revs = {0, 1, 2, 3}
        missanc.removeancestorsfrom(revs)
        self.assertEqual(revs, {2, 3})

    def testrefcount(self):
        idx = self.parserustindex()
        start_count = sys.getrefcount(idx.inner)

        # refcount increases upon iterator init...
        ait = AncestorsIterator(idx, [3], 0, True)
        self.assertEqual(sys.getrefcount(idx.inner), start_count + 1)
        self.assertEqual(next(ait), 3)

        # and decreases once the iterator is removed
        del ait
        self.assertEqual(sys.getrefcount(idx.inner), start_count)

        # and removing ref to the index after iterator init is no issue
        ait = AncestorsIterator(idx, [3], 0, True)
        del idx
        self.assertEqual(list(ait), [3, 2, 1, 0])

        # the index is not tracked by the GC, hence there is nothing more
        # we can assert to check that it is properly deleted once its refcount
        # drops to 0

    def testgrapherror(self):
        data = (
            revlogtesting.data_non_inlined[: 64 + 27]
            + b'\xf2'
            + revlogtesting.data_non_inlined[64 + 28 :]
        )
        idx = self.parserustindex(data=data)
        with self.assertRaises(rustext.GraphError) as arc:
            AncestorsIterator(idx, [1], -1, False)
        exc = arc.exception
        self.assertIsInstance(exc, ValueError)
        # rust-cpython issues appropriate str instances for Python 2 and 3
        self.assertEqual(exc.args, ('ParentOutOfRange', 1))

    def testwdirunsupported(self):
        # trying to access ancestors of the working directory raises
        idx = self.parserustindex()
        with self.assertRaises(rustext.GraphError) as arc:
            list(AncestorsIterator(idx, [wdirrev], -1, False))

        exc = arc.exception
        self.assertIsInstance(exc, ValueError)
        # rust-cpython issues appropriate str instances for Python 2 and 3
        self.assertEqual(exc.args, ('InvalidRevision', wdirrev))

    def testheadrevs(self):
        idx = self.parserustindex()
        self.assertEqual(dagop.headrevs(idx, [1, 2, 3]), {3})

    def testpyo3_headrevs(self):
        idx = self.parserustindex()
        self.assertEqual(pyo3_rustext.dagop.headrevs(idx, [1, 2, 3]), {3})

    def testpyo3_rank(self):
        idx = self.parserustindex()
        try:
            pyo3_rustext.dagop.rank(idx, 1, 2)
        except pyo3_rustext.GraphError as exc:
            self.assertEqual(exc.args, ("InconsistentGraphData",))


if __name__ == '__main__':
    import silenttestrunner

    silenttestrunner.main(__name__)