--- a/mercurial/cmdutil.py Wed Mar 27 17:29:48 2024 +0000
+++ b/mercurial/cmdutil.py Wed Mar 27 17:46:23 2024 +0000
@@ -35,11 +35,13 @@
from . import (
bookmarks,
+ bundle2,
changelog,
copies,
crecord as crecordmod,
encoding,
error,
+ exchange,
formatter,
logcmdutil,
match as matchmod,
@@ -56,6 +58,7 @@
rewriteutil,
scmutil,
state as statemod,
+ streamclone,
subrepoutil,
templatekw,
templater,
@@ -66,6 +69,7 @@
from .utils import (
dateutil,
stringutil,
+ urlutil,
)
from .revlogutils import (
@@ -4178,3 +4182,47 @@
elif not ui.configbool(b'commands', b'update.requiredest'):
ui.status(_(b"(run 'hg update' to get a working copy)\n"))
return False
+
+
+def unbundle_files(ui, repo, fnames, unbundle_source=b'unbundle'):
+ """utility for `hg unbundle` and `hg debug::unbundle`"""
+ assert fnames
+ # avoid circular import
+ from . import hg
+
+ with repo.lock():
+ for fname in fnames:
+ f = hg.openpath(ui, fname)
+ gen = exchange.readbundle(ui, f, fname)
+ if isinstance(gen, streamclone.streamcloneapplier):
+ raise error.InputError(
+ _(
+ b'packed bundles cannot be applied with '
+ b'"hg unbundle"'
+ ),
+ hint=_(b'use "hg debugapplystreamclonebundle"'),
+ )
+ url = b'bundle:' + fname
+ try:
+ txnname = b'unbundle'
+ if not isinstance(gen, bundle2.unbundle20):
+ txnname = b'unbundle\n%s' % urlutil.hidepassword(url)
+ with repo.transaction(txnname) as tr:
+ op = bundle2.applybundle(
+ repo,
+ gen,
+ tr,
+ source=unbundle_source, # used by debug::unbundle
+ url=url,
+ )
+ except error.BundleUnknownFeatureError as exc:
+ raise error.Abort(
+ _(b'%s: unknown bundle feature, %s') % (fname, exc),
+ hint=_(
+ b"see https://mercurial-scm.org/"
+ b"wiki/BundleFeature for more "
+ b"information"
+ ),
+ )
+ modheads = bundle2.combinechangegroupresults(op)
+ return modheads