SpawnProcess: split out a PipeLogger class
authorZac Medico <zmedico@gentoo.org>
Fri, 19 Oct 2012 01:15:00 +0000 (18:15 -0700)
committerZac Medico <zmedico@gentoo.org>
Fri, 19 Oct 2012 01:15:00 +0000 (18:15 -0700)
The copyright dates for these classes begin in 2008, since SpawnProcess
code is derived from the EbuildFetcherAsync class which was added in
commit e4edadf5ae7063f375d76be151c6d0e949980ecf in 2008.

pym/_emerge/SpawnProcess.py
pym/portage/util/_async/PipeLogger.py [new file with mode: 0644]

index ab152c3c37a6c7960f62e276df6080ab13ce90d4..d18512b348d54be6e08e5fef5742d24b87e12482 100644 (file)
@@ -1,17 +1,12 @@
-# Copyright 1999-2012 Gentoo Foundation
+# Copyright 2008-2012 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
 from _emerge.SubProcess import SubProcess
 import sys
-from portage.cache.mappings import slot_dict_class
 import portage
-from portage import _encodings
-from portage import _unicode_encode
 from portage import os
 from portage.const import BASH_BINARY
-import fcntl
-import errno
-import gzip
+from portage.util._async.PipeLogger import PipeLogger
 
 class SpawnProcess(SubProcess):
 
@@ -26,10 +21,7 @@ class SpawnProcess(SubProcess):
                "path_lookup", "pre_exec")
 
        __slots__ = ("args",) + \
-               _spawn_kwarg_names + ("_log_file_real", "_selinux_type",)
-
-       _file_names = ("log", "process", "stdout")
-       _files_dict = slot_dict_class(_file_names, prefix="")
+               _spawn_kwarg_names + ("_pipe_logger", "_selinux_type",)
 
        def _start(self):
 
@@ -37,17 +29,13 @@ class SpawnProcess(SubProcess):
                        self.fd_pipes = {}
                fd_pipes = self.fd_pipes
 
-               self._files = self._files_dict()
-               files = self._files
-
                master_fd, slave_fd = self._pipe(fd_pipes)
-               fcntl.fcntl(master_fd, fcntl.F_SETFL,
-                       fcntl.fcntl(master_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
-               files.process = master_fd
 
-               logfile = None
-               if self._can_log(slave_fd):
-                       logfile = self.logfile
+               can_log = self._can_log(slave_fd)
+               if can_log:
+                       log_file_path = self.logfile
+               else:
+                       log_file_path = None
 
                null_input = None
                if not self.background or 0 in fd_pipes:
@@ -74,37 +62,19 @@ class SpawnProcess(SubProcess):
                                sys.__stderr__.flush()
                                break
 
-               if logfile is not None:
+               fd_pipes_orig = fd_pipes.copy()
 
-                       fd_pipes_orig = fd_pipes.copy()
+               if log_file_path is not None:
                        fd_pipes[1] = slave_fd
                        fd_pipes[2] = slave_fd
 
-                       files.log = open(_unicode_encode(logfile,
-                               encoding=_encodings['fs'], errors='strict'), mode='ab')
-                       if logfile.endswith('.gz'):
-                               self._log_file_real = files.log
-                               files.log = gzip.GzipFile(filename='', mode='ab',
-                                       fileobj=files.log)
-
-                       portage.util.apply_secpass_permissions(logfile,
-                               uid=portage.portage_uid, gid=portage.portage_gid,
-                               mode=0o660)
-
-                       if not self.background:
-                               files.stdout = os.dup(fd_pipes_orig[1])
-
-                       output_handler = self._output_handler
-
                else:
-
                        # Create a dummy pipe so the scheduler can monitor
                        # the process from inside a poll() loop.
                        fd_pipes[self._dummy_pipe_fd] = slave_fd
                        if self.background:
                                fd_pipes[1] = slave_fd
                                fd_pipes[2] = slave_fd
-                       output_handler = self._dummy_handler
 
                kwargs = {}
                for k in self._spawn_kwarg_names:
@@ -116,10 +86,6 @@ class SpawnProcess(SubProcess):
                kwargs["returnpid"] = True
                kwargs.pop("logfile", None)
 
-               self._reg_id = self.scheduler.io_add_watch(files.process,
-                       self._registered_events, output_handler)
-               self._registered = True
-
                retval = self._spawn(self.args, **kwargs)
 
                os.close(slave_fd)
@@ -136,6 +102,18 @@ class SpawnProcess(SubProcess):
                self.pid = retval[0]
                portage.process.spawned_pids.remove(self.pid)
 
+               stdout_fd = None
+               if can_log and not self.background:
+                       stdout_fd = os.dup(fd_pipes_orig[1])
+
+               self._pipe_logger = PipeLogger(background=self.background,
+                       scheduler=self.scheduler, input_fd=master_fd,
+                       log_file_path=log_file_path,
+                       stdout_fd=stdout_fd)
+               self._pipe_logger.addExitListener(self._pipe_logger_exit)
+               self._pipe_logger.start()
+               self._registered = True
+
        def _can_log(self, slave_fd):
                return True
 
@@ -158,92 +136,17 @@ class SpawnProcess(SubProcess):
 
                return spawn_func(args, **kwargs)
 
-       def _output_handler(self, fd, event):
-
-               files = self._files
-               while True:
-                       buf = self._read_buf(fd, event)
-
-                       if buf is None:
-                               # not a POLLIN event, EAGAIN, etc...
-                               break
+       def _pipe_logger_exit(self, pipe_logger):
+               self._pipe_logger = None
+               self._unregister()
+               self.wait()
 
-                       if not buf:
-                               # EOF
-                               self._unregister()
-                               self.wait()
-                               break
-
-                       else:
-                               if not self.background:
-                                       write_successful = False
-                                       failures = 0
-                                       while True:
-                                               try:
-                                                       if not write_successful:
-                                                               os.write(files.stdout, buf)
-                                                               write_successful = True
-                                                       break
-                                               except OSError as e:
-                                                       if e.errno != errno.EAGAIN:
-                                                               raise
-                                                       del e
-                                                       failures += 1
-                                                       if failures > 50:
-                                                               # Avoid a potentially infinite loop. In
-                                                               # most cases, the failure count is zero
-                                                               # and it's unlikely to exceed 1.
-                                                               raise
-
-                                                       # This means that a subprocess has put an inherited
-                                                       # stdio file descriptor (typically stdin) into
-                                                       # O_NONBLOCK mode. This is not acceptable (see bug
-                                                       # #264435), so revert it. We need to use a loop
-                                                       # here since there's a race condition due to
-                                                       # parallel processes being able to change the
-                                                       # flags on the inherited file descriptor.
-                                                       # TODO: When possible, avoid having child processes
-                                                       # inherit stdio file descriptors from portage
-                                                       # (maybe it can't be avoided with
-                                                       # PROPERTIES=interactive).
-                                                       fcntl.fcntl(files.stdout, fcntl.F_SETFL,
-                                                               fcntl.fcntl(files.stdout,
-                                                               fcntl.F_GETFL) ^ os.O_NONBLOCK)
-
-                               files.log.write(buf)
-                               files.log.flush()
-
-               self._unregister_if_appropriate(event)
-
-               return True
-
-       def _dummy_handler(self, fd, event):
-               """
-               This method is mainly interested in detecting EOF, since
-               the only purpose of the pipe is to allow the scheduler to
-               monitor the process from inside a poll() loop.
-               """
-
-               while True:
-                       buf = self._read_buf(fd, event)
-
-                       if buf is None:
-                               # not a POLLIN event, EAGAIN, etc...
-                               break
-
-                       if not buf:
-                               # EOF
-                               self._unregister()
-                               self.wait()
-                               break
-
-               self._unregister_if_appropriate(event)
-
-               return True
+       def _waitpid_loop(self):
+               SubProcess._waitpid_loop(self)
 
-       def _unregister(self):
-               super(SpawnProcess, self)._unregister()
-               if self._log_file_real is not None:
-                       # Avoid "ResourceWarning: unclosed file" since python 3.2.
-                       self._log_file_real.close()
-                       self._log_file_real = None
+               pipe_logger = self._pipe_logger
+               if pipe_logger is not None:
+                       self._pipe_logger = None
+                       pipe_logger.removeExitListener(self._pipe_logger_exit)
+                       pipe_logger.cancel()
+                       pipe_logger.wait()
diff --git a/pym/portage/util/_async/PipeLogger.py b/pym/portage/util/_async/PipeLogger.py
new file mode 100644 (file)
index 0000000..dbdd56f
--- /dev/null
@@ -0,0 +1,149 @@
+# Copyright 2008-2012 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import fcntl
+import errno
+import gzip
+
+import portage
+from portage import os, _encodings, _unicode_encode
+from _emerge.AbstractPollTask import AbstractPollTask
+
+class PipeLogger(AbstractPollTask):
+
+       """
+       This can be used for logging output of a child process,
+       optionally outputing to log_file_path and/or stdout_fd.  It can
+       also monitor for EOF on input_fd, which may be used to detect
+       termination of a child process. If log_file_path ends with
+       '.gz' then the log file is written with compression.
+       """
+
+       __slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
+               ("_log_file", "_log_file_real", "_reg_id")
+
+       def _start(self):
+
+               log_file_path = self.log_file_path
+               if log_file_path is not None:
+
+                       self._log_file = open(_unicode_encode(log_file_path,
+                               encoding=_encodings['fs'], errors='strict'), mode='ab')
+                       if log_file_path.endswith('.gz'):
+                               self._log_file_real = self._log_file
+                               self._log_file = gzip.GzipFile(filename='', mode='ab',
+                                       fileobj=self._log_file)
+
+                       portage.util.apply_secpass_permissions(log_file_path,
+                               uid=portage.portage_uid, gid=portage.portage_gid,
+                               mode=0o660)
+
+               fcntl.fcntl(self.input_fd, fcntl.F_SETFL,
+                       fcntl.fcntl(self.input_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
+
+               self._reg_id = self.scheduler.io_add_watch(self.input_fd,
+                       self._registered_events, self._output_handler)
+               self._registered = True
+
+       def isAlive(self):
+               return self._registered
+
+       def _cancel(self):
+               self._unregister()
+               if self.returncode is None:
+                       self.returncode = self._cancelled_returncode
+
+       def _wait(self):
+               if self.returncode is not None:
+                       return self.returncode
+               self._wait_loop()
+               self.returncode = os.EX_OK
+               return self.returncode
+
+       def _output_handler(self, fd, event):
+
+               background = self.background
+               stdout_fd = self.stdout_fd
+               log_file = self._log_file 
+
+               while True:
+                       buf = self._read_buf(fd, event)
+
+                       if buf is None:
+                               # not a POLLIN event, EAGAIN, etc...
+                               break
+
+                       if not buf:
+                               # EOF
+                               self._unregister()
+                               self.wait()
+                               break
+
+                       else:
+                               if not background and stdout_fd is not None:
+                                       write_successful = False
+                                       failures = 0
+                                       while True:
+                                               try:
+                                                       if not write_successful:
+                                                               os.write(stdout_fd, buf)
+                                                               write_successful = True
+                                                       break
+                                               except OSError as e:
+                                                       if e.errno != errno.EAGAIN:
+                                                               raise
+                                                       del e
+                                                       failures += 1
+                                                       if failures > 50:
+                                                               # Avoid a potentially infinite loop. In
+                                                               # most cases, the failure count is zero
+                                                               # and it's unlikely to exceed 1.
+                                                               raise
+
+                                                       # This means that a subprocess has put an inherited
+                                                       # stdio file descriptor (typically stdin) into
+                                                       # O_NONBLOCK mode. This is not acceptable (see bug
+                                                       # #264435), so revert it. We need to use a loop
+                                                       # here since there's a race condition due to
+                                                       # parallel processes being able to change the
+                                                       # flags on the inherited file descriptor.
+                                                       # TODO: When possible, avoid having child processes
+                                                       # inherit stdio file descriptors from portage
+                                                       # (maybe it can't be avoided with
+                                                       # PROPERTIES=interactive).
+                                                       fcntl.fcntl(stdout_fd, fcntl.F_SETFL,
+                                                               fcntl.fcntl(stdout_fd,
+                                                               fcntl.F_GETFL) ^ os.O_NONBLOCK)
+
+                               if log_file is not None:
+                                       log_file.write(buf)
+                                       log_file.flush()
+
+               self._unregister_if_appropriate(event)
+
+               return True
+
+       def _unregister(self):
+
+               if self._reg_id is not None:
+                       self.scheduler.source_remove(self._reg_id)
+                       self._reg_id = None
+
+               if self.input_fd is not None:
+                       os.close(self.input_fd)
+                       self.input_fd = None
+
+               if self.stdout_fd is not None:
+                       os.close(self.stdout_fd)
+                       self.stdout_fd = None
+
+               if self._log_file is not None:
+                       self._log_file.close()
+                       self._log_file = None
+
+               if self._log_file_real is not None:
+                       # Avoid "ResourceWarning: unclosed file" since python 3.2.
+                       self._log_file_real.close()
+                       self._log_file_real = None
+
+               self._registered = False