Mercurial > public > mercurial-scm > hg-stable
comparison mercurial/revlog.py @ 47225:906a7bcaac86
revlog: introduce a mandatory `_writing` context to update revlog content
Before this change, various revlog methods where managing the opening and
closing of the revlog files manually and passing the file descriptor alors the
call path. To simplify the tracking of the write operation by a future docket,
we need something more organised. As a result, we introduce a `revlog._writing`
context manager that will wrap each revlog update operation. The file
descriptor are kept in the existing `revlog._writinghandles` parameter that
was already used by the `addgroup` logic.
All this change is internal to the revlog only, the "public" interface is not
affected. The `addrevision` and `addgroup` logic are still responsible for
setup up this context. However this new context give us multiple benefits:
* all writer use a same, unified, logic,
* this context is programmatically enforced,
* each write "session" as a clearly identified start and end.
The post-pull sidedata update logic is still doing writing by end and will be
adjusted in a later changesets.
This change affect the concurrency checker test, because register the state of
the file in the transaction sooner in `addrevision` (about as early as what
`addgroup` would do), so the abort is rollbacking the other commit. I don't want
to weaken the current main logic.
Differential Revision: https://phab.mercurial-scm.org/D10605
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Mon, 03 May 2021 12:27:42 +0200 |
parents | 100f061d88f6 |
children | 042388bba644 |
comparison
equal
deleted
inserted
replaced
47224:100f061d88f6 | 47225:906a7bcaac86 |
---|---|
358 # custom flags. | 358 # custom flags. |
359 self._flagprocessors = dict(flagutil.flagprocessors) | 359 self._flagprocessors = dict(flagutil.flagprocessors) |
360 | 360 |
361 # 2-tuple of file handles being used for active writing. | 361 # 2-tuple of file handles being used for active writing. |
362 self._writinghandles = None | 362 self._writinghandles = None |
363 # prevent nesting of addgroup | |
364 self._adding_group = None | |
363 | 365 |
364 self._loadindex() | 366 self._loadindex() |
365 | 367 |
366 self._concurrencychecker = concurrencychecker | 368 self._concurrencychecker = concurrencychecker |
367 | 369 |
1953 except error.RevlogError: | 1955 except error.RevlogError: |
1954 if self._censorable and storageutil.iscensoredtext(text): | 1956 if self._censorable and storageutil.iscensoredtext(text): |
1955 raise error.CensoredNodeError(self.display_id, node, text) | 1957 raise error.CensoredNodeError(self.display_id, node, text) |
1956 raise | 1958 raise |
1957 | 1959 |
1958 def _enforceinlinesize(self, tr, fp=None): | 1960 def _enforceinlinesize(self, tr): |
1959 """Check if the revlog is too big for inline and convert if so. | 1961 """Check if the revlog is too big for inline and convert if so. |
1960 | 1962 |
1961 This should be called after revisions are added to the revlog. If the | 1963 This should be called after revisions are added to the revlog. If the |
1962 revlog has grown too large to be an inline revlog, it will convert it | 1964 revlog has grown too large to be an inline revlog, it will convert it |
1963 to use multiple index and data files. | 1965 to use multiple index and data files. |
1973 _(b"%s not found in the transaction") % self._indexfile | 1975 _(b"%s not found in the transaction") % self._indexfile |
1974 ) | 1976 ) |
1975 trindex = 0 | 1977 trindex = 0 |
1976 tr.add(self._datafile, 0) | 1978 tr.add(self._datafile, 0) |
1977 | 1979 |
1978 if fp: | 1980 existing_handles = False |
1981 if self._writinghandles is not None: | |
1982 existing_handles = True | |
1983 fp = self._writinghandles[0] | |
1979 fp.flush() | 1984 fp.flush() |
1980 fp.close() | 1985 fp.close() |
1981 # We can't use the cached file handle after close(). So prevent | 1986 # We can't use the cached file handle after close(). So prevent |
1982 # its usage. | 1987 # its usage. |
1983 self._writinghandles = None | 1988 self._writinghandles = None |
1984 | 1989 |
1985 if True: | 1990 new_dfh = self._datafp(b'w+') |
1986 with self._indexfp(b'r') as ifh, self._datafp(b'w') as dfh: | 1991 new_dfh.truncate(0) # drop any potentially existing data |
1992 try: | |
1993 with self._indexfp(b'r') as read_ifh: | |
1987 for r in self: | 1994 for r in self: |
1988 dfh.write(self._getsegmentforrevs(r, r, df=ifh)[1]) | 1995 new_dfh.write(self._getsegmentforrevs(r, r, df=read_ifh)[1]) |
1989 if troffset <= self.start(r): | 1996 if troffset <= self.start(r): |
1990 trindex = r | 1997 trindex = r |
1991 | 1998 new_dfh.flush() |
1992 with self._indexfp(b'w') as fp: | 1999 |
2000 with self.opener(self._indexfile, mode=b'w', atomictemp=True) as fp: | |
1993 self._format_flags &= ~FLAG_INLINE_DATA | 2001 self._format_flags &= ~FLAG_INLINE_DATA |
1994 self._inline = False | 2002 self._inline = False |
1995 for i in self: | 2003 for i in self: |
1996 e = self.index.entry_binary(i) | 2004 e = self.index.entry_binary(i) |
1997 if i == 0: | 2005 if i == 0: |
1998 header = self._format_flags | self._format_version | 2006 header = self._format_flags | self._format_version |
1999 header = self.index.pack_header(header) | 2007 header = self.index.pack_header(header) |
2000 e = header + e | 2008 e = header + e |
2001 fp.write(e) | 2009 fp.write(e) |
2002 | |
2003 # the temp file replace the real index when we exit the context | 2010 # the temp file replace the real index when we exit the context |
2004 # manager | 2011 # manager |
2005 | 2012 |
2006 tr.replace(self._indexfile, trindex * self.index.entry_size) | 2013 tr.replace(self._indexfile, trindex * self.index.entry_size) |
2007 nodemaputil.setup_persistent_nodemap(tr, self) | 2014 nodemaputil.setup_persistent_nodemap(tr, self) |
2008 self._chunkclear() | 2015 self._chunkclear() |
2009 | 2016 |
2017 if existing_handles: | |
2018 # switched from inline to conventional reopen the index | |
2019 ifh = self._indexfp(b"a+") | |
2020 self._writinghandles = (ifh, new_dfh) | |
2021 new_dfh = None | |
2022 finally: | |
2023 if new_dfh is not None: | |
2024 new_dfh.close() | |
2025 | |
2010 def _nodeduplicatecallback(self, transaction, node): | 2026 def _nodeduplicatecallback(self, transaction, node): |
2011 """called when trying to add a node already stored.""" | 2027 """called when trying to add a node already stored.""" |
2028 | |
2029 @contextlib.contextmanager | |
2030 def _writing(self, transaction): | |
2031 if self._writinghandles is not None: | |
2032 yield | |
2033 else: | |
2034 r = len(self) | |
2035 dsize = 0 | |
2036 if r: | |
2037 dsize = self.end(r - 1) | |
2038 dfh = None | |
2039 if not self._inline: | |
2040 dfh = self._datafp(b"a+") | |
2041 transaction.add(self._datafile, dsize) | |
2042 try: | |
2043 isize = r * self.index.entry_size | |
2044 ifh = self._indexfp(b"a+") | |
2045 if self._inline: | |
2046 transaction.add(self._indexfile, dsize + isize) | |
2047 else: | |
2048 transaction.add(self._indexfile, isize) | |
2049 try: | |
2050 self._writinghandles = (ifh, dfh) | |
2051 try: | |
2052 yield | |
2053 finally: | |
2054 self._writinghandles = None | |
2055 finally: | |
2056 ifh.close() | |
2057 finally: | |
2058 if dfh is not None: | |
2059 dfh.close() | |
2012 | 2060 |
2013 def addrevision( | 2061 def addrevision( |
2014 self, | 2062 self, |
2015 text, | 2063 text, |
2016 transaction, | 2064 transaction, |
2103 ): | 2151 ): |
2104 """add a raw revision with known flags, node and parents | 2152 """add a raw revision with known flags, node and parents |
2105 useful when reusing a revision not stored in this revlog (ex: received | 2153 useful when reusing a revision not stored in this revlog (ex: received |
2106 over wire, or read from an external bundle). | 2154 over wire, or read from an external bundle). |
2107 """ | 2155 """ |
2108 dfh = None | 2156 with self._writing(transaction): |
2109 if not self._inline: | |
2110 dfh = self._datafp(b"a+") | |
2111 ifh = self._indexfp(b"a+") | |
2112 try: | |
2113 return self._addrevision( | 2157 return self._addrevision( |
2114 node, | 2158 node, |
2115 rawtext, | 2159 rawtext, |
2116 transaction, | 2160 transaction, |
2117 link, | 2161 link, |
2118 p1, | 2162 p1, |
2119 p2, | 2163 p2, |
2120 flags, | 2164 flags, |
2121 cachedelta, | 2165 cachedelta, |
2122 ifh, | |
2123 dfh, | |
2124 deltacomputer=deltacomputer, | 2166 deltacomputer=deltacomputer, |
2125 sidedata=sidedata, | 2167 sidedata=sidedata, |
2126 ) | 2168 ) |
2127 finally: | |
2128 if dfh: | |
2129 dfh.close() | |
2130 ifh.close() | |
2131 | 2169 |
2132 def compress(self, data): | 2170 def compress(self, data): |
2133 """Generate a possibly-compressed representation of data.""" | 2171 """Generate a possibly-compressed representation of data.""" |
2134 if not data: | 2172 if not data: |
2135 return b'', data | 2173 return b'', data |
2212 link, | 2250 link, |
2213 p1, | 2251 p1, |
2214 p2, | 2252 p2, |
2215 flags, | 2253 flags, |
2216 cachedelta, | 2254 cachedelta, |
2217 ifh, | |
2218 dfh, | |
2219 alwayscache=False, | 2255 alwayscache=False, |
2220 deltacomputer=None, | 2256 deltacomputer=None, |
2221 sidedata=None, | 2257 sidedata=None, |
2222 ): | 2258 ): |
2223 """internal function to add revisions to the log | 2259 """internal function to add revisions to the log |
2242 or node in self.nodeconstants.wdirfilenodeids | 2278 or node in self.nodeconstants.wdirfilenodeids |
2243 ): | 2279 ): |
2244 raise error.RevlogError( | 2280 raise error.RevlogError( |
2245 _(b"%s: attempt to add wdir revision") % self.display_id | 2281 _(b"%s: attempt to add wdir revision") % self.display_id |
2246 ) | 2282 ) |
2283 if self._writinghandles is None: | |
2284 msg = b'adding revision outside `revlog._writing` context' | |
2285 raise error.ProgrammingError(msg) | |
2247 | 2286 |
2248 if self._inline: | 2287 if self._inline: |
2249 fh = ifh | 2288 fh = self._writinghandles[0] |
2250 else: | 2289 else: |
2251 fh = dfh | 2290 fh = self._writinghandles[1] |
2252 | 2291 |
2253 btext = [rawtext] | 2292 btext = [rawtext] |
2254 | 2293 |
2255 curr = len(self) | 2294 curr = len(self) |
2256 prev = curr - 1 | 2295 prev = curr - 1 |
2257 | 2296 |
2258 offset = self._get_data_offset(prev) | 2297 offset = self._get_data_offset(prev) |
2259 | 2298 |
2260 if self._concurrencychecker: | 2299 if self._concurrencychecker: |
2300 ifh, dfh = self._writinghandles | |
2261 if self._inline: | 2301 if self._inline: |
2262 # offset is "as if" it were in the .d file, so we need to add on | 2302 # offset is "as if" it were in the .d file, so we need to add on |
2263 # the size of the entry metadata. | 2303 # the size of the entry metadata. |
2264 self._concurrencychecker( | 2304 self._concurrencychecker( |
2265 ifh, self._indexfile, offset + curr * self.index.entry_size | 2305 ifh, self._indexfile, offset + curr * self.index.entry_size |
2321 header = self._format_flags | self._format_version | 2361 header = self._format_flags | self._format_version |
2322 header = self.index.pack_header(header) | 2362 header = self.index.pack_header(header) |
2323 entry = header + entry | 2363 entry = header + entry |
2324 self._writeentry( | 2364 self._writeentry( |
2325 transaction, | 2365 transaction, |
2326 ifh, | |
2327 dfh, | |
2328 entry, | 2366 entry, |
2329 deltainfo.data, | 2367 deltainfo.data, |
2330 link, | 2368 link, |
2331 offset, | 2369 offset, |
2332 serialized_sidedata, | 2370 serialized_sidedata, |
2360 # Sidedata for a previous rev has potentially been written after | 2398 # Sidedata for a previous rev has potentially been written after |
2361 # this rev's end, so take the max. | 2399 # this rev's end, so take the max. |
2362 offset = max(self.end(rev), offset, sidedata_end) | 2400 offset = max(self.end(rev), offset, sidedata_end) |
2363 return offset | 2401 return offset |
2364 | 2402 |
2365 def _writeentry( | 2403 def _writeentry(self, transaction, entry, data, link, offset, sidedata): |
2366 self, transaction, ifh, dfh, entry, data, link, offset, sidedata | |
2367 ): | |
2368 # Files opened in a+ mode have inconsistent behavior on various | 2404 # Files opened in a+ mode have inconsistent behavior on various |
2369 # platforms. Windows requires that a file positioning call be made | 2405 # platforms. Windows requires that a file positioning call be made |
2370 # when the file handle transitions between reads and writes. See | 2406 # when the file handle transitions between reads and writes. See |
2371 # 3686fa2b8eee and the mixedfilemodewrapper in windows.py. On other | 2407 # 3686fa2b8eee and the mixedfilemodewrapper in windows.py. On other |
2372 # platforms, Python or the platform itself can be buggy. Some versions | 2408 # platforms, Python or the platform itself can be buggy. Some versions |
2375 # | 2411 # |
2376 # We work around this issue by inserting a seek() before writing. | 2412 # We work around this issue by inserting a seek() before writing. |
2377 # Note: This is likely not necessary on Python 3. However, because | 2413 # Note: This is likely not necessary on Python 3. However, because |
2378 # the file handle is reused for reads and may be seeked there, we need | 2414 # the file handle is reused for reads and may be seeked there, we need |
2379 # to be careful before changing this. | 2415 # to be careful before changing this. |
2416 if self._writinghandles is None: | |
2417 msg = b'adding revision outside `revlog._writing` context' | |
2418 raise error.ProgrammingError(msg) | |
2419 ifh, dfh = self._writinghandles | |
2380 ifh.seek(0, os.SEEK_END) | 2420 ifh.seek(0, os.SEEK_END) |
2381 if dfh: | 2421 if dfh: |
2382 dfh.seek(0, os.SEEK_END) | 2422 dfh.seek(0, os.SEEK_END) |
2383 | 2423 |
2384 curr = len(self) - 1 | 2424 curr = len(self) - 1 |
2397 ifh.write(entry) | 2437 ifh.write(entry) |
2398 ifh.write(data[0]) | 2438 ifh.write(data[0]) |
2399 ifh.write(data[1]) | 2439 ifh.write(data[1]) |
2400 if sidedata: | 2440 if sidedata: |
2401 ifh.write(sidedata) | 2441 ifh.write(sidedata) |
2402 self._enforceinlinesize(transaction, ifh) | 2442 self._enforceinlinesize(transaction) |
2403 nodemaputil.setup_persistent_nodemap(transaction, self) | 2443 nodemaputil.setup_persistent_nodemap(transaction, self) |
2404 | 2444 |
2405 def addgroup( | 2445 def addgroup( |
2406 self, | 2446 self, |
2407 deltas, | 2447 deltas, |
2420 | 2460 |
2421 If ``addrevisioncb`` is defined, it will be called with arguments of | 2461 If ``addrevisioncb`` is defined, it will be called with arguments of |
2422 this revlog and the node that was added. | 2462 this revlog and the node that was added. |
2423 """ | 2463 """ |
2424 | 2464 |
2425 if self._writinghandles: | 2465 if self._adding_group: |
2426 raise error.ProgrammingError(b'cannot nest addgroup() calls') | 2466 raise error.ProgrammingError(b'cannot nest addgroup() calls') |
2427 | 2467 |
2428 r = len(self) | 2468 self._adding_group = True |
2429 end = 0 | |
2430 if r: | |
2431 end = self.end(r - 1) | |
2432 ifh = self._indexfp(b"a+") | |
2433 isize = r * self.index.entry_size | |
2434 if self._inline: | |
2435 transaction.add(self._indexfile, end + isize) | |
2436 dfh = None | |
2437 else: | |
2438 transaction.add(self._indexfile, isize) | |
2439 transaction.add(self._datafile, end) | |
2440 dfh = self._datafp(b"a+") | |
2441 | |
2442 self._writinghandles = (ifh, dfh) | |
2443 empty = True | 2469 empty = True |
2444 | |
2445 try: | 2470 try: |
2446 if True: | 2471 with self._writing(transaction): |
2447 deltacomputer = deltautil.deltacomputer(self) | 2472 deltacomputer = deltautil.deltacomputer(self) |
2448 # loop through our set of deltas | 2473 # loop through our set of deltas |
2449 for data in deltas: | 2474 for data in deltas: |
2450 ( | 2475 ( |
2451 node, | 2476 node, |
2512 link, | 2537 link, |
2513 p1, | 2538 p1, |
2514 p2, | 2539 p2, |
2515 flags, | 2540 flags, |
2516 (baserev, delta), | 2541 (baserev, delta), |
2517 ifh, | |
2518 dfh, | |
2519 alwayscache=alwayscache, | 2542 alwayscache=alwayscache, |
2520 deltacomputer=deltacomputer, | 2543 deltacomputer=deltacomputer, |
2521 sidedata=sidedata, | 2544 sidedata=sidedata, |
2522 ) | 2545 ) |
2523 | 2546 |
2524 if addrevisioncb: | 2547 if addrevisioncb: |
2525 addrevisioncb(self, rev) | 2548 addrevisioncb(self, rev) |
2526 empty = False | 2549 empty = False |
2527 | |
2528 if not dfh and not self._inline: | |
2529 # addrevision switched from inline to conventional | |
2530 # reopen the index | |
2531 ifh.close() | |
2532 dfh = self._datafp(b"a+") | |
2533 ifh = self._indexfp(b"a+") | |
2534 self._writinghandles = (ifh, dfh) | |
2535 finally: | 2550 finally: |
2536 self._writinghandles = None | 2551 self._adding_group = False |
2537 | |
2538 if dfh: | |
2539 dfh.close() | |
2540 ifh.close() | |
2541 return not empty | 2552 return not empty |
2542 | 2553 |
2543 def iscensored(self, rev): | 2554 def iscensored(self, rev): |
2544 """Check if a file revision is censored.""" | 2555 """Check if a file revision is censored.""" |
2545 if not self._censorable: | 2556 if not self._censorable: |
2866 (sidedata, new_flags) = sidedatautil.run_sidedata_helpers( | 2877 (sidedata, new_flags) = sidedatautil.run_sidedata_helpers( |
2867 self, sidedata_helpers, sidedata, rev | 2878 self, sidedata_helpers, sidedata, rev |
2868 ) | 2879 ) |
2869 flags = flags | new_flags[0] & ~new_flags[1] | 2880 flags = flags | new_flags[0] & ~new_flags[1] |
2870 | 2881 |
2871 ifh = destrevlog.opener( | 2882 with destrevlog._writing(tr): |
2872 destrevlog._indexfile, b'a+', checkambig=False | |
2873 ) | |
2874 dfh = None | |
2875 if not destrevlog._inline: | |
2876 dfh = destrevlog.opener(destrevlog._datafile, b'a+') | |
2877 try: | |
2878 destrevlog._addrevision( | 2883 destrevlog._addrevision( |
2879 node, | 2884 node, |
2880 rawtext, | 2885 rawtext, |
2881 tr, | 2886 tr, |
2882 linkrev, | 2887 linkrev, |
2883 p1, | 2888 p1, |
2884 p2, | 2889 p2, |
2885 flags, | 2890 flags, |
2886 cachedelta, | 2891 cachedelta, |
2887 ifh, | |
2888 dfh, | |
2889 deltacomputer=deltacomputer, | 2892 deltacomputer=deltacomputer, |
2890 sidedata=sidedata, | 2893 sidedata=sidedata, |
2891 ) | 2894 ) |
2892 finally: | |
2893 if dfh: | |
2894 dfh.close() | |
2895 ifh.close() | |
2896 | 2895 |
2897 if addrevisioncb: | 2896 if addrevisioncb: |
2898 addrevisioncb(self, rev, node) | 2897 addrevisioncb(self, rev, node) |
2899 | 2898 |
2900 def censorrevision(self, tr, censornode, tombstone=b''): | 2899 def censorrevision(self, tr, censornode, tombstone=b''): |