Mercurial > public > mercurial-scm > hg-stable
comparison mercurial/sshpeer.py @ 52276:7aec18bded6d stable
sshpeer: fix deadlock on short writes
This commit makes the `sshpeer.doublepipe` object retry on short write,
which fixes a deadlock in hg client-server communication, in
particular when client needs to send a large message.
Apparently Mercurial relies on `write` method on file objects
to never return short writes, without checking, which leads to deadlocks.
This work fine when the underlying file object is buffered,
since buffered writers never return short writes.
(why buffering has anything to do with this I don't know, but ok)
It breaks horribly with raw IO, which happens to be used in sshpeer.
author | Arseniy Alekseyev <aalekseyev@janestreet.com> |
---|---|
date | Wed, 05 Feb 2025 16:49:43 +0000 |
parents | f4733654f144 |
children | f6b30f4b5e07 |
comparison
equal
deleted
inserted
replaced
52275:e16065bb7f42 | 52276:7aec18bded6d |
---|---|
7 | 7 |
8 from __future__ import annotations | 8 from __future__ import annotations |
9 | 9 |
10 import re | 10 import re |
11 import uuid | 11 import uuid |
12 | |
13 from typing import Callable, Optional | |
12 | 14 |
13 from .i18n import _ | 15 from .i18n import _ |
14 from . import ( | 16 from . import ( |
15 error, | 17 error, |
16 pycompat, | 18 pycompat, |
45 display = ui.warn if warn else ui.status | 47 display = ui.warn if warn else ui.status |
46 for l in s.splitlines(): | 48 for l in s.splitlines(): |
47 display(_(b"remote: "), l, b'\n') | 49 display(_(b"remote: "), l, b'\n') |
48 | 50 |
49 | 51 |
52 def _write_all( | |
53 write_once: Callable[[bytes], Optional[int]], | |
54 data: bytes, | |
55 ) -> Optional[int]: | |
56 """write data with a non blocking function | |
57 | |
58 In case not all data were written, keep writing until everything is | |
59 written. | |
60 """ | |
61 to_write = len(data) | |
62 written = write_once(data) | |
63 if written is None: | |
64 written = 0 | |
65 if written < to_write: | |
66 data = memoryview(data) | |
67 while written < to_write: | |
68 wrote = write_once(data[written:]) | |
69 # XXX if number of written bytes is "None", the destination is | |
70 # full. Some `select` call would be better than the current active | |
71 # polling. | |
72 if wrote is not None: | |
73 written += wrote | |
74 return written | |
75 | |
76 | |
50 class doublepipe: | 77 class doublepipe: |
51 """Operate a side-channel pipe in addition of a main one | 78 """Operate a side-channel pipe in addition of a main one |
52 | 79 |
53 The side-channel pipe contains server output to be forwarded to the user | 80 The side-channel pipe contains server output to be forwarded to the user |
54 input. The double pipe will behave as the "main" pipe, but will ensure the | 81 input. The double pipe will behave as the "main" pipe, but will ensure the |
89 except NotImplementedError: | 116 except NotImplementedError: |
90 # non supported yet case, assume all have data. | 117 # non supported yet case, assume all have data. |
91 act = fds | 118 act = fds |
92 return (self._main.fileno() in act, self._side.fileno() in act) | 119 return (self._main.fileno() in act, self._side.fileno() in act) |
93 | 120 |
94 def write(self, data): | 121 def _write_once(self, data: bytes) -> Optional[int]: |
122 """Write as much data as possible in a non blocking way""" | |
95 return self._call(b'write', data) | 123 return self._call(b'write', data) |
124 | |
125 def write(self, data: bytes) -> Optional[int]: | |
126 """write all data in a blocking way""" | |
127 return _write_all(self._write_once, data) | |
96 | 128 |
97 def read(self, size): | 129 def read(self, size): |
98 r = self._call(b'read', size) | 130 r = self._call(b'read', size) |
99 if size != 0 and not r: | 131 if size != 0 and not r: |
100 # We've observed a condition that indicates the | 132 # We've observed a condition that indicates the |
122 def _call(self, methname, data=None): | 154 def _call(self, methname, data=None): |
123 """call <methname> on "main", forward output of "side" while blocking""" | 155 """call <methname> on "main", forward output of "side" while blocking""" |
124 # data can be '' or 0 | 156 # data can be '' or 0 |
125 if (data is not None and not data) or self._main.closed: | 157 if (data is not None and not data) or self._main.closed: |
126 _forwardoutput(self._ui, self._side) | 158 _forwardoutput(self._ui, self._side) |
159 if methname == b'write': | |
160 return 0 | |
127 return b'' | 161 return b'' |
128 while True: | 162 while True: |
129 mainready, sideready = self._wait() | 163 mainready, sideready = self._wait() |
130 if sideready: | 164 if sideready: |
131 _forwardoutput(self._ui, self._side) | 165 _forwardoutput(self._ui, self._side) |