comparison mercurial/commandserver.py @ 43076:2372284d9457

formatting: blacken the codebase This is using my patch to black (https://github.com/psf/black/pull/826) so we don't un-wrap collection literals. Done with: hg files 'set:**.py - mercurial/thirdparty/** - "contrib/python-zstandard/**"' | xargs black -S # skip-blame mass-reformatting only # no-check-commit reformats foo_bar functions Differential Revision: https://phab.mercurial-scm.org/D6971
author Augie Fackler <augie@google.com>
date Sun, 06 Oct 2019 09:45:02 -0400
parents b0e3f2d7c143
children 687b865b95ad
comparison
equal deleted inserted replaced
43075:57875cf423c9 43076:2372284d9457
16 import struct 16 import struct
17 import traceback 17 import traceback
18 18
19 try: 19 try:
20 import selectors 20 import selectors
21
21 selectors.BaseSelector 22 selectors.BaseSelector
22 except ImportError: 23 except ImportError:
23 from .thirdparty import selectors2 as selectors 24 from .thirdparty import selectors2 as selectors
24 25
25 from .i18n import _ 26 from .i18n import _
35 from .utils import ( 36 from .utils import (
36 cborutil, 37 cborutil,
37 procutil, 38 procutil,
38 ) 39 )
39 40
41
40 class channeledoutput(object): 42 class channeledoutput(object):
41 """ 43 """
42 Write data to out in the following format: 44 Write data to out in the following format:
43 45
44 data length (unsigned int), 46 data length (unsigned int),
45 data 47 data
46 """ 48 """
49
47 def __init__(self, out, channel): 50 def __init__(self, out, channel):
48 self.out = out 51 self.out = out
49 self.channel = channel 52 self.channel = channel
50 53
51 @property 54 @property
62 def __getattr__(self, attr): 65 def __getattr__(self, attr):
63 if attr in (r'isatty', r'fileno', r'tell', r'seek'): 66 if attr in (r'isatty', r'fileno', r'tell', r'seek'):
64 raise AttributeError(attr) 67 raise AttributeError(attr)
65 return getattr(self.out, attr) 68 return getattr(self.out, attr)
66 69
70
67 class channeledmessage(object): 71 class channeledmessage(object):
68 """ 72 """
69 Write encoded message and metadata to out in the following format: 73 Write encoded message and metadata to out in the following format:
70 74
71 data length (unsigned int), 75 data length (unsigned int),
89 opts[b'data'] = data 93 opts[b'data'] = data
90 self._cout.write(self._encodefn(opts)) 94 self._cout.write(self._encodefn(opts))
91 95
92 def __getattr__(self, attr): 96 def __getattr__(self, attr):
93 return getattr(self._cout, attr) 97 return getattr(self._cout, attr)
98
94 99
95 class channeledinput(object): 100 class channeledinput(object):
96 """ 101 """
97 Read data from in_. 102 Read data from in_.
98 103
176 def __getattr__(self, attr): 181 def __getattr__(self, attr):
177 if attr in (r'isatty', r'fileno', r'tell', r'seek'): 182 if attr in (r'isatty', r'fileno', r'tell', r'seek'):
178 raise AttributeError(attr) 183 raise AttributeError(attr)
179 return getattr(self.in_, attr) 184 return getattr(self.in_, attr)
180 185
186
181 _messageencoders = { 187 _messageencoders = {
182 b'cbor': lambda v: b''.join(cborutil.streamencode(v)), 188 b'cbor': lambda v: b''.join(cborutil.streamencode(v)),
183 } 189 }
190
184 191
185 def _selectmessageencoder(ui): 192 def _selectmessageencoder(ui):
186 # experimental config: cmdserver.message-encodings 193 # experimental config: cmdserver.message-encodings
187 encnames = ui.configlist(b'cmdserver', b'message-encodings') 194 encnames = ui.configlist(b'cmdserver', b'message-encodings')
188 for n in encnames: 195 for n in encnames:
189 f = _messageencoders.get(n) 196 f = _messageencoders.get(n)
190 if f: 197 if f:
191 return n, f 198 return n, f
192 raise error.Abort(b'no supported message encodings: %s' 199 raise error.Abort(
193 % b' '.join(encnames)) 200 b'no supported message encodings: %s' % b' '.join(encnames)
201 )
202
194 203
195 class server(object): 204 class server(object):
196 """ 205 """
197 Listens for commands on fin, runs them and writes the output on a channel 206 Listens for commands on fin, runs them and writes the output on a channel
198 based stream to fout. 207 based stream to fout.
199 """ 208 """
209
200 def __init__(self, ui, repo, fin, fout, prereposetups=None): 210 def __init__(self, ui, repo, fin, fout, prereposetups=None):
201 self.cwd = encoding.getcwd() 211 self.cwd = encoding.getcwd()
202 212
203 if repo: 213 if repo:
204 # the ui here is really the repo ui so take its baseui so we don't 214 # the ui here is really the repo ui so take its baseui so we don't
280 uis = [copiedui] 290 uis = [copiedui]
281 if self.repo: 291 if self.repo:
282 self.repo.baseui = copiedui 292 self.repo.baseui = copiedui
283 # clone ui without using ui.copy because this is protected 293 # clone ui without using ui.copy because this is protected
284 repoui = self.repoui.__class__(self.repoui) 294 repoui = self.repoui.__class__(self.repoui)
285 repoui.copy = copiedui.copy # redo copy protection 295 repoui.copy = copiedui.copy # redo copy protection
286 uis.append(repoui) 296 uis.append(repoui)
287 self.repo.ui = self.repo.dirstate._ui = repoui 297 self.repo.ui = self.repo.dirstate._ui = repoui
288 self.repo.invalidateall() 298 self.repo.invalidateall()
289 299
290 for ui in uis: 300 for ui in uis:
293 # replace channels by fully functional tty files. so nontty is 303 # replace channels by fully functional tty files. so nontty is
294 # enforced only if cin is a channel. 304 # enforced only if cin is a channel.
295 if not util.safehasattr(self.cin, 'fileno'): 305 if not util.safehasattr(self.cin, 'fileno'):
296 ui.setconfig('ui', 'nontty', 'true', 'commandserver') 306 ui.setconfig('ui', 'nontty', 'true', 'commandserver')
297 307
298 req = dispatch.request(args[:], copiedui, self.repo, self.cin, 308 req = dispatch.request(
299 self.cout, self.cerr, self.cmsg, 309 args[:],
300 prereposetups=self._prereposetups) 310 copiedui,
311 self.repo,
312 self.cin,
313 self.cout,
314 self.cerr,
315 self.cmsg,
316 prereposetups=self._prereposetups,
317 )
301 318
302 try: 319 try:
303 ret = dispatch.dispatch(req) & 255 320 ret = dispatch.dispatch(req) & 255
304 self.cresult.write(struct.pack('>i', int(ret))) 321 self.cresult.write(struct.pack('>i', int(ret)))
305 finally: 322 finally:
322 # looking at the servers capabilities 339 # looking at the servers capabilities
323 raise error.Abort(_('unknown command %s') % cmd) 340 raise error.Abort(_('unknown command %s') % cmd)
324 341
325 return cmd != '' 342 return cmd != ''
326 343
327 capabilities = {'runcommand': runcommand, 344 capabilities = {'runcommand': runcommand, 'getencoding': getencoding}
328 'getencoding': getencoding}
329 345
330 def serve(self): 346 def serve(self):
331 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities)) 347 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
332 hellomsg += '\n' 348 hellomsg += '\n'
333 hellomsg += 'encoding: ' + encoding.encoding 349 hellomsg += 'encoding: ' + encoding.encoding
350 # its request 366 # its request
351 return 1 367 return 1
352 368
353 return 0 369 return 0
354 370
371
355 def setuplogging(ui, repo=None, fp=None): 372 def setuplogging(ui, repo=None, fp=None):
356 """Set up server logging facility 373 """Set up server logging facility
357 374
358 If cmdserver.log is '-', log messages will be sent to the given fp. 375 If cmdserver.log is '-', log messages will be sent to the given fp.
359 It should be the 'd' channel while a client is connected, and otherwise 376 It should be the 'd' channel while a client is connected, and otherwise
375 # developer config: cmdserver.max-log-files 392 # developer config: cmdserver.max-log-files
376 maxfiles = ui.configint(b'cmdserver', b'max-log-files') 393 maxfiles = ui.configint(b'cmdserver', b'max-log-files')
377 # developer config: cmdserver.max-log-size 394 # developer config: cmdserver.max-log-size
378 maxsize = ui.configbytes(b'cmdserver', b'max-log-size') 395 maxsize = ui.configbytes(b'cmdserver', b'max-log-size')
379 vfs = vfsmod.vfs(os.path.dirname(logpath)) 396 vfs = vfsmod.vfs(os.path.dirname(logpath))
380 logger = loggingutil.filelogger(vfs, os.path.basename(logpath), tracked, 397 logger = loggingutil.filelogger(
381 maxfiles=maxfiles, maxsize=maxsize) 398 vfs,
399 os.path.basename(logpath),
400 tracked,
401 maxfiles=maxfiles,
402 maxsize=maxsize,
403 )
382 404
383 targetuis = {ui} 405 targetuis = {ui}
384 if repo: 406 if repo:
385 targetuis.add(repo.baseui) 407 targetuis.add(repo.baseui)
386 targetuis.add(repo.ui) 408 targetuis.add(repo.ui)
387 for u in targetuis: 409 for u in targetuis:
388 u.setlogger(b'cmdserver', logger) 410 u.setlogger(b'cmdserver', logger)
411
389 412
390 class pipeservice(object): 413 class pipeservice(object):
391 def __init__(self, ui, repo, opts): 414 def __init__(self, ui, repo, opts):
392 self.ui = ui 415 self.ui = ui
393 self.repo = repo 416 self.repo = repo
403 sv = server(ui, self.repo, fin, fout) 426 sv = server(ui, self.repo, fin, fout)
404 try: 427 try:
405 return sv.serve() 428 return sv.serve()
406 finally: 429 finally:
407 sv.cleanup() 430 sv.cleanup()
431
408 432
409 def _initworkerprocess(): 433 def _initworkerprocess():
410 # use a different process group from the master process, in order to: 434 # use a different process group from the master process, in order to:
411 # 1. make the current process group no longer "orphaned" (because the 435 # 1. make the current process group no longer "orphaned" (because the
412 # parent of this process is in a different process group while 436 # parent of this process is in a different process group while
420 # unrelated processes. 444 # unrelated processes.
421 os.setpgid(0, 0) 445 os.setpgid(0, 0)
422 # change random state otherwise forked request handlers would have a 446 # change random state otherwise forked request handlers would have a
423 # same state inherited from parent. 447 # same state inherited from parent.
424 random.seed() 448 random.seed()
449
425 450
426 def _serverequest(ui, repo, conn, createcmdserver, prereposetups): 451 def _serverequest(ui, repo, conn, createcmdserver, prereposetups):
427 fin = conn.makefile(r'rb') 452 fin = conn.makefile(r'rb')
428 fout = conn.makefile(r'wb') 453 fout = conn.makefile(r'wb')
429 sv = None 454 sv = None
440 raise 465 raise
441 except KeyboardInterrupt: 466 except KeyboardInterrupt:
442 pass 467 pass
443 finally: 468 finally:
444 sv.cleanup() 469 sv.cleanup()
445 except: # re-raises 470 except: # re-raises
446 # also write traceback to error channel. otherwise client cannot 471 # also write traceback to error channel. otherwise client cannot
447 # see it because it is written to server's stderr by default. 472 # see it because it is written to server's stderr by default.
448 if sv: 473 if sv:
449 cerr = sv.cerr 474 cerr = sv.cerr
450 else: 475 else:
457 fout.close() # implicit flush() may cause another EPIPE 482 fout.close() # implicit flush() may cause another EPIPE
458 except IOError as inst: 483 except IOError as inst:
459 if inst.errno != errno.EPIPE: 484 if inst.errno != errno.EPIPE:
460 raise 485 raise
461 486
487
462 class unixservicehandler(object): 488 class unixservicehandler(object):
463 """Set of pluggable operations for unix-mode services 489 """Set of pluggable operations for unix-mode services
464 490
465 Almost all methods except for createcmdserver() are called in the main 491 Almost all methods except for createcmdserver() are called in the main
466 process. You can't pass mutable resource back from createcmdserver(). 492 process. You can't pass mutable resource back from createcmdserver().
489 515
490 def createcmdserver(self, repo, conn, fin, fout, prereposetups): 516 def createcmdserver(self, repo, conn, fin, fout, prereposetups):
491 """Create new command server instance; called in the process that 517 """Create new command server instance; called in the process that
492 serves for the current connection""" 518 serves for the current connection"""
493 return server(self.ui, repo, fin, fout, prereposetups) 519 return server(self.ui, repo, fin, fout, prereposetups)
520
494 521
495 class unixforkingservice(object): 522 class unixforkingservice(object):
496 """ 523 """
497 Listens on unix domain socket and forks server per connection 524 Listens on unix domain socket and forks server per connection
498 """ 525 """
556 583
557 def _mainloop(self): 584 def _mainloop(self):
558 exiting = False 585 exiting = False
559 h = self._servicehandler 586 h = self._servicehandler
560 selector = selectors.DefaultSelector() 587 selector = selectors.DefaultSelector()
561 selector.register(self._sock, selectors.EVENT_READ, 588 selector.register(
562 self._acceptnewconnection) 589 self._sock, selectors.EVENT_READ, self._acceptnewconnection
563 selector.register(self._mainipc, selectors.EVENT_READ, 590 )
564 self._handlemainipc) 591 selector.register(
592 self._mainipc, selectors.EVENT_READ, self._handlemainipc
593 )
565 while True: 594 while True:
566 if not exiting and h.shouldexit(): 595 if not exiting and h.shouldexit():
567 # clients can no longer connect() to the domain socket, so 596 # clients can no longer connect() to the domain socket, so
568 # we stop queuing new requests. 597 # we stop queuing new requests.
569 # for requests that are queued (connect()-ed, but haven't been 598 # for requests that are queued (connect()-ed, but haven't been
603 # https://instagram-engineering.com/ 632 # https://instagram-engineering.com/
604 # copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf 633 # copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf
605 pid = os.fork() 634 pid = os.fork()
606 if pid: 635 if pid:
607 try: 636 try:
608 self.ui.log(b'cmdserver', b'forked worker process (pid=%d)\n', 637 self.ui.log(
609 pid) 638 b'cmdserver', b'forked worker process (pid=%d)\n', pid
639 )
610 self._workerpids.add(pid) 640 self._workerpids.add(pid)
611 h.newconnection() 641 h.newconnection()
612 finally: 642 finally:
613 conn.close() # release handle in parent process 643 conn.close() # release handle in parent process
614 else: 644 else:
660 def _runworker(self, conn): 690 def _runworker(self, conn):
661 signal.signal(signal.SIGCHLD, self._oldsigchldhandler) 691 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
662 _initworkerprocess() 692 _initworkerprocess()
663 h = self._servicehandler 693 h = self._servicehandler
664 try: 694 try:
665 _serverequest(self.ui, self.repo, conn, h.createcmdserver, 695 _serverequest(
666 prereposetups=[self._reposetup]) 696 self.ui,
697 self.repo,
698 conn,
699 h.createcmdserver,
700 prereposetups=[self._reposetup],
701 )
667 finally: 702 finally:
668 gc.collect() # trigger __del__ since worker process uses os._exit 703 gc.collect() # trigger __del__ since worker process uses os._exit
669 704
670 def _reposetup(self, ui, repo): 705 def _reposetup(self, ui, repo):
671 if not repo.local(): 706 if not repo.local():
675 def close(self): 710 def close(self):
676 super(unixcmdserverrepo, self).close() 711 super(unixcmdserverrepo, self).close()
677 try: 712 try:
678 self._cmdserveripc.send(self.root) 713 self._cmdserveripc.send(self.root)
679 except socket.error: 714 except socket.error:
680 self.ui.log(b'cmdserver', 715 self.ui.log(
681 b'failed to send repo root to master\n') 716 b'cmdserver', b'failed to send repo root to master\n'
717 )
682 718
683 repo.__class__ = unixcmdserverrepo 719 repo.__class__ = unixcmdserverrepo
684 repo._cmdserveripc = self._workeripc 720 repo._cmdserveripc = self._workeripc
685 721
686 cachedrepo = self._repoloader.get(repo.root) 722 cachedrepo = self._repoloader.get(repo.root)