--- a/hgext/fsmonitor/pywatchman/__init__.py Thu Dec 22 11:07:59 2016 -0800
+++ b/hgext/fsmonitor/pywatchman/__init__.py Thu Dec 22 11:22:32 2016 -0800
@@ -26,9 +26,14 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+# no unicode literals
+
+import inspect
+import math
import os
-import errno
-import math
import socket
import subprocess
import time
@@ -36,11 +41,20 @@
# Sometimes it's really hard to get Python extensions to compile,
# so fall back to a pure Python implementation.
try:
- import bser
+ from . import bser
+ # Demandimport causes modules to be loaded lazily. Force the load now
+ # so that we can fall back on pybser if bser doesn't exist
+ bser.pdu_info
except ImportError:
- import pybser as bser
+ from . import pybser as bser
-import capabilities
+from . import (
+ capabilities,
+ compat,
+ encoding,
+ load,
+)
+
if os.name == 'nt':
import ctypes
@@ -55,18 +69,29 @@
FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100
FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200
+ WAIT_FAILED = 0xFFFFFFFF
WAIT_TIMEOUT = 0x00000102
WAIT_OBJECT_0 = 0x00000000
- ERROR_IO_PENDING = 997
+ WAIT_IO_COMPLETION = 0x000000C0
+ INFINITE = 0xFFFFFFFF
+
+ # Overlapped I/O operation is in progress. (997)
+ ERROR_IO_PENDING = 0x000003E5
+
+ # The pointer size follows the architecture
+ # We use WPARAM since this type is already conditionally defined
+ ULONG_PTR = ctypes.wintypes.WPARAM
class OVERLAPPED(ctypes.Structure):
_fields_ = [
- ("Internal", wintypes.ULONG), ("InternalHigh", wintypes.ULONG),
+ ("Internal", ULONG_PTR), ("InternalHigh", ULONG_PTR),
("Offset", wintypes.DWORD), ("OffsetHigh", wintypes.DWORD),
("hEvent", wintypes.HANDLE)
]
def __init__(self):
+ self.Internal = 0
+ self.InternalHigh = 0
self.Offset = 0
self.OffsetHigh = 0
self.hEvent = 0
@@ -97,6 +122,10 @@
GetLastError.argtypes = []
GetLastError.restype = wintypes.DWORD
+ SetLastError = ctypes.windll.kernel32.SetLastError
+ SetLastError.argtypes = [wintypes.DWORD]
+ SetLastError.restype = None
+
FormatMessage = ctypes.windll.kernel32.FormatMessageA
FormatMessage.argtypes = [wintypes.DWORD, wintypes.LPVOID, wintypes.DWORD,
wintypes.DWORD, ctypes.POINTER(wintypes.LPSTR),
@@ -105,12 +134,30 @@
LocalFree = ctypes.windll.kernel32.LocalFree
- GetOverlappedResultEx = ctypes.windll.kernel32.GetOverlappedResultEx
- GetOverlappedResultEx.argtypes = [wintypes.HANDLE,
- ctypes.POINTER(OVERLAPPED), LPDWORD,
- wintypes.DWORD, wintypes.BOOL]
- GetOverlappedResultEx.restype = wintypes.BOOL
+ GetOverlappedResult = ctypes.windll.kernel32.GetOverlappedResult
+ GetOverlappedResult.argtypes = [wintypes.HANDLE,
+ ctypes.POINTER(OVERLAPPED), LPDWORD,
+ wintypes.BOOL]
+ GetOverlappedResult.restype = wintypes.BOOL
+ GetOverlappedResultEx = getattr(ctypes.windll.kernel32,
+ 'GetOverlappedResultEx', None)
+ if GetOverlappedResultEx is not None:
+ GetOverlappedResultEx.argtypes = [wintypes.HANDLE,
+ ctypes.POINTER(OVERLAPPED), LPDWORD,
+ wintypes.DWORD, wintypes.BOOL]
+ GetOverlappedResultEx.restype = wintypes.BOOL
+
+ WaitForSingleObjectEx = ctypes.windll.kernel32.WaitForSingleObjectEx
+ WaitForSingleObjectEx.argtypes = [wintypes.HANDLE, wintypes.DWORD, wintypes.BOOL]
+ WaitForSingleObjectEx.restype = wintypes.DWORD
+
+ CreateEvent = ctypes.windll.kernel32.CreateEventA
+ CreateEvent.argtypes = [LPDWORD, wintypes.BOOL, wintypes.BOOL,
+ wintypes.LPSTR]
+ CreateEvent.restype = wintypes.HANDLE
+
+ # Windows Vista is the minimum supported client for CancelIoEx.
CancelIoEx = ctypes.windll.kernel32.CancelIoEx
CancelIoEx.argtypes = [wintypes.HANDLE, ctypes.POINTER(OVERLAPPED)]
CancelIoEx.restype = wintypes.BOOL
@@ -132,8 +179,47 @@
pass
+def _win32_strerror(err):
+ """ expand a win32 error code into a human readable message """
+
+ # FormatMessage will allocate memory and assign it here
+ buf = ctypes.c_char_p()
+ FormatMessage(
+ FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER
+ | FORMAT_MESSAGE_IGNORE_INSERTS, None, err, 0, buf, 0, None)
+ try:
+ return buf.value
+ finally:
+ LocalFree(buf)
+
+
class WatchmanError(Exception):
- pass
+ def __init__(self, msg=None, cmd=None):
+ self.msg = msg
+ self.cmd = cmd
+
+ def setCommand(self, cmd):
+ self.cmd = cmd
+
+ def __str__(self):
+ if self.cmd:
+ return '%s, while executing %s' % (self.msg, self.cmd)
+ return self.msg
+
+
+class WatchmanEnvironmentError(WatchmanError):
+ def __init__(self, msg, errno, errmsg, cmd=None):
+ super(WatchmanEnvironmentError, self).__init__(
+ '{0}: errno={1} errmsg={2}'.format(msg, errno, errmsg),
+ cmd)
+
+
+class SocketConnectError(WatchmanError):
+ def __init__(self, sockpath, exc):
+ super(SocketConnectError, self).__init__(
+ 'unable to connect to %s: %s' % (sockpath, exc))
+ self.sockpath = sockpath
+ self.exc = exc
class SocketTimeout(WatchmanError):
@@ -151,19 +237,11 @@
self.msg is the message returned by watchman.
"""
-
def __init__(self, msg, cmd=None):
- self.msg = msg
- self.cmd = cmd
- super(CommandError, self).__init__('watchman command error: %s' % msg)
-
- def setCommand(self, cmd):
- self.cmd = cmd
-
- def __str__(self):
- if self.cmd:
- return '%s, while executing %s' % (self.msg, self.cmd)
- return self.msg
+ super(CommandError, self).__init__(
+ 'watchman command error: %s' % (msg, ),
+ cmd,
+ )
class Transport(object):
@@ -195,16 +273,16 @@
# Buffer may already have a line if we've received unilateral
# response(s) from the server
- if len(self.buf) == 1 and "\n" in self.buf[0]:
- (line, b) = self.buf[0].split("\n", 1)
+ if len(self.buf) == 1 and b"\n" in self.buf[0]:
+ (line, b) = self.buf[0].split(b"\n", 1)
self.buf = [b]
return line
while True:
b = self.readBytes(4096)
- if "\n" in b:
- result = ''.join(self.buf)
- (line, b) = b.split("\n", 1)
+ if b"\n" in b:
+ result = b''.join(self.buf)
+ (line, b) = b.split(b"\n", 1)
self.buf = [b]
return result + line
self.buf.append(b)
@@ -241,8 +319,8 @@
sock.connect(self.sockpath)
self.sock = sock
except socket.error as e:
- raise WatchmanError('unable to connect to %s: %s' %
- (self.sockpath, e))
+ sock.close()
+ raise SocketConnectError(self.sockpath, e)
def close(self):
self.sock.close()
@@ -268,6 +346,46 @@
raise SocketTimeout('timed out sending query command')
+def _get_overlapped_result_ex_impl(pipe, olap, nbytes, millis, alertable):
+ """ Windows 7 and earlier does not support GetOverlappedResultEx. The
+ alternative is to use GetOverlappedResult and wait for read or write
+ operation to complete. This is done be using CreateEvent and
+ WaitForSingleObjectEx. CreateEvent, WaitForSingleObjectEx
+ and GetOverlappedResult are all part of Windows API since WindowsXP.
+ This is the exact same implementation that can be found in the watchman
+ source code (see get_overlapped_result_ex_impl in stream_win.c). This
+ way, maintenance should be simplified.
+ """
+ log('Preparing to wait for maximum %dms', millis )
+ if millis != 0:
+ waitReturnCode = WaitForSingleObjectEx(olap.hEvent, millis, alertable)
+ if waitReturnCode == WAIT_OBJECT_0:
+ # Event is signaled, overlapped IO operation result should be available.
+ pass
+ elif waitReturnCode == WAIT_IO_COMPLETION:
+ # WaitForSingleObjectEx returnes because the system added an I/O completion
+ # routine or an asynchronous procedure call (APC) to the thread queue.
+ SetLastError(WAIT_IO_COMPLETION)
+ pass
+ elif waitReturnCode == WAIT_TIMEOUT:
+ # We reached the maximum allowed wait time, the IO operation failed
+ # to complete in timely fashion.
+ SetLastError(WAIT_TIMEOUT)
+ return False
+ elif waitReturnCode == WAIT_FAILED:
+ # something went wrong calling WaitForSingleObjectEx
+ err = GetLastError()
+ log('WaitForSingleObjectEx failed: %s', _win32_strerror(err))
+ return False
+ else:
+ # unexpected situation deserving investigation.
+ err = GetLastError()
+ log('Unexpected error: %s', _win32_strerror(err))
+ return False
+
+ return GetOverlappedResult(pipe, olap, nbytes, False)
+
+
class WindowsNamedPipeTransport(Transport):
""" connect to a named pipe """
@@ -284,28 +402,35 @@
self._raise_win_err('failed to open pipe %s' % sockpath,
GetLastError())
- def _win32_strerror(self, err):
- """ expand a win32 error code into a human readable message """
+ # event for the overlapped I/O operations
+ self._waitable = CreateEvent(None, True, False, None)
+ if self._waitable is None:
+ self._raise_win_err('CreateEvent failed', GetLastError())
- # FormatMessage will allocate memory and assign it here
- buf = ctypes.c_char_p()
- FormatMessage(
- FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER
- | FORMAT_MESSAGE_IGNORE_INSERTS, None, err, 0, buf, 0, None)
- try:
- return buf.value
- finally:
- LocalFree(buf)
+ self._get_overlapped_result_ex = GetOverlappedResultEx
+ if (os.getenv('WATCHMAN_WIN7_COMPAT') == '1' or
+ self._get_overlapped_result_ex is None):
+ self._get_overlapped_result_ex = _get_overlapped_result_ex_impl
def _raise_win_err(self, msg, err):
raise IOError('%s win32 error code: %d %s' %
- (msg, err, self._win32_strerror(err)))
+ (msg, err, _win32_strerror(err)))
def close(self):
if self.pipe:
+ log('Closing pipe')
CloseHandle(self.pipe)
self.pipe = None
+ if self._waitable is not None:
+ # We release the handle for the event
+ CloseHandle(self._waitable)
+ self._waitable = None
+
+ def setTimeout(self, value):
+ # convert to milliseconds
+ self.timeout = int(value * 1000)
+
def readBytes(self, size):
""" A read can block for an unbounded amount of time, even if the
kernel reports that the pipe handle is signalled, so we need to
@@ -325,6 +450,7 @@
# We need to initiate a read
buf = ctypes.create_string_buffer(size)
olap = OVERLAPPED()
+ olap.hEvent = self._waitable
log('made read buff of size %d', size)
@@ -339,8 +465,9 @@
GetLastError())
nread = wintypes.DWORD()
- if not GetOverlappedResultEx(self.pipe, olap, nread,
- 0 if immediate else self.timeout, True):
+ if not self._get_overlapped_result_ex(self.pipe, olap, nread,
+ 0 if immediate else self.timeout,
+ True):
err = GetLastError()
CancelIoEx(self.pipe, olap)
@@ -374,6 +501,8 @@
def write(self, data):
olap = OVERLAPPED()
+ olap.hEvent = self._waitable
+
immediate = WriteFile(self.pipe, ctypes.c_char_p(data), len(data),
None, olap)
@@ -385,8 +514,10 @@
# Obtain results, waiting if needed
nwrote = wintypes.DWORD()
- if GetOverlappedResultEx(self.pipe, olap, nwrote, 0 if immediate else
- self.timeout, True):
+ if self._get_overlapped_result_ex(self.pipe, olap, nwrote,
+ 0 if immediate else self.timeout,
+ True):
+ log('made write of %d bytes', nwrote.value)
return nwrote.value
err = GetLastError()
@@ -430,7 +561,10 @@
def close(self):
if self.proc:
- self.proc.kill()
+ if self.proc.pid is not None:
+ self.proc.kill()
+ self.proc.stdin.close()
+ self.proc.stdout.close()
self.proc = None
def _connect(self):
@@ -438,7 +572,7 @@
return self.proc
args = [
'watchman',
- '--sockname={}'.format(self.sockpath),
+ '--sockname={0}'.format(self.sockpath),
'--logfile=/BOGUS',
'--statefile=/BOGUS',
'--no-spawn',
@@ -460,8 +594,8 @@
def write(self, data):
if self.closed:
+ self.close()
self.closed = False
- self.proc = None
self._connect()
res = self.proc.stdin.write(data)
self.proc.stdin.close()
@@ -473,21 +607,21 @@
""" use the BSER encoding. This is the default, preferred codec """
def _loads(self, response):
- return bser.loads(response)
+ return bser.loads(response) # Defaults to BSER v1
def receive(self):
buf = [self.transport.readBytes(sniff_len)]
if not buf[0]:
raise WatchmanError('empty watchman response')
- elen = bser.pdu_len(buf[0])
+ _1, _2, elen = bser.pdu_info(buf[0])
rlen = len(buf[0])
while elen > rlen:
buf.append(self.transport.readBytes(elen - rlen))
rlen += len(buf[-1])
- response = ''.join(buf)
+ response = b''.join(buf)
try:
res = self._loads(response)
return res
@@ -495,7 +629,7 @@
raise WatchmanError('watchman response decode error: %s' % e)
def send(self, *args):
- cmd = bser.dumps(*args)
+ cmd = bser.dumps(*args) # Defaults to BSER v1
self.transport.write(cmd)
@@ -504,7 +638,64 @@
immutable object support """
def _loads(self, response):
- return bser.loads(response, False)
+ return bser.loads(response, False) # Defaults to BSER v1
+
+
+class Bser2WithFallbackCodec(BserCodec):
+ """ use BSER v2 encoding """
+
+ def __init__(self, transport):
+ super(Bser2WithFallbackCodec, self).__init__(transport)
+ # Once the server advertises support for bser-v2 we should switch this
+ # to 'required' on Python 3.
+ self.send(["version", {"optional": ["bser-v2"]}])
+
+ capabilities = self.receive()
+
+ if 'error' in capabilities:
+ raise Exception('Unsupported BSER version')
+
+ if capabilities['capabilities']['bser-v2']:
+ self.bser_version = 2
+ self.bser_capabilities = 0
+ else:
+ self.bser_version = 1
+ self.bser_capabilities = 0
+
+ def _loads(self, response):
+ return bser.loads(response)
+
+ def receive(self):
+ buf = [self.transport.readBytes(sniff_len)]
+ if not buf[0]:
+ raise WatchmanError('empty watchman response')
+
+ recv_bser_version, recv_bser_capabilities, elen = bser.pdu_info(buf[0])
+
+ if hasattr(self, 'bser_version'):
+ # Readjust BSER version and capabilities if necessary
+ self.bser_version = max(self.bser_version, recv_bser_version)
+ self.capabilities = self.bser_capabilities & recv_bser_capabilities
+
+ rlen = len(buf[0])
+ while elen > rlen:
+ buf.append(self.transport.readBytes(elen - rlen))
+ rlen += len(buf[-1])
+
+ response = b''.join(buf)
+ try:
+ res = self._loads(response)
+ return res
+ except ValueError as e:
+ raise WatchmanError('watchman response decode error: %s' % e)
+
+ def send(self, *args):
+ if hasattr(self, 'bser_version'):
+ cmd = bser.dumps(*args, version=self.bser_version,
+ capabilities=self.bser_capabilities)
+ else:
+ cmd = bser.dumps(*args)
+ self.transport.write(cmd)
class JsonCodec(Codec):
@@ -520,6 +711,13 @@
def receive(self):
line = self.transport.readLine()
try:
+ # In Python 3, json.loads is a transformation from Unicode string to
+ # objects possibly containing Unicode strings. We typically expect
+ # the JSON blob to be ASCII-only with non-ASCII characters escaped,
+ # but it's possible we might get non-ASCII bytes that are valid
+ # UTF-8.
+ if compat.PYTHON3:
+ line = line.decode('utf-8')
return self.json.loads(line)
except Exception as e:
print(e, line)
@@ -527,7 +725,12 @@
def send(self, *args):
cmd = self.json.dumps(*args)
- self.transport.write(cmd + "\n")
+ # In Python 3, json.dumps is a transformation from objects possibly
+ # containing Unicode strings to Unicode string. Even with (the default)
+ # ensure_ascii=True, dumps returns a Unicode string.
+ if compat.PYTHON3:
+ cmd = cmd.encode('ascii')
+ self.transport.write(cmd + b"\n")
class client(object):
@@ -556,22 +759,27 @@
self.timeout = timeout
self.useImmutableBser = useImmutableBser
- transport = transport or os.getenv('WATCHMAN_TRANSPORT') or 'local'
- if transport == 'local' and os.name == 'nt':
- self.transport = WindowsNamedPipeTransport
- elif transport == 'local':
- self.transport = UnixSocketTransport
- elif transport == 'cli':
- self.transport = CLIProcessTransport
- if sendEncoding is None:
- sendEncoding = 'json'
- if recvEncoding is None:
- recvEncoding = sendEncoding
+ if inspect.isclass(transport) and issubclass(transport, Transport):
+ self.transport = transport
else:
- raise WatchmanError('invalid transport %s' % transport)
+ transport = transport or os.getenv('WATCHMAN_TRANSPORT') or 'local'
+ if transport == 'local' and os.name == 'nt':
+ self.transport = WindowsNamedPipeTransport
+ elif transport == 'local':
+ self.transport = UnixSocketTransport
+ elif transport == 'cli':
+ self.transport = CLIProcessTransport
+ if sendEncoding is None:
+ sendEncoding = 'json'
+ if recvEncoding is None:
+ recvEncoding = sendEncoding
+ else:
+ raise WatchmanError('invalid transport %s' % transport)
- sendEncoding = sendEncoding or os.getenv('WATCHMAN_ENCODING') or 'bser'
- recvEncoding = recvEncoding or os.getenv('WATCHMAN_ENCODING') or 'bser'
+ sendEncoding = str(sendEncoding or os.getenv('WATCHMAN_ENCODING') or
+ 'bser')
+ recvEncoding = str(recvEncoding or os.getenv('WATCHMAN_ENCODING') or
+ 'bser')
self.recvCodec = self._parseEncoding(recvEncoding)
self.sendCodec = self._parseEncoding(sendEncoding)
@@ -581,6 +789,8 @@
if self.useImmutableBser:
return ImmutableBserCodec
return BserCodec
+ elif enc == 'experimental-bser-v2':
+ return Bser2WithFallbackCodec
elif enc == 'json':
return JsonCodec
else:
@@ -600,10 +810,20 @@
cmd = ['watchman', '--output-encoding=bser', 'get-sockname']
try:
- p = subprocess.Popen(cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- close_fds=os.name != 'nt')
+ args = dict(stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ close_fds=os.name != 'nt')
+
+ if os.name == 'nt':
+ # if invoked via an application with graphical user interface,
+ # this call will cause a brief command window pop-up.
+ # Using the flag STARTF_USESHOWWINDOW to avoid this behavior.
+ startupinfo = subprocess.STARTUPINFO()
+ startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
+ args['startupinfo'] = startupinfo
+
+ p = subprocess.Popen(cmd, **args)
+
except OSError as e:
raise WatchmanError('"watchman" executable not in PATH (%s)', e)
@@ -614,10 +834,10 @@
raise WatchmanError("watchman exited with code %d" % exitcode)
result = bser.loads(stdout)
- if 'error' in result:
+ if b'error' in result:
raise WatchmanError('get-sockname error: %s' % result['error'])
- return result['sockname']
+ return result[b'sockname']
def _connect(self):
""" establish transport connection """
@@ -660,10 +880,16 @@
self._connect()
result = self.recvConn.receive()
if self._hasprop(result, 'error'):
- raise CommandError(result['error'])
+ error = result['error']
+ if compat.PYTHON3 and isinstance(self.recvConn, BserCodec):
+ error = result['error'].decode('utf-8', 'surrogateescape')
+ raise CommandError(error)
if self._hasprop(result, 'log'):
- self.logs.append(result['log'])
+ log = result['log']
+ if compat.PYTHON3 and isinstance(self.recvConn, BserCodec):
+ log = log.decode('utf-8', 'surrogateescape')
+ self.logs.append(log)
if self._hasprop(result, 'subscription'):
sub = result['subscription']
@@ -682,6 +908,9 @@
return result
def isUnilateralResponse(self, res):
+ if 'unilateral' in res and res['unilateral']:
+ return True
+ # Fall back to checking for known unilateral responses
for k in self.unilateral:
if k in res:
return True
@@ -712,6 +941,13 @@
remove processing impacts both the unscoped and scoped stores
for the subscription data.
"""
+ if compat.PYTHON3 and issubclass(self.recvCodec, BserCodec):
+ # People may pass in Unicode strings here -- but currently BSER only
+ # returns bytestrings. Deal with that.
+ if isinstance(root, str):
+ root = encoding.encode_local(root)
+ if isinstance(name, str):
+ name = name.encode('utf-8')
if root is not None:
if not root in self.sub_by_root:
@@ -752,9 +988,17 @@
res = self.receive()
return res
- except CommandError as ex:
+ except EnvironmentError as ee:
+ # When we can depend on Python 3, we can use PEP 3134
+ # exception chaining here.
+ raise WatchmanEnvironmentError(
+ 'I/O error communicating with watchman daemon',
+ ee.errno,
+ ee.strerror,
+ args)
+ except WatchmanError as ex:
ex.setCommand(args)
- raise ex
+ raise
def capabilityCheck(self, optional=None, required=None):
""" Perform a server capability check """
@@ -775,5 +1019,3 @@
def setTimeout(self, value):
self.recvConn.setTimeout(value)
self.sendConn.setTimeout(value)
-
-# no-check-code -- this is a 3rd party library