hgext/remotefilelog/basepack.py
changeset 40495 3a333a582d7b
child 40506 10c10da14c5d
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/hgext/remotefilelog/basepack.py	Thu Sep 27 13:03:19 2018 -0400
@@ -0,0 +1,543 @@
+from __future__ import absolute_import
+
+import collections
+import errno
+import hashlib
+import mmap
+import os
+import struct
+import time
+
+from mercurial.i18n import _
+from mercurial import (
+    policy,
+    pycompat,
+    util,
+    vfs as vfsmod,
+)
+from . import shallowutil
+
+osutil = policy.importmod(r'osutil')
+
+# The pack version supported by this implementation. This will need to be
+# rev'd whenever the byte format changes. Ex: changing the fanout prefix,
+# changing any of the int sizes, changing the delta algorithm, etc.
+PACKVERSIONSIZE = 1
+INDEXVERSIONSIZE = 2
+
+FANOUTSTART = INDEXVERSIONSIZE
+
+# Constant that indicates a fanout table entry hasn't been filled in. (This does
+# not get serialized)
+EMPTYFANOUT = -1
+
+# The fanout prefix is the number of bytes that can be addressed by the fanout
+# table. Example: a fanout prefix of 1 means we use the first byte of a hash to
+# look in the fanout table (which will be 2^8 entries long).
+SMALLFANOUTPREFIX = 1
+LARGEFANOUTPREFIX = 2
+
+# The number of entries in the index at which point we switch to a large fanout.
+# It is chosen to balance the linear scan through a sparse fanout, with the
+# size of the bisect in actual index.
+# 2^16 / 8 was chosen because it trades off (1 step fanout scan + 5 step
+# bisect) with (8 step fanout scan + 1 step bisect)
+# 5 step bisect = log(2^16 / 8 / 255)  # fanout
+# 10 step fanout scan = 2^16 / (2^16 / 8)  # fanout space divided by entries
+SMALLFANOUTCUTOFF = 2**16 / 8
+
+# The amount of time to wait between checking for new packs. This prevents an
+# exception when data is moved to a new pack after the process has already
+# loaded the pack list.
+REFRESHRATE = 0.1
+
+if pycompat.isposix:
+    # With glibc 2.7+ the 'e' flag uses O_CLOEXEC when opening.
+    # The 'e' flag will be ignored on older versions of glibc.
+    PACKOPENMODE = 'rbe'
+else:
+    PACKOPENMODE = 'rb'
+
+class _cachebackedpacks(object):
+    def __init__(self, packs, cachesize):
+        self._packs = set(packs)
+        self._lrucache = util.lrucachedict(cachesize)
+        self._lastpack = None
+
+        # Avoid cold start of the cache by populating the most recent packs
+        # in the cache.
+        for i in reversed(range(min(cachesize, len(packs)))):
+            self._movetofront(packs[i])
+
+    def _movetofront(self, pack):
+        # This effectively makes pack the first entry in the cache.
+        self._lrucache[pack] = True
+
+    def _registerlastpackusage(self):
+        if self._lastpack is not None:
+            self._movetofront(self._lastpack)
+            self._lastpack = None
+
+    def add(self, pack):
+        self._registerlastpackusage()
+
+        # This method will mostly be called when packs are not in cache.
+        # Therefore, adding pack to the cache.
+        self._movetofront(pack)
+        self._packs.add(pack)
+
+    def __iter__(self):
+        self._registerlastpackusage()
+
+        # Cache iteration is based on LRU.
+        for pack in self._lrucache:
+            self._lastpack = pack
+            yield pack
+
+        cachedpacks = set(pack for pack in self._lrucache)
+        # Yield for paths not in the cache.
+        for pack in self._packs - cachedpacks:
+            self._lastpack = pack
+            yield pack
+
+        # Data not found in any pack.
+        self._lastpack = None
+
+class basepackstore(object):
+    # Default cache size limit for the pack files.
+    DEFAULTCACHESIZE = 100
+
+    def __init__(self, ui, path):
+        self.ui = ui
+        self.path = path
+
+        # lastrefesh is 0 so we'll immediately check for new packs on the first
+        # failure.
+        self.lastrefresh = 0
+
+        packs = []
+        for filepath, __, __ in self._getavailablepackfilessorted():
+            try:
+                pack = self.getpack(filepath)
+            except Exception as ex:
+                # An exception may be thrown if the pack file is corrupted
+                # somehow.  Log a warning but keep going in this case, just
+                # skipping this pack file.
+                #
+                # If this is an ENOENT error then don't even bother logging.
+                # Someone could have removed the file since we retrieved the
+                # list of paths.
+                if getattr(ex, 'errno', None) != errno.ENOENT:
+                    ui.warn(_('unable to load pack %s: %s\n') % (filepath, ex))
+                continue
+            packs.append(pack)
+
+        self.packs = _cachebackedpacks(packs, self.DEFAULTCACHESIZE)
+
+    def _getavailablepackfiles(self):
+        """For each pack file (a index/data file combo), yields:
+          (full path without extension, mtime, size)
+
+        mtime will be the mtime of the index/data file (whichever is newer)
+        size is the combined size of index/data file
+        """
+        indexsuffixlen = len(self.INDEXSUFFIX)
+        packsuffixlen = len(self.PACKSUFFIX)
+
+        ids = set()
+        sizes = collections.defaultdict(lambda: 0)
+        mtimes = collections.defaultdict(lambda: [])
+        try:
+            for filename, type, stat in osutil.listdir(self.path, stat=True):
+                id = None
+                if filename[-indexsuffixlen:] == self.INDEXSUFFIX:
+                    id = filename[:-indexsuffixlen]
+                elif filename[-packsuffixlen:] == self.PACKSUFFIX:
+                    id = filename[:-packsuffixlen]
+
+                # Since we expect to have two files corresponding to each ID
+                # (the index file and the pack file), we can yield once we see
+                # it twice.
+                if id:
+                    sizes[id] += stat.st_size # Sum both files' sizes together
+                    mtimes[id].append(stat.st_mtime)
+                    if id in ids:
+                        yield (os.path.join(self.path, id), max(mtimes[id]),
+                            sizes[id])
+                    else:
+                        ids.add(id)
+        except OSError as ex:
+            if ex.errno != errno.ENOENT:
+                raise
+
+    def _getavailablepackfilessorted(self):
+        """Like `_getavailablepackfiles`, but also sorts the files by mtime,
+        yielding newest files first.
+
+        This is desirable, since it is more likely newer packfiles have more
+        desirable data.
+        """
+        files = []
+        for path, mtime, size in self._getavailablepackfiles():
+            files.append((mtime, size, path))
+        files = sorted(files, reverse=True)
+        for mtime, size, path in files:
+            yield path, mtime, size
+
+    def gettotalsizeandcount(self):
+        """Returns the total disk size (in bytes) of all the pack files in
+        this store, and the count of pack files.
+
+        (This might be smaller than the total size of the ``self.path``
+        directory, since this only considers fuly-writen pack files, and not
+        temporary files or other detritus on the directory.)
+        """
+        totalsize = 0
+        count = 0
+        for __, __, size in self._getavailablepackfiles():
+            totalsize += size
+            count += 1
+        return totalsize, count
+
+    def getmetrics(self):
+        """Returns metrics on the state of this store."""
+        size, count = self.gettotalsizeandcount()
+        return {
+            'numpacks': count,
+            'totalpacksize': size,
+        }
+
+    def getpack(self, path):
+        raise NotImplementedError()
+
+    def getmissing(self, keys):
+        missing = keys
+        for pack in self.packs:
+            missing = pack.getmissing(missing)
+
+            # Ensures better performance of the cache by keeping the most
+            # recently accessed pack at the beginning in subsequent iterations.
+            if not missing:
+                return missing
+
+        if missing:
+            for pack in self.refresh():
+                missing = pack.getmissing(missing)
+
+        return missing
+
+    def markledger(self, ledger, options=None):
+        for pack in self.packs:
+            pack.markledger(ledger)
+
+    def markforrefresh(self):
+        """Tells the store that there may be new pack files, so the next time it
+        has a lookup miss it should check for new files."""
+        self.lastrefresh = 0
+
+    def refresh(self):
+        """Checks for any new packs on disk, adds them to the main pack list,
+        and returns a list of just the new packs."""
+        now = time.time()
+
+        # If we experience a lot of misses (like in the case of getmissing() on
+        # new objects), let's only actually check disk for new stuff every once
+        # in a while. Generally this code path should only ever matter when a
+        # repack is going on in the background, and that should be pretty rare
+        # to have that happen twice in quick succession.
+        newpacks = []
+        if now > self.lastrefresh + REFRESHRATE:
+            self.lastrefresh = now
+            previous = set(p.path for p in self.packs)
+            for filepath, __, __ in self._getavailablepackfilessorted():
+                if filepath not in previous:
+                    newpack = self.getpack(filepath)
+                    newpacks.append(newpack)
+                    self.packs.add(newpack)
+
+        return newpacks
+
+class versionmixin(object):
+    # Mix-in for classes with multiple supported versions
+    VERSION = None
+    SUPPORTED_VERSIONS = [0]
+
+    def _checkversion(self, version):
+        if version in self.SUPPORTED_VERSIONS:
+            if self.VERSION is None:
+                # only affect this instance
+                self.VERSION = version
+            elif self.VERSION != version:
+                raise RuntimeError('inconsistent version: %s' % version)
+        else:
+            raise RuntimeError('unsupported version: %s' % version)
+
+class basepack(versionmixin):
+    # The maximum amount we should read via mmap before remmaping so the old
+    # pages can be released (100MB)
+    MAXPAGEDIN = 100 * 1024**2
+
+    SUPPORTED_VERSIONS = [0]
+
+    def __init__(self, path):
+        self.path = path
+        self.packpath = path + self.PACKSUFFIX
+        self.indexpath = path + self.INDEXSUFFIX
+
+        self.indexsize = os.stat(self.indexpath).st_size
+        self.datasize = os.stat(self.packpath).st_size
+
+        self._index = None
+        self._data = None
+        self.freememory() # initialize the mmap
+
+        version = struct.unpack('!B', self._data[:PACKVERSIONSIZE])[0]
+        self._checkversion(version)
+
+        version, config = struct.unpack('!BB', self._index[:INDEXVERSIONSIZE])
+        self._checkversion(version)
+
+        if 0b10000000 & config:
+            self.params = indexparams(LARGEFANOUTPREFIX, version)
+        else:
+            self.params = indexparams(SMALLFANOUTPREFIX, version)
+
+    @util.propertycache
+    def _fanouttable(self):
+        params = self.params
+        rawfanout = self._index[FANOUTSTART:FANOUTSTART + params.fanoutsize]
+        fanouttable = []
+        for i in pycompat.xrange(0, params.fanoutcount):
+            loc = i * 4
+            fanoutentry = struct.unpack('!I', rawfanout[loc:loc + 4])[0]
+            fanouttable.append(fanoutentry)
+        return fanouttable
+
+    @util.propertycache
+    def _indexend(self):
+        if self.VERSION == 0:
+            return self.indexsize
+        else:
+            nodecount = struct.unpack_from('!Q', self._index,
+                                           self.params.indexstart - 8)[0]
+            return self.params.indexstart + nodecount * self.INDEXENTRYLENGTH
+
+    def freememory(self):
+        """Unmap and remap the memory to free it up after known expensive
+        operations. Return True if self._data and self._index were reloaded.
+        """
+        if self._index:
+            if self._pagedin < self.MAXPAGEDIN:
+                return False
+
+            self._index.close()
+            self._data.close()
+
+        # TODO: use an opener/vfs to access these paths
+        with open(self.indexpath, PACKOPENMODE) as indexfp:
+            # memory-map the file, size 0 means whole file
+            self._index = mmap.mmap(indexfp.fileno(), 0,
+                                    access=mmap.ACCESS_READ)
+        with open(self.packpath, PACKOPENMODE) as datafp:
+            self._data = mmap.mmap(datafp.fileno(), 0, access=mmap.ACCESS_READ)
+
+        self._pagedin = 0
+        return True
+
+    def getmissing(self, keys):
+        raise NotImplementedError()
+
+    def markledger(self, ledger, options=None):
+        raise NotImplementedError()
+
+    def cleanup(self, ledger):
+        raise NotImplementedError()
+
+    def __iter__(self):
+        raise NotImplementedError()
+
+    def iterentries(self):
+        raise NotImplementedError()
+
+class mutablebasepack(versionmixin):
+
+    def __init__(self, ui, packdir, version=0):
+        self._checkversion(version)
+
+        opener = vfsmod.vfs(packdir)
+        opener.createmode = 0o444
+        self.opener = opener
+
+        self.entries = {}
+
+        shallowutil.mkstickygroupdir(ui, packdir)
+        self.packfp, self.packpath = opener.mkstemp(
+            suffix=self.PACKSUFFIX + '-tmp')
+        self.idxfp, self.idxpath = opener.mkstemp(
+            suffix=self.INDEXSUFFIX + '-tmp')
+        self.packfp = os.fdopen(self.packfp, 'w+')
+        self.idxfp = os.fdopen(self.idxfp, 'w+')
+        self.sha = hashlib.sha1()
+        self._closed = False
+
+        # The opener provides no way of doing permission fixup on files created
+        # via mkstemp, so we must fix it ourselves. We can probably fix this
+        # upstream in vfs.mkstemp so we don't need to use the private method.
+        opener._fixfilemode(opener.join(self.packpath))
+        opener._fixfilemode(opener.join(self.idxpath))
+
+        # Write header
+        # TODO: make it extensible (ex: allow specifying compression algorithm,
+        # a flexible key/value header, delta algorithm, fanout size, etc)
+        versionbuf = struct.pack('!B', self.VERSION) # unsigned 1 byte int
+        self.writeraw(versionbuf)
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        if exc_type is None:
+            self.close()
+        else:
+            self.abort()
+
+    def abort(self):
+        # Unclean exit
+        self._cleantemppacks()
+
+    def writeraw(self, data):
+        self.packfp.write(data)
+        self.sha.update(data)
+
+    def close(self, ledger=None):
+        if self._closed:
+            return
+
+        try:
+            sha = self.sha.hexdigest()
+            self.packfp.close()
+            self.writeindex()
+
+            if len(self.entries) == 0:
+                # Empty pack
+                self._cleantemppacks()
+                self._closed = True
+                return None
+
+            self.opener.rename(self.packpath, sha + self.PACKSUFFIX)
+            try:
+                self.opener.rename(self.idxpath, sha + self.INDEXSUFFIX)
+            except Exception as ex:
+                try:
+                    self.opener.unlink(sha + self.PACKSUFFIX)
+                except Exception:
+                    pass
+                # Throw exception 'ex' explicitly since a normal 'raise' would
+                # potentially throw an exception from the unlink cleanup.
+                raise ex
+        except Exception:
+            # Clean up temp packs in all exception cases
+            self._cleantemppacks()
+            raise
+
+        self._closed = True
+        result = self.opener.join(sha)
+        if ledger:
+            ledger.addcreated(result)
+        return result
+
+    def _cleantemppacks(self):
+        try:
+            self.opener.unlink(self.packpath)
+        except Exception:
+            pass
+        try:
+            self.opener.unlink(self.idxpath)
+        except Exception:
+            pass
+
+    def writeindex(self):
+        rawindex = ''
+
+        largefanout = len(self.entries) > SMALLFANOUTCUTOFF
+        if largefanout:
+            params = indexparams(LARGEFANOUTPREFIX, self.VERSION)
+        else:
+            params = indexparams(SMALLFANOUTPREFIX, self.VERSION)
+
+        fanouttable = [EMPTYFANOUT] * params.fanoutcount
+
+        # Precompute the location of each entry
+        locations = {}
+        count = 0
+        for node in sorted(self.entries.iterkeys()):
+            location = count * self.INDEXENTRYLENGTH
+            locations[node] = location
+            count += 1
+
+            # Must use [0] on the unpack result since it's always a tuple.
+            fanoutkey = struct.unpack(params.fanoutstruct,
+                                      node[:params.fanoutprefix])[0]
+            if fanouttable[fanoutkey] == EMPTYFANOUT:
+                fanouttable[fanoutkey] = location
+
+        rawfanouttable = ''
+        last = 0
+        for offset in fanouttable:
+            offset = offset if offset != EMPTYFANOUT else last
+            last = offset
+            rawfanouttable += struct.pack('!I', offset)
+
+        rawentrieslength = struct.pack('!Q', len(self.entries))
+
+        # The index offset is the it's location in the file. So after the 2 byte
+        # header and the fanouttable.
+        rawindex = self.createindex(locations, 2 + len(rawfanouttable))
+
+        self._writeheader(params)
+        self.idxfp.write(rawfanouttable)
+        if self.VERSION == 1:
+            self.idxfp.write(rawentrieslength)
+        self.idxfp.write(rawindex)
+        self.idxfp.close()
+
+    def createindex(self, nodelocations):
+        raise NotImplementedError()
+
+    def _writeheader(self, indexparams):
+        # Index header
+        #    <version: 1 byte>
+        #    <large fanout: 1 bit> # 1 means 2^16, 0 means 2^8
+        #    <unused: 7 bit> # future use (compression, delta format, etc)
+        config = 0
+        if indexparams.fanoutprefix == LARGEFANOUTPREFIX:
+            config = 0b10000000
+        self.idxfp.write(struct.pack('!BB', self.VERSION, config))
+
+class indexparams(object):
+    __slots__ = ('fanoutprefix', 'fanoutstruct', 'fanoutcount', 'fanoutsize',
+                 'indexstart')
+
+    def __init__(self, prefixsize, version):
+        self.fanoutprefix = prefixsize
+
+        # The struct pack format for fanout table location (i.e. the format that
+        # converts the node prefix into an integer location in the fanout
+        # table).
+        if prefixsize == SMALLFANOUTPREFIX:
+            self.fanoutstruct = '!B'
+        elif prefixsize == LARGEFANOUTPREFIX:
+            self.fanoutstruct = '!H'
+        else:
+            raise ValueError("invalid fanout prefix size: %s" % prefixsize)
+
+        # The number of fanout table entries
+        self.fanoutcount = 2**(prefixsize * 8)
+
+        # The total bytes used by the fanout table
+        self.fanoutsize = self.fanoutcount * 4
+
+        self.indexstart = FANOUTSTART + self.fanoutsize
+        if version == 1:
+            # Skip the index length
+            self.indexstart += 8