Test PopenProcess.
authorZac Medico <zmedico@gentoo.org>
Tue, 16 Oct 2012 19:41:00 +0000 (12:41 -0700)
committerZac Medico <zmedico@gentoo.org>
Tue, 16 Oct 2012 19:41:00 +0000 (12:41 -0700)
pym/portage/tests/process/test_PopenProcess.py [new file with mode: 0644]

diff --git a/pym/portage/tests/process/test_PopenProcess.py b/pym/portage/tests/process/test_PopenProcess.py
new file mode 100644 (file)
index 0000000..e7654fc
--- /dev/null
@@ -0,0 +1,48 @@
+# Copyright 2012 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import subprocess
+
+from portage import os
+from portage.tests import TestCase
+from portage.util._async.PopenProcess import PopenProcess
+from portage.util._eventloop.global_event_loop import global_event_loop
+from _emerge.PipeReader import PipeReader
+
+class PopenPipeReaderTestCase(TestCase):
+       """
+       Test PopenProcess, which can be useful for Jython support, since it
+       uses the subprocess.Popen instead of os.fork().
+       """
+
+       _echo_cmd = "echo -n '%s'"
+
+       def _testPipeReader(self, test_string):
+               """
+               Use a poll loop to read data from a pipe and assert that
+               the data written to the pipe is identical to the data
+               read from the pipe.
+               """
+
+               producer = PopenProcess(proc=subprocess.Popen(
+                       ["bash", "-c", self._echo_cmd % test_string],
+                       stdout=subprocess.PIPE, stderr=subprocess.STDOUT),
+                       pipe_reader=PipeReader(), scheduler=global_event_loop())
+
+               consumer = producer.pipe_reader
+               consumer.input_files = {"producer" : producer.proc.stdout}
+
+               producer.start()
+               producer.wait()
+
+               self.assertEqual(producer.returncode, os.EX_OK)
+               self.assertEqual(consumer.returncode, os.EX_OK)
+
+               return consumer.getvalue().decode('ascii', 'replace')
+
+       def testPipeReader(self):
+               for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14):
+                       test_string = x * "a"
+                       output = self._testPipeReader(test_string)
+                       self.assertEqual(test_string, output,
+                               "x = %s, len(output) = %s" % (x, len(output)))