Mercurial > public > mercurial-scm > hg
comparison mercurial/util.py @ 36525:3158052720ae
util: enable observing of util.bufferedinputpipe
Our file object proxy is useful. But it doesn't capture all I/O.
The "os" module offers low-level interfaces to various system calls.
For example, os.read() exposes read(2) to read from a file
descriptor.
bufferedinputpipe is special in a few ways. First, it acts as a
proxy of sorts around our [potentially proxied] file object. In
addition, it uses os.read() to satisfy all I/O. This means that
our observer doesn't see notifications for reads on this type.
This is preventing us from properly instrumenting reads on ssh
peers.
This commit teaches bufferedinputpipe to be aware of our
observed file objects. We do this by introducing a class variation
that notifies our observer of os.read() events. Since read()
and readline() bypass os.read(), we also teach this instance
to notify the observer for buffered variations of these reads as
well. We don't report them as actual read() and readline() calls
because these methods are never called on the actual file object
but rather a buffered version of it.
We introduce bufferedinputpipe.__new__ to swap in the new class
if the passed file object is a fileobjectproxy. This makes hooking
up the observer automatic. And it is a zero cost abstraction for
I/O operations on non-proxied file objects.
Differential Revision: https://phab.mercurial-scm.org/D2404
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Sat, 24 Feb 2018 12:24:03 -0800 |
parents | bfe38f787d5b |
children | 0c2eeaca0577 |
comparison
equal
deleted
inserted
replaced
36524:bfe38f787d5b | 36525:3158052720ae |
---|---|
371 empty from the output (allowing collaboration of the buffer with polling). | 371 empty from the output (allowing collaboration of the buffer with polling). |
372 | 372 |
373 This class lives in the 'util' module because it makes use of the 'os' | 373 This class lives in the 'util' module because it makes use of the 'os' |
374 module from the python stdlib. | 374 module from the python stdlib. |
375 """ | 375 """ |
376 def __new__(cls, fh): | |
377 # If we receive a fileobjectproxy, we need to use a variation of this | |
378 # class that notifies observers about activity. | |
379 if isinstance(fh, fileobjectproxy): | |
380 cls = observedbufferedinputpipe | |
381 | |
382 return super(bufferedinputpipe, cls).__new__(cls) | |
376 | 383 |
377 def __init__(self, input): | 384 def __init__(self, input): |
378 self._input = input | 385 self._input = input |
379 self._buffer = [] | 386 self._buffer = [] |
380 self._eof = False | 387 self._eof = False |
451 self._eof = True | 458 self._eof = True |
452 else: | 459 else: |
453 self._lenbuf += len(data) | 460 self._lenbuf += len(data) |
454 self._buffer.append(data) | 461 self._buffer.append(data) |
455 | 462 |
463 return data | |
464 | |
456 def mmapread(fp): | 465 def mmapread(fp): |
457 try: | 466 try: |
458 fd = getattr(fp, 'fileno', lambda: fp)() | 467 fd = getattr(fp, 'fileno', lambda: fp)() |
459 return mmap.mmap(fd, 0, access=mmap.ACCESS_READ) | 468 return mmap.mmap(fd, 0, access=mmap.ACCESS_READ) |
460 except ValueError: | 469 except ValueError: |
503 object.__setattr__(self, '_orig', fh) | 512 object.__setattr__(self, '_orig', fh) |
504 object.__setattr__(self, '_observer', observer) | 513 object.__setattr__(self, '_observer', observer) |
505 | 514 |
506 def __getattribute__(self, name): | 515 def __getattribute__(self, name): |
507 ours = { | 516 ours = { |
517 r'_observer', | |
518 | |
508 # IOBase | 519 # IOBase |
509 r'close', | 520 r'close', |
510 # closed if a property | 521 # closed if a property |
511 r'fileno', | 522 r'fileno', |
512 r'flush', | 523 r'flush', |
637 | 648 |
638 def read1(self, *args, **kwargs): | 649 def read1(self, *args, **kwargs): |
639 return object.__getattribute__(self, r'_observedcall')( | 650 return object.__getattribute__(self, r'_observedcall')( |
640 r'read1', *args, **kwargs) | 651 r'read1', *args, **kwargs) |
641 | 652 |
653 class observedbufferedinputpipe(bufferedinputpipe): | |
654 """A variation of bufferedinputpipe that is aware of fileobjectproxy. | |
655 | |
656 ``bufferedinputpipe`` makes low-level calls to ``os.read()`` that | |
657 bypass ``fileobjectproxy``. Because of this, we need to make | |
658 ``bufferedinputpipe`` aware of these operations. | |
659 | |
660 This variation of ``bufferedinputpipe`` can notify observers about | |
661 ``os.read()`` events. It also re-publishes other events, such as | |
662 ``read()`` and ``readline()``. | |
663 """ | |
664 def _fillbuffer(self): | |
665 res = super(observedbufferedinputpipe, self)._fillbuffer() | |
666 | |
667 fn = getattr(self._input._observer, r'osread', None) | |
668 if fn: | |
669 fn(res, _chunksize) | |
670 | |
671 return res | |
672 | |
673 # We use different observer methods because the operation isn't | |
674 # performed on the actual file object but on us. | |
675 def read(self, size): | |
676 res = super(observedbufferedinputpipe, self).read(size) | |
677 | |
678 fn = getattr(self._input._observer, r'bufferedread', None) | |
679 if fn: | |
680 fn(res, size) | |
681 | |
682 return res | |
683 | |
684 def readline(self, *args, **kwargs): | |
685 res = super(observedbufferedinputpipe, self).readline(*args, **kwargs) | |
686 | |
687 fn = getattr(self._input._observer, r'bufferedreadline', None) | |
688 if fn: | |
689 fn(res) | |
690 | |
691 return res | |
692 | |
642 DATA_ESCAPE_MAP = {pycompat.bytechr(i): br'\x%02x' % i for i in range(256)} | 693 DATA_ESCAPE_MAP = {pycompat.bytechr(i): br'\x%02x' % i for i in range(256)} |
643 DATA_ESCAPE_MAP.update({ | 694 DATA_ESCAPE_MAP.update({ |
644 b'\\': b'\\\\', | 695 b'\\': b'\\\\', |
645 b'\r': br'\r', | 696 b'\r': br'\r', |
646 b'\n': br'\n', | 697 b'\n': br'\n', |
699 def flush(self, res): | 750 def flush(self, res): |
700 if not self.writes: | 751 if not self.writes: |
701 return | 752 return |
702 | 753 |
703 self.fh.write('%s> flush() -> %r\n' % (self.name, res)) | 754 self.fh.write('%s> flush() -> %r\n' % (self.name, res)) |
755 | |
756 # For observedbufferedinputpipe. | |
757 def bufferedread(self, res, size): | |
758 self.fh.write('%s> bufferedread(%d) -> %d' % ( | |
759 self.name, size, len(res))) | |
760 self._writedata(res) | |
761 | |
762 def bufferedreadline(self, res): | |
763 self.fh.write('%s> bufferedreadline() -> %d' % (self.name, len(res))) | |
764 self._writedata(res) | |
704 | 765 |
705 def makeloggingfileobject(logh, fh, name, reads=True, writes=True, | 766 def makeloggingfileobject(logh, fh, name, reads=True, writes=True, |
706 logdata=False): | 767 logdata=False): |
707 """Turn a file object into a logging file object.""" | 768 """Turn a file object into a logging file object.""" |
708 | 769 |