--- a/mercurial/commandserver.py Sat Oct 05 10:29:34 2019 -0400
+++ b/mercurial/commandserver.py Sun Oct 06 09:45:02 2019 -0400
@@ -18,6 +18,7 @@
try:
import selectors
+
selectors.BaseSelector
except ImportError:
from .thirdparty import selectors2 as selectors
@@ -37,6 +38,7 @@
procutil,
)
+
class channeledoutput(object):
"""
Write data to out in the following format:
@@ -44,6 +46,7 @@
data length (unsigned int),
data
"""
+
def __init__(self, out, channel):
self.out = out
self.channel = channel
@@ -64,6 +67,7 @@
raise AttributeError(attr)
return getattr(self.out, attr)
+
class channeledmessage(object):
"""
Write encoded message and metadata to out in the following format:
@@ -92,6 +96,7 @@
def __getattr__(self, attr):
return getattr(self._cout, attr)
+
class channeledinput(object):
"""
Read data from in_.
@@ -178,10 +183,12 @@
raise AttributeError(attr)
return getattr(self.in_, attr)
+
_messageencoders = {
b'cbor': lambda v: b''.join(cborutil.streamencode(v)),
}
+
def _selectmessageencoder(ui):
# experimental config: cmdserver.message-encodings
encnames = ui.configlist(b'cmdserver', b'message-encodings')
@@ -189,14 +196,17 @@
f = _messageencoders.get(n)
if f:
return n, f
- raise error.Abort(b'no supported message encodings: %s'
- % b' '.join(encnames))
+ raise error.Abort(
+ b'no supported message encodings: %s' % b' '.join(encnames)
+ )
+
class server(object):
"""
Listens for commands on fin, runs them and writes the output on a channel
based stream to fout.
"""
+
def __init__(self, ui, repo, fin, fout, prereposetups=None):
self.cwd = encoding.getcwd()
@@ -282,7 +292,7 @@
self.repo.baseui = copiedui
# clone ui without using ui.copy because this is protected
repoui = self.repoui.__class__(self.repoui)
- repoui.copy = copiedui.copy # redo copy protection
+ repoui.copy = copiedui.copy # redo copy protection
uis.append(repoui)
self.repo.ui = self.repo.dirstate._ui = repoui
self.repo.invalidateall()
@@ -295,9 +305,16 @@
if not util.safehasattr(self.cin, 'fileno'):
ui.setconfig('ui', 'nontty', 'true', 'commandserver')
- req = dispatch.request(args[:], copiedui, self.repo, self.cin,
- self.cout, self.cerr, self.cmsg,
- prereposetups=self._prereposetups)
+ req = dispatch.request(
+ args[:],
+ copiedui,
+ self.repo,
+ self.cin,
+ self.cout,
+ self.cerr,
+ self.cmsg,
+ prereposetups=self._prereposetups,
+ )
try:
ret = dispatch.dispatch(req) & 255
@@ -324,8 +341,7 @@
return cmd != ''
- capabilities = {'runcommand': runcommand,
- 'getencoding': getencoding}
+ capabilities = {'runcommand': runcommand, 'getencoding': getencoding}
def serve(self):
hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
@@ -352,6 +368,7 @@
return 0
+
def setuplogging(ui, repo=None, fp=None):
"""Set up server logging facility
@@ -377,8 +394,13 @@
# developer config: cmdserver.max-log-size
maxsize = ui.configbytes(b'cmdserver', b'max-log-size')
vfs = vfsmod.vfs(os.path.dirname(logpath))
- logger = loggingutil.filelogger(vfs, os.path.basename(logpath), tracked,
- maxfiles=maxfiles, maxsize=maxsize)
+ logger = loggingutil.filelogger(
+ vfs,
+ os.path.basename(logpath),
+ tracked,
+ maxfiles=maxfiles,
+ maxsize=maxsize,
+ )
targetuis = {ui}
if repo:
@@ -387,6 +409,7 @@
for u in targetuis:
u.setlogger(b'cmdserver', logger)
+
class pipeservice(object):
def __init__(self, ui, repo, opts):
self.ui = ui
@@ -406,6 +429,7 @@
finally:
sv.cleanup()
+
def _initworkerprocess():
# use a different process group from the master process, in order to:
# 1. make the current process group no longer "orphaned" (because the
@@ -423,6 +447,7 @@
# same state inherited from parent.
random.seed()
+
def _serverequest(ui, repo, conn, createcmdserver, prereposetups):
fin = conn.makefile(r'rb')
fout = conn.makefile(r'wb')
@@ -442,7 +467,7 @@
pass
finally:
sv.cleanup()
- except: # re-raises
+ except: # re-raises
# also write traceback to error channel. otherwise client cannot
# see it because it is written to server's stderr by default.
if sv:
@@ -459,6 +484,7 @@
if inst.errno != errno.EPIPE:
raise
+
class unixservicehandler(object):
"""Set of pluggable operations for unix-mode services
@@ -492,6 +518,7 @@
serves for the current connection"""
return server(self.ui, repo, fin, fout, prereposetups)
+
class unixforkingservice(object):
"""
Listens on unix domain socket and forks server per connection
@@ -558,10 +585,12 @@
exiting = False
h = self._servicehandler
selector = selectors.DefaultSelector()
- selector.register(self._sock, selectors.EVENT_READ,
- self._acceptnewconnection)
- selector.register(self._mainipc, selectors.EVENT_READ,
- self._handlemainipc)
+ selector.register(
+ self._sock, selectors.EVENT_READ, self._acceptnewconnection
+ )
+ selector.register(
+ self._mainipc, selectors.EVENT_READ, self._handlemainipc
+ )
while True:
if not exiting and h.shouldexit():
# clients can no longer connect() to the domain socket, so
@@ -605,8 +634,9 @@
pid = os.fork()
if pid:
try:
- self.ui.log(b'cmdserver', b'forked worker process (pid=%d)\n',
- pid)
+ self.ui.log(
+ b'cmdserver', b'forked worker process (pid=%d)\n', pid
+ )
self._workerpids.add(pid)
h.newconnection()
finally:
@@ -662,8 +692,13 @@
_initworkerprocess()
h = self._servicehandler
try:
- _serverequest(self.ui, self.repo, conn, h.createcmdserver,
- prereposetups=[self._reposetup])
+ _serverequest(
+ self.ui,
+ self.repo,
+ conn,
+ h.createcmdserver,
+ prereposetups=[self._reposetup],
+ )
finally:
gc.collect() # trigger __del__ since worker process uses os._exit
@@ -677,8 +712,9 @@
try:
self._cmdserveripc.send(self.root)
except socket.error:
- self.ui.log(b'cmdserver',
- b'failed to send repo root to master\n')
+ self.ui.log(
+ b'cmdserver', b'failed to send repo root to master\n'
+ )
repo.__class__ = unixcmdserverrepo
repo._cmdserveripc = self._workeripc