diff -r 9aeb9e2d28a7 -r 3a333a582d7b hgext/remotefilelog/basepack.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/hgext/remotefilelog/basepack.py Thu Sep 27 13:03:19 2018 -0400 @@ -0,0 +1,543 @@ +from __future__ import absolute_import + +import collections +import errno +import hashlib +import mmap +import os +import struct +import time + +from mercurial.i18n import _ +from mercurial import ( + policy, + pycompat, + util, + vfs as vfsmod, +) +from . import shallowutil + +osutil = policy.importmod(r'osutil') + +# The pack version supported by this implementation. This will need to be +# rev'd whenever the byte format changes. Ex: changing the fanout prefix, +# changing any of the int sizes, changing the delta algorithm, etc. +PACKVERSIONSIZE = 1 +INDEXVERSIONSIZE = 2 + +FANOUTSTART = INDEXVERSIONSIZE + +# Constant that indicates a fanout table entry hasn't been filled in. (This does +# not get serialized) +EMPTYFANOUT = -1 + +# The fanout prefix is the number of bytes that can be addressed by the fanout +# table. Example: a fanout prefix of 1 means we use the first byte of a hash to +# look in the fanout table (which will be 2^8 entries long). +SMALLFANOUTPREFIX = 1 +LARGEFANOUTPREFIX = 2 + +# The number of entries in the index at which point we switch to a large fanout. +# It is chosen to balance the linear scan through a sparse fanout, with the +# size of the bisect in actual index. +# 2^16 / 8 was chosen because it trades off (1 step fanout scan + 5 step +# bisect) with (8 step fanout scan + 1 step bisect) +# 5 step bisect = log(2^16 / 8 / 255) # fanout +# 10 step fanout scan = 2^16 / (2^16 / 8) # fanout space divided by entries +SMALLFANOUTCUTOFF = 2**16 / 8 + +# The amount of time to wait between checking for new packs. This prevents an +# exception when data is moved to a new pack after the process has already +# loaded the pack list. +REFRESHRATE = 0.1 + +if pycompat.isposix: + # With glibc 2.7+ the 'e' flag uses O_CLOEXEC when opening. + # The 'e' flag will be ignored on older versions of glibc. + PACKOPENMODE = 'rbe' +else: + PACKOPENMODE = 'rb' + +class _cachebackedpacks(object): + def __init__(self, packs, cachesize): + self._packs = set(packs) + self._lrucache = util.lrucachedict(cachesize) + self._lastpack = None + + # Avoid cold start of the cache by populating the most recent packs + # in the cache. + for i in reversed(range(min(cachesize, len(packs)))): + self._movetofront(packs[i]) + + def _movetofront(self, pack): + # This effectively makes pack the first entry in the cache. + self._lrucache[pack] = True + + def _registerlastpackusage(self): + if self._lastpack is not None: + self._movetofront(self._lastpack) + self._lastpack = None + + def add(self, pack): + self._registerlastpackusage() + + # This method will mostly be called when packs are not in cache. + # Therefore, adding pack to the cache. + self._movetofront(pack) + self._packs.add(pack) + + def __iter__(self): + self._registerlastpackusage() + + # Cache iteration is based on LRU. + for pack in self._lrucache: + self._lastpack = pack + yield pack + + cachedpacks = set(pack for pack in self._lrucache) + # Yield for paths not in the cache. + for pack in self._packs - cachedpacks: + self._lastpack = pack + yield pack + + # Data not found in any pack. + self._lastpack = None + +class basepackstore(object): + # Default cache size limit for the pack files. + DEFAULTCACHESIZE = 100 + + def __init__(self, ui, path): + self.ui = ui + self.path = path + + # lastrefesh is 0 so we'll immediately check for new packs on the first + # failure. + self.lastrefresh = 0 + + packs = [] + for filepath, __, __ in self._getavailablepackfilessorted(): + try: + pack = self.getpack(filepath) + except Exception as ex: + # An exception may be thrown if the pack file is corrupted + # somehow. Log a warning but keep going in this case, just + # skipping this pack file. + # + # If this is an ENOENT error then don't even bother logging. + # Someone could have removed the file since we retrieved the + # list of paths. + if getattr(ex, 'errno', None) != errno.ENOENT: + ui.warn(_('unable to load pack %s: %s\n') % (filepath, ex)) + continue + packs.append(pack) + + self.packs = _cachebackedpacks(packs, self.DEFAULTCACHESIZE) + + def _getavailablepackfiles(self): + """For each pack file (a index/data file combo), yields: + (full path without extension, mtime, size) + + mtime will be the mtime of the index/data file (whichever is newer) + size is the combined size of index/data file + """ + indexsuffixlen = len(self.INDEXSUFFIX) + packsuffixlen = len(self.PACKSUFFIX) + + ids = set() + sizes = collections.defaultdict(lambda: 0) + mtimes = collections.defaultdict(lambda: []) + try: + for filename, type, stat in osutil.listdir(self.path, stat=True): + id = None + if filename[-indexsuffixlen:] == self.INDEXSUFFIX: + id = filename[:-indexsuffixlen] + elif filename[-packsuffixlen:] == self.PACKSUFFIX: + id = filename[:-packsuffixlen] + + # Since we expect to have two files corresponding to each ID + # (the index file and the pack file), we can yield once we see + # it twice. + if id: + sizes[id] += stat.st_size # Sum both files' sizes together + mtimes[id].append(stat.st_mtime) + if id in ids: + yield (os.path.join(self.path, id), max(mtimes[id]), + sizes[id]) + else: + ids.add(id) + except OSError as ex: + if ex.errno != errno.ENOENT: + raise + + def _getavailablepackfilessorted(self): + """Like `_getavailablepackfiles`, but also sorts the files by mtime, + yielding newest files first. + + This is desirable, since it is more likely newer packfiles have more + desirable data. + """ + files = [] + for path, mtime, size in self._getavailablepackfiles(): + files.append((mtime, size, path)) + files = sorted(files, reverse=True) + for mtime, size, path in files: + yield path, mtime, size + + def gettotalsizeandcount(self): + """Returns the total disk size (in bytes) of all the pack files in + this store, and the count of pack files. + + (This might be smaller than the total size of the ``self.path`` + directory, since this only considers fuly-writen pack files, and not + temporary files or other detritus on the directory.) + """ + totalsize = 0 + count = 0 + for __, __, size in self._getavailablepackfiles(): + totalsize += size + count += 1 + return totalsize, count + + def getmetrics(self): + """Returns metrics on the state of this store.""" + size, count = self.gettotalsizeandcount() + return { + 'numpacks': count, + 'totalpacksize': size, + } + + def getpack(self, path): + raise NotImplementedError() + + def getmissing(self, keys): + missing = keys + for pack in self.packs: + missing = pack.getmissing(missing) + + # Ensures better performance of the cache by keeping the most + # recently accessed pack at the beginning in subsequent iterations. + if not missing: + return missing + + if missing: + for pack in self.refresh(): + missing = pack.getmissing(missing) + + return missing + + def markledger(self, ledger, options=None): + for pack in self.packs: + pack.markledger(ledger) + + def markforrefresh(self): + """Tells the store that there may be new pack files, so the next time it + has a lookup miss it should check for new files.""" + self.lastrefresh = 0 + + def refresh(self): + """Checks for any new packs on disk, adds them to the main pack list, + and returns a list of just the new packs.""" + now = time.time() + + # If we experience a lot of misses (like in the case of getmissing() on + # new objects), let's only actually check disk for new stuff every once + # in a while. Generally this code path should only ever matter when a + # repack is going on in the background, and that should be pretty rare + # to have that happen twice in quick succession. + newpacks = [] + if now > self.lastrefresh + REFRESHRATE: + self.lastrefresh = now + previous = set(p.path for p in self.packs) + for filepath, __, __ in self._getavailablepackfilessorted(): + if filepath not in previous: + newpack = self.getpack(filepath) + newpacks.append(newpack) + self.packs.add(newpack) + + return newpacks + +class versionmixin(object): + # Mix-in for classes with multiple supported versions + VERSION = None + SUPPORTED_VERSIONS = [0] + + def _checkversion(self, version): + if version in self.SUPPORTED_VERSIONS: + if self.VERSION is None: + # only affect this instance + self.VERSION = version + elif self.VERSION != version: + raise RuntimeError('inconsistent version: %s' % version) + else: + raise RuntimeError('unsupported version: %s' % version) + +class basepack(versionmixin): + # The maximum amount we should read via mmap before remmaping so the old + # pages can be released (100MB) + MAXPAGEDIN = 100 * 1024**2 + + SUPPORTED_VERSIONS = [0] + + def __init__(self, path): + self.path = path + self.packpath = path + self.PACKSUFFIX + self.indexpath = path + self.INDEXSUFFIX + + self.indexsize = os.stat(self.indexpath).st_size + self.datasize = os.stat(self.packpath).st_size + + self._index = None + self._data = None + self.freememory() # initialize the mmap + + version = struct.unpack('!B', self._data[:PACKVERSIONSIZE])[0] + self._checkversion(version) + + version, config = struct.unpack('!BB', self._index[:INDEXVERSIONSIZE]) + self._checkversion(version) + + if 0b10000000 & config: + self.params = indexparams(LARGEFANOUTPREFIX, version) + else: + self.params = indexparams(SMALLFANOUTPREFIX, version) + + @util.propertycache + def _fanouttable(self): + params = self.params + rawfanout = self._index[FANOUTSTART:FANOUTSTART + params.fanoutsize] + fanouttable = [] + for i in pycompat.xrange(0, params.fanoutcount): + loc = i * 4 + fanoutentry = struct.unpack('!I', rawfanout[loc:loc + 4])[0] + fanouttable.append(fanoutentry) + return fanouttable + + @util.propertycache + def _indexend(self): + if self.VERSION == 0: + return self.indexsize + else: + nodecount = struct.unpack_from('!Q', self._index, + self.params.indexstart - 8)[0] + return self.params.indexstart + nodecount * self.INDEXENTRYLENGTH + + def freememory(self): + """Unmap and remap the memory to free it up after known expensive + operations. Return True if self._data and self._index were reloaded. + """ + if self._index: + if self._pagedin < self.MAXPAGEDIN: + return False + + self._index.close() + self._data.close() + + # TODO: use an opener/vfs to access these paths + with open(self.indexpath, PACKOPENMODE) as indexfp: + # memory-map the file, size 0 means whole file + self._index = mmap.mmap(indexfp.fileno(), 0, + access=mmap.ACCESS_READ) + with open(self.packpath, PACKOPENMODE) as datafp: + self._data = mmap.mmap(datafp.fileno(), 0, access=mmap.ACCESS_READ) + + self._pagedin = 0 + return True + + def getmissing(self, keys): + raise NotImplementedError() + + def markledger(self, ledger, options=None): + raise NotImplementedError() + + def cleanup(self, ledger): + raise NotImplementedError() + + def __iter__(self): + raise NotImplementedError() + + def iterentries(self): + raise NotImplementedError() + +class mutablebasepack(versionmixin): + + def __init__(self, ui, packdir, version=0): + self._checkversion(version) + + opener = vfsmod.vfs(packdir) + opener.createmode = 0o444 + self.opener = opener + + self.entries = {} + + shallowutil.mkstickygroupdir(ui, packdir) + self.packfp, self.packpath = opener.mkstemp( + suffix=self.PACKSUFFIX + '-tmp') + self.idxfp, self.idxpath = opener.mkstemp( + suffix=self.INDEXSUFFIX + '-tmp') + self.packfp = os.fdopen(self.packfp, 'w+') + self.idxfp = os.fdopen(self.idxfp, 'w+') + self.sha = hashlib.sha1() + self._closed = False + + # The opener provides no way of doing permission fixup on files created + # via mkstemp, so we must fix it ourselves. We can probably fix this + # upstream in vfs.mkstemp so we don't need to use the private method. + opener._fixfilemode(opener.join(self.packpath)) + opener._fixfilemode(opener.join(self.idxpath)) + + # Write header + # TODO: make it extensible (ex: allow specifying compression algorithm, + # a flexible key/value header, delta algorithm, fanout size, etc) + versionbuf = struct.pack('!B', self.VERSION) # unsigned 1 byte int + self.writeraw(versionbuf) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + if exc_type is None: + self.close() + else: + self.abort() + + def abort(self): + # Unclean exit + self._cleantemppacks() + + def writeraw(self, data): + self.packfp.write(data) + self.sha.update(data) + + def close(self, ledger=None): + if self._closed: + return + + try: + sha = self.sha.hexdigest() + self.packfp.close() + self.writeindex() + + if len(self.entries) == 0: + # Empty pack + self._cleantemppacks() + self._closed = True + return None + + self.opener.rename(self.packpath, sha + self.PACKSUFFIX) + try: + self.opener.rename(self.idxpath, sha + self.INDEXSUFFIX) + except Exception as ex: + try: + self.opener.unlink(sha + self.PACKSUFFIX) + except Exception: + pass + # Throw exception 'ex' explicitly since a normal 'raise' would + # potentially throw an exception from the unlink cleanup. + raise ex + except Exception: + # Clean up temp packs in all exception cases + self._cleantemppacks() + raise + + self._closed = True + result = self.opener.join(sha) + if ledger: + ledger.addcreated(result) + return result + + def _cleantemppacks(self): + try: + self.opener.unlink(self.packpath) + except Exception: + pass + try: + self.opener.unlink(self.idxpath) + except Exception: + pass + + def writeindex(self): + rawindex = '' + + largefanout = len(self.entries) > SMALLFANOUTCUTOFF + if largefanout: + params = indexparams(LARGEFANOUTPREFIX, self.VERSION) + else: + params = indexparams(SMALLFANOUTPREFIX, self.VERSION) + + fanouttable = [EMPTYFANOUT] * params.fanoutcount + + # Precompute the location of each entry + locations = {} + count = 0 + for node in sorted(self.entries.iterkeys()): + location = count * self.INDEXENTRYLENGTH + locations[node] = location + count += 1 + + # Must use [0] on the unpack result since it's always a tuple. + fanoutkey = struct.unpack(params.fanoutstruct, + node[:params.fanoutprefix])[0] + if fanouttable[fanoutkey] == EMPTYFANOUT: + fanouttable[fanoutkey] = location + + rawfanouttable = '' + last = 0 + for offset in fanouttable: + offset = offset if offset != EMPTYFANOUT else last + last = offset + rawfanouttable += struct.pack('!I', offset) + + rawentrieslength = struct.pack('!Q', len(self.entries)) + + # The index offset is the it's location in the file. So after the 2 byte + # header and the fanouttable. + rawindex = self.createindex(locations, 2 + len(rawfanouttable)) + + self._writeheader(params) + self.idxfp.write(rawfanouttable) + if self.VERSION == 1: + self.idxfp.write(rawentrieslength) + self.idxfp.write(rawindex) + self.idxfp.close() + + def createindex(self, nodelocations): + raise NotImplementedError() + + def _writeheader(self, indexparams): + # Index header + # + # # 1 means 2^16, 0 means 2^8 + # # future use (compression, delta format, etc) + config = 0 + if indexparams.fanoutprefix == LARGEFANOUTPREFIX: + config = 0b10000000 + self.idxfp.write(struct.pack('!BB', self.VERSION, config)) + +class indexparams(object): + __slots__ = ('fanoutprefix', 'fanoutstruct', 'fanoutcount', 'fanoutsize', + 'indexstart') + + def __init__(self, prefixsize, version): + self.fanoutprefix = prefixsize + + # The struct pack format for fanout table location (i.e. the format that + # converts the node prefix into an integer location in the fanout + # table). + if prefixsize == SMALLFANOUTPREFIX: + self.fanoutstruct = '!B' + elif prefixsize == LARGEFANOUTPREFIX: + self.fanoutstruct = '!H' + else: + raise ValueError("invalid fanout prefix size: %s" % prefixsize) + + # The number of fanout table entries + self.fanoutcount = 2**(prefixsize * 8) + + # The total bytes used by the fanout table + self.fanoutsize = self.fanoutcount * 4 + + self.indexstart = FANOUTSTART + self.fanoutsize + if version == 1: + # Skip the index length + self.indexstart += 8