mercurial/shelve.py
changeset 51812 a1a94d488e14
parent 51809 54b1a3738530
child 51813 adbfbbf9963f
--- a/mercurial/shelve.py	Tue Aug 20 18:30:47 2024 -0400
+++ b/mercurial/shelve.py	Tue Aug 20 22:34:51 2024 -0400
@@ -26,6 +26,16 @@
 import itertools
 import stat
 
+from typing import (
+    Any,
+    Dict,
+    Iterable,
+    Iterator,
+    List,
+    Sequence,
+    Tuple,
+)
+
 from .i18n import _
 from .node import (
     bin,
@@ -37,6 +47,7 @@
     bundle2,
     changegroup,
     cmdutil,
+    context as contextmod,
     discovery,
     error,
     exchange,
@@ -69,16 +80,16 @@
 
 
 class ShelfDir:
-    def __init__(self, repo, for_backups=False):
+    def __init__(self, repo, for_backups: bool = False) -> None:
         if for_backups:
             self.vfs = vfsmod.vfs(repo.vfs.join(backupdir))
         else:
             self.vfs = vfsmod.vfs(repo.vfs.join(shelvedir))
 
-    def get(self, name):
+    def get(self, name: bytes) -> "Shelf":
         return Shelf(self.vfs, name)
 
-    def listshelves(self):
+    def listshelves(self) -> List[Tuple[float, bytes]]:
         """return all shelves in repo as list of (time, name)"""
         try:
             names = self.vfs.listdir()
@@ -99,14 +110,14 @@
         return sorted(info, reverse=True)
 
 
-def _use_internal_phase(repo):
+def _use_internal_phase(repo) -> bool:
     return (
         phases.supportinternal(repo)
         and repo.ui.config(b'shelve', b'store') == b'internal'
     )
 
 
-def _target_phase(repo):
+def _target_phase(repo) -> int:
     return phases.internal if _use_internal_phase(repo) else phases.secret
 
 
@@ -118,29 +129,29 @@
     differences and lets you work with the shelf as a whole.
     """
 
-    def __init__(self, vfs, name):
+    def __init__(self, vfs: vfsmod.vfs, name: bytes) -> None:
         self.vfs = vfs
         self.name = name
 
-    def exists(self):
+    def exists(self) -> bool:
         return self._exists(b'.shelve') or self._exists(b'.patch', b'.hg')
 
-    def _exists(self, *exts):
+    def _exists(self, *exts: bytes) -> bool:
         return all(self.vfs.exists(self.name + ext) for ext in exts)
 
-    def mtime(self):
+    def mtime(self) -> float:
         try:
             return self._stat(b'.shelve')[stat.ST_MTIME]
         except FileNotFoundError:
             return self._stat(b'.patch')[stat.ST_MTIME]
 
-    def _stat(self, ext):
+    def _stat(self, ext: bytes):
         return self.vfs.stat(self.name + ext)
 
-    def writeinfo(self, info):
+    def writeinfo(self, info) -> None:
         scmutil.simplekeyvaluefile(self.vfs, self.name + b'.shelve').write(info)
 
-    def hasinfo(self):
+    def hasinfo(self) -> bool:
         return self.vfs.exists(self.name + b'.shelve')
 
     def readinfo(self):
@@ -148,7 +159,7 @@
             self.vfs, self.name + b'.shelve'
         ).read()
 
-    def writebundle(self, repo, bases, node):
+    def writebundle(self, repo, bases, node) -> None:
         cgversion = changegroup.safeversion(repo)
         if cgversion == b'01':
             btype = b'HG10BZ'
@@ -174,7 +185,7 @@
             compression=compression,
         )
 
-    def applybundle(self, repo, tr):
+    def applybundle(self, repo, tr) -> contextmod.changectx:
         filename = self.name + b'.hg'
         fp = self.vfs(filename)
         try:
@@ -197,10 +208,10 @@
         finally:
             fp.close()
 
-    def open_patch(self, mode=b'rb'):
+    def open_patch(self, mode: bytes = b'rb'):
         return self.vfs(self.name + b'.patch', mode)
 
-    def patch_from_node(self, repo, node):
+    def patch_from_node(self, repo, node) -> io.BytesIO:
         repo = repo.unfiltered()
         match = _optimized_match(repo, node)
         fp = io.BytesIO()
@@ -221,8 +232,8 @@
         except (FileNotFoundError, error.RepoLookupError):
             return self.open_patch()
 
-    def _backupfilename(self, backupvfs, filename):
-        def gennames(base):
+    def _backupfilename(self, backupvfs: vfsmod.vfs, filename: bytes) -> bytes:
+        def gennames(base: bytes):
             yield base
             base, ext = base.rsplit(b'.', 1)
             for i in itertools.count(1):
@@ -232,7 +243,10 @@
             if not backupvfs.exists(n):
                 return backupvfs.join(n)
 
-    def movetobackup(self, backupvfs):
+        # Help pytype- gennames() yields infinitely
+        raise error.ProgrammingError("unreachable")
+
+    def movetobackup(self, backupvfs: vfsmod.vfs) -> None:
         if not backupvfs.isdir():
             backupvfs.makedir()
         for suffix in shelvefileextensions:
@@ -243,7 +257,7 @@
                     self._backupfilename(backupvfs, filename),
                 )
 
-    def delete(self):
+    def delete(self) -> None:
         for ext in shelvefileextensions:
             self.vfs.tryunlink(self.name + b'.' + ext)
 
@@ -256,7 +270,7 @@
             return patch.changedfiles(ui, repo, filename)
 
 
-def _optimized_match(repo, node):
+def _optimized_match(repo, node: bytes):
     """
     Create a matcher so that prefetch doesn't attempt to fetch
     the entire repository pointlessly, and as an optimisation
@@ -272,6 +286,7 @@
     versions of a shelved state are possible and handles them appropriately.
     """
 
+    # Class-wide constants
     _version = 2
     _filename = b'shelvedstate'
     _keep = b'keep'
@@ -280,8 +295,19 @@
     _noactivebook = b':no-active-bookmark'
     _interactive = b'interactive'
 
+    # Per instance attrs
+    name: bytes
+    wctx: contextmod.workingctx
+    pendingctx: contextmod.changectx
+    parents: List[bytes]
+    nodestoremove: List[bytes]
+    branchtorestore: bytes
+    keep: bool
+    activebookmark: bytes
+    interactive: bool
+
     @classmethod
-    def _verifyandtransform(cls, d):
+    def _verifyandtransform(cls, d: Dict[bytes, Any]) -> None:
         """Some basic shelvestate syntactic verification and transformation"""
         try:
             d[b'originalwctx'] = bin(d[b'originalwctx'])
@@ -294,7 +320,7 @@
             raise error.CorruptedState(stringutil.forcebytestr(err))
 
     @classmethod
-    def _getversion(cls, repo):
+    def _getversion(cls, repo) -> int:
         """Read version information from shelvestate file"""
         fp = repo.vfs(cls._filename)
         try:
@@ -306,7 +332,7 @@
         return version
 
     @classmethod
-    def _readold(cls, repo):
+    def _readold(cls, repo) -> Dict[bytes, Any]:
         """Read the old position-based version of a shelvestate file"""
         # Order is important, because old shelvestate file uses it
         # to detemine values of fields (i.g. name is on the second line,
@@ -373,15 +399,15 @@
     def save(
         cls,
         repo,
-        name,
-        originalwctx,
-        pendingctx,
-        nodestoremove,
-        branchtorestore,
-        keep=False,
-        activebook=b'',
-        interactive=False,
-    ):
+        name: bytes,
+        originalwctx: contextmod.workingctx,
+        pendingctx: contextmod.changectx,
+        nodestoremove: List[bytes],
+        branchtorestore: bytes,
+        keep: bool = False,
+        activebook: bytes = b'',
+        interactive: bool = False,
+    ) -> None:
         info = {
             b"name": name,
             b"originalwctx": hex(originalwctx.node()),
@@ -399,11 +425,11 @@
         )
 
     @classmethod
-    def clear(cls, repo):
+    def clear(cls, repo) -> None:
         repo.vfs.unlinkpath(cls._filename, ignoremissing=True)
 
 
-def cleanupoldbackups(repo):
+def cleanupoldbackups(repo) -> None:
     maxbackups = repo.ui.configint(b'shelve', b'maxbackups')
     backup_dir = ShelfDir(repo, for_backups=True)
     hgfiles = backup_dir.listshelves()
@@ -418,19 +444,19 @@
         backup_dir.get(name).delete()
 
 
-def _backupactivebookmark(repo):
+def _backupactivebookmark(repo) -> bytes:
     activebookmark = repo._activebookmark
     if activebookmark:
         bookmarks.deactivate(repo)
     return activebookmark
 
 
-def _restoreactivebookmark(repo, mark):
+def _restoreactivebookmark(repo, mark) -> None:
     if mark:
         bookmarks.activate(repo, mark)
 
 
-def _aborttransaction(repo, tr):
+def _aborttransaction(repo, tr) -> None:
     """Abort current transaction for shelve/unshelve, but keep dirstate"""
     # disable the transaction invalidation of the dirstate, to preserve the
     # current change in memory.
@@ -456,7 +482,7 @@
     ds.setbranch(current_branch, None)
 
 
-def getshelvename(repo, parent, opts):
+def getshelvename(repo, parent, opts) -> bytes:
     """Decide on the name this shelve is going to have"""
 
     def gennames():
@@ -496,7 +522,7 @@
     return name
 
 
-def mutableancestors(ctx):
+def mutableancestors(ctx) -> Iterator[bytes]:
     """return all mutable ancestors for ctx (included)
 
     Much faster than the revset ancestors(ctx) & draft()"""
@@ -514,7 +540,7 @@
                     visit.append(parent)
 
 
-def getcommitfunc(extra, interactive, editor=False):
+def getcommitfunc(extra, interactive: bool, editor: bool = False):
     def commitfunc(ui, repo, message, match, opts):
         hasmq = hasattr(repo, 'mq')
         if hasmq:
@@ -550,7 +576,7 @@
     return interactivecommitfunc if interactive else commitfunc
 
 
-def _nothingtoshelvemessaging(ui, repo, pats, opts):
+def _nothingtoshelvemessaging(ui, repo, pats, opts) -> None:
     stat = repo.status(match=scmutil.match(repo[None], pats, opts))
     if stat.deleted:
         ui.status(
@@ -561,7 +587,7 @@
         ui.status(_(b"nothing changed\n"))
 
 
-def _shelvecreatedcommit(repo, node, name, match):
+def _shelvecreatedcommit(repo, node: bytes, name: bytes, match) -> None:
     info = {b'node': hex(node)}
     shelf = ShelfDir(repo).get(name)
     shelf.writeinfo(info)
@@ -573,14 +599,14 @@
         )
 
 
-def _includeunknownfiles(repo, pats, opts, extra):
+def _includeunknownfiles(repo, pats, opts, extra) -> None:
     s = repo.status(match=scmutil.match(repo[None], pats, opts), unknown=True)
     if s.unknown:
         extra[b'shelve_unknown'] = b'\0'.join(s.unknown)
         repo[None].add(s.unknown)
 
 
-def _finishshelve(repo, tr):
+def _finishshelve(repo, tr) -> None:
     if _use_internal_phase(repo):
         tr.close()
     else:
@@ -675,7 +701,7 @@
         lockmod.release(tr, lock)
 
 
-def _isbareshelve(pats, opts):
+def _isbareshelve(pats, opts) -> bool:
     return (
         not pats
         and not opts.get(b'interactive', False)
@@ -684,11 +710,11 @@
     )
 
 
-def _iswctxonnewbranch(repo):
+def _iswctxonnewbranch(repo) -> bool:
     return repo[None].branch() != repo[b'.'].branch()
 
 
-def cleanupcmd(ui, repo):
+def cleanupcmd(ui, repo) -> None:
     """subcommand that deletes all shelves"""
 
     with repo.wlock():
@@ -699,7 +725,7 @@
             cleanupoldbackups(repo)
 
 
-def deletecmd(ui, repo, pats):
+def deletecmd(ui, repo, pats) -> None:
     """subcommand that deletes a specific shelve"""
     if not pats:
         raise error.InputError(_(b'no shelved changes specified!'))
@@ -715,7 +741,7 @@
             cleanupoldbackups(repo)
 
 
-def listcmd(ui, repo, pats, opts):
+def listcmd(ui, repo, pats: Iterable[bytes], opts) -> None:
     """subcommand that displays the list of shelves"""
     pats = set(pats)
     width = 80
@@ -762,7 +788,7 @@
                     ui.write(chunk, label=label)
 
 
-def patchcmds(ui, repo, pats, opts):
+def patchcmds(ui, repo, pats: Sequence[bytes], opts) -> None:
     """subcommand that displays shelves"""
     shelf_dir = ShelfDir(repo)
     if len(pats) == 0:
@@ -779,7 +805,7 @@
     listcmd(ui, repo, pats, opts)
 
 
-def checkparents(repo, state):
+def checkparents(repo, state: shelvedstate) -> None:
     """check parent while resuming an unshelve"""
     if state.parents != repo.dirstate.parents():
         raise error.Abort(
@@ -787,7 +813,7 @@
         )
 
 
-def _loadshelvedstate(ui, repo, opts):
+def _loadshelvedstate(ui, repo, opts) -> shelvedstate:
     try:
         state = shelvedstate.load(repo)
         if opts.get(b'keep') is None:
@@ -819,7 +845,7 @@
             )
 
 
-def unshelveabort(ui, repo, state):
+def unshelveabort(ui, repo, state: shelvedstate) -> None:
     """subcommand that abort an in-progress unshelve"""
     with repo.lock():
         try:
@@ -838,14 +864,14 @@
             ui.warn(_(b"unshelve of '%s' aborted\n") % state.name)
 
 
-def hgabortunshelve(ui, repo):
+def hgabortunshelve(ui, repo) -> None:
     """logic to  abort unshelve using 'hg abort"""
     with repo.wlock():
         state = _loadshelvedstate(ui, repo, {b'abort': True})
         return unshelveabort(ui, repo, state)
 
 
-def mergefiles(ui, repo, wctx, shelvectx):
+def mergefiles(ui, repo, wctx, shelvectx) -> None:
     """updates to wctx and merges the changes from shelvectx into the
     dirstate."""
     with ui.configoverride({(b'ui', b'quiet'): True}):
@@ -853,7 +879,7 @@
         cmdutil.revert(ui, repo, shelvectx)
 
 
-def restorebranch(ui, repo, branchtorestore):
+def restorebranch(ui, repo, branchtorestore: bytes) -> None:
     if branchtorestore and branchtorestore != repo.dirstate.branch():
         repo.dirstate.setbranch(branchtorestore, repo.currenttransaction())
         ui.status(
@@ -861,7 +887,7 @@
         )
 
 
-def unshelvecleanup(ui, repo, name, opts):
+def unshelvecleanup(ui, repo, name: bytes, opts) -> None:
     """remove related files after an unshelve"""
     if not opts.get(b'keep'):
         backupvfs = vfsmod.vfs(repo.vfs.join(backupdir))
@@ -869,7 +895,7 @@
         cleanupoldbackups(repo)
 
 
-def unshelvecontinue(ui, repo, state, opts):
+def unshelvecontinue(ui, repo, state: shelvedstate, opts) -> None:
     """subcommand to continue an in-progress unshelve"""
     # We're finishing off a merge. First parent is our original
     # parent, second is the temporary "fake" commit we're unshelving.
@@ -927,7 +953,7 @@
         ui.status(_(b"unshelve of '%s' complete\n") % state.name)
 
 
-def hgcontinueunshelve(ui, repo):
+def hgcontinueunshelve(ui, repo) -> None:
     """logic to resume unshelve using 'hg continue'"""
     with repo.wlock():
         state = _loadshelvedstate(ui, repo, {b'continue': True})
@@ -959,7 +985,7 @@
     return tmpwctx, addedbefore
 
 
-def _unshelverestorecommit(ui, repo, tr, basename):
+def _unshelverestorecommit(ui, repo, tr, basename: bytes):
     """Recreate commit in the repository during the unshelve"""
     repo = repo.unfiltered()
     node = None
@@ -980,7 +1006,9 @@
     return repo, shelvectx
 
 
-def _createunshelvectx(ui, repo, shelvectx, basename, interactive, opts):
+def _createunshelvectx(
+    ui, repo, shelvectx, basename: bytes, interactive: bool, opts
+) -> Tuple[bytes, bool]:
     """Handles the creation of unshelve commit and updates the shelve if it
     was partially unshelved.
 
@@ -1042,7 +1070,7 @@
     opts,
     tr,
     oldtiprev,
-    basename,
+    basename: bytes,
     pctx,
     tmpwctx,
     shelvectx,
@@ -1113,7 +1141,7 @@
     return shelvectx, ispartialunshelve
 
 
-def _forgetunknownfiles(repo, shelvectx, addedbefore):
+def _forgetunknownfiles(repo, shelvectx, addedbefore) -> None:
     # Forget any files that were unknown before the shelve, unknown before
     # unshelve started, but are now added.
     shelveunknown = shelvectx.extra().get(b'shelve_unknown')
@@ -1125,7 +1153,7 @@
     repo[None].forget(toforget)
 
 
-def _finishunshelve(repo, oldtiprev, tr, activebookmark):
+def _finishunshelve(repo, oldtiprev, tr, activebookmark) -> None:
     _restoreactivebookmark(repo, activebookmark)
     # We used to manually strip the commit to update inmemory structure and
     # prevent some issue around hooks. This no longer seems to be the case, so
@@ -1133,7 +1161,7 @@
     _aborttransaction(repo, tr)
 
 
-def _checkunshelveuntrackedproblems(ui, repo, shelvectx):
+def _checkunshelveuntrackedproblems(ui, repo, shelvectx) -> None:
     """Check potential problems which may result from working
     copy having untracked changes."""
     wcdeleted = set(repo.status().deleted)
@@ -1145,7 +1173,7 @@
         raise error.Abort(m, hint=hint)
 
 
-def unshelvecmd(ui, repo, *shelved, **opts):
+def unshelvecmd(ui, repo, *shelved, **opts) -> None:
     opts = pycompat.byteskwargs(opts)
     abortf = opts.get(b'abort')
     continuef = opts.get(b'continue')
@@ -1182,6 +1210,11 @@
             )
         elif continuef:
             return unshelvecontinue(ui, repo, state, opts)
+        else:
+            # Unreachable code, but help type checkers not think that
+            # 'basename' may be used before initialization when checking
+            # ShelfDir below.
+            raise error.ProgrammingError("neither abort nor continue specified")
     elif len(shelved) > 1:
         raise error.InputError(_(b'can only unshelve one change at a time'))
     elif not shelved:
@@ -1199,7 +1232,7 @@
     return _dounshelve(ui, repo, basename, opts)
 
 
-def _dounshelve(ui, repo, basename, opts):
+def _dounshelve(ui, repo, basename: bytes, opts) -> None:
     repo = repo.unfiltered()
     lock = tr = None
     try: