Mercurial > public > mercurial-scm > hg-stable
diff mercurial/revlog.py @ 51110:d83d788590a8
changelog-delay: move the delay/divert logic inside the (inner) revlog
Instead of hacking throught the vfs/opener, we implement the delay/divert logic
inside the `_InnerRevlog` and `randomaccessfile` object. This will allow to an
alternative implementation of the `_InnerRevlog` that does not need to use Python details.
As a result, the new implementation can use the transaction less agressively
and avoid some extra output since no data had been written yet. That seems like
a good side effect.
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Tue, 24 Oct 2023 11:08:49 +0200 |
parents | af96fbb8f739 |
children | c2d2e5b65def |
line wrap: on
line diff
--- a/mercurial/revlog.py Thu Oct 26 05:37:37 2023 +0200 +++ b/mercurial/revlog.py Tue Oct 24 11:08:49 2023 +0200 @@ -369,6 +369,9 @@ self.delta_config = delta_config self.feature_config = feature_config + # used during diverted write. + self._orig_index_file = None + self._default_compression_header = default_compression_header # index @@ -393,6 +396,8 @@ # 3-tuple of (node, rev, text) for a raw revision. self._revisioncache = None + self._delay_buffer = None + @property def index_file(self): return self.__index_file @@ -407,14 +412,27 @@ return len(self.index) def clear_cache(self): + assert not self.is_delaying self._revisioncache = None self._segmentfile.clear_cache() self._segmentfile_sidedata.clear_cache() @property def canonical_index_file(self): + if self._orig_index_file is not None: + return self._orig_index_file return self.index_file + @property + def is_delaying(self): + """is the revlog is currently delaying the visibility of written data? + + The delaying mechanism can be either in-memory or written on disk in a + side-file.""" + return (self._delay_buffer is not None) or ( + self._orig_index_file is not None + ) + # Derived from index values. def start(self, rev): @@ -700,22 +718,36 @@ You should not use this directly and use `_writing` instead """ try: - f = self.opener( - self.index_file, - mode=b"r+", - checkambig=self.data_config.check_ambig, - ) + if self._delay_buffer is None: + f = self.opener( + self.index_file, + mode=b"r+", + checkambig=self.data_config.check_ambig, + ) + else: + # check_ambig affect we way we open file for writing, however + # here, we do not actually open a file for writting as write + # will appened to a delay_buffer. So check_ambig is not + # meaningful and unneeded here. + f = randomaccessfile.appender( + self.opener, self.index_file, b"r+", self._delay_buffer + ) if index_end is None: f.seek(0, os.SEEK_END) else: f.seek(index_end, os.SEEK_SET) return f except FileNotFoundError: - return self.opener( - self.index_file, - mode=b"w+", - checkambig=self.data_config.check_ambig, - ) + if self._delay_buffer is None: + return self.opener( + self.index_file, + mode=b"w+", + checkambig=self.data_config.check_ambig, + ) + else: + return randomaccessfile.appender( + self.opener, self.index_file, b"w+", self._delay_buffer + ) def __index_new_fp(self): """internal method to create a new index file for writing @@ -1044,20 +1076,101 @@ dfh.write(data[1]) if sidedata: sdfh.write(sidedata) - ifh.write(entry) + if self._delay_buffer is None: + ifh.write(entry) + else: + self._delay_buffer.append(entry) else: offset += curr * self.index.entry_size transaction.add(self.canonical_index_file, offset) - ifh.write(entry) - ifh.write(data[0]) - ifh.write(data[1]) assert not sidedata + if self._delay_buffer is None: + ifh.write(entry) + ifh.write(data[0]) + ifh.write(data[1]) + else: + self._delay_buffer.append(entry) + self._delay_buffer.append(data[0]) + self._delay_buffer.append(data[1]) return ( ifh.tell(), dfh.tell() if dfh else None, sdfh.tell() if sdfh else None, ) + def _divert_index(self): + return self.index_file + b'.a' + + def delay(self): + assert not self.is_open + if self._delay_buffer is not None or self._orig_index_file is not None: + # delay or divert already in place + return None + elif len(self.index) == 0: + self._orig_index_file = self.index_file + self.index_file = self._divert_index() + self._segmentfile.filename = self.index_file + assert self._orig_index_file is not None + assert self.index_file is not None + if self.opener.exists(self.index_file): + self.opener.unlink(self.index_file) + return self.index_file + else: + self._segmentfile._delay_buffer = self._delay_buffer = [] + return None + + def write_pending(self): + assert not self.is_open + if self._orig_index_file is not None: + return None, True + any_pending = False + pending_index_file = self._divert_index() + if self.opener.exists(pending_index_file): + self.opener.unlink(pending_index_file) + util.copyfile( + self.opener.join(self.index_file), + self.opener.join(pending_index_file), + ) + if self._delay_buffer: + with self.opener(pending_index_file, b'r+') as ifh: + ifh.seek(0, os.SEEK_END) + ifh.write(b"".join(self._delay_buffer)) + any_pending = True + self._segmentfile._delay_buffer = self._delay_buffer = None + self._orig_index_file = self.index_file + self.index_file = pending_index_file + self._segmentfile.filename = self.index_file + return self.index_file, any_pending + + def finalize_pending(self): + assert not self.is_open + + delay = self._delay_buffer is not None + divert = self._orig_index_file is not None + + if delay and divert: + assert False, "unreachable" + elif delay: + if self._delay_buffer: + with self.opener(self.index_file, b'r+') as ifh: + ifh.seek(0, os.SEEK_END) + ifh.write(b"".join(self._delay_buffer)) + self._segmentfile._delay_buffer = self._delay_buffer = None + elif divert: + if self.opener.exists(self.index_file): + self.opener.rename( + self.index_file, + self._orig_index_file, + checkambig=True, + ) + self.index_file = self._orig_index_file + self._orig_index_file = None + self._segmentfile.filename = self.index_file + else: + msg = b"not delay or divert found on this revlog" + raise error.ProgrammingError(msg) + return self.canonical_index_file + class revlog: """ @@ -2925,6 +3038,10 @@ if self._docket is not None: self._write_docket(transaction) + @property + def is_delaying(self): + return self._inner.is_delaying + def _write_docket(self, transaction): """write the current docket on disk