mercurial/wireproto.py
changeset 37785 b4d85bc122bd
parent 37784 ee0d5e9d77b2
child 37786 cc932c15b9ee
equal deleted inserted replaced
37784:ee0d5e9d77b2 37785:b4d85bc122bd
     1 # wireproto.py - generic wire protocol support functions
       
     2 #
       
     3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
       
     4 #
       
     5 # This software may be used and distributed according to the terms of the
       
     6 # GNU General Public License version 2 or any later version.
       
     7 
       
     8 from __future__ import absolute_import
       
     9 
       
    10 import os
       
    11 import tempfile
       
    12 
       
    13 from .i18n import _
       
    14 from .node import (
       
    15     hex,
       
    16     nullid,
       
    17 )
       
    18 
       
    19 from . import (
       
    20     bundle2,
       
    21     changegroup as changegroupmod,
       
    22     discovery,
       
    23     encoding,
       
    24     error,
       
    25     exchange,
       
    26     pushkey as pushkeymod,
       
    27     pycompat,
       
    28     streamclone,
       
    29     util,
       
    30     wireprototypes,
       
    31 )
       
    32 
       
    33 from .utils import (
       
    34     procutil,
       
    35     stringutil,
       
    36 )
       
    37 
       
    38 urlerr = util.urlerr
       
    39 urlreq = util.urlreq
       
    40 
       
    41 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
       
    42 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
       
    43                         'IncompatibleClient')
       
    44 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
       
    45 
       
    46 def clientcompressionsupport(proto):
       
    47     """Returns a list of compression methods supported by the client.
       
    48 
       
    49     Returns a list of the compression methods supported by the client
       
    50     according to the protocol capabilities. If no such capability has
       
    51     been announced, fallback to the default of zlib and uncompressed.
       
    52     """
       
    53     for cap in proto.getprotocaps():
       
    54         if cap.startswith('comp='):
       
    55             return cap[5:].split(',')
       
    56     return ['zlib', 'none']
       
    57 
       
    58 # wire protocol command can either return a string or one of these classes.
       
    59 
       
    60 def getdispatchrepo(repo, proto, command):
       
    61     """Obtain the repo used for processing wire protocol commands.
       
    62 
       
    63     The intent of this function is to serve as a monkeypatch point for
       
    64     extensions that need commands to operate on different repo views under
       
    65     specialized circumstances.
       
    66     """
       
    67     return repo.filtered('served')
       
    68 
       
    69 def dispatch(repo, proto, command):
       
    70     repo = getdispatchrepo(repo, proto, command)
       
    71 
       
    72     func, spec = commands[command]
       
    73     args = proto.getargs(spec)
       
    74 
       
    75     return func(repo, proto, *args)
       
    76 
       
    77 def options(cmd, keys, others):
       
    78     opts = {}
       
    79     for k in keys:
       
    80         if k in others:
       
    81             opts[k] = others[k]
       
    82             del others[k]
       
    83     if others:
       
    84         procutil.stderr.write("warning: %s ignored unexpected arguments %s\n"
       
    85                               % (cmd, ",".join(others)))
       
    86     return opts
       
    87 
       
    88 def bundle1allowed(repo, action):
       
    89     """Whether a bundle1 operation is allowed from the server.
       
    90 
       
    91     Priority is:
       
    92 
       
    93     1. server.bundle1gd.<action> (if generaldelta active)
       
    94     2. server.bundle1.<action>
       
    95     3. server.bundle1gd (if generaldelta active)
       
    96     4. server.bundle1
       
    97     """
       
    98     ui = repo.ui
       
    99     gd = 'generaldelta' in repo.requirements
       
   100 
       
   101     if gd:
       
   102         v = ui.configbool('server', 'bundle1gd.%s' % action)
       
   103         if v is not None:
       
   104             return v
       
   105 
       
   106     v = ui.configbool('server', 'bundle1.%s' % action)
       
   107     if v is not None:
       
   108         return v
       
   109 
       
   110     if gd:
       
   111         v = ui.configbool('server', 'bundle1gd')
       
   112         if v is not None:
       
   113             return v
       
   114 
       
   115     return ui.configbool('server', 'bundle1')
       
   116 
       
   117 commands = wireprototypes.commanddict()
       
   118 
       
   119 def wireprotocommand(name, args=None, permission='push'):
       
   120     """Decorator to declare a wire protocol command.
       
   121 
       
   122     ``name`` is the name of the wire protocol command being provided.
       
   123 
       
   124     ``args`` defines the named arguments accepted by the command. It is
       
   125     a space-delimited list of argument names. ``*`` denotes a special value
       
   126     that says to accept all named arguments.
       
   127 
       
   128     ``permission`` defines the permission type needed to run this command.
       
   129     Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
       
   130     respectively. Default is to assume command requires ``push`` permissions
       
   131     because otherwise commands not declaring their permissions could modify
       
   132     a repository that is supposed to be read-only.
       
   133     """
       
   134     transports = {k for k, v in wireprototypes.TRANSPORTS.items()
       
   135                   if v['version'] == 1}
       
   136 
       
   137     # Because SSHv2 is a mirror of SSHv1, we allow "batch" commands through to
       
   138     # SSHv2.
       
   139     # TODO undo this hack when SSH is using the unified frame protocol.
       
   140     if name == b'batch':
       
   141         transports.add(wireprototypes.SSHV2)
       
   142 
       
   143     if permission not in ('push', 'pull'):
       
   144         raise error.ProgrammingError('invalid wire protocol permission; '
       
   145                                      'got %s; expected "push" or "pull"' %
       
   146                                      permission)
       
   147 
       
   148     if args is None:
       
   149         args = ''
       
   150 
       
   151     if not isinstance(args, bytes):
       
   152         raise error.ProgrammingError('arguments for version 1 commands '
       
   153                                      'must be declared as bytes')
       
   154 
       
   155     def register(func):
       
   156         if name in commands:
       
   157             raise error.ProgrammingError('%s command already registered '
       
   158                                          'for version 1' % name)
       
   159         commands[name] = wireprototypes.commandentry(
       
   160             func, args=args, transports=transports, permission=permission)
       
   161 
       
   162         return func
       
   163     return register
       
   164 
       
   165 # TODO define a more appropriate permissions type to use for this.
       
   166 @wireprotocommand('batch', 'cmds *', permission='pull')
       
   167 def batch(repo, proto, cmds, others):
       
   168     unescapearg = wireprototypes.unescapebatcharg
       
   169     repo = repo.filtered("served")
       
   170     res = []
       
   171     for pair in cmds.split(';'):
       
   172         op, args = pair.split(' ', 1)
       
   173         vals = {}
       
   174         for a in args.split(','):
       
   175             if a:
       
   176                 n, v = a.split('=')
       
   177                 vals[unescapearg(n)] = unescapearg(v)
       
   178         func, spec = commands[op]
       
   179 
       
   180         # Validate that client has permissions to perform this command.
       
   181         perm = commands[op].permission
       
   182         assert perm in ('push', 'pull')
       
   183         proto.checkperm(perm)
       
   184 
       
   185         if spec:
       
   186             keys = spec.split()
       
   187             data = {}
       
   188             for k in keys:
       
   189                 if k == '*':
       
   190                     star = {}
       
   191                     for key in vals.keys():
       
   192                         if key not in keys:
       
   193                             star[key] = vals[key]
       
   194                     data['*'] = star
       
   195                 else:
       
   196                     data[k] = vals[k]
       
   197             result = func(repo, proto, *[data[k] for k in keys])
       
   198         else:
       
   199             result = func(repo, proto)
       
   200         if isinstance(result, wireprototypes.ooberror):
       
   201             return result
       
   202 
       
   203         # For now, all batchable commands must return bytesresponse or
       
   204         # raw bytes (for backwards compatibility).
       
   205         assert isinstance(result, (wireprototypes.bytesresponse, bytes))
       
   206         if isinstance(result, wireprototypes.bytesresponse):
       
   207             result = result.data
       
   208         res.append(wireprototypes.escapebatcharg(result))
       
   209 
       
   210     return wireprototypes.bytesresponse(';'.join(res))
       
   211 
       
   212 @wireprotocommand('between', 'pairs', permission='pull')
       
   213 def between(repo, proto, pairs):
       
   214     pairs = [wireprototypes.decodelist(p, '-') for p in pairs.split(" ")]
       
   215     r = []
       
   216     for b in repo.between(pairs):
       
   217         r.append(wireprototypes.encodelist(b) + "\n")
       
   218 
       
   219     return wireprototypes.bytesresponse(''.join(r))
       
   220 
       
   221 @wireprotocommand('branchmap', permission='pull')
       
   222 def branchmap(repo, proto):
       
   223     branchmap = repo.branchmap()
       
   224     heads = []
       
   225     for branch, nodes in branchmap.iteritems():
       
   226         branchname = urlreq.quote(encoding.fromlocal(branch))
       
   227         branchnodes = wireprototypes.encodelist(nodes)
       
   228         heads.append('%s %s' % (branchname, branchnodes))
       
   229 
       
   230     return wireprototypes.bytesresponse('\n'.join(heads))
       
   231 
       
   232 @wireprotocommand('branches', 'nodes', permission='pull')
       
   233 def branches(repo, proto, nodes):
       
   234     nodes = wireprototypes.decodelist(nodes)
       
   235     r = []
       
   236     for b in repo.branches(nodes):
       
   237         r.append(wireprototypes.encodelist(b) + "\n")
       
   238 
       
   239     return wireprototypes.bytesresponse(''.join(r))
       
   240 
       
   241 @wireprotocommand('clonebundles', '', permission='pull')
       
   242 def clonebundles(repo, proto):
       
   243     """Server command for returning info for available bundles to seed clones.
       
   244 
       
   245     Clients will parse this response and determine what bundle to fetch.
       
   246 
       
   247     Extensions may wrap this command to filter or dynamically emit data
       
   248     depending on the request. e.g. you could advertise URLs for the closest
       
   249     data center given the client's IP address.
       
   250     """
       
   251     return wireprototypes.bytesresponse(
       
   252         repo.vfs.tryread('clonebundles.manifest'))
       
   253 
       
   254 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
       
   255                  'known', 'getbundle', 'unbundlehash']
       
   256 
       
   257 def _capabilities(repo, proto):
       
   258     """return a list of capabilities for a repo
       
   259 
       
   260     This function exists to allow extensions to easily wrap capabilities
       
   261     computation
       
   262 
       
   263     - returns a lists: easy to alter
       
   264     - change done here will be propagated to both `capabilities` and `hello`
       
   265       command without any other action needed.
       
   266     """
       
   267     # copy to prevent modification of the global list
       
   268     caps = list(wireprotocaps)
       
   269 
       
   270     # Command of same name as capability isn't exposed to version 1 of
       
   271     # transports. So conditionally add it.
       
   272     if commands.commandavailable('changegroupsubset', proto):
       
   273         caps.append('changegroupsubset')
       
   274 
       
   275     if streamclone.allowservergeneration(repo):
       
   276         if repo.ui.configbool('server', 'preferuncompressed'):
       
   277             caps.append('stream-preferred')
       
   278         requiredformats = repo.requirements & repo.supportedformats
       
   279         # if our local revlogs are just revlogv1, add 'stream' cap
       
   280         if not requiredformats - {'revlogv1'}:
       
   281             caps.append('stream')
       
   282         # otherwise, add 'streamreqs' detailing our local revlog format
       
   283         else:
       
   284             caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
       
   285     if repo.ui.configbool('experimental', 'bundle2-advertise'):
       
   286         capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
       
   287         caps.append('bundle2=' + urlreq.quote(capsblob))
       
   288     caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
       
   289 
       
   290     return proto.addcapabilities(repo, caps)
       
   291 
       
   292 # If you are writing an extension and consider wrapping this function. Wrap
       
   293 # `_capabilities` instead.
       
   294 @wireprotocommand('capabilities', permission='pull')
       
   295 def capabilities(repo, proto):
       
   296     caps = _capabilities(repo, proto)
       
   297     return wireprototypes.bytesresponse(' '.join(sorted(caps)))
       
   298 
       
   299 @wireprotocommand('changegroup', 'roots', permission='pull')
       
   300 def changegroup(repo, proto, roots):
       
   301     nodes = wireprototypes.decodelist(roots)
       
   302     outgoing = discovery.outgoing(repo, missingroots=nodes,
       
   303                                   missingheads=repo.heads())
       
   304     cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
       
   305     gen = iter(lambda: cg.read(32768), '')
       
   306     return wireprototypes.streamres(gen=gen)
       
   307 
       
   308 @wireprotocommand('changegroupsubset', 'bases heads',
       
   309                   permission='pull')
       
   310 def changegroupsubset(repo, proto, bases, heads):
       
   311     bases = wireprototypes.decodelist(bases)
       
   312     heads = wireprototypes.decodelist(heads)
       
   313     outgoing = discovery.outgoing(repo, missingroots=bases,
       
   314                                   missingheads=heads)
       
   315     cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
       
   316     gen = iter(lambda: cg.read(32768), '')
       
   317     return wireprototypes.streamres(gen=gen)
       
   318 
       
   319 @wireprotocommand('debugwireargs', 'one two *',
       
   320                   permission='pull')
       
   321 def debugwireargs(repo, proto, one, two, others):
       
   322     # only accept optional args from the known set
       
   323     opts = options('debugwireargs', ['three', 'four'], others)
       
   324     return wireprototypes.bytesresponse(repo.debugwireargs(
       
   325         one, two, **pycompat.strkwargs(opts)))
       
   326 
       
   327 def find_pullbundle(repo, proto, opts, clheads, heads, common):
       
   328     """Return a file object for the first matching pullbundle.
       
   329 
       
   330     Pullbundles are specified in .hg/pullbundles.manifest similar to
       
   331     clonebundles.
       
   332     For each entry, the bundle specification is checked for compatibility:
       
   333     - Client features vs the BUNDLESPEC.
       
   334     - Revisions shared with the clients vs base revisions of the bundle.
       
   335       A bundle can be applied only if all its base revisions are known by
       
   336       the client.
       
   337     - At least one leaf of the bundle's DAG is missing on the client.
       
   338     - Every leaf of the bundle's DAG is part of node set the client wants.
       
   339       E.g. do not send a bundle of all changes if the client wants only
       
   340       one specific branch of many.
       
   341     """
       
   342     def decodehexstring(s):
       
   343         return set([h.decode('hex') for h in s.split(';')])
       
   344 
       
   345     manifest = repo.vfs.tryread('pullbundles.manifest')
       
   346     if not manifest:
       
   347         return None
       
   348     res = exchange.parseclonebundlesmanifest(repo, manifest)
       
   349     res = exchange.filterclonebundleentries(repo, res)
       
   350     if not res:
       
   351         return None
       
   352     cl = repo.changelog
       
   353     heads_anc = cl.ancestors([cl.rev(rev) for rev in heads], inclusive=True)
       
   354     common_anc = cl.ancestors([cl.rev(rev) for rev in common], inclusive=True)
       
   355     compformats = clientcompressionsupport(proto)
       
   356     for entry in res:
       
   357         if 'COMPRESSION' in entry and entry['COMPRESSION'] not in compformats:
       
   358             continue
       
   359         # No test yet for VERSION, since V2 is supported by any client
       
   360         # that advertises partial pulls
       
   361         if 'heads' in entry:
       
   362             try:
       
   363                 bundle_heads = decodehexstring(entry['heads'])
       
   364             except TypeError:
       
   365                 # Bad heads entry
       
   366                 continue
       
   367             if bundle_heads.issubset(common):
       
   368                 continue # Nothing new
       
   369             if all(cl.rev(rev) in common_anc for rev in bundle_heads):
       
   370                 continue # Still nothing new
       
   371             if any(cl.rev(rev) not in heads_anc and
       
   372                    cl.rev(rev) not in common_anc for rev in bundle_heads):
       
   373                 continue
       
   374         if 'bases' in entry:
       
   375             try:
       
   376                 bundle_bases = decodehexstring(entry['bases'])
       
   377             except TypeError:
       
   378                 # Bad bases entry
       
   379                 continue
       
   380             if not all(cl.rev(rev) in common_anc for rev in bundle_bases):
       
   381                 continue
       
   382         path = entry['URL']
       
   383         repo.ui.debug('sending pullbundle "%s"\n' % path)
       
   384         try:
       
   385             return repo.vfs.open(path)
       
   386         except IOError:
       
   387             repo.ui.debug('pullbundle "%s" not accessible\n' % path)
       
   388             continue
       
   389     return None
       
   390 
       
   391 @wireprotocommand('getbundle', '*', permission='pull')
       
   392 def getbundle(repo, proto, others):
       
   393     opts = options('getbundle', wireprototypes.GETBUNDLE_ARGUMENTS.keys(),
       
   394                    others)
       
   395     for k, v in opts.iteritems():
       
   396         keytype = wireprototypes.GETBUNDLE_ARGUMENTS[k]
       
   397         if keytype == 'nodes':
       
   398             opts[k] = wireprototypes.decodelist(v)
       
   399         elif keytype == 'csv':
       
   400             opts[k] = list(v.split(','))
       
   401         elif keytype == 'scsv':
       
   402             opts[k] = set(v.split(','))
       
   403         elif keytype == 'boolean':
       
   404             # Client should serialize False as '0', which is a non-empty string
       
   405             # so it evaluates as a True bool.
       
   406             if v == '0':
       
   407                 opts[k] = False
       
   408             else:
       
   409                 opts[k] = bool(v)
       
   410         elif keytype != 'plain':
       
   411             raise KeyError('unknown getbundle option type %s'
       
   412                            % keytype)
       
   413 
       
   414     if not bundle1allowed(repo, 'pull'):
       
   415         if not exchange.bundle2requested(opts.get('bundlecaps')):
       
   416             if proto.name == 'http-v1':
       
   417                 return wireprototypes.ooberror(bundle2required)
       
   418             raise error.Abort(bundle2requiredmain,
       
   419                               hint=bundle2requiredhint)
       
   420 
       
   421     prefercompressed = True
       
   422 
       
   423     try:
       
   424         clheads = set(repo.changelog.heads())
       
   425         heads = set(opts.get('heads', set()))
       
   426         common = set(opts.get('common', set()))
       
   427         common.discard(nullid)
       
   428         if (repo.ui.configbool('server', 'pullbundle') and
       
   429             'partial-pull' in proto.getprotocaps()):
       
   430             # Check if a pre-built bundle covers this request.
       
   431             bundle = find_pullbundle(repo, proto, opts, clheads, heads, common)
       
   432             if bundle:
       
   433                 return wireprototypes.streamres(gen=util.filechunkiter(bundle),
       
   434                                                 prefer_uncompressed=True)
       
   435 
       
   436         if repo.ui.configbool('server', 'disablefullbundle'):
       
   437             # Check to see if this is a full clone.
       
   438             changegroup = opts.get('cg', True)
       
   439             if changegroup and not common and clheads == heads:
       
   440                 raise error.Abort(
       
   441                     _('server has pull-based clones disabled'),
       
   442                     hint=_('remove --pull if specified or upgrade Mercurial'))
       
   443 
       
   444         info, chunks = exchange.getbundlechunks(repo, 'serve',
       
   445                                                 **pycompat.strkwargs(opts))
       
   446         prefercompressed = info.get('prefercompressed', True)
       
   447     except error.Abort as exc:
       
   448         # cleanly forward Abort error to the client
       
   449         if not exchange.bundle2requested(opts.get('bundlecaps')):
       
   450             if proto.name == 'http-v1':
       
   451                 return wireprototypes.ooberror(pycompat.bytestr(exc) + '\n')
       
   452             raise # cannot do better for bundle1 + ssh
       
   453         # bundle2 request expect a bundle2 reply
       
   454         bundler = bundle2.bundle20(repo.ui)
       
   455         manargs = [('message', pycompat.bytestr(exc))]
       
   456         advargs = []
       
   457         if exc.hint is not None:
       
   458             advargs.append(('hint', exc.hint))
       
   459         bundler.addpart(bundle2.bundlepart('error:abort',
       
   460                                            manargs, advargs))
       
   461         chunks = bundler.getchunks()
       
   462         prefercompressed = False
       
   463 
       
   464     return wireprototypes.streamres(
       
   465         gen=chunks, prefer_uncompressed=not prefercompressed)
       
   466 
       
   467 @wireprotocommand('heads', permission='pull')
       
   468 def heads(repo, proto):
       
   469     h = repo.heads()
       
   470     return wireprototypes.bytesresponse(wireprototypes.encodelist(h) + '\n')
       
   471 
       
   472 @wireprotocommand('hello', permission='pull')
       
   473 def hello(repo, proto):
       
   474     """Called as part of SSH handshake to obtain server info.
       
   475 
       
   476     Returns a list of lines describing interesting things about the
       
   477     server, in an RFC822-like format.
       
   478 
       
   479     Currently, the only one defined is ``capabilities``, which consists of a
       
   480     line of space separated tokens describing server abilities:
       
   481 
       
   482         capabilities: <token0> <token1> <token2>
       
   483     """
       
   484     caps = capabilities(repo, proto).data
       
   485     return wireprototypes.bytesresponse('capabilities: %s\n' % caps)
       
   486 
       
   487 @wireprotocommand('listkeys', 'namespace', permission='pull')
       
   488 def listkeys(repo, proto, namespace):
       
   489     d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
       
   490     return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
       
   491 
       
   492 @wireprotocommand('lookup', 'key', permission='pull')
       
   493 def lookup(repo, proto, key):
       
   494     try:
       
   495         k = encoding.tolocal(key)
       
   496         n = repo.lookup(k)
       
   497         r = hex(n)
       
   498         success = 1
       
   499     except Exception as inst:
       
   500         r = stringutil.forcebytestr(inst)
       
   501         success = 0
       
   502     return wireprototypes.bytesresponse('%d %s\n' % (success, r))
       
   503 
       
   504 @wireprotocommand('known', 'nodes *', permission='pull')
       
   505 def known(repo, proto, nodes, others):
       
   506     v = ''.join(b and '1' or '0'
       
   507                 for b in repo.known(wireprototypes.decodelist(nodes)))
       
   508     return wireprototypes.bytesresponse(v)
       
   509 
       
   510 @wireprotocommand('protocaps', 'caps', permission='pull')
       
   511 def protocaps(repo, proto, caps):
       
   512     if proto.name == wireprototypes.SSHV1:
       
   513         proto._protocaps = set(caps.split(' '))
       
   514     return wireprototypes.bytesresponse('OK')
       
   515 
       
   516 @wireprotocommand('pushkey', 'namespace key old new', permission='push')
       
   517 def pushkey(repo, proto, namespace, key, old, new):
       
   518     # compatibility with pre-1.8 clients which were accidentally
       
   519     # sending raw binary nodes rather than utf-8-encoded hex
       
   520     if len(new) == 20 and stringutil.escapestr(new) != new:
       
   521         # looks like it could be a binary node
       
   522         try:
       
   523             new.decode('utf-8')
       
   524             new = encoding.tolocal(new) # but cleanly decodes as UTF-8
       
   525         except UnicodeDecodeError:
       
   526             pass # binary, leave unmodified
       
   527     else:
       
   528         new = encoding.tolocal(new) # normal path
       
   529 
       
   530     with proto.mayberedirectstdio() as output:
       
   531         r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
       
   532                          encoding.tolocal(old), new) or False
       
   533 
       
   534     output = output.getvalue() if output else ''
       
   535     return wireprototypes.bytesresponse('%d\n%s' % (int(r), output))
       
   536 
       
   537 @wireprotocommand('stream_out', permission='pull')
       
   538 def stream(repo, proto):
       
   539     '''If the server supports streaming clone, it advertises the "stream"
       
   540     capability with a value representing the version and flags of the repo
       
   541     it is serving. Client checks to see if it understands the format.
       
   542     '''
       
   543     return wireprototypes.streamreslegacy(
       
   544         streamclone.generatev1wireproto(repo))
       
   545 
       
   546 @wireprotocommand('unbundle', 'heads', permission='push')
       
   547 def unbundle(repo, proto, heads):
       
   548     their_heads = wireprototypes.decodelist(heads)
       
   549 
       
   550     with proto.mayberedirectstdio() as output:
       
   551         try:
       
   552             exchange.check_heads(repo, their_heads, 'preparing changes')
       
   553             cleanup = lambda: None
       
   554             try:
       
   555                 payload = proto.getpayload()
       
   556                 if repo.ui.configbool('server', 'streamunbundle'):
       
   557                     def cleanup():
       
   558                         # Ensure that the full payload is consumed, so
       
   559                         # that the connection doesn't contain trailing garbage.
       
   560                         for p in payload:
       
   561                             pass
       
   562                     fp = util.chunkbuffer(payload)
       
   563                 else:
       
   564                     # write bundle data to temporary file as it can be big
       
   565                     fp, tempname = None, None
       
   566                     def cleanup():
       
   567                         if fp:
       
   568                             fp.close()
       
   569                         if tempname:
       
   570                             os.unlink(tempname)
       
   571                     fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
       
   572                     repo.ui.debug('redirecting incoming bundle to %s\n' %
       
   573                         tempname)
       
   574                     fp = os.fdopen(fd, pycompat.sysstr('wb+'))
       
   575                     r = 0
       
   576                     for p in payload:
       
   577                         fp.write(p)
       
   578                     fp.seek(0)
       
   579 
       
   580                 gen = exchange.readbundle(repo.ui, fp, None)
       
   581                 if (isinstance(gen, changegroupmod.cg1unpacker)
       
   582                     and not bundle1allowed(repo, 'push')):
       
   583                     if proto.name == 'http-v1':
       
   584                         # need to special case http because stderr do not get to
       
   585                         # the http client on failed push so we need to abuse
       
   586                         # some other error type to make sure the message get to
       
   587                         # the user.
       
   588                         return wireprototypes.ooberror(bundle2required)
       
   589                     raise error.Abort(bundle2requiredmain,
       
   590                                       hint=bundle2requiredhint)
       
   591 
       
   592                 r = exchange.unbundle(repo, gen, their_heads, 'serve',
       
   593                                       proto.client())
       
   594                 if util.safehasattr(r, 'addpart'):
       
   595                     # The return looks streamable, we are in the bundle2 case
       
   596                     # and should return a stream.
       
   597                     return wireprototypes.streamreslegacy(gen=r.getchunks())
       
   598                 return wireprototypes.pushres(
       
   599                     r, output.getvalue() if output else '')
       
   600 
       
   601             finally:
       
   602                 cleanup()
       
   603 
       
   604         except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
       
   605             # handle non-bundle2 case first
       
   606             if not getattr(exc, 'duringunbundle2', False):
       
   607                 try:
       
   608                     raise
       
   609                 except error.Abort:
       
   610                     # The old code we moved used procutil.stderr directly.
       
   611                     # We did not change it to minimise code change.
       
   612                     # This need to be moved to something proper.
       
   613                     # Feel free to do it.
       
   614                     procutil.stderr.write("abort: %s\n" % exc)
       
   615                     if exc.hint is not None:
       
   616                         procutil.stderr.write("(%s)\n" % exc.hint)
       
   617                     procutil.stderr.flush()
       
   618                     return wireprototypes.pushres(
       
   619                         0, output.getvalue() if output else '')
       
   620                 except error.PushRaced:
       
   621                     return wireprototypes.pusherr(
       
   622                         pycompat.bytestr(exc),
       
   623                         output.getvalue() if output else '')
       
   624 
       
   625             bundler = bundle2.bundle20(repo.ui)
       
   626             for out in getattr(exc, '_bundle2salvagedoutput', ()):
       
   627                 bundler.addpart(out)
       
   628             try:
       
   629                 try:
       
   630                     raise
       
   631                 except error.PushkeyFailed as exc:
       
   632                     # check client caps
       
   633                     remotecaps = getattr(exc, '_replycaps', None)
       
   634                     if (remotecaps is not None
       
   635                             and 'pushkey' not in remotecaps.get('error', ())):
       
   636                         # no support remote side, fallback to Abort handler.
       
   637                         raise
       
   638                     part = bundler.newpart('error:pushkey')
       
   639                     part.addparam('in-reply-to', exc.partid)
       
   640                     if exc.namespace is not None:
       
   641                         part.addparam('namespace', exc.namespace,
       
   642                                       mandatory=False)
       
   643                     if exc.key is not None:
       
   644                         part.addparam('key', exc.key, mandatory=False)
       
   645                     if exc.new is not None:
       
   646                         part.addparam('new', exc.new, mandatory=False)
       
   647                     if exc.old is not None:
       
   648                         part.addparam('old', exc.old, mandatory=False)
       
   649                     if exc.ret is not None:
       
   650                         part.addparam('ret', exc.ret, mandatory=False)
       
   651             except error.BundleValueError as exc:
       
   652                 errpart = bundler.newpart('error:unsupportedcontent')
       
   653                 if exc.parttype is not None:
       
   654                     errpart.addparam('parttype', exc.parttype)
       
   655                 if exc.params:
       
   656                     errpart.addparam('params', '\0'.join(exc.params))
       
   657             except error.Abort as exc:
       
   658                 manargs = [('message', stringutil.forcebytestr(exc))]
       
   659                 advargs = []
       
   660                 if exc.hint is not None:
       
   661                     advargs.append(('hint', exc.hint))
       
   662                 bundler.addpart(bundle2.bundlepart('error:abort',
       
   663                                                    manargs, advargs))
       
   664             except error.PushRaced as exc:
       
   665                 bundler.newpart('error:pushraced',
       
   666                                 [('message', stringutil.forcebytestr(exc))])
       
   667             return wireprototypes.streamreslegacy(gen=bundler.getchunks())