From 19c9cc88c3a3569e57eac5b846340141a42e991e Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Thu, 18 Oct 2012 19:18:14 -0700 Subject: [PATCH] Test PopenProcess + PipeLogger. --- .../tests/process/test_PopenProcess.py | 44 ++++++++++++++++++- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/pym/portage/tests/process/test_PopenProcess.py b/pym/portage/tests/process/test_PopenProcess.py index e7654fc8e..5ede164af 100644 --- a/pym/portage/tests/process/test_PopenProcess.py +++ b/pym/portage/tests/process/test_PopenProcess.py @@ -2,14 +2,16 @@ # Distributed under the terms of the GNU General Public License v2 import subprocess +import tempfile from portage import os from portage.tests import TestCase +from portage.util._async.PipeLogger import PipeLogger from portage.util._async.PopenProcess import PopenProcess from portage.util._eventloop.global_event_loop import global_event_loop from _emerge.PipeReader import PipeReader -class PopenPipeReaderTestCase(TestCase): +class PopenPipeTestCase(TestCase): """ Test PopenProcess, which can be useful for Jython support, since it uses the subprocess.Popen instead of os.fork(). @@ -40,9 +42,47 @@ class PopenPipeReaderTestCase(TestCase): return consumer.getvalue().decode('ascii', 'replace') - def testPipeReader(self): + def _testPipeLogger(self, test_string): + + producer = PopenProcess(proc=subprocess.Popen( + ["bash", "-c", self._echo_cmd % test_string], + stdout=subprocess.PIPE, stderr=subprocess.STDOUT), + scheduler=global_event_loop()) + + fd, log_file_path = tempfile.mkstemp() + try: + + consumer = PipeLogger(background=True, + input_fd=os.dup(producer.proc.stdout.fileno()), + log_file_path=log_file_path) + + # Close the stdout pipe, since we duplicated it, and it + # must be closed in order to avoid a ResourceWarning. + producer.proc.stdout.close() + producer.pipe_reader = consumer + + producer.start() + producer.wait() + + self.assertEqual(producer.returncode, os.EX_OK) + self.assertEqual(consumer.returncode, os.EX_OK) + + with open(log_file_path, 'rb') as f: + content = f.read() + + finally: + os.close(fd) + os.unlink(log_file_path) + + return content.decode('ascii', 'replace') + + def testPopenPipe(self): for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14): test_string = x * "a" output = self._testPipeReader(test_string) self.assertEqual(test_string, output, "x = %s, len(output) = %s" % (x, len(output))) + + output = self._testPipeLogger(test_string) + self.assertEqual(test_string, output, + "x = %s, len(output) = %s" % (x, len(output))) -- 2.26.2