Mercurial > public > mercurial-scm > hg-stable
diff mercurial/sshpeer.py @ 52276:7aec18bded6d stable
sshpeer: fix deadlock on short writes
This commit makes the `sshpeer.doublepipe` object retry on short write,
which fixes a deadlock in hg client-server communication, in
particular when client needs to send a large message.
Apparently Mercurial relies on `write` method on file objects
to never return short writes, without checking, which leads to deadlocks.
This work fine when the underlying file object is buffered,
since buffered writers never return short writes.
(why buffering has anything to do with this I don't know, but ok)
It breaks horribly with raw IO, which happens to be used in sshpeer.
author | Arseniy Alekseyev <aalekseyev@janestreet.com> |
---|---|
date | Wed, 05 Feb 2025 16:49:43 +0000 |
parents | f4733654f144 |
children | f6b30f4b5e07 |
line wrap: on
line diff
--- a/mercurial/sshpeer.py Tue Feb 18 21:23:13 2025 +0100 +++ b/mercurial/sshpeer.py Wed Feb 05 16:49:43 2025 +0000 @@ -10,6 +10,8 @@ import re import uuid +from typing import Callable, Optional + from .i18n import _ from . import ( error, @@ -47,6 +49,31 @@ display(_(b"remote: "), l, b'\n') +def _write_all( + write_once: Callable[[bytes], Optional[int]], + data: bytes, +) -> Optional[int]: + """write data with a non blocking function + + In case not all data were written, keep writing until everything is + written. + """ + to_write = len(data) + written = write_once(data) + if written is None: + written = 0 + if written < to_write: + data = memoryview(data) + while written < to_write: + wrote = write_once(data[written:]) + # XXX if number of written bytes is "None", the destination is + # full. Some `select` call would be better than the current active + # polling. + if wrote is not None: + written += wrote + return written + + class doublepipe: """Operate a side-channel pipe in addition of a main one @@ -91,9 +118,14 @@ act = fds return (self._main.fileno() in act, self._side.fileno() in act) - def write(self, data): + def _write_once(self, data: bytes) -> Optional[int]: + """Write as much data as possible in a non blocking way""" return self._call(b'write', data) + def write(self, data: bytes) -> Optional[int]: + """write all data in a blocking way""" + return _write_all(self._write_once, data) + def read(self, size): r = self._call(b'read', size) if size != 0 and not r: @@ -124,6 +156,8 @@ # data can be '' or 0 if (data is not None and not data) or self._main.closed: _forwardoutput(self._ui, self._side) + if methname == b'write': + return 0 return b'' while True: mainready, sideready = self._wait()