Add PipeReaderBlockingIO, Jython experimentation.
authorZac Medico <zmedico@gentoo.org>
Thu, 27 Dec 2012 08:15:22 +0000 (00:15 -0800)
committerZac Medico <zmedico@gentoo.org>
Thu, 27 Dec 2012 08:17:37 +0000 (00:17 -0800)
.gitignore
pym/portage/tests/process/test_PopenProcessBlockingIO.py [new file with mode: 0644]
pym/portage/util/_async/PipeReaderBlockingIO.py [new file with mode: 0644]
pym/portage/util/_eventloop/EventLoop.py

index 8da61905af9e3b4612bad1ce9d0c1373b60cec33..2236c6379cb5bfc4eda22be355639fc410c23161 100644 (file)
@@ -1,2 +1,3 @@
 *.py[co]
+*.class
 /tags
diff --git a/pym/portage/tests/process/test_PopenProcessBlockingIO.py b/pym/portage/tests/process/test_PopenProcessBlockingIO.py
new file mode 100644 (file)
index 0000000..9cdad32
--- /dev/null
@@ -0,0 +1,62 @@
+# Copyright 2012 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import subprocess
+
+try:
+       import threading
+except ImportError:
+       threading = None
+
+from portage import os
+from portage.tests import TestCase
+from portage.util._async.PopenProcess import PopenProcess
+from portage.util._eventloop.global_event_loop import global_event_loop
+from portage.util._async.PipeReaderBlockingIO import PipeReaderBlockingIO
+
+class PopenPipeBlockingIOTestCase(TestCase):
+       """
+       Test PopenProcess, which can be useful for Jython support:
+               * use subprocess.Popen since Jython does not support os.fork()
+               * use blocking IO with threads, since Jython does not support
+                 fcntl non-blocking IO)
+       """
+
+       _echo_cmd = "echo -n '%s'"
+
+       def _testPipeReader(self, test_string):
+               """
+               Use a poll loop to read data from a pipe and assert that
+               the data written to the pipe is identical to the data
+               read from the pipe.
+               """
+
+               producer = PopenProcess(proc=subprocess.Popen(
+                       ["bash", "-c", self._echo_cmd % test_string],
+                       stdout=subprocess.PIPE, stderr=subprocess.STDOUT),
+                       pipe_reader=PipeReaderBlockingIO(), scheduler=global_event_loop())
+
+               consumer = producer.pipe_reader
+               consumer.input_files = {"producer" : producer.proc.stdout}
+
+               producer.start()
+               producer.wait()
+
+               self.assertEqual(producer.returncode, os.EX_OK)
+               self.assertEqual(consumer.returncode, os.EX_OK)
+
+               return consumer.getvalue().decode('ascii', 'replace')
+
+       def testPopenPipeBlockingIO(self):
+
+               if threading is None:
+                       skip_reason = "threading disabled"
+                       self.portage_skip = "threading disabled"
+                       self.assertFalse(True, skip_reason)
+                       return
+
+               for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14):
+                       test_string = x * "a"
+                       output = self._testPipeReader(test_string)
+                       self.assertEqual(test_string, output,
+                               "x = %s, len(output) = %s" % (x, len(output)))
diff --git a/pym/portage/util/_async/PipeReaderBlockingIO.py b/pym/portage/util/_async/PipeReaderBlockingIO.py
new file mode 100644 (file)
index 0000000..8ce2ec5
--- /dev/null
@@ -0,0 +1,87 @@
+# Copyright 2012 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import threading
+
+from portage import os
+from _emerge.AbstractPollTask import AbstractPollTask
+
+class PipeReaderBlockingIO(AbstractPollTask):
+       """
+       Reads output from one or more files and saves it in memory, for
+       retrieval via the getvalue() method. This is driven by a thread
+       for each input file, in order to support blocking IO.  This may
+       be useful for using threads to handle blocking IO with Jython,
+       since Jython lacks the fcntl module which is needed for
+       non-blocking IO (see http://bugs.jython.org/issue1074).
+       """
+
+       __slots__ = ("input_files", "_read_data", "_terminate",
+               "_threads", "_thread_rlock")
+
+       def _start(self):
+               self._terminate = threading.Event()
+               self._threads = {}
+               self._read_data = []
+
+               self._registered = True
+               self._thread_rlock = threading.RLock()
+               with self._thread_rlock:
+                       for f in self.input_files.values():
+                               t = threading.Thread(target=self._reader_thread, args=(f,))
+                               t.daemon = True
+                               t.start()
+                               self._threads[f] = t
+
+       def _reader_thread(self, f):
+               try:
+                       terminated = self._terminate.is_set
+               except AttributeError:
+                       # Jython 2.7.0a2
+                       terminated = self._terminate.isSet
+               bufsize = self._bufsize
+               while not terminated():
+                       buf = f.read(bufsize)
+                       with self._thread_rlock:
+                               if terminated():
+                                       break
+                               elif buf:
+                                       self._read_data.append(buf)
+                               else:
+                                       del self._threads[f]
+                                       if not self._threads:
+                                               # Thread-safe callback to EventLoop
+                                               self.scheduler.idle_add(self._eof)
+                                       break
+               f.close()
+
+       def _eof(self):
+               self._registered = False
+               if self.returncode is None:
+                       self.returncode = os.EX_OK
+               self.wait()
+               return False
+
+       def _cancel(self):
+               self._terminate.set()
+               self._registered = False
+               if self.returncode is None:
+                       self.returncode = self._cancelled_returncode
+               self.wait()
+
+       def _wait(self):
+               if self.returncode is not None:
+                       return self.returncode
+               self._wait_loop()
+               self.returncode = os.EX_OK
+               return self.returncode
+
+       def getvalue(self):
+               """Retrieve the entire contents"""
+               with self._thread_rlock:
+                       return b''.join(self._read_data)
+
+       def close(self):
+               """Free the memory buffer."""
+               with self._thread_rlock:
+                       self._read_data = None
index 37e600792f0e7976c6af6770e725d43420f6c643..efd1f13763e4d93514a5b4b39f9ec305add58fc8 100644 (file)
@@ -2,13 +2,18 @@
 # Distributed under the terms of the GNU General Public License v2
 
 import errno
-import fcntl
 import logging
 import os
 import select
 import signal
 import time
 
+try:
+       import fcntl
+except ImportError:
+       #  http://bugs.jython.org/issue1074
+       fcntl = None
+
 try:
        import threading
 except ImportError:
@@ -54,7 +59,7 @@ class EventLoop(object):
                        that global_event_loop does not need constructor arguments)
                @type main: bool
                """
-               self._use_signal = main
+               self._use_signal = main and fcntl is not None
                self._thread_rlock = threading.RLock()
                self._poll_event_queue = []
                self._poll_event_handlers = {}
@@ -524,7 +529,12 @@ def can_poll_device():
                return _can_poll_device
 
        p = select.poll()
-       p.register(dev_null.fileno(), PollConstants.POLLIN)
+       try:
+               p.register(dev_null.fileno(), PollConstants.POLLIN)
+       except TypeError:
+               # Jython: Object 'org.python.core.io.FileIO@f8f175' is not watchable
+               _can_poll_device = False
+               return _can_poll_device
 
        invalid_request = False
        for f, event in p.poll():