Mercurial > public > mercurial-scm > hg-stable
diff mercurial/copies.py @ 44981:4c1d39215034
metadata: move computation related to files touched in a dedicated module
This was suggested by Yuya Nishihara a while back. Since I am about to add more
metadata related computation, lets create a new repositories.
Differential Revision: https://phab.mercurial-scm.org/D8587
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Wed, 27 May 2020 12:26:08 +0200 |
parents | 61719b9658b1 |
children | cfd06649a1b8 |
line wrap: on
line diff
--- a/mercurial/copies.py Fri Jun 05 11:10:33 2020 -0700 +++ b/mercurial/copies.py Wed May 27 12:26:08 2020 +0200 @@ -8,7 +8,6 @@ from __future__ import absolute_import import collections -import multiprocessing import os from .i18n import _ @@ -17,7 +16,6 @@ from .revlogutils.flagutil import REVIDX_SIDEDATA from . import ( - error, match as matchmod, node, pathutil, @@ -25,7 +23,6 @@ util, ) -from .revlogutils import sidedata as sidedatamod from .utils import stringutil @@ -992,250 +989,3 @@ _filter(wctx.p1(), wctx, new_copies) for dst, src in pycompat.iteritems(new_copies): wctx[dst].markcopied(src) - - -def computechangesetfilesadded(ctx): - """return the list of files added in a changeset - """ - added = [] - for f in ctx.files(): - if not any(f in p for p in ctx.parents()): - added.append(f) - return added - - -def computechangesetfilesremoved(ctx): - """return the list of files removed in a changeset - """ - removed = [] - for f in ctx.files(): - if f not in ctx: - removed.append(f) - return removed - - -def computechangesetcopies(ctx): - """return the copies data for a changeset - - The copies data are returned as a pair of dictionnary (p1copies, p2copies). - - Each dictionnary are in the form: `{newname: oldname}` - """ - p1copies = {} - p2copies = {} - p1 = ctx.p1() - p2 = ctx.p2() - narrowmatch = ctx._repo.narrowmatch() - for dst in ctx.files(): - if not narrowmatch(dst) or dst not in ctx: - continue - copied = ctx[dst].renamed() - if not copied: - continue - src, srcnode = copied - if src in p1 and p1[src].filenode() == srcnode: - p1copies[dst] = src - elif src in p2 and p2[src].filenode() == srcnode: - p2copies[dst] = src - return p1copies, p2copies - - -def encodecopies(files, copies): - items = [] - for i, dst in enumerate(files): - if dst in copies: - items.append(b'%d\0%s' % (i, copies[dst])) - if len(items) != len(copies): - raise error.ProgrammingError( - b'some copy targets missing from file list' - ) - return b"\n".join(items) - - -def decodecopies(files, data): - try: - copies = {} - if not data: - return copies - for l in data.split(b'\n'): - strindex, src = l.split(b'\0') - i = int(strindex) - dst = files[i] - copies[dst] = src - return copies - except (ValueError, IndexError): - # Perhaps someone had chosen the same key name (e.g. "p1copies") and - # used different syntax for the value. - return None - - -def encodefileindices(files, subset): - subset = set(subset) - indices = [] - for i, f in enumerate(files): - if f in subset: - indices.append(b'%d' % i) - return b'\n'.join(indices) - - -def decodefileindices(files, data): - try: - subset = [] - if not data: - return subset - for strindex in data.split(b'\n'): - i = int(strindex) - if i < 0 or i >= len(files): - return None - subset.append(files[i]) - return subset - except (ValueError, IndexError): - # Perhaps someone had chosen the same key name (e.g. "added") and - # used different syntax for the value. - return None - - -def _getsidedata(srcrepo, rev): - ctx = srcrepo[rev] - filescopies = computechangesetcopies(ctx) - filesadded = computechangesetfilesadded(ctx) - filesremoved = computechangesetfilesremoved(ctx) - sidedata = {} - if any([filescopies, filesadded, filesremoved]): - sortedfiles = sorted(ctx.files()) - p1copies, p2copies = filescopies - p1copies = encodecopies(sortedfiles, p1copies) - p2copies = encodecopies(sortedfiles, p2copies) - filesadded = encodefileindices(sortedfiles, filesadded) - filesremoved = encodefileindices(sortedfiles, filesremoved) - if p1copies: - sidedata[sidedatamod.SD_P1COPIES] = p1copies - if p2copies: - sidedata[sidedatamod.SD_P2COPIES] = p2copies - if filesadded: - sidedata[sidedatamod.SD_FILESADDED] = filesadded - if filesremoved: - sidedata[sidedatamod.SD_FILESREMOVED] = filesremoved - return sidedata - - -def getsidedataadder(srcrepo, destrepo): - use_w = srcrepo.ui.configbool(b'experimental', b'worker.repository-upgrade') - if pycompat.iswindows or not use_w: - return _get_simple_sidedata_adder(srcrepo, destrepo) - else: - return _get_worker_sidedata_adder(srcrepo, destrepo) - - -def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens): - """The function used by worker precomputing sidedata - - It read an input queue containing revision numbers - It write in an output queue containing (rev, <sidedata-map>) - - The `None` input value is used as a stop signal. - - The `tokens` semaphore is user to avoid having too many unprocessed - entries. The workers needs to acquire one token before fetching a task. - They will be released by the consumer of the produced data. - """ - tokens.acquire() - rev = revs_queue.get() - while rev is not None: - data = _getsidedata(srcrepo, rev) - sidedata_queue.put((rev, data)) - tokens.acquire() - rev = revs_queue.get() - # processing of `None` is completed, release the token. - tokens.release() - - -BUFF_PER_WORKER = 50 - - -def _get_worker_sidedata_adder(srcrepo, destrepo): - """The parallel version of the sidedata computation - - This code spawn a pool of worker that precompute a buffer of sidedata - before we actually need them""" - # avoid circular import copies -> scmutil -> worker -> copies - from . import worker - - nbworkers = worker._numworkers(srcrepo.ui) - - tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER) - revsq = multiprocessing.Queue() - sidedataq = multiprocessing.Queue() - - assert srcrepo.filtername is None - # queue all tasks beforehand, revision numbers are small and it make - # synchronisation simpler - # - # Since the computation for each node can be quite expensive, the overhead - # of using a single queue is not revelant. In practice, most computation - # are fast but some are very expensive and dominate all the other smaller - # cost. - for r in srcrepo.changelog.revs(): - revsq.put(r) - # queue the "no more tasks" markers - for i in range(nbworkers): - revsq.put(None) - - allworkers = [] - for i in range(nbworkers): - args = (srcrepo, revsq, sidedataq, tokens) - w = multiprocessing.Process(target=_sidedata_worker, args=args) - allworkers.append(w) - w.start() - - # dictionnary to store results for revision higher than we one we are - # looking for. For example, if we need the sidedatamap for 42, and 43 is - # received, when shelve 43 for later use. - staging = {} - - def sidedata_companion(revlog, rev): - sidedata = {} - if util.safehasattr(revlog, b'filteredrevs'): # this is a changelog - # Is the data previously shelved ? - sidedata = staging.pop(rev, None) - if sidedata is None: - # look at the queued result until we find the one we are lookig - # for (shelve the other ones) - r, sidedata = sidedataq.get() - while r != rev: - staging[r] = sidedata - r, sidedata = sidedataq.get() - tokens.release() - return False, (), sidedata - - return sidedata_companion - - -def _get_simple_sidedata_adder(srcrepo, destrepo): - """The simple version of the sidedata computation - - It just compute it in the same thread on request""" - - def sidedatacompanion(revlog, rev): - sidedata = {} - if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog - sidedata = _getsidedata(srcrepo, rev) - return False, (), sidedata - - return sidedatacompanion - - -def getsidedataremover(srcrepo, destrepo): - def sidedatacompanion(revlog, rev): - f = () - if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog - if revlog.flags(rev) & REVIDX_SIDEDATA: - f = ( - sidedatamod.SD_P1COPIES, - sidedatamod.SD_P2COPIES, - sidedatamod.SD_FILESADDED, - sidedatamod.SD_FILESREMOVED, - ) - return False, f, {} - - return sidedatacompanion