comparison mercurial/sshpeer.py @ 52276:7aec18bded6d stable

sshpeer: fix deadlock on short writes This commit makes the `sshpeer.doublepipe` object retry on short write, which fixes a deadlock in hg client-server communication, in particular when client needs to send a large message. Apparently Mercurial relies on `write` method on file objects to never return short writes, without checking, which leads to deadlocks. This work fine when the underlying file object is buffered, since buffered writers never return short writes. (why buffering has anything to do with this I don't know, but ok) It breaks horribly with raw IO, which happens to be used in sshpeer.
author Arseniy Alekseyev <aalekseyev@janestreet.com>
date Wed, 05 Feb 2025 16:49:43 +0000
parents f4733654f144
children f6b30f4b5e07
comparison
equal deleted inserted replaced
52275:e16065bb7f42 52276:7aec18bded6d
7 7
8 from __future__ import annotations 8 from __future__ import annotations
9 9
10 import re 10 import re
11 import uuid 11 import uuid
12
13 from typing import Callable, Optional
12 14
13 from .i18n import _ 15 from .i18n import _
14 from . import ( 16 from . import (
15 error, 17 error,
16 pycompat, 18 pycompat,
45 display = ui.warn if warn else ui.status 47 display = ui.warn if warn else ui.status
46 for l in s.splitlines(): 48 for l in s.splitlines():
47 display(_(b"remote: "), l, b'\n') 49 display(_(b"remote: "), l, b'\n')
48 50
49 51
52 def _write_all(
53 write_once: Callable[[bytes], Optional[int]],
54 data: bytes,
55 ) -> Optional[int]:
56 """write data with a non blocking function
57
58 In case not all data were written, keep writing until everything is
59 written.
60 """
61 to_write = len(data)
62 written = write_once(data)
63 if written is None:
64 written = 0
65 if written < to_write:
66 data = memoryview(data)
67 while written < to_write:
68 wrote = write_once(data[written:])
69 # XXX if number of written bytes is "None", the destination is
70 # full. Some `select` call would be better than the current active
71 # polling.
72 if wrote is not None:
73 written += wrote
74 return written
75
76
50 class doublepipe: 77 class doublepipe:
51 """Operate a side-channel pipe in addition of a main one 78 """Operate a side-channel pipe in addition of a main one
52 79
53 The side-channel pipe contains server output to be forwarded to the user 80 The side-channel pipe contains server output to be forwarded to the user
54 input. The double pipe will behave as the "main" pipe, but will ensure the 81 input. The double pipe will behave as the "main" pipe, but will ensure the
89 except NotImplementedError: 116 except NotImplementedError:
90 # non supported yet case, assume all have data. 117 # non supported yet case, assume all have data.
91 act = fds 118 act = fds
92 return (self._main.fileno() in act, self._side.fileno() in act) 119 return (self._main.fileno() in act, self._side.fileno() in act)
93 120
94 def write(self, data): 121 def _write_once(self, data: bytes) -> Optional[int]:
122 """Write as much data as possible in a non blocking way"""
95 return self._call(b'write', data) 123 return self._call(b'write', data)
124
125 def write(self, data: bytes) -> Optional[int]:
126 """write all data in a blocking way"""
127 return _write_all(self._write_once, data)
96 128
97 def read(self, size): 129 def read(self, size):
98 r = self._call(b'read', size) 130 r = self._call(b'read', size)
99 if size != 0 and not r: 131 if size != 0 and not r:
100 # We've observed a condition that indicates the 132 # We've observed a condition that indicates the
122 def _call(self, methname, data=None): 154 def _call(self, methname, data=None):
123 """call <methname> on "main", forward output of "side" while blocking""" 155 """call <methname> on "main", forward output of "side" while blocking"""
124 # data can be '' or 0 156 # data can be '' or 0
125 if (data is not None and not data) or self._main.closed: 157 if (data is not None and not data) or self._main.closed:
126 _forwardoutput(self._ui, self._side) 158 _forwardoutput(self._ui, self._side)
159 if methname == b'write':
160 return 0
127 return b'' 161 return b''
128 while True: 162 while True:
129 mainready, sideready = self._wait() 163 mainready, sideready = self._wait()
130 if sideready: 164 if sideready:
131 _forwardoutput(self._ui, self._side) 165 _forwardoutput(self._ui, self._side)