Mercurial > public > mercurial-scm > hg-stable
comparison mercurial/wireproto.py @ 37414:2d965bfeb8f6
wireproto: allow direct stream processing for unbundle
Introduce a new option server.streamunbundle which starts a transaction
immediately to apply a bundle instead of writing it to a temporary file
first. This side steps the need for a large tmp directory at the cost of
preventing concurrent pushes. This is a reasonable trade-off for many
setups as concurrent pushes for the main branch at least are disallowed
anyway. The option defaults to off to preserve existing behavior.
Change the wireproto interface to provide a generator for reading the
payload and make callers responsible for consuming all data.
Differential Revision: https://phab.mercurial-scm.org/D2470
author | Joerg Sonnenberger <joerg@bec.de> |
---|---|
date | Tue, 27 Feb 2018 02:37:31 +0100 |
parents | 33af46d639b4 |
children | 0b7475ea38cf |
comparison
equal
deleted
inserted
replaced
37413:33af46d639b4 | 37414:2d965bfeb8f6 |
---|---|
1080 their_heads = decodelist(heads) | 1080 their_heads = decodelist(heads) |
1081 | 1081 |
1082 with proto.mayberedirectstdio() as output: | 1082 with proto.mayberedirectstdio() as output: |
1083 try: | 1083 try: |
1084 exchange.check_heads(repo, their_heads, 'preparing changes') | 1084 exchange.check_heads(repo, their_heads, 'preparing changes') |
1085 | 1085 cleanup = lambda: None |
1086 # write bundle data to temporary file because it can be big | |
1087 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-') | |
1088 fp = os.fdopen(fd, r'wb+') | |
1089 r = 0 | |
1090 try: | 1086 try: |
1091 proto.forwardpayload(fp) | 1087 payload = proto.getpayload() |
1092 fp.seek(0) | 1088 if repo.ui.configbool('server', 'streamunbundle'): |
1089 def cleanup(): | |
1090 # Ensure that the full payload is consumed, so | |
1091 # that the connection doesn't contain trailing garbage. | |
1092 for p in payload: | |
1093 pass | |
1094 fp = util.chunkbuffer(payload) | |
1095 else: | |
1096 # write bundle data to temporary file as it can be big | |
1097 fp, tempname = None, None | |
1098 def cleanup(): | |
1099 if fp: | |
1100 fp.close() | |
1101 if tempname: | |
1102 os.unlink(tempname) | |
1103 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-') | |
1104 repo.ui.debug('redirecting incoming bundle to %s\n' % | |
1105 tempname) | |
1106 fp = os.fdopen(fd, pycompat.sysstr('wb+')) | |
1107 r = 0 | |
1108 for p in payload: | |
1109 fp.write(p) | |
1110 fp.seek(0) | |
1111 | |
1093 gen = exchange.readbundle(repo.ui, fp, None) | 1112 gen = exchange.readbundle(repo.ui, fp, None) |
1094 if (isinstance(gen, changegroupmod.cg1unpacker) | 1113 if (isinstance(gen, changegroupmod.cg1unpacker) |
1095 and not bundle1allowed(repo, 'push')): | 1114 and not bundle1allowed(repo, 'push')): |
1096 if proto.name == 'http-v1': | 1115 if proto.name == 'http-v1': |
1097 # need to special case http because stderr do not get to | 1116 # need to special case http because stderr do not get to |
1110 return wireprototypes.streamreslegacy(gen=r.getchunks()) | 1129 return wireprototypes.streamreslegacy(gen=r.getchunks()) |
1111 return wireprototypes.pushres( | 1130 return wireprototypes.pushres( |
1112 r, output.getvalue() if output else '') | 1131 r, output.getvalue() if output else '') |
1113 | 1132 |
1114 finally: | 1133 finally: |
1115 fp.close() | 1134 cleanup() |
1116 os.unlink(tempname) | |
1117 | 1135 |
1118 except (error.BundleValueError, error.Abort, error.PushRaced) as exc: | 1136 except (error.BundleValueError, error.Abort, error.PushRaced) as exc: |
1119 # handle non-bundle2 case first | 1137 # handle non-bundle2 case first |
1120 if not getattr(exc, 'duringunbundle2', False): | 1138 if not getattr(exc, 'duringunbundle2', False): |
1121 try: | 1139 try: |