Mercurial > public > mercurial-scm > evolve
view hgext3rd/pullbundle.py @ 4130:a1f6b8211016
pullbundle: add caching logic
We now only generate a bundle once (and store it to disk). If we need it again,
we use it directly from disk.
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Sun, 23 Sep 2018 00:08:02 +0200 |
parents | bc4e62a1cb82 |
children | afc933d32085 |
line wrap: on
line source
# Extension to provide automatic caching of bundle server for pull # # Copyright 2018 Pierre-Yves David <pierre-yves.david@ens-lyon.org> # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. import errno import os from mercurial import ( changegroup, discovery, exchange, narrowspec, node as nodemod, util, ) from mercurial.i18n import _ # generic wrapping def uisetup(ui): exchange.getbundle2partsmapping['changegroup'] = _getbundlechangegrouppart def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None, b2caps=None, heads=None, common=None, **kwargs): """add a changegroup part to the requested bundle""" if not kwargs.get(r'cg', True): return version = '01' cgversions = b2caps.get('changegroup') if cgversions: # 3.1 and 3.2 ship with an empty value cgversions = [v for v in cgversions if v in changegroup.supportedoutgoingversions(repo)] if not cgversions: raise ValueError(_('no common changegroup version')) version = max(cgversions) outgoing = exchange._computeoutgoing(repo, heads, common) if not outgoing.missing: return if kwargs.get(r'narrow', False): include = sorted(filter(bool, kwargs.get(r'includepats', []))) exclude = sorted(filter(bool, kwargs.get(r'excludepats', []))) filematcher = narrowspec.match(repo.root, include=include, exclude=exclude) else: filematcher = None # START OF ALTERED PART makeallcgpart(bundler.newpart, repo, outgoing, version, source, bundlecaps, filematcher, cgversions) # END OF ALTERED PART if kwargs.get(r'narrow', False) and (include or exclude): narrowspecpart = bundler.newpart('narrow:spec') if include: narrowspecpart.addparam( 'include', '\n'.join(include), mandatory=True) if exclude: narrowspecpart.addparam( 'exclude', '\n'.join(exclude), mandatory=True) def makeallcgpart(newpart, repo, outgoing, version, source, bundlecaps, filematcher, cgversions): pullbundle = not filematcher if pullbundle and not util.safehasattr(repo, 'stablerange'): repo.ui.warn('pullbundle: required extension "evolve" are missing, skipping pullbundle\n') pullbundle = False if filematcher: makeonecgpart(newpart, repo, None, outgoing, version, source, bundlecaps, filematcher, cgversions) else: for sliceid, sliceout in sliceoutgoing(repo, outgoing): makeonecgpart(newpart, repo, sliceid, sliceout, version, source, bundlecaps, filematcher, cgversions) # stable range slicing def sliceoutgoing(repo, outgoing): cl = repo.changelog rev = cl.nodemap.get node = cl.node revsort = repo.stablesort missingrevs = set(rev(n) for n in outgoing.missing) allslices = [] missingheads = [rev(n) for n in outgoing.missingheads] for head in missingheads: localslices = [] localmissing = set(repo.revs('%ld and ::%d', missingrevs, head)) while localmissing: slicerevs = [] for r in revsort.walkfrom(repo, head): if r not in missingrevs: break slicerevs.append(r) slicenodes = [node(r) for r in slicerevs] localslices.extend(canonicalslices(repo, slicenodes)) missingrevs.difference_update(slicerevs) localmissing.difference_update(slicerevs) if localmissing: head = max(localmissing) allslices.extend(localslices) return [(rangeid, outgoingfromnodes(repo, nodes)) for rangeid, nodes in allslices] def canonicalslices(repo, nodes): depth = repo.depthcache.get stablerange = repo.stablerange rangelength = lambda x: stablerange.rangelength(repo, x) headrev = repo.changelog.rev(nodes[0]) nbrevs = len(nodes) headdepth = depth(headrev) skipped = headdepth - nbrevs rangeid = (headrev, skipped) subranges = canonicalsubranges(repo, stablerange, rangeid) idx = 0 slices = [] nodes.reverse() for rangeid in subranges: size = rangelength(rangeid) slices.append((rangeid, nodes[idx:idx + size])) idx += size return slices def canonicalsubranges(repo, stablerange, rangeid): """slice a size of nodes into most reusable subranges We try to slice a range into a set of "largest" and "canonical" stable range. It might make sense to move this function as a 'stablerange' method. """ headrev, skip = rangeid rangedepth = stablerange.depthrev(repo, rangeid[0]) canonicals = [] # 0. find the first power of 2 higher than this range depth cursor = 1 while cursor <= rangedepth: cursor *= 2 # 1. find first cupt precut = cut = 0 while True: if skip <= cut: break if cut + cursor < rangedepth: precut = cut cut += cursor if cursor == 1: break cursor //= 2 # 2. optimise, bottom part if skip != cut: tailranges = [] tailsize = cut - skip assert 0 < tailsize, tailsize prerange = (headrev, precut) size = stablerange.rangelength(repo, prerange) sub = stablerange.subranges(repo, prerange)[:-1] while not poweroftwo(tailsize): for prerange in reversed(sub): if tailsize <= 0: break assert stablerange.depthrev(repo, prerange[0]) != prerange[1], prerange tailrev, tailskip = prerange size = stablerange.rangelength(repo, (tailrev, tailskip)) if tailsize < size: tailskip += size - tailsize size = tailsize tailranges.append((tailrev, tailskip)) tailsize -= size else: # size of the last block tailsize = stablerange.rangelength(repo, tailranges[-1]) if poweroftwo(tailsize): continue # exit the loop prerange = tailranges.pop() sub = stablerange.subranges(repo, prerange) tailranges.reverse() canonicals.extend(tailranges) # 3. take recursive subrange until we get to a power of two size? current = (headrev, cut) while not poweroftwo(stablerange.rangelength(repo, current)): sub = stablerange.subranges(repo, current) canonicals.extend(sub[:-1]) current = sub[-1] canonicals.append(current) return canonicals def poweroftwo(num): return num and not num & (num - 1) def outgoingfromnodes(repo, nodes): return discovery.outgoing(repo, missingroots=nodes, missingheads=nodes) # changegroup part construction def _changegroupinfo(repo, nodes, source): if repo.ui.verbose or source == 'bundle': repo.ui.status(_("%d changesets found\n") % len(nodes)) def _makenewstream(newpart, repo, outgoing, version, source, bundlecaps, filematcher, cgversions): old = changegroup._changegroupinfo try: changegroup._changegroupinfo = _changegroupinfo cgstream = changegroup.makestream(repo, outgoing, version, source, bundlecaps=bundlecaps, filematcher=filematcher) finally: changegroup._changegroupinfo = old nbchanges = len(outgoing.missing) pversion = None if cgversions: pversion = version return (cgstream, nbchanges, pversion) def _makepartfromstream(newpart, repo, cgstream, nbchanges, version): # same as upstream code part = newpart('changegroup', data=cgstream) if version: part.addparam('version', version) part.addparam('nbchanges', '%d' % nbchanges, mandatory=False) if 'treemanifest' in repo.requirements: part.addparam('treemanifest', '1') # cache management def cachedir(repo): return repo.cachevfs.join('pullbundles') def getcache(repo, bundlename): cdir = cachedir(repo) bundlepath = os.path.join(cdir, bundlename) try: fd = open(bundlepath, 'rb') return util.filechunkiter(fd) except IOError as exc: if exc.errno != errno.ENOENT: raise return None def cachewriter(repo, bundlename, stream): cdir = cachedir(repo) bundlepath = os.path.join(cdir, bundlename) try: os.makedirs(cdir) except OSError as exc: if exc.errno == errno.EEXIST: pass with util.atomictempfile(bundlepath) as cachefile: for chunk in stream: cachefile.write(chunk) yield chunk BUNDLEMASK = "%s-%s-%010iskip-%010isize.hg" def makeonecgpart(newpart, repo, rangeid, outgoing, version, source, bundlecaps, filematcher, cgversions): bundlename = cachedata = None if rangeid is not None: nbchanges = repo.stablerange.rangelength(repo, rangeid) headnode = nodemod.hex(repo.changelog.node(rangeid[0])) # XXX do we need to use cgversion in there? bundlename = BUNDLEMASK % (version, headnode, rangeid[1], nbchanges) cachedata = getcache(repo, bundlename) if cachedata is None: partdata = _makenewstream(newpart, repo, outgoing, version, source, bundlecaps, filematcher, cgversions) if bundlename is not None: cgstream = cachewriter(repo, bundlename, partdata[0]) partdata = (cgstream,) + partdata[1:] else: if repo.ui.verbose or source == 'bundle': repo.ui.status(_("%d changesets found in caches\n") % nbchanges) pversion = None if cgversions: pversion = version partdata = (cachedata, nbchanges, pversion) return _makepartfromstream(newpart, repo, *partdata)