diff -r 460e80488cf0 -r a1a94d488e14 mercurial/shelve.py --- a/mercurial/shelve.py Tue Aug 20 18:30:47 2024 -0400 +++ b/mercurial/shelve.py Tue Aug 20 22:34:51 2024 -0400 @@ -26,6 +26,16 @@ import itertools import stat +from typing import ( + Any, + Dict, + Iterable, + Iterator, + List, + Sequence, + Tuple, +) + from .i18n import _ from .node import ( bin, @@ -37,6 +47,7 @@ bundle2, changegroup, cmdutil, + context as contextmod, discovery, error, exchange, @@ -69,16 +80,16 @@ class ShelfDir: - def __init__(self, repo, for_backups=False): + def __init__(self, repo, for_backups: bool = False) -> None: if for_backups: self.vfs = vfsmod.vfs(repo.vfs.join(backupdir)) else: self.vfs = vfsmod.vfs(repo.vfs.join(shelvedir)) - def get(self, name): + def get(self, name: bytes) -> "Shelf": return Shelf(self.vfs, name) - def listshelves(self): + def listshelves(self) -> List[Tuple[float, bytes]]: """return all shelves in repo as list of (time, name)""" try: names = self.vfs.listdir() @@ -99,14 +110,14 @@ return sorted(info, reverse=True) -def _use_internal_phase(repo): +def _use_internal_phase(repo) -> bool: return ( phases.supportinternal(repo) and repo.ui.config(b'shelve', b'store') == b'internal' ) -def _target_phase(repo): +def _target_phase(repo) -> int: return phases.internal if _use_internal_phase(repo) else phases.secret @@ -118,29 +129,29 @@ differences and lets you work with the shelf as a whole. """ - def __init__(self, vfs, name): + def __init__(self, vfs: vfsmod.vfs, name: bytes) -> None: self.vfs = vfs self.name = name - def exists(self): + def exists(self) -> bool: return self._exists(b'.shelve') or self._exists(b'.patch', b'.hg') - def _exists(self, *exts): + def _exists(self, *exts: bytes) -> bool: return all(self.vfs.exists(self.name + ext) for ext in exts) - def mtime(self): + def mtime(self) -> float: try: return self._stat(b'.shelve')[stat.ST_MTIME] except FileNotFoundError: return self._stat(b'.patch')[stat.ST_MTIME] - def _stat(self, ext): + def _stat(self, ext: bytes): return self.vfs.stat(self.name + ext) - def writeinfo(self, info): + def writeinfo(self, info) -> None: scmutil.simplekeyvaluefile(self.vfs, self.name + b'.shelve').write(info) - def hasinfo(self): + def hasinfo(self) -> bool: return self.vfs.exists(self.name + b'.shelve') def readinfo(self): @@ -148,7 +159,7 @@ self.vfs, self.name + b'.shelve' ).read() - def writebundle(self, repo, bases, node): + def writebundle(self, repo, bases, node) -> None: cgversion = changegroup.safeversion(repo) if cgversion == b'01': btype = b'HG10BZ' @@ -174,7 +185,7 @@ compression=compression, ) - def applybundle(self, repo, tr): + def applybundle(self, repo, tr) -> contextmod.changectx: filename = self.name + b'.hg' fp = self.vfs(filename) try: @@ -197,10 +208,10 @@ finally: fp.close() - def open_patch(self, mode=b'rb'): + def open_patch(self, mode: bytes = b'rb'): return self.vfs(self.name + b'.patch', mode) - def patch_from_node(self, repo, node): + def patch_from_node(self, repo, node) -> io.BytesIO: repo = repo.unfiltered() match = _optimized_match(repo, node) fp = io.BytesIO() @@ -221,8 +232,8 @@ except (FileNotFoundError, error.RepoLookupError): return self.open_patch() - def _backupfilename(self, backupvfs, filename): - def gennames(base): + def _backupfilename(self, backupvfs: vfsmod.vfs, filename: bytes) -> bytes: + def gennames(base: bytes): yield base base, ext = base.rsplit(b'.', 1) for i in itertools.count(1): @@ -232,7 +243,10 @@ if not backupvfs.exists(n): return backupvfs.join(n) - def movetobackup(self, backupvfs): + # Help pytype- gennames() yields infinitely + raise error.ProgrammingError("unreachable") + + def movetobackup(self, backupvfs: vfsmod.vfs) -> None: if not backupvfs.isdir(): backupvfs.makedir() for suffix in shelvefileextensions: @@ -243,7 +257,7 @@ self._backupfilename(backupvfs, filename), ) - def delete(self): + def delete(self) -> None: for ext in shelvefileextensions: self.vfs.tryunlink(self.name + b'.' + ext) @@ -256,7 +270,7 @@ return patch.changedfiles(ui, repo, filename) -def _optimized_match(repo, node): +def _optimized_match(repo, node: bytes): """ Create a matcher so that prefetch doesn't attempt to fetch the entire repository pointlessly, and as an optimisation @@ -272,6 +286,7 @@ versions of a shelved state are possible and handles them appropriately. """ + # Class-wide constants _version = 2 _filename = b'shelvedstate' _keep = b'keep' @@ -280,8 +295,19 @@ _noactivebook = b':no-active-bookmark' _interactive = b'interactive' + # Per instance attrs + name: bytes + wctx: contextmod.workingctx + pendingctx: contextmod.changectx + parents: List[bytes] + nodestoremove: List[bytes] + branchtorestore: bytes + keep: bool + activebookmark: bytes + interactive: bool + @classmethod - def _verifyandtransform(cls, d): + def _verifyandtransform(cls, d: Dict[bytes, Any]) -> None: """Some basic shelvestate syntactic verification and transformation""" try: d[b'originalwctx'] = bin(d[b'originalwctx']) @@ -294,7 +320,7 @@ raise error.CorruptedState(stringutil.forcebytestr(err)) @classmethod - def _getversion(cls, repo): + def _getversion(cls, repo) -> int: """Read version information from shelvestate file""" fp = repo.vfs(cls._filename) try: @@ -306,7 +332,7 @@ return version @classmethod - def _readold(cls, repo): + def _readold(cls, repo) -> Dict[bytes, Any]: """Read the old position-based version of a shelvestate file""" # Order is important, because old shelvestate file uses it # to detemine values of fields (i.g. name is on the second line, @@ -373,15 +399,15 @@ def save( cls, repo, - name, - originalwctx, - pendingctx, - nodestoremove, - branchtorestore, - keep=False, - activebook=b'', - interactive=False, - ): + name: bytes, + originalwctx: contextmod.workingctx, + pendingctx: contextmod.changectx, + nodestoremove: List[bytes], + branchtorestore: bytes, + keep: bool = False, + activebook: bytes = b'', + interactive: bool = False, + ) -> None: info = { b"name": name, b"originalwctx": hex(originalwctx.node()), @@ -399,11 +425,11 @@ ) @classmethod - def clear(cls, repo): + def clear(cls, repo) -> None: repo.vfs.unlinkpath(cls._filename, ignoremissing=True) -def cleanupoldbackups(repo): +def cleanupoldbackups(repo) -> None: maxbackups = repo.ui.configint(b'shelve', b'maxbackups') backup_dir = ShelfDir(repo, for_backups=True) hgfiles = backup_dir.listshelves() @@ -418,19 +444,19 @@ backup_dir.get(name).delete() -def _backupactivebookmark(repo): +def _backupactivebookmark(repo) -> bytes: activebookmark = repo._activebookmark if activebookmark: bookmarks.deactivate(repo) return activebookmark -def _restoreactivebookmark(repo, mark): +def _restoreactivebookmark(repo, mark) -> None: if mark: bookmarks.activate(repo, mark) -def _aborttransaction(repo, tr): +def _aborttransaction(repo, tr) -> None: """Abort current transaction for shelve/unshelve, but keep dirstate""" # disable the transaction invalidation of the dirstate, to preserve the # current change in memory. @@ -456,7 +482,7 @@ ds.setbranch(current_branch, None) -def getshelvename(repo, parent, opts): +def getshelvename(repo, parent, opts) -> bytes: """Decide on the name this shelve is going to have""" def gennames(): @@ -496,7 +522,7 @@ return name -def mutableancestors(ctx): +def mutableancestors(ctx) -> Iterator[bytes]: """return all mutable ancestors for ctx (included) Much faster than the revset ancestors(ctx) & draft()""" @@ -514,7 +540,7 @@ visit.append(parent) -def getcommitfunc(extra, interactive, editor=False): +def getcommitfunc(extra, interactive: bool, editor: bool = False): def commitfunc(ui, repo, message, match, opts): hasmq = hasattr(repo, 'mq') if hasmq: @@ -550,7 +576,7 @@ return interactivecommitfunc if interactive else commitfunc -def _nothingtoshelvemessaging(ui, repo, pats, opts): +def _nothingtoshelvemessaging(ui, repo, pats, opts) -> None: stat = repo.status(match=scmutil.match(repo[None], pats, opts)) if stat.deleted: ui.status( @@ -561,7 +587,7 @@ ui.status(_(b"nothing changed\n")) -def _shelvecreatedcommit(repo, node, name, match): +def _shelvecreatedcommit(repo, node: bytes, name: bytes, match) -> None: info = {b'node': hex(node)} shelf = ShelfDir(repo).get(name) shelf.writeinfo(info) @@ -573,14 +599,14 @@ ) -def _includeunknownfiles(repo, pats, opts, extra): +def _includeunknownfiles(repo, pats, opts, extra) -> None: s = repo.status(match=scmutil.match(repo[None], pats, opts), unknown=True) if s.unknown: extra[b'shelve_unknown'] = b'\0'.join(s.unknown) repo[None].add(s.unknown) -def _finishshelve(repo, tr): +def _finishshelve(repo, tr) -> None: if _use_internal_phase(repo): tr.close() else: @@ -675,7 +701,7 @@ lockmod.release(tr, lock) -def _isbareshelve(pats, opts): +def _isbareshelve(pats, opts) -> bool: return ( not pats and not opts.get(b'interactive', False) @@ -684,11 +710,11 @@ ) -def _iswctxonnewbranch(repo): +def _iswctxonnewbranch(repo) -> bool: return repo[None].branch() != repo[b'.'].branch() -def cleanupcmd(ui, repo): +def cleanupcmd(ui, repo) -> None: """subcommand that deletes all shelves""" with repo.wlock(): @@ -699,7 +725,7 @@ cleanupoldbackups(repo) -def deletecmd(ui, repo, pats): +def deletecmd(ui, repo, pats) -> None: """subcommand that deletes a specific shelve""" if not pats: raise error.InputError(_(b'no shelved changes specified!')) @@ -715,7 +741,7 @@ cleanupoldbackups(repo) -def listcmd(ui, repo, pats, opts): +def listcmd(ui, repo, pats: Iterable[bytes], opts) -> None: """subcommand that displays the list of shelves""" pats = set(pats) width = 80 @@ -762,7 +788,7 @@ ui.write(chunk, label=label) -def patchcmds(ui, repo, pats, opts): +def patchcmds(ui, repo, pats: Sequence[bytes], opts) -> None: """subcommand that displays shelves""" shelf_dir = ShelfDir(repo) if len(pats) == 0: @@ -779,7 +805,7 @@ listcmd(ui, repo, pats, opts) -def checkparents(repo, state): +def checkparents(repo, state: shelvedstate) -> None: """check parent while resuming an unshelve""" if state.parents != repo.dirstate.parents(): raise error.Abort( @@ -787,7 +813,7 @@ ) -def _loadshelvedstate(ui, repo, opts): +def _loadshelvedstate(ui, repo, opts) -> shelvedstate: try: state = shelvedstate.load(repo) if opts.get(b'keep') is None: @@ -819,7 +845,7 @@ ) -def unshelveabort(ui, repo, state): +def unshelveabort(ui, repo, state: shelvedstate) -> None: """subcommand that abort an in-progress unshelve""" with repo.lock(): try: @@ -838,14 +864,14 @@ ui.warn(_(b"unshelve of '%s' aborted\n") % state.name) -def hgabortunshelve(ui, repo): +def hgabortunshelve(ui, repo) -> None: """logic to abort unshelve using 'hg abort""" with repo.wlock(): state = _loadshelvedstate(ui, repo, {b'abort': True}) return unshelveabort(ui, repo, state) -def mergefiles(ui, repo, wctx, shelvectx): +def mergefiles(ui, repo, wctx, shelvectx) -> None: """updates to wctx and merges the changes from shelvectx into the dirstate.""" with ui.configoverride({(b'ui', b'quiet'): True}): @@ -853,7 +879,7 @@ cmdutil.revert(ui, repo, shelvectx) -def restorebranch(ui, repo, branchtorestore): +def restorebranch(ui, repo, branchtorestore: bytes) -> None: if branchtorestore and branchtorestore != repo.dirstate.branch(): repo.dirstate.setbranch(branchtorestore, repo.currenttransaction()) ui.status( @@ -861,7 +887,7 @@ ) -def unshelvecleanup(ui, repo, name, opts): +def unshelvecleanup(ui, repo, name: bytes, opts) -> None: """remove related files after an unshelve""" if not opts.get(b'keep'): backupvfs = vfsmod.vfs(repo.vfs.join(backupdir)) @@ -869,7 +895,7 @@ cleanupoldbackups(repo) -def unshelvecontinue(ui, repo, state, opts): +def unshelvecontinue(ui, repo, state: shelvedstate, opts) -> None: """subcommand to continue an in-progress unshelve""" # We're finishing off a merge. First parent is our original # parent, second is the temporary "fake" commit we're unshelving. @@ -927,7 +953,7 @@ ui.status(_(b"unshelve of '%s' complete\n") % state.name) -def hgcontinueunshelve(ui, repo): +def hgcontinueunshelve(ui, repo) -> None: """logic to resume unshelve using 'hg continue'""" with repo.wlock(): state = _loadshelvedstate(ui, repo, {b'continue': True}) @@ -959,7 +985,7 @@ return tmpwctx, addedbefore -def _unshelverestorecommit(ui, repo, tr, basename): +def _unshelverestorecommit(ui, repo, tr, basename: bytes): """Recreate commit in the repository during the unshelve""" repo = repo.unfiltered() node = None @@ -980,7 +1006,9 @@ return repo, shelvectx -def _createunshelvectx(ui, repo, shelvectx, basename, interactive, opts): +def _createunshelvectx( + ui, repo, shelvectx, basename: bytes, interactive: bool, opts +) -> Tuple[bytes, bool]: """Handles the creation of unshelve commit and updates the shelve if it was partially unshelved. @@ -1042,7 +1070,7 @@ opts, tr, oldtiprev, - basename, + basename: bytes, pctx, tmpwctx, shelvectx, @@ -1113,7 +1141,7 @@ return shelvectx, ispartialunshelve -def _forgetunknownfiles(repo, shelvectx, addedbefore): +def _forgetunknownfiles(repo, shelvectx, addedbefore) -> None: # Forget any files that were unknown before the shelve, unknown before # unshelve started, but are now added. shelveunknown = shelvectx.extra().get(b'shelve_unknown') @@ -1125,7 +1153,7 @@ repo[None].forget(toforget) -def _finishunshelve(repo, oldtiprev, tr, activebookmark): +def _finishunshelve(repo, oldtiprev, tr, activebookmark) -> None: _restoreactivebookmark(repo, activebookmark) # We used to manually strip the commit to update inmemory structure and # prevent some issue around hooks. This no longer seems to be the case, so @@ -1133,7 +1161,7 @@ _aborttransaction(repo, tr) -def _checkunshelveuntrackedproblems(ui, repo, shelvectx): +def _checkunshelveuntrackedproblems(ui, repo, shelvectx) -> None: """Check potential problems which may result from working copy having untracked changes.""" wcdeleted = set(repo.status().deleted) @@ -1145,7 +1173,7 @@ raise error.Abort(m, hint=hint) -def unshelvecmd(ui, repo, *shelved, **opts): +def unshelvecmd(ui, repo, *shelved, **opts) -> None: opts = pycompat.byteskwargs(opts) abortf = opts.get(b'abort') continuef = opts.get(b'continue') @@ -1182,6 +1210,11 @@ ) elif continuef: return unshelvecontinue(ui, repo, state, opts) + else: + # Unreachable code, but help type checkers not think that + # 'basename' may be used before initialization when checking + # ShelfDir below. + raise error.ProgrammingError("neither abort nor continue specified") elif len(shelved) > 1: raise error.InputError(_(b'can only unshelve one change at a time')) elif not shelved: @@ -1199,7 +1232,7 @@ return _dounshelve(ui, repo, basename, opts) -def _dounshelve(ui, repo, basename, opts): +def _dounshelve(ui, repo, basename: bytes, opts) -> None: repo = repo.unfiltered() lock = tr = None try: