SpawnProcess: work around array.fromfile() bugs
authorZac Medico <zmedico@gentoo.org>
Fri, 16 Dec 2011 02:02:32 +0000 (18:02 -0800)
committerZac Medico <zmedico@gentoo.org>
Fri, 16 Dec 2011 02:02:32 +0000 (18:02 -0800)
When I extended test_poll to test SpawnProcess array.fromfile() usage,
it exposed bugs in array.fromfile() that I couldn't find a way to
handle. So, use os.read() instead.

pym/_emerge/AbstractPollTask.py
pym/_emerge/SpawnProcess.py
pym/portage/tests/process/test_poll.py

index f7f3a9526d03cc3cf3be647f97d2981948352964..ea13587bc240f5b635fa810a00498062d374268d 100644 (file)
@@ -2,7 +2,9 @@
 # Distributed under the terms of the GNU General Public License v2
 
 import array
+import errno
 import logging
+import os
 
 from portage.util import writemsg_level
 from _emerge.AsynchronousTask import AsynchronousTask
@@ -20,7 +22,7 @@ class AbstractPollTask(AsynchronousTask):
        def isAlive(self):
                return bool(self._registered)
 
-       def _read_buf(self, f, event):
+       def _read_buf(self, fd, event):
                """
                | POLLIN | RETURN
                | BIT    | VALUE
@@ -32,13 +34,26 @@ class AbstractPollTask(AsynchronousTask):
                | ---------------------------------------------------
                | 0      | None
                """
+               # NOTE: array.fromfile() is no longer used here because it has
+               # bugs in all known versions of Python (including Python 2.7
+               # and Python 3.2).
                buf = None
                if event & PollConstants.POLLIN:
                        buf = array.array('B')
                        try:
-                               buf.fromfile(f, self._bufsize)
-                       except (EOFError, IOError):
-                               pass
+                               # Python >=3.2
+                               frombytes = buf.frombytes
+                       except AttributeError:
+                               frombytes = buf.fromstring
+                       try:
+                               frombytes(os.read(fd, self._bufsize))
+                       except OSError as e:
+                               # EIO happens with pty on Linux after the
+                               # slave end of the pty has been closed.
+                               if e.errno not in (errno.EAGAIN, errno.EIO):
+                                       raise
+                               buf = None
+
                return buf
 
        def _unregister(self):
index 84493fe4206a6d614e7ae88814851f3ff8a93137..c2f4928734c7e6b9d7ec0e00dff5c4f50c525459 100644 (file)
@@ -165,11 +165,20 @@ class SpawnProcess(SubProcess):
        def _output_handler(self, fd, event):
 
                files = self._files
-               buf = self._read_buf(files.process, event)
+               while True:
+                       buf = self._read_buf(fd, event)
 
-               if buf is not None:
+                       if buf is None:
+                               # not a POLLIN event, EAGAIN, etc...
+                               break
 
-                       if buf:
+                       if not buf:
+                               # EOF
+                               self._unregister()
+                               self.wait()
+                               break
+
+                       else:
                                if not self.background:
                                        write_successful = False
                                        failures = 0
@@ -217,9 +226,6 @@ class SpawnProcess(SubProcess):
                                                data = buf.tostring()
                                        files.log.write(data)
                                files.log.flush()
-                       else:
-                               self._unregister()
-                               self.wait()
 
                self._unregister_if_appropriate(event)
 
@@ -230,15 +236,18 @@ class SpawnProcess(SubProcess):
                monitor the process from inside a poll() loop.
                """
 
-               buf = self._read_buf(self._files.process, event)
+               while True:
+                       buf = self._read_buf(fd, event)
 
-               if buf is not None:
+                       if buf is None:
+                               # not a POLLIN event, EAGAIN, etc...
+                               break
 
-                       if buf:
-                               pass
-                       else:
+                       if not buf:
+                               # EOF
                                self._unregister()
                                self.wait()
+                               break
 
                self._unregister_if_appropriate(event)
 
index e7a47028fa5f28874b6d290b4095619a9381aa72..9b1f9cb5553d0ffdf3b528bbc9d9d32f7ac42751 100644 (file)
@@ -1,6 +1,8 @@
 # Copyright 1998-2011 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
+import tempfile
+
 from portage import os
 from portage.tests import TestCase
 from portage.util._pty import _create_pty_or_pipe
@@ -8,6 +10,13 @@ from _emerge.PollScheduler import PollScheduler
 from _emerge.PipeReader import PipeReader
 from _emerge.SpawnProcess import SpawnProcess
 
+class _SpawnProcessPty(SpawnProcess):
+       __slots__ = ("got_pty",)
+       def _pipe(self, fd_pipes):
+               got_pty, master_fd, slave_fd = _create_pty_or_pipe()
+               self.got_pty = got_pty
+               return (master_fd, slave_fd)
+
 class PipeReaderTestCase(TestCase):
 
        def _testPipeReader(self, test_string, use_pty):
@@ -49,10 +58,57 @@ class PipeReaderTestCase(TestCase):
                output = consumer.getvalue().decode('ascii', 'replace')
                return (output, got_pty)
 
+       def _testPipeReaderArray(self, test_string, use_pty):
+               """
+               Use a poll loop to read data from a pipe and assert that
+               the data written to the pipe is identical to the data
+               read from the pipe.
+               """
+
+               scheduler = PollScheduler().sched_iface
+               if use_pty:
+                       spawn_process = _SpawnProcessPty
+               else:
+                       spawn_process = SpawnProcess
+
+               fd, logfile = tempfile.mkstemp()
+               os.close(fd)
+               producer = spawn_process(
+                       background=True,
+                       args=["bash", "-c", "echo -n '%s'" % test_string],
+                       env=os.environ,
+                       scheduler=scheduler, logfile=logfile)
+
+               try:
+                       producer.start()
+                       scheduler.schedule()
+                       self.assertEqual(producer.returncode, os.EX_OK)
+
+                       if use_pty:
+                               got_pty = producer.got_pty
+                       else:
+                               got_pty = False
+
+                       with open(logfile, 'rb') as f:
+                               output = f.read().decode('ascii')
+                       return (output, got_pty)
+               finally:
+                       try:
+                               os.unlink(logfile)
+                       except OSError:
+                               pass
+
        def testPipeReader(self):
                for use_pty in (False, True):
-                       for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14):
-                               test_string = x * "a"
-                               output, got_pty = self._testPipeReader(test_string, use_pty)
-                               self.assertEqual(test_string, output,
-                                       "x = %s, use_pty = %s, got_pty = %s" % (x, use_pty, got_pty))
+                       for use_array in (False, True):
+                               for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14):
+                                       test_string = x * "a"
+                                       if use_array:
+                                               method = self._testPipeReaderArray
+                                       else:
+                                               method = self._testPipeReader
+                                       output, got_pty = method(test_string, use_pty)
+                                       self.assertEqual(test_string, output,
+                                               "x = %s, len(output) = %s, use_array = %s, "
+                                               "use_pty = %s, got_pty = %s" %
+                                               (x, len(output), use_array, use_pty, got_pty))