Mercurial > public > mercurial-scm > hg
comparison tests/test-stdio.py @ 45104:eb26a9cf7821
procutil: avoid use of deprecated tempfile.mktemp()
In the previous version, I used tempfile.mktemp() because it seemed to be the
only way to open a file from two processes (the Python documentation says the
file backing NamedTemporaryFile can?t be opened a second time on Windows).
However, it?s possible when passing the O_TEMPORARY flag to the second open.
Source: https://stackoverflow.com/a/15235559/6366251
author | Manuel Jacob <me@manueljacob.de> |
---|---|
date | Thu, 09 Jul 2020 12:52:04 +0200 |
parents | dff208398ede |
children | c2c862b9b544 |
comparison
equal
deleted
inserted
replaced
45103:a5fa2761a6cd | 45104:eb26a9cf7821 |
---|---|
32 LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]' | 32 LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]' |
33 FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n' | 33 FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n' |
34 | 34 |
35 | 35 |
36 TEST_LARGE_WRITE_CHILD_SCRIPT = r''' | 36 TEST_LARGE_WRITE_CHILD_SCRIPT = r''' |
37 import os | |
37 import signal | 38 import signal |
38 import sys | 39 import sys |
39 | 40 |
40 from mercurial import dispatch | 41 from mercurial import dispatch |
41 from mercurial.utils import procutil | 42 from mercurial.utils import procutil |
42 | 43 |
43 signal.signal(signal.SIGINT, lambda *x: None) | 44 signal.signal(signal.SIGINT, lambda *x: None) |
44 dispatch.initstdio() | 45 dispatch.initstdio() |
45 write_result = procutil.{stream}.write(b'x' * 1048576) | 46 write_result = procutil.{stream}.write(b'x' * 1048576) |
46 with open({write_result_fn}, 'w') as write_result_f: | 47 with os.fdopen( |
48 os.open({write_result_fn!r}, os.O_WRONLY | getattr(os, 'O_TEMPORARY', 0)), | |
49 'w', | |
50 ) as write_result_f: | |
47 write_result_f.write(str(write_result)) | 51 write_result_f.write(str(write_result)) |
48 ''' | 52 ''' |
49 | 53 |
50 | 54 |
51 @contextlib.contextmanager | 55 @contextlib.contextmanager |
199 self.assertEqual( | 203 self.assertEqual( |
200 _readall(stream_receiver, 131072, buf), b'x' * 1048576 | 204 _readall(stream_receiver, 131072, buf), b'x' * 1048576 |
201 ) | 205 ) |
202 | 206 |
203 def post_child_check(): | 207 def post_child_check(): |
204 with open(write_result_fn, 'r') as write_result_f: | 208 write_result_str = write_result_f.read() |
205 write_result_str = write_result_f.read() | |
206 if pycompat.ispy3: | 209 if pycompat.ispy3: |
207 # On Python 3, we test that the correct number of bytes is | 210 # On Python 3, we test that the correct number of bytes is |
208 # claimed to have been written. | 211 # claimed to have been written. |
209 expected_write_result_str = '1048576' | 212 expected_write_result_str = '1048576' |
210 else: | 213 else: |
211 # On Python 2, we only check that the large write does not | 214 # On Python 2, we only check that the large write does not |
212 # crash. | 215 # crash. |
213 expected_write_result_str = 'None' | 216 expected_write_result_str = 'None' |
214 self.assertEqual(write_result_str, expected_write_result_str) | 217 self.assertEqual(write_result_str, expected_write_result_str) |
215 | 218 |
216 try: | 219 with tempfile.NamedTemporaryFile('r') as write_result_f: |
217 # tempfile.mktemp() is unsafe in general, as a malicious process | |
218 # could create the file before we do. But in tests, we're running | |
219 # in a controlled environment. | |
220 write_result_fn = tempfile.mktemp() | |
221 self._test( | 220 self._test( |
222 TEST_LARGE_WRITE_CHILD_SCRIPT.format( | 221 TEST_LARGE_WRITE_CHILD_SCRIPT.format( |
223 stream=stream, write_result_fn=repr(write_result_fn) | 222 stream=stream, write_result_fn=write_result_f.name |
224 ), | 223 ), |
225 stream, | 224 stream, |
226 rwpair_generator, | 225 rwpair_generator, |
227 check_output, | 226 check_output, |
228 python_args, | 227 python_args, |
229 post_child_check=post_child_check, | 228 post_child_check=post_child_check, |
230 ) | 229 ) |
231 finally: | |
232 try: | |
233 os.unlink(write_result_fn) | |
234 except OSError: | |
235 pass | |
236 | 230 |
237 def test_large_write_stdout_devnull(self): | 231 def test_large_write_stdout_devnull(self): |
238 self._test_large_write('stdout', _devnull) | 232 self._test_large_write('stdout', _devnull) |
239 | 233 |
240 def test_large_write_stdout_pipes(self): | 234 def test_large_write_stdout_pipes(self): |