mercurial/worker.py
branchstable
changeset 44248 12491abf93bd
parent 44226 cb52e619c99e
child 45376 26eb62bd0550
--- a/mercurial/worker.py	Thu Jan 02 11:04:18 2020 -0800
+++ b/mercurial/worker.py	Tue Feb 04 22:07:36 2020 +0100
@@ -65,6 +65,41 @@
     return min(max(countcpus(), 4), 32)
 
 
+if pycompat.ispy3:
+
+    class _blockingreader(object):
+        def __init__(self, wrapped):
+            self._wrapped = wrapped
+
+        def __getattr__(self, attr):
+            return getattr(self._wrapped, attr)
+
+        # issue multiple reads until size is fulfilled
+        def read(self, size=-1):
+            if size < 0:
+                return self._wrapped.readall()
+
+            buf = bytearray(size)
+            view = memoryview(buf)
+            pos = 0
+
+            while pos < size:
+                ret = self._wrapped.readinto(view[pos:])
+                if not ret:
+                    break
+                pos += ret
+
+            del view
+            del buf[pos:]
+            return buf
+
+
+else:
+
+    def _blockingreader(wrapped):
+        return wrapped
+
+
 if pycompat.isposix or pycompat.iswindows:
     _STARTUP_COST = 0.01
     # The Windows worker is thread based. If tasks are CPU bound, threads
@@ -226,7 +261,7 @@
     selector = selectors.DefaultSelector()
     for rfd, wfd in pipes:
         os.close(wfd)
-        selector.register(os.fdopen(rfd, 'rb'), selectors.EVENT_READ)
+        selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
 
     def cleanup():
         signal.signal(signal.SIGINT, oldhandler)
@@ -240,7 +275,7 @@
         while openpipes > 0:
             for key, events in selector.select():
                 try:
-                    res = util.pickle.load(key.fileobj)
+                    res = util.pickle.load(_blockingreader(key.fileobj))
                     if hasretval and res[0]:
                         retval.update(res[1])
                     else: