Mercurial > public > mercurial-scm > hg
comparison mercurial/copies.py @ 43806:acbb55b8e9dc
upgraderepo: add a config option for parallel computation
The option is put to use to compute new copy tracing side data in parallel. It
use the multiprocessing module as it had the appropriate primitive for what we
needed. Gregory Szorc had concerned on windows so we disabled it there.
See inline comment for details on the parallel implementation.
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Sun, 29 Sep 2019 16:00:32 +0200 |
parents | 421ea5772039 |
children | 2f0a44c69e07 |
comparison
equal
deleted
inserted
replaced
43805:ad84fc97d120 | 43806:acbb55b8e9dc |
---|---|
6 # GNU General Public License version 2 or any later version. | 6 # GNU General Public License version 2 or any later version. |
7 | 7 |
8 from __future__ import absolute_import | 8 from __future__ import absolute_import |
9 | 9 |
10 import collections | 10 import collections |
11 import multiprocessing | |
11 import os | 12 import os |
12 | 13 |
13 from .i18n import _ | 14 from .i18n import _ |
14 | 15 |
15 | 16 |
1005 sidedata[sidedatamod.SD_FILESREMOVED] = filesremoved | 1006 sidedata[sidedatamod.SD_FILESREMOVED] = filesremoved |
1006 return sidedata | 1007 return sidedata |
1007 | 1008 |
1008 | 1009 |
1009 def getsidedataadder(srcrepo, destrepo): | 1010 def getsidedataadder(srcrepo, destrepo): |
1011 use_w = srcrepo.ui.configbool(b'experimental', b'worker.repository-upgrade') | |
1012 if pycompat.iswindows or not use_w: | |
1013 return _get_simple_sidedata_adder(srcrepo, destrepo) | |
1014 else: | |
1015 return _get_worker_sidedata_adder(srcrepo, destrepo) | |
1016 | |
1017 | |
1018 def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens): | |
1019 """The function used by worker precomputing sidedata | |
1020 | |
1021 It read an input queue containing revision numbers | |
1022 It write in an output queue containing (rev, <sidedata-map>) | |
1023 | |
1024 The `None` input value is used as a stop signal. | |
1025 | |
1026 The `tokens` semaphore is user to avoid having too many unprocessed | |
1027 entries. The workers needs to acquire one token before fetching a task. | |
1028 They will be released by the consumer of the produced data. | |
1029 """ | |
1030 tokens.acquire() | |
1031 rev = revs_queue.get() | |
1032 while rev is not None: | |
1033 data = _getsidedata(srcrepo, rev) | |
1034 sidedata_queue.put((rev, data)) | |
1035 tokens.acquire() | |
1036 rev = revs_queue.get() | |
1037 # processing of `None` is completed, release the token. | |
1038 tokens.release() | |
1039 | |
1040 | |
1041 BUFF_PER_WORKER = 50 | |
1042 | |
1043 | |
1044 def _get_worker_sidedata_adder(srcrepo, destrepo): | |
1045 """The parallel version of the sidedata computation | |
1046 | |
1047 This code spawn a pool of worker that precompute a buffer of sidedata | |
1048 before we actually need them""" | |
1049 # avoid circular import copies -> scmutil -> worker -> copies | |
1050 from . import worker | |
1051 | |
1052 nbworkers = worker._numworkers(srcrepo.ui) | |
1053 | |
1054 tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER) | |
1055 revsq = multiprocessing.Queue() | |
1056 sidedataq = multiprocessing.Queue() | |
1057 | |
1058 assert srcrepo.filtername is None | |
1059 # queue all tasks beforehand, revision numbers are small and it make | |
1060 # synchronisation simpler | |
1061 # | |
1062 # Since the computation for each node can be quite expensive, the overhead | |
1063 # of using a single queue is not revelant. In practice, most computation | |
1064 # are fast but some are very expensive and dominate all the other smaller | |
1065 # cost. | |
1066 for r in srcrepo.changelog.revs(): | |
1067 revsq.put(r) | |
1068 # queue the "no more tasks" markers | |
1069 for i in range(nbworkers): | |
1070 revsq.put(None) | |
1071 | |
1072 allworkers = [] | |
1073 for i in range(nbworkers): | |
1074 args = (srcrepo, revsq, sidedataq, tokens) | |
1075 w = multiprocessing.Process(target=_sidedata_worker, args=args) | |
1076 allworkers.append(w) | |
1077 w.start() | |
1078 | |
1079 # dictionnary to store results for revision higher than we one we are | |
1080 # looking for. For example, if we need the sidedatamap for 42, and 43 is | |
1081 # received, when shelve 43 for later use. | |
1082 staging = {} | |
1083 | |
1084 def sidedata_companion(revlog, rev): | |
1085 sidedata = {} | |
1086 if util.safehasattr(revlog, b'filteredrevs'): # this is a changelog | |
1087 # Is the data previously shelved ? | |
1088 sidedata = staging.pop(rev, None) | |
1089 if sidedata is None: | |
1090 # look at the queued result until we find the one we are lookig | |
1091 # for (shelve the other ones) | |
1092 r, sidedata = sidedataq.get() | |
1093 while r != rev: | |
1094 staging[r] = sidedata | |
1095 r, sidedata = sidedataq.get() | |
1096 tokens.release() | |
1097 return False, (), sidedata | |
1098 | |
1099 return sidedata_companion | |
1100 | |
1101 | |
1102 def _get_simple_sidedata_adder(srcrepo, destrepo): | |
1103 """The simple version of the sidedata computation | |
1104 | |
1105 It just compute it in the same thread on request""" | |
1106 | |
1010 def sidedatacompanion(revlog, rev): | 1107 def sidedatacompanion(revlog, rev): |
1011 sidedata = {} | 1108 sidedata = {} |
1012 if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog | 1109 if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog |
1013 sidedata = _getsidedata(srcrepo, rev) | 1110 sidedata = _getsidedata(srcrepo, rev) |
1014 return False, (), sidedata | 1111 return False, (), sidedata |