comparison mercurial/revlog.py @ 51103:d83d788590a8

changelog-delay: move the delay/divert logic inside the (inner) revlog Instead of hacking throught the vfs/opener, we implement the delay/divert logic inside the `_InnerRevlog` and `randomaccessfile` object. This will allow to an alternative implementation of the `_InnerRevlog` that does not need to use Python details. As a result, the new implementation can use the transaction less agressively and avoid some extra output since no data had been written yet. That seems like a good side effect.
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Tue, 24 Oct 2023 11:08:49 +0200
parents af96fbb8f739
children c2d2e5b65def
comparison
equal deleted inserted replaced
51102:af96fbb8f739 51103:d83d788590a8
367 self.inline = inline 367 self.inline = inline
368 self.data_config = data_config 368 self.data_config = data_config
369 self.delta_config = delta_config 369 self.delta_config = delta_config
370 self.feature_config = feature_config 370 self.feature_config = feature_config
371 371
372 # used during diverted write.
373 self._orig_index_file = None
374
372 self._default_compression_header = default_compression_header 375 self._default_compression_header = default_compression_header
373 376
374 # index 377 # index
375 378
376 # 3-tuple of file handles being used for active writing. 379 # 3-tuple of file handles being used for active writing.
391 # revlog header -> revlog compressor 394 # revlog header -> revlog compressor
392 self._decompressors = {} 395 self._decompressors = {}
393 # 3-tuple of (node, rev, text) for a raw revision. 396 # 3-tuple of (node, rev, text) for a raw revision.
394 self._revisioncache = None 397 self._revisioncache = None
395 398
399 self._delay_buffer = None
400
396 @property 401 @property
397 def index_file(self): 402 def index_file(self):
398 return self.__index_file 403 return self.__index_file
399 404
400 @index_file.setter 405 @index_file.setter
405 410
406 def __len__(self): 411 def __len__(self):
407 return len(self.index) 412 return len(self.index)
408 413
409 def clear_cache(self): 414 def clear_cache(self):
415 assert not self.is_delaying
410 self._revisioncache = None 416 self._revisioncache = None
411 self._segmentfile.clear_cache() 417 self._segmentfile.clear_cache()
412 self._segmentfile_sidedata.clear_cache() 418 self._segmentfile_sidedata.clear_cache()
413 419
414 @property 420 @property
415 def canonical_index_file(self): 421 def canonical_index_file(self):
422 if self._orig_index_file is not None:
423 return self._orig_index_file
416 return self.index_file 424 return self.index_file
425
426 @property
427 def is_delaying(self):
428 """is the revlog is currently delaying the visibility of written data?
429
430 The delaying mechanism can be either in-memory or written on disk in a
431 side-file."""
432 return (self._delay_buffer is not None) or (
433 self._orig_index_file is not None
434 )
417 435
418 # Derived from index values. 436 # Derived from index values.
419 437
420 def start(self, rev): 438 def start(self, rev):
421 """the offset of the data chunk for this revision""" 439 """the offset of the data chunk for this revision"""
698 """internal method to open the index file for writing 716 """internal method to open the index file for writing
699 717
700 You should not use this directly and use `_writing` instead 718 You should not use this directly and use `_writing` instead
701 """ 719 """
702 try: 720 try:
703 f = self.opener( 721 if self._delay_buffer is None:
704 self.index_file, 722 f = self.opener(
705 mode=b"r+", 723 self.index_file,
706 checkambig=self.data_config.check_ambig, 724 mode=b"r+",
707 ) 725 checkambig=self.data_config.check_ambig,
726 )
727 else:
728 # check_ambig affect we way we open file for writing, however
729 # here, we do not actually open a file for writting as write
730 # will appened to a delay_buffer. So check_ambig is not
731 # meaningful and unneeded here.
732 f = randomaccessfile.appender(
733 self.opener, self.index_file, b"r+", self._delay_buffer
734 )
708 if index_end is None: 735 if index_end is None:
709 f.seek(0, os.SEEK_END) 736 f.seek(0, os.SEEK_END)
710 else: 737 else:
711 f.seek(index_end, os.SEEK_SET) 738 f.seek(index_end, os.SEEK_SET)
712 return f 739 return f
713 except FileNotFoundError: 740 except FileNotFoundError:
714 return self.opener( 741 if self._delay_buffer is None:
715 self.index_file, 742 return self.opener(
716 mode=b"w+", 743 self.index_file,
717 checkambig=self.data_config.check_ambig, 744 mode=b"w+",
718 ) 745 checkambig=self.data_config.check_ambig,
746 )
747 else:
748 return randomaccessfile.appender(
749 self.opener, self.index_file, b"w+", self._delay_buffer
750 )
719 751
720 def __index_new_fp(self): 752 def __index_new_fp(self):
721 """internal method to create a new index file for writing 753 """internal method to create a new index file for writing
722 754
723 You should not use this unless you are upgrading from inline revlog 755 You should not use this unless you are upgrading from inline revlog
1042 if data[0]: 1074 if data[0]:
1043 dfh.write(data[0]) 1075 dfh.write(data[0])
1044 dfh.write(data[1]) 1076 dfh.write(data[1])
1045 if sidedata: 1077 if sidedata:
1046 sdfh.write(sidedata) 1078 sdfh.write(sidedata)
1047 ifh.write(entry) 1079 if self._delay_buffer is None:
1080 ifh.write(entry)
1081 else:
1082 self._delay_buffer.append(entry)
1048 else: 1083 else:
1049 offset += curr * self.index.entry_size 1084 offset += curr * self.index.entry_size
1050 transaction.add(self.canonical_index_file, offset) 1085 transaction.add(self.canonical_index_file, offset)
1051 ifh.write(entry)
1052 ifh.write(data[0])
1053 ifh.write(data[1])
1054 assert not sidedata 1086 assert not sidedata
1087 if self._delay_buffer is None:
1088 ifh.write(entry)
1089 ifh.write(data[0])
1090 ifh.write(data[1])
1091 else:
1092 self._delay_buffer.append(entry)
1093 self._delay_buffer.append(data[0])
1094 self._delay_buffer.append(data[1])
1055 return ( 1095 return (
1056 ifh.tell(), 1096 ifh.tell(),
1057 dfh.tell() if dfh else None, 1097 dfh.tell() if dfh else None,
1058 sdfh.tell() if sdfh else None, 1098 sdfh.tell() if sdfh else None,
1059 ) 1099 )
1100
1101 def _divert_index(self):
1102 return self.index_file + b'.a'
1103
1104 def delay(self):
1105 assert not self.is_open
1106 if self._delay_buffer is not None or self._orig_index_file is not None:
1107 # delay or divert already in place
1108 return None
1109 elif len(self.index) == 0:
1110 self._orig_index_file = self.index_file
1111 self.index_file = self._divert_index()
1112 self._segmentfile.filename = self.index_file
1113 assert self._orig_index_file is not None
1114 assert self.index_file is not None
1115 if self.opener.exists(self.index_file):
1116 self.opener.unlink(self.index_file)
1117 return self.index_file
1118 else:
1119 self._segmentfile._delay_buffer = self._delay_buffer = []
1120 return None
1121
1122 def write_pending(self):
1123 assert not self.is_open
1124 if self._orig_index_file is not None:
1125 return None, True
1126 any_pending = False
1127 pending_index_file = self._divert_index()
1128 if self.opener.exists(pending_index_file):
1129 self.opener.unlink(pending_index_file)
1130 util.copyfile(
1131 self.opener.join(self.index_file),
1132 self.opener.join(pending_index_file),
1133 )
1134 if self._delay_buffer:
1135 with self.opener(pending_index_file, b'r+') as ifh:
1136 ifh.seek(0, os.SEEK_END)
1137 ifh.write(b"".join(self._delay_buffer))
1138 any_pending = True
1139 self._segmentfile._delay_buffer = self._delay_buffer = None
1140 self._orig_index_file = self.index_file
1141 self.index_file = pending_index_file
1142 self._segmentfile.filename = self.index_file
1143 return self.index_file, any_pending
1144
1145 def finalize_pending(self):
1146 assert not self.is_open
1147
1148 delay = self._delay_buffer is not None
1149 divert = self._orig_index_file is not None
1150
1151 if delay and divert:
1152 assert False, "unreachable"
1153 elif delay:
1154 if self._delay_buffer:
1155 with self.opener(self.index_file, b'r+') as ifh:
1156 ifh.seek(0, os.SEEK_END)
1157 ifh.write(b"".join(self._delay_buffer))
1158 self._segmentfile._delay_buffer = self._delay_buffer = None
1159 elif divert:
1160 if self.opener.exists(self.index_file):
1161 self.opener.rename(
1162 self.index_file,
1163 self._orig_index_file,
1164 checkambig=True,
1165 )
1166 self.index_file = self._orig_index_file
1167 self._orig_index_file = None
1168 self._segmentfile.filename = self.index_file
1169 else:
1170 msg = b"not delay or divert found on this revlog"
1171 raise error.ProgrammingError(msg)
1172 return self.canonical_index_file
1060 1173
1061 1174
1062 class revlog: 1175 class revlog:
1063 """ 1176 """
1064 the underlying revision storage object 1177 the underlying revision storage object
2923 ): 3036 ):
2924 yield 3037 yield
2925 if self._docket is not None: 3038 if self._docket is not None:
2926 self._write_docket(transaction) 3039 self._write_docket(transaction)
2927 3040
3041 @property
3042 def is_delaying(self):
3043 return self._inner.is_delaying
3044
2928 def _write_docket(self, transaction): 3045 def _write_docket(self, transaction):
2929 """write the current docket on disk 3046 """write the current docket on disk
2930 3047
2931 Exist as a method to help changelog to implement transaction logic 3048 Exist as a method to help changelog to implement transaction logic
2932 3049