comparison mercurial/copies.py @ 43806:acbb55b8e9dc

upgraderepo: add a config option for parallel computation The option is put to use to compute new copy tracing side data in parallel. It use the multiprocessing module as it had the appropriate primitive for what we needed. Gregory Szorc had concerned on windows so we disabled it there. See inline comment for details on the parallel implementation.
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Sun, 29 Sep 2019 16:00:32 +0200
parents 421ea5772039
children 2f0a44c69e07
comparison
equal deleted inserted replaced
43805:ad84fc97d120 43806:acbb55b8e9dc
6 # GNU General Public License version 2 or any later version. 6 # GNU General Public License version 2 or any later version.
7 7
8 from __future__ import absolute_import 8 from __future__ import absolute_import
9 9
10 import collections 10 import collections
11 import multiprocessing
11 import os 12 import os
12 13
13 from .i18n import _ 14 from .i18n import _
14 15
15 16
1005 sidedata[sidedatamod.SD_FILESREMOVED] = filesremoved 1006 sidedata[sidedatamod.SD_FILESREMOVED] = filesremoved
1006 return sidedata 1007 return sidedata
1007 1008
1008 1009
1009 def getsidedataadder(srcrepo, destrepo): 1010 def getsidedataadder(srcrepo, destrepo):
1011 use_w = srcrepo.ui.configbool(b'experimental', b'worker.repository-upgrade')
1012 if pycompat.iswindows or not use_w:
1013 return _get_simple_sidedata_adder(srcrepo, destrepo)
1014 else:
1015 return _get_worker_sidedata_adder(srcrepo, destrepo)
1016
1017
1018 def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens):
1019 """The function used by worker precomputing sidedata
1020
1021 It read an input queue containing revision numbers
1022 It write in an output queue containing (rev, <sidedata-map>)
1023
1024 The `None` input value is used as a stop signal.
1025
1026 The `tokens` semaphore is user to avoid having too many unprocessed
1027 entries. The workers needs to acquire one token before fetching a task.
1028 They will be released by the consumer of the produced data.
1029 """
1030 tokens.acquire()
1031 rev = revs_queue.get()
1032 while rev is not None:
1033 data = _getsidedata(srcrepo, rev)
1034 sidedata_queue.put((rev, data))
1035 tokens.acquire()
1036 rev = revs_queue.get()
1037 # processing of `None` is completed, release the token.
1038 tokens.release()
1039
1040
1041 BUFF_PER_WORKER = 50
1042
1043
1044 def _get_worker_sidedata_adder(srcrepo, destrepo):
1045 """The parallel version of the sidedata computation
1046
1047 This code spawn a pool of worker that precompute a buffer of sidedata
1048 before we actually need them"""
1049 # avoid circular import copies -> scmutil -> worker -> copies
1050 from . import worker
1051
1052 nbworkers = worker._numworkers(srcrepo.ui)
1053
1054 tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER)
1055 revsq = multiprocessing.Queue()
1056 sidedataq = multiprocessing.Queue()
1057
1058 assert srcrepo.filtername is None
1059 # queue all tasks beforehand, revision numbers are small and it make
1060 # synchronisation simpler
1061 #
1062 # Since the computation for each node can be quite expensive, the overhead
1063 # of using a single queue is not revelant. In practice, most computation
1064 # are fast but some are very expensive and dominate all the other smaller
1065 # cost.
1066 for r in srcrepo.changelog.revs():
1067 revsq.put(r)
1068 # queue the "no more tasks" markers
1069 for i in range(nbworkers):
1070 revsq.put(None)
1071
1072 allworkers = []
1073 for i in range(nbworkers):
1074 args = (srcrepo, revsq, sidedataq, tokens)
1075 w = multiprocessing.Process(target=_sidedata_worker, args=args)
1076 allworkers.append(w)
1077 w.start()
1078
1079 # dictionnary to store results for revision higher than we one we are
1080 # looking for. For example, if we need the sidedatamap for 42, and 43 is
1081 # received, when shelve 43 for later use.
1082 staging = {}
1083
1084 def sidedata_companion(revlog, rev):
1085 sidedata = {}
1086 if util.safehasattr(revlog, b'filteredrevs'): # this is a changelog
1087 # Is the data previously shelved ?
1088 sidedata = staging.pop(rev, None)
1089 if sidedata is None:
1090 # look at the queued result until we find the one we are lookig
1091 # for (shelve the other ones)
1092 r, sidedata = sidedataq.get()
1093 while r != rev:
1094 staging[r] = sidedata
1095 r, sidedata = sidedataq.get()
1096 tokens.release()
1097 return False, (), sidedata
1098
1099 return sidedata_companion
1100
1101
1102 def _get_simple_sidedata_adder(srcrepo, destrepo):
1103 """The simple version of the sidedata computation
1104
1105 It just compute it in the same thread on request"""
1106
1010 def sidedatacompanion(revlog, rev): 1107 def sidedatacompanion(revlog, rev):
1011 sidedata = {} 1108 sidedata = {}
1012 if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog 1109 if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog
1013 sidedata = _getsidedata(srcrepo, rev) 1110 sidedata = _getsidedata(srcrepo, rev)
1014 return False, (), sidedata 1111 return False, (), sidedata