comparison mercurial/revlog.py @ 47225:906a7bcaac86

revlog: introduce a mandatory `_writing` context to update revlog content Before this change, various revlog methods where managing the opening and closing of the revlog files manually and passing the file descriptor alors the call path. To simplify the tracking of the write operation by a future docket, we need something more organised. As a result, we introduce a `revlog._writing` context manager that will wrap each revlog update operation. The file descriptor are kept in the existing `revlog._writinghandles` parameter that was already used by the `addgroup` logic. All this change is internal to the revlog only, the "public" interface is not affected. The `addrevision` and `addgroup` logic are still responsible for setup up this context. However this new context give us multiple benefits: * all writer use a same, unified, logic, * this context is programmatically enforced, * each write "session" as a clearly identified start and end. The post-pull sidedata update logic is still doing writing by end and will be adjusted in a later changesets. This change affect the concurrency checker test, because register the state of the file in the transaction sooner in `addrevision` (about as early as what `addgroup` would do), so the abort is rollbacking the other commit. I don't want to weaken the current main logic. Differential Revision: https://phab.mercurial-scm.org/D10605
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Mon, 03 May 2021 12:27:42 +0200
parents 100f061d88f6
children 042388bba644
comparison
equal deleted inserted replaced
47224:100f061d88f6 47225:906a7bcaac86
358 # custom flags. 358 # custom flags.
359 self._flagprocessors = dict(flagutil.flagprocessors) 359 self._flagprocessors = dict(flagutil.flagprocessors)
360 360
361 # 2-tuple of file handles being used for active writing. 361 # 2-tuple of file handles being used for active writing.
362 self._writinghandles = None 362 self._writinghandles = None
363 # prevent nesting of addgroup
364 self._adding_group = None
363 365
364 self._loadindex() 366 self._loadindex()
365 367
366 self._concurrencychecker = concurrencychecker 368 self._concurrencychecker = concurrencychecker
367 369
1953 except error.RevlogError: 1955 except error.RevlogError:
1954 if self._censorable and storageutil.iscensoredtext(text): 1956 if self._censorable and storageutil.iscensoredtext(text):
1955 raise error.CensoredNodeError(self.display_id, node, text) 1957 raise error.CensoredNodeError(self.display_id, node, text)
1956 raise 1958 raise
1957 1959
1958 def _enforceinlinesize(self, tr, fp=None): 1960 def _enforceinlinesize(self, tr):
1959 """Check if the revlog is too big for inline and convert if so. 1961 """Check if the revlog is too big for inline and convert if so.
1960 1962
1961 This should be called after revisions are added to the revlog. If the 1963 This should be called after revisions are added to the revlog. If the
1962 revlog has grown too large to be an inline revlog, it will convert it 1964 revlog has grown too large to be an inline revlog, it will convert it
1963 to use multiple index and data files. 1965 to use multiple index and data files.
1973 _(b"%s not found in the transaction") % self._indexfile 1975 _(b"%s not found in the transaction") % self._indexfile
1974 ) 1976 )
1975 trindex = 0 1977 trindex = 0
1976 tr.add(self._datafile, 0) 1978 tr.add(self._datafile, 0)
1977 1979
1978 if fp: 1980 existing_handles = False
1981 if self._writinghandles is not None:
1982 existing_handles = True
1983 fp = self._writinghandles[0]
1979 fp.flush() 1984 fp.flush()
1980 fp.close() 1985 fp.close()
1981 # We can't use the cached file handle after close(). So prevent 1986 # We can't use the cached file handle after close(). So prevent
1982 # its usage. 1987 # its usage.
1983 self._writinghandles = None 1988 self._writinghandles = None
1984 1989
1985 if True: 1990 new_dfh = self._datafp(b'w+')
1986 with self._indexfp(b'r') as ifh, self._datafp(b'w') as dfh: 1991 new_dfh.truncate(0) # drop any potentially existing data
1992 try:
1993 with self._indexfp(b'r') as read_ifh:
1987 for r in self: 1994 for r in self:
1988 dfh.write(self._getsegmentforrevs(r, r, df=ifh)[1]) 1995 new_dfh.write(self._getsegmentforrevs(r, r, df=read_ifh)[1])
1989 if troffset <= self.start(r): 1996 if troffset <= self.start(r):
1990 trindex = r 1997 trindex = r
1991 1998 new_dfh.flush()
1992 with self._indexfp(b'w') as fp: 1999
2000 with self.opener(self._indexfile, mode=b'w', atomictemp=True) as fp:
1993 self._format_flags &= ~FLAG_INLINE_DATA 2001 self._format_flags &= ~FLAG_INLINE_DATA
1994 self._inline = False 2002 self._inline = False
1995 for i in self: 2003 for i in self:
1996 e = self.index.entry_binary(i) 2004 e = self.index.entry_binary(i)
1997 if i == 0: 2005 if i == 0:
1998 header = self._format_flags | self._format_version 2006 header = self._format_flags | self._format_version
1999 header = self.index.pack_header(header) 2007 header = self.index.pack_header(header)
2000 e = header + e 2008 e = header + e
2001 fp.write(e) 2009 fp.write(e)
2002
2003 # the temp file replace the real index when we exit the context 2010 # the temp file replace the real index when we exit the context
2004 # manager 2011 # manager
2005 2012
2006 tr.replace(self._indexfile, trindex * self.index.entry_size) 2013 tr.replace(self._indexfile, trindex * self.index.entry_size)
2007 nodemaputil.setup_persistent_nodemap(tr, self) 2014 nodemaputil.setup_persistent_nodemap(tr, self)
2008 self._chunkclear() 2015 self._chunkclear()
2009 2016
2017 if existing_handles:
2018 # switched from inline to conventional reopen the index
2019 ifh = self._indexfp(b"a+")
2020 self._writinghandles = (ifh, new_dfh)
2021 new_dfh = None
2022 finally:
2023 if new_dfh is not None:
2024 new_dfh.close()
2025
2010 def _nodeduplicatecallback(self, transaction, node): 2026 def _nodeduplicatecallback(self, transaction, node):
2011 """called when trying to add a node already stored.""" 2027 """called when trying to add a node already stored."""
2028
2029 @contextlib.contextmanager
2030 def _writing(self, transaction):
2031 if self._writinghandles is not None:
2032 yield
2033 else:
2034 r = len(self)
2035 dsize = 0
2036 if r:
2037 dsize = self.end(r - 1)
2038 dfh = None
2039 if not self._inline:
2040 dfh = self._datafp(b"a+")
2041 transaction.add(self._datafile, dsize)
2042 try:
2043 isize = r * self.index.entry_size
2044 ifh = self._indexfp(b"a+")
2045 if self._inline:
2046 transaction.add(self._indexfile, dsize + isize)
2047 else:
2048 transaction.add(self._indexfile, isize)
2049 try:
2050 self._writinghandles = (ifh, dfh)
2051 try:
2052 yield
2053 finally:
2054 self._writinghandles = None
2055 finally:
2056 ifh.close()
2057 finally:
2058 if dfh is not None:
2059 dfh.close()
2012 2060
2013 def addrevision( 2061 def addrevision(
2014 self, 2062 self,
2015 text, 2063 text,
2016 transaction, 2064 transaction,
2103 ): 2151 ):
2104 """add a raw revision with known flags, node and parents 2152 """add a raw revision with known flags, node and parents
2105 useful when reusing a revision not stored in this revlog (ex: received 2153 useful when reusing a revision not stored in this revlog (ex: received
2106 over wire, or read from an external bundle). 2154 over wire, or read from an external bundle).
2107 """ 2155 """
2108 dfh = None 2156 with self._writing(transaction):
2109 if not self._inline:
2110 dfh = self._datafp(b"a+")
2111 ifh = self._indexfp(b"a+")
2112 try:
2113 return self._addrevision( 2157 return self._addrevision(
2114 node, 2158 node,
2115 rawtext, 2159 rawtext,
2116 transaction, 2160 transaction,
2117 link, 2161 link,
2118 p1, 2162 p1,
2119 p2, 2163 p2,
2120 flags, 2164 flags,
2121 cachedelta, 2165 cachedelta,
2122 ifh,
2123 dfh,
2124 deltacomputer=deltacomputer, 2166 deltacomputer=deltacomputer,
2125 sidedata=sidedata, 2167 sidedata=sidedata,
2126 ) 2168 )
2127 finally:
2128 if dfh:
2129 dfh.close()
2130 ifh.close()
2131 2169
2132 def compress(self, data): 2170 def compress(self, data):
2133 """Generate a possibly-compressed representation of data.""" 2171 """Generate a possibly-compressed representation of data."""
2134 if not data: 2172 if not data:
2135 return b'', data 2173 return b'', data
2212 link, 2250 link,
2213 p1, 2251 p1,
2214 p2, 2252 p2,
2215 flags, 2253 flags,
2216 cachedelta, 2254 cachedelta,
2217 ifh,
2218 dfh,
2219 alwayscache=False, 2255 alwayscache=False,
2220 deltacomputer=None, 2256 deltacomputer=None,
2221 sidedata=None, 2257 sidedata=None,
2222 ): 2258 ):
2223 """internal function to add revisions to the log 2259 """internal function to add revisions to the log
2242 or node in self.nodeconstants.wdirfilenodeids 2278 or node in self.nodeconstants.wdirfilenodeids
2243 ): 2279 ):
2244 raise error.RevlogError( 2280 raise error.RevlogError(
2245 _(b"%s: attempt to add wdir revision") % self.display_id 2281 _(b"%s: attempt to add wdir revision") % self.display_id
2246 ) 2282 )
2283 if self._writinghandles is None:
2284 msg = b'adding revision outside `revlog._writing` context'
2285 raise error.ProgrammingError(msg)
2247 2286
2248 if self._inline: 2287 if self._inline:
2249 fh = ifh 2288 fh = self._writinghandles[0]
2250 else: 2289 else:
2251 fh = dfh 2290 fh = self._writinghandles[1]
2252 2291
2253 btext = [rawtext] 2292 btext = [rawtext]
2254 2293
2255 curr = len(self) 2294 curr = len(self)
2256 prev = curr - 1 2295 prev = curr - 1
2257 2296
2258 offset = self._get_data_offset(prev) 2297 offset = self._get_data_offset(prev)
2259 2298
2260 if self._concurrencychecker: 2299 if self._concurrencychecker:
2300 ifh, dfh = self._writinghandles
2261 if self._inline: 2301 if self._inline:
2262 # offset is "as if" it were in the .d file, so we need to add on 2302 # offset is "as if" it were in the .d file, so we need to add on
2263 # the size of the entry metadata. 2303 # the size of the entry metadata.
2264 self._concurrencychecker( 2304 self._concurrencychecker(
2265 ifh, self._indexfile, offset + curr * self.index.entry_size 2305 ifh, self._indexfile, offset + curr * self.index.entry_size
2321 header = self._format_flags | self._format_version 2361 header = self._format_flags | self._format_version
2322 header = self.index.pack_header(header) 2362 header = self.index.pack_header(header)
2323 entry = header + entry 2363 entry = header + entry
2324 self._writeentry( 2364 self._writeentry(
2325 transaction, 2365 transaction,
2326 ifh,
2327 dfh,
2328 entry, 2366 entry,
2329 deltainfo.data, 2367 deltainfo.data,
2330 link, 2368 link,
2331 offset, 2369 offset,
2332 serialized_sidedata, 2370 serialized_sidedata,
2360 # Sidedata for a previous rev has potentially been written after 2398 # Sidedata for a previous rev has potentially been written after
2361 # this rev's end, so take the max. 2399 # this rev's end, so take the max.
2362 offset = max(self.end(rev), offset, sidedata_end) 2400 offset = max(self.end(rev), offset, sidedata_end)
2363 return offset 2401 return offset
2364 2402
2365 def _writeentry( 2403 def _writeentry(self, transaction, entry, data, link, offset, sidedata):
2366 self, transaction, ifh, dfh, entry, data, link, offset, sidedata
2367 ):
2368 # Files opened in a+ mode have inconsistent behavior on various 2404 # Files opened in a+ mode have inconsistent behavior on various
2369 # platforms. Windows requires that a file positioning call be made 2405 # platforms. Windows requires that a file positioning call be made
2370 # when the file handle transitions between reads and writes. See 2406 # when the file handle transitions between reads and writes. See
2371 # 3686fa2b8eee and the mixedfilemodewrapper in windows.py. On other 2407 # 3686fa2b8eee and the mixedfilemodewrapper in windows.py. On other
2372 # platforms, Python or the platform itself can be buggy. Some versions 2408 # platforms, Python or the platform itself can be buggy. Some versions
2375 # 2411 #
2376 # We work around this issue by inserting a seek() before writing. 2412 # We work around this issue by inserting a seek() before writing.
2377 # Note: This is likely not necessary on Python 3. However, because 2413 # Note: This is likely not necessary on Python 3. However, because
2378 # the file handle is reused for reads and may be seeked there, we need 2414 # the file handle is reused for reads and may be seeked there, we need
2379 # to be careful before changing this. 2415 # to be careful before changing this.
2416 if self._writinghandles is None:
2417 msg = b'adding revision outside `revlog._writing` context'
2418 raise error.ProgrammingError(msg)
2419 ifh, dfh = self._writinghandles
2380 ifh.seek(0, os.SEEK_END) 2420 ifh.seek(0, os.SEEK_END)
2381 if dfh: 2421 if dfh:
2382 dfh.seek(0, os.SEEK_END) 2422 dfh.seek(0, os.SEEK_END)
2383 2423
2384 curr = len(self) - 1 2424 curr = len(self) - 1
2397 ifh.write(entry) 2437 ifh.write(entry)
2398 ifh.write(data[0]) 2438 ifh.write(data[0])
2399 ifh.write(data[1]) 2439 ifh.write(data[1])
2400 if sidedata: 2440 if sidedata:
2401 ifh.write(sidedata) 2441 ifh.write(sidedata)
2402 self._enforceinlinesize(transaction, ifh) 2442 self._enforceinlinesize(transaction)
2403 nodemaputil.setup_persistent_nodemap(transaction, self) 2443 nodemaputil.setup_persistent_nodemap(transaction, self)
2404 2444
2405 def addgroup( 2445 def addgroup(
2406 self, 2446 self,
2407 deltas, 2447 deltas,
2420 2460
2421 If ``addrevisioncb`` is defined, it will be called with arguments of 2461 If ``addrevisioncb`` is defined, it will be called with arguments of
2422 this revlog and the node that was added. 2462 this revlog and the node that was added.
2423 """ 2463 """
2424 2464
2425 if self._writinghandles: 2465 if self._adding_group:
2426 raise error.ProgrammingError(b'cannot nest addgroup() calls') 2466 raise error.ProgrammingError(b'cannot nest addgroup() calls')
2427 2467
2428 r = len(self) 2468 self._adding_group = True
2429 end = 0
2430 if r:
2431 end = self.end(r - 1)
2432 ifh = self._indexfp(b"a+")
2433 isize = r * self.index.entry_size
2434 if self._inline:
2435 transaction.add(self._indexfile, end + isize)
2436 dfh = None
2437 else:
2438 transaction.add(self._indexfile, isize)
2439 transaction.add(self._datafile, end)
2440 dfh = self._datafp(b"a+")
2441
2442 self._writinghandles = (ifh, dfh)
2443 empty = True 2469 empty = True
2444
2445 try: 2470 try:
2446 if True: 2471 with self._writing(transaction):
2447 deltacomputer = deltautil.deltacomputer(self) 2472 deltacomputer = deltautil.deltacomputer(self)
2448 # loop through our set of deltas 2473 # loop through our set of deltas
2449 for data in deltas: 2474 for data in deltas:
2450 ( 2475 (
2451 node, 2476 node,
2512 link, 2537 link,
2513 p1, 2538 p1,
2514 p2, 2539 p2,
2515 flags, 2540 flags,
2516 (baserev, delta), 2541 (baserev, delta),
2517 ifh,
2518 dfh,
2519 alwayscache=alwayscache, 2542 alwayscache=alwayscache,
2520 deltacomputer=deltacomputer, 2543 deltacomputer=deltacomputer,
2521 sidedata=sidedata, 2544 sidedata=sidedata,
2522 ) 2545 )
2523 2546
2524 if addrevisioncb: 2547 if addrevisioncb:
2525 addrevisioncb(self, rev) 2548 addrevisioncb(self, rev)
2526 empty = False 2549 empty = False
2527
2528 if not dfh and not self._inline:
2529 # addrevision switched from inline to conventional
2530 # reopen the index
2531 ifh.close()
2532 dfh = self._datafp(b"a+")
2533 ifh = self._indexfp(b"a+")
2534 self._writinghandles = (ifh, dfh)
2535 finally: 2550 finally:
2536 self._writinghandles = None 2551 self._adding_group = False
2537
2538 if dfh:
2539 dfh.close()
2540 ifh.close()
2541 return not empty 2552 return not empty
2542 2553
2543 def iscensored(self, rev): 2554 def iscensored(self, rev):
2544 """Check if a file revision is censored.""" 2555 """Check if a file revision is censored."""
2545 if not self._censorable: 2556 if not self._censorable:
2866 (sidedata, new_flags) = sidedatautil.run_sidedata_helpers( 2877 (sidedata, new_flags) = sidedatautil.run_sidedata_helpers(
2867 self, sidedata_helpers, sidedata, rev 2878 self, sidedata_helpers, sidedata, rev
2868 ) 2879 )
2869 flags = flags | new_flags[0] & ~new_flags[1] 2880 flags = flags | new_flags[0] & ~new_flags[1]
2870 2881
2871 ifh = destrevlog.opener( 2882 with destrevlog._writing(tr):
2872 destrevlog._indexfile, b'a+', checkambig=False
2873 )
2874 dfh = None
2875 if not destrevlog._inline:
2876 dfh = destrevlog.opener(destrevlog._datafile, b'a+')
2877 try:
2878 destrevlog._addrevision( 2883 destrevlog._addrevision(
2879 node, 2884 node,
2880 rawtext, 2885 rawtext,
2881 tr, 2886 tr,
2882 linkrev, 2887 linkrev,
2883 p1, 2888 p1,
2884 p2, 2889 p2,
2885 flags, 2890 flags,
2886 cachedelta, 2891 cachedelta,
2887 ifh,
2888 dfh,
2889 deltacomputer=deltacomputer, 2892 deltacomputer=deltacomputer,
2890 sidedata=sidedata, 2893 sidedata=sidedata,
2891 ) 2894 )
2892 finally:
2893 if dfh:
2894 dfh.close()
2895 ifh.close()
2896 2895
2897 if addrevisioncb: 2896 if addrevisioncb:
2898 addrevisioncb(self, rev, node) 2897 addrevisioncb(self, rev, node)
2899 2898
2900 def censorrevision(self, tr, censornode, tombstone=b''): 2899 def censorrevision(self, tr, censornode, tombstone=b''):