Test PopenProcess + PipeLogger.
authorZac Medico <zmedico@gentoo.org>
Fri, 19 Oct 2012 02:18:14 +0000 (19:18 -0700)
committerZac Medico <zmedico@gentoo.org>
Fri, 19 Oct 2012 02:18:14 +0000 (19:18 -0700)
pym/portage/tests/process/test_PopenProcess.py

index e7654fc8ebc5152a483d7ca03e959afd2f1f126d..5ede164af920a58dc5f460f5b752a9c066c57886 100644 (file)
@@ -2,14 +2,16 @@
 # Distributed under the terms of the GNU General Public License v2
 
 import subprocess
+import tempfile
 
 from portage import os
 from portage.tests import TestCase
+from portage.util._async.PipeLogger import PipeLogger
 from portage.util._async.PopenProcess import PopenProcess
 from portage.util._eventloop.global_event_loop import global_event_loop
 from _emerge.PipeReader import PipeReader
 
-class PopenPipeReaderTestCase(TestCase):
+class PopenPipeTestCase(TestCase):
        """
        Test PopenProcess, which can be useful for Jython support, since it
        uses the subprocess.Popen instead of os.fork().
@@ -40,9 +42,47 @@ class PopenPipeReaderTestCase(TestCase):
 
                return consumer.getvalue().decode('ascii', 'replace')
 
-       def testPipeReader(self):
+       def _testPipeLogger(self, test_string):
+
+               producer = PopenProcess(proc=subprocess.Popen(
+                       ["bash", "-c", self._echo_cmd % test_string],
+                       stdout=subprocess.PIPE, stderr=subprocess.STDOUT),
+                       scheduler=global_event_loop())
+
+               fd, log_file_path = tempfile.mkstemp()
+               try:
+
+                       consumer = PipeLogger(background=True,
+                               input_fd=os.dup(producer.proc.stdout.fileno()),
+                               log_file_path=log_file_path)
+
+                       # Close the stdout pipe, since we duplicated it, and it
+                       # must be closed in order to avoid a ResourceWarning.
+                       producer.proc.stdout.close()
+                       producer.pipe_reader = consumer
+
+                       producer.start()
+                       producer.wait()
+
+                       self.assertEqual(producer.returncode, os.EX_OK)
+                       self.assertEqual(consumer.returncode, os.EX_OK)
+
+                       with open(log_file_path, 'rb') as f:
+                               content = f.read()
+
+               finally:
+                       os.close(fd)
+                       os.unlink(log_file_path)
+
+               return content.decode('ascii', 'replace')
+
+       def testPopenPipe(self):
                for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14):
                        test_string = x * "a"
                        output = self._testPipeReader(test_string)
                        self.assertEqual(test_string, output,
                                "x = %s, len(output) = %s" % (x, len(output)))
+
+                       output = self._testPipeLogger(test_string)
+                       self.assertEqual(test_string, output,
+                               "x = %s, len(output) = %s" % (x, len(output)))