comparison mercurial/wireproto.py @ 37414:2d965bfeb8f6

wireproto: allow direct stream processing for unbundle Introduce a new option server.streamunbundle which starts a transaction immediately to apply a bundle instead of writing it to a temporary file first. This side steps the need for a large tmp directory at the cost of preventing concurrent pushes. This is a reasonable trade-off for many setups as concurrent pushes for the main branch at least are disallowed anyway. The option defaults to off to preserve existing behavior. Change the wireproto interface to provide a generator for reading the payload and make callers responsible for consuming all data. Differential Revision: https://phab.mercurial-scm.org/D2470
author Joerg Sonnenberger <joerg@bec.de>
date Tue, 27 Feb 2018 02:37:31 +0100
parents 33af46d639b4
children 0b7475ea38cf
comparison
equal deleted inserted replaced
37413:33af46d639b4 37414:2d965bfeb8f6
1080 their_heads = decodelist(heads) 1080 their_heads = decodelist(heads)
1081 1081
1082 with proto.mayberedirectstdio() as output: 1082 with proto.mayberedirectstdio() as output:
1083 try: 1083 try:
1084 exchange.check_heads(repo, their_heads, 'preparing changes') 1084 exchange.check_heads(repo, their_heads, 'preparing changes')
1085 1085 cleanup = lambda: None
1086 # write bundle data to temporary file because it can be big
1087 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1088 fp = os.fdopen(fd, r'wb+')
1089 r = 0
1090 try: 1086 try:
1091 proto.forwardpayload(fp) 1087 payload = proto.getpayload()
1092 fp.seek(0) 1088 if repo.ui.configbool('server', 'streamunbundle'):
1089 def cleanup():
1090 # Ensure that the full payload is consumed, so
1091 # that the connection doesn't contain trailing garbage.
1092 for p in payload:
1093 pass
1094 fp = util.chunkbuffer(payload)
1095 else:
1096 # write bundle data to temporary file as it can be big
1097 fp, tempname = None, None
1098 def cleanup():
1099 if fp:
1100 fp.close()
1101 if tempname:
1102 os.unlink(tempname)
1103 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1104 repo.ui.debug('redirecting incoming bundle to %s\n' %
1105 tempname)
1106 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1107 r = 0
1108 for p in payload:
1109 fp.write(p)
1110 fp.seek(0)
1111
1093 gen = exchange.readbundle(repo.ui, fp, None) 1112 gen = exchange.readbundle(repo.ui, fp, None)
1094 if (isinstance(gen, changegroupmod.cg1unpacker) 1113 if (isinstance(gen, changegroupmod.cg1unpacker)
1095 and not bundle1allowed(repo, 'push')): 1114 and not bundle1allowed(repo, 'push')):
1096 if proto.name == 'http-v1': 1115 if proto.name == 'http-v1':
1097 # need to special case http because stderr do not get to 1116 # need to special case http because stderr do not get to
1110 return wireprototypes.streamreslegacy(gen=r.getchunks()) 1129 return wireprototypes.streamreslegacy(gen=r.getchunks())
1111 return wireprototypes.pushres( 1130 return wireprototypes.pushres(
1112 r, output.getvalue() if output else '') 1131 r, output.getvalue() if output else '')
1113 1132
1114 finally: 1133 finally:
1115 fp.close() 1134 cleanup()
1116 os.unlink(tempname)
1117 1135
1118 except (error.BundleValueError, error.Abort, error.PushRaced) as exc: 1136 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1119 # handle non-bundle2 case first 1137 # handle non-bundle2 case first
1120 if not getattr(exc, 'duringunbundle2', False): 1138 if not getattr(exc, 'duringunbundle2', False):
1121 try: 1139 try: