--- a/mercurial/wireprotov1peer.py Fri Apr 13 10:51:23 2018 -0700
+++ b/mercurial/wireprotov1peer.py Fri Apr 13 11:02:34 2018 -0700
@@ -9,6 +9,7 @@
import hashlib
import sys
+import weakref
from .i18n import _
from .node import (
@@ -180,6 +181,26 @@
return ';'.join(cmds)
+class unsentfuture(pycompat.futures.Future):
+ """A Future variation to represent an unsent command.
+
+ Because we buffer commands and don't submit them immediately, calling
+ ``result()`` on an unsent future could deadlock. Futures for buffered
+ commands are represented by this type, which wraps ``result()`` to
+ call ``sendcommands()``.
+ """
+
+ def result(self, timeout=None):
+ if self.done():
+ return pycompat.futures.Future.result(self, timeout)
+
+ self._peerexecutor.sendcommands()
+
+ # This looks like it will infinitely recurse. However,
+ # sendcommands() should modify __class__. This call serves as a check
+ # on that.
+ return self.result(timeout)
+
@zi.implementer(repository.ipeercommandexecutor)
class peerexecutor(object):
def __init__(self, peer):
@@ -187,6 +208,9 @@
self._sent = False
self._closed = False
self._calls = []
+ self._futures = weakref.WeakSet()
+ self._responseexecutor = None
+ self._responsef = None
def __enter__(self):
return self
@@ -214,20 +238,35 @@
# Commands are either batchable or they aren't. If a command
# isn't batchable, we send it immediately because the executor
# can no longer accept new commands after a non-batchable command.
- # If a command is batchable, we queue it for later.
+ # If a command is batchable, we queue it for later. But we have
+ # to account for the case of a non-batchable command arriving after
+ # a batchable one and refuse to service it.
+
+ def addcall():
+ f = pycompat.futures.Future()
+ self._futures.add(f)
+ self._calls.append((command, args, fn, f))
+ return f
if getattr(fn, 'batchable', False):
- pass
+ f = addcall()
+
+ # But since we don't issue it immediately, we wrap its result()
+ # to trigger sending so we avoid deadlocks.
+ f.__class__ = unsentfuture
+ f._peerexecutor = self
else:
if self._calls:
raise error.ProgrammingError(
'%s is not batchable and cannot be called on a command '
'executor along with other commands' % command)
- # We don't support batching yet. So resolve it immediately.
- f = pycompat.futures.Future()
- self._calls.append((command, args, fn, f))
- self.sendcommands()
+ f = addcall()
+
+ # Non-batchable commands can never coexist with another command
+ # in this executor. So send the command immediately.
+ self.sendcommands()
+
return f
def sendcommands(self):
@@ -239,10 +278,18 @@
self._sent = True
+ # Unhack any future types so caller seens a clean type and to break
+ # cycle between us and futures.
+ for f in self._futures:
+ if isinstance(f, unsentfuture):
+ f.__class__ = pycompat.futures.Future
+ f._peerexecutor = None
+
calls = self._calls
# Mainly to destroy references to futures.
self._calls = None
+ # Simple case of a single command. We call it synchronously.
if len(calls) == 1:
command, args, fn, f = calls[0]
@@ -259,14 +306,99 @@
return
- raise error.ProgrammingError('support for multiple commands not '
- 'yet implemented')
+ # Batch commands are a bit harder. First, we have to deal with the
+ # @batchable coroutine. That's a bit annoying. Furthermore, we also
+ # need to preserve streaming. i.e. it should be possible for the
+ # futures to resolve as data is coming in off the wire without having
+ # to wait for the final byte of the final response. We do this by
+ # spinning up a thread to read the responses.
+
+ requests = []
+ states = []
+
+ for command, args, fn, f in calls:
+ # Future was cancelled. Ignore it.
+ if not f.set_running_or_notify_cancel():
+ continue
+
+ try:
+ batchable = fn.batchable(fn.__self__,
+ **pycompat.strkwargs(args))
+ except Exception:
+ f.set_exception_info(*sys.exc_info()[1:])
+ return
+
+ # Encoded arguments and future holding remote result.
+ try:
+ encodedargs, fremote = next(batchable)
+ except Exception:
+ f.set_exception_info(*sys.exc_info()[1:])
+ return
+
+ requests.append((command, encodedargs))
+ states.append((command, f, batchable, fremote))
+
+ if not requests:
+ return
+
+ # This will emit responses in order they were executed.
+ wireresults = self._peer._submitbatch(requests)
+
+ # The use of a thread pool executor here is a bit weird for something
+ # that only spins up a single thread. However, thread management is
+ # hard and it is easy to encounter race conditions, deadlocks, etc.
+ # concurrent.futures already solves these problems and its thread pool
+ # executor has minimal overhead. So we use it.
+ self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
+ self._responsef = self._responseexecutor.submit(self._readbatchresponse,
+ states, wireresults)
def close(self):
self.sendcommands()
+ if self._closed:
+ return
+
self._closed = True
+ if not self._responsef:
+ return
+
+ # We need to wait on our in-flight response and then shut down the
+ # executor once we have a result.
+ try:
+ self._responsef.result()
+ finally:
+ self._responseexecutor.shutdown(wait=True)
+ self._responsef = None
+ self._responseexecutor = None
+
+ # If any of our futures are still in progress, mark them as
+ # errored. Otherwise a result() could wait indefinitely.
+ for f in self._futures:
+ if not f.done():
+ f.set_exception(error.ResponseError(
+ _('unfulfilled batch command response')))
+
+ self._futures = None
+
+ def _readbatchresponse(self, states, wireresults):
+ # Executes in a thread to read data off the wire.
+
+ for command, f, batchable, fremote in states:
+ # Grab raw result off the wire and teach the internal future
+ # about it.
+ remoteresult = next(wireresults)
+ fremote.set(remoteresult)
+
+ # And ask the coroutine to decode that value.
+ try:
+ result = next(batchable)
+ except Exception:
+ f.set_exception_info(*sys.exc_info()[1:])
+ else:
+ f.set_result(result)
+
class wirepeer(repository.legacypeer):
"""Client-side interface for communicating with a peer repository.