tests/test-fastannotate-revmap.py
changeset 39210 1ddb296e0dee
child 39250 659f010ffa7e
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/test-fastannotate-revmap.py	Mon Jul 30 22:50:00 2018 -0400
@@ -0,0 +1,191 @@
+from __future__ import absolute_import, print_function
+
+import os
+import tempfile
+
+from mercurial import util
+from hgext.fastannotate import error, revmap
+
+def genhsh(i):
+    return chr(i) + b'\0' * 19
+
+def gettemppath():
+    fd, path = tempfile.mkstemp()
+    os.unlink(path)
+    os.close(fd)
+    return path
+
+def ensure(condition):
+    if not condition:
+        raise RuntimeError('Unexpected')
+
+def testbasicreadwrite():
+    path = gettemppath()
+
+    rm = revmap.revmap(path)
+    ensure(rm.maxrev == 0)
+    for i in xrange(5):
+        ensure(rm.rev2hsh(i) is None)
+    ensure(rm.hsh2rev(b'\0' * 20) is None)
+
+    paths = ['', 'a', None, 'b', 'b', 'c', 'c', None, 'a', 'b', 'a', 'a']
+    for i in xrange(1, 5):
+        ensure(rm.append(genhsh(i), sidebranch=(i & 1), path=paths[i]) == i)
+
+    ensure(rm.maxrev == 4)
+    for i in xrange(1, 5):
+        ensure(rm.hsh2rev(genhsh(i)) == i)
+        ensure(rm.rev2hsh(i) == genhsh(i))
+
+    # re-load and verify
+    rm.flush()
+    rm = revmap.revmap(path)
+    ensure(rm.maxrev == 4)
+    for i in xrange(1, 5):
+        ensure(rm.hsh2rev(genhsh(i)) == i)
+        ensure(rm.rev2hsh(i) == genhsh(i))
+        ensure(bool(rm.rev2flag(i) & revmap.sidebranchflag) == bool(i & 1))
+
+    # append without calling save() explicitly
+    for i in xrange(5, 12):
+        ensure(rm.append(genhsh(i), sidebranch=(i & 1), path=paths[i],
+                         flush=True) == i)
+
+    # re-load and verify
+    rm = revmap.revmap(path)
+    ensure(rm.maxrev == 11)
+    for i in xrange(1, 12):
+        ensure(rm.hsh2rev(genhsh(i)) == i)
+        ensure(rm.rev2hsh(i) == genhsh(i))
+        ensure(rm.rev2path(i) == paths[i] or paths[i - 1])
+        ensure(bool(rm.rev2flag(i) & revmap.sidebranchflag) == bool(i & 1))
+
+    os.unlink(path)
+
+    # missing keys
+    ensure(rm.rev2hsh(12) is None)
+    ensure(rm.rev2hsh(0) is None)
+    ensure(rm.rev2hsh(-1) is None)
+    ensure(rm.rev2flag(12) is None)
+    ensure(rm.rev2path(12) is None)
+    ensure(rm.hsh2rev(b'\1' * 20) is None)
+
+    # illformed hash (not 20 bytes)
+    try:
+        rm.append(b'\0')
+        ensure(False)
+    except Exception:
+        pass
+
+def testcorruptformat():
+    path = gettemppath()
+
+    # incorrect header
+    with open(path, 'w') as f:
+        f.write(b'NOT A VALID HEADER')
+    try:
+        revmap.revmap(path)
+        ensure(False)
+    except error.CorruptedFileError:
+        pass
+
+    # rewrite the file
+    os.unlink(path)
+    rm = revmap.revmap(path)
+    rm.append(genhsh(0), flush=True)
+
+    rm = revmap.revmap(path)
+    ensure(rm.maxrev == 1)
+
+    # corrupt the file by appending a byte
+    size = os.stat(path).st_size
+    with open(path, 'a') as f:
+        f.write('\xff')
+    try:
+        revmap.revmap(path)
+        ensure(False)
+    except error.CorruptedFileError:
+        pass
+
+    # corrupt the file by removing the last byte
+    ensure(size > 0)
+    with open(path, 'w') as f:
+        f.truncate(size - 1)
+    try:
+        revmap.revmap(path)
+        ensure(False)
+    except error.CorruptedFileError:
+        pass
+
+    os.unlink(path)
+
+def testcopyfrom():
+    path = gettemppath()
+    rm = revmap.revmap(path)
+    for i in xrange(1, 10):
+        ensure(rm.append(genhsh(i), sidebranch=(i & 1), path=str(i // 3)) == i)
+    rm.flush()
+
+    # copy rm to rm2
+    rm2 = revmap.revmap()
+    rm2.copyfrom(rm)
+    path2 = gettemppath()
+    rm2.path = path2
+    rm2.flush()
+
+    # two files should be the same
+    ensure(len(set(util.readfile(p) for p in [path, path2])) == 1)
+
+    os.unlink(path)
+    os.unlink(path2)
+
+class fakefctx(object):
+    def __init__(self, node, path=None):
+        self._node = node
+        self._path = path
+
+    def node(self):
+        return self._node
+
+    def path(self):
+        return self._path
+
+def testcontains():
+    path = gettemppath()
+
+    rm = revmap.revmap(path)
+    for i in xrange(1, 5):
+        ensure(rm.append(genhsh(i), sidebranch=(i & 1)) == i)
+
+    for i in xrange(1, 5):
+        ensure(((genhsh(i), None) in rm) == ((i & 1) == 0))
+        ensure((fakefctx(genhsh(i)) in rm) == ((i & 1) == 0))
+    for i in xrange(5, 10):
+        ensure(fakefctx(genhsh(i)) not in rm)
+        ensure((genhsh(i), None) not in rm)
+
+    # "contains" checks paths
+    rm = revmap.revmap()
+    for i in xrange(1, 5):
+        ensure(rm.append(genhsh(i), path=str(i // 2)) == i)
+    for i in xrange(1, 5):
+        ensure(fakefctx(genhsh(i), path=str(i // 2)) in rm)
+        ensure(fakefctx(genhsh(i), path='a') not in rm)
+
+def testlastnode():
+    path = gettemppath()
+    ensure(revmap.getlastnode(path) is None)
+    rm = revmap.revmap(path)
+    ensure(revmap.getlastnode(path) is None)
+    for i in xrange(1, 10):
+        hsh = genhsh(i)
+        rm.append(hsh, path=str(i // 2), flush=True)
+        ensure(revmap.getlastnode(path) == hsh)
+        rm2 = revmap.revmap(path)
+        ensure(rm2.rev2hsh(rm2.maxrev) == hsh)
+
+testbasicreadwrite()
+testcorruptformat()
+testcopyfrom()
+testcontains()
+testlastnode()