Centralize select.poll() event handling in MergeTask._schedule(). This will
authorZac Medico <zmedico@gentoo.org>
Sun, 29 Jun 2008 17:27:37 +0000 (17:27 -0000)
committerZac Medico <zmedico@gentoo.org>
Sun, 29 Jun 2008 17:27:37 +0000 (17:27 -0000)
allow the parent process to handle output of multiple child processes
running in parllel.

svn path=/main/trunk/; revision=10851

pym/_emerge/__init__.py

index 84b5769a33cd7c706a010d984cc006954c8135f0..8115bdf5253e25e38ce6c6a9c40dfdb901958ea5 100644 (file)
@@ -20,6 +20,8 @@ try:
 except KeyboardInterrupt:
        sys.exit(1)
 
+import array
+import select
 import gc
 import os, stat
 import platform
@@ -1527,7 +1529,7 @@ class EbuildBuild(Task):
        """
        TODO: Support asynchronous execution, to implement parallel builds.
        """
-       __slots__ = ("pkg", "settings")
+       __slots__ = ("pkg", "register", "schedule", "settings", "unregister")
 
        _phases = ("setup", "unpack", "compile", "test", "install")
 
@@ -1562,9 +1564,10 @@ class EbuildBuild(Task):
 
                for mydo in self._phases:
                        ebuild_phase = EbuildPhase(fd_pipes=fd_pipes,
-                               pkg=self.pkg, phase=mydo, settings=settings)
+                               pkg=self.pkg, phase=mydo, register=self.register,
+                               settings=settings, unregister=self.unregister)
                        ebuild_phase.start()
-                       ebuild_phase._output_handler()
+                       self.schedule()
                        retval = ebuild_phase.wait()
 
                        portage._post_phase_userpriv_perms(settings)
@@ -1580,7 +1583,8 @@ class EbuildBuild(Task):
 
 class EbuildPhase(SlotObject):
 
-       __slots__ = ("fd_pipes", "phase", "pkg", "settings",
+       __slots__ = ("fd_pipes", "phase", "pkg",
+               "register", "settings", "unregister",
                "pid", "returncode", "files")
 
        _file_names = ("log", "stdout", "ebuild")
@@ -1662,40 +1666,25 @@ class EbuildPhase(SlotObject):
                        files["log"] = open(logfile, 'a')
                        files["stdout"] = os.fdopen(os.dup(fd_pipes_orig[1]), 'w')
                        files["ebuild"] = os.fdopen(master_fd, 'r')
+                       self.register(files["ebuild"].fileno(),
+                               select.POLLIN, self._output_handler)
 
-       def _output_handler(self):
-               log_file = self.files.get("log")
-               if log_file is None:
-                       return
-               ebuild_file = self.files["ebuild"]
-               stdout_file = self.files["stdout"]
-               iwtd = [ebuild_file]
-               owtd = []
-               ewtd = []
-               import array, select
-               buffsize = self._bufsize
-               eof = False
-               while not eof:
-                       events = select.select(iwtd, owtd, ewtd)
-                       for f in events[0]:
-                               # Use non-blocking mode to prevent read
-                               # calls from blocking indefinitely.
-                               buf = array.array('B')
-                               try:
-                                       buf.fromfile(f, buffsize)
-                               except EOFError:
-                                       pass
-                               if not buf:
-                                       eof = True
-                                       break
-                               if f is ebuild_file:
-                                       buf.tofile(stdout_file)
-                                       stdout_file.flush()
-                                       buf.tofile(log_file)
-                                       log_file.flush()
-               log_file.close()
-               stdout_file.close()
-               ebuild_file.close()
+       def _output_handler(self, fd, event):
+               files = self.files
+               buf = array.array('B')
+               try:
+                       buf.fromfile(files["ebuild"], self._bufsize)
+               except EOFError:
+                       pass
+               if buf:
+                       buf.tofile(files["stdout"])
+                       files["stdout"].flush()
+                       buf.tofile(files["log"])
+                       files["log"].flush()
+               else:
+                       self.unregister(files["ebuild"].fileno())
+                       for f in files.values():
+                               f.close()
 
        def wait(self):
                pid = self.pid
@@ -6357,6 +6346,8 @@ class MergeTask(object):
                                clone=trees[root]["vartree"].settings)
                self.curval = 0
                self._spawned_pids = []
+               self._poll_event_handlers = {}
+               self._poll = select.poll()
 
        class _pkg_failure(portage.exception.PortageException):
                """
@@ -6521,6 +6512,19 @@ class MergeTask(object):
                                pass
                        spawned_pids.remove(pid)
 
+       def _register(self, f, eventmask, handler):
+               self._poll_event_handlers[f] = handler
+               self._poll.register(f, eventmask)
+
+       def _unregister(self, f):
+               self._poll.unregister(f)
+               del self._poll_event_handlers[f]
+
+       def _schedule(self):
+               while self._poll_event_handlers:
+                       for f, event in self._poll.poll():
+                               self._poll_event_handlers[f](f, event)
+
        def _merge(self):
                mylist = self._mergelist
                favorites = self._favorites
@@ -6759,7 +6763,9 @@ class MergeTask(object):
                                                        (mergecount, len(mymergelist), pkg_key)
                                                emergelog(xterm_titles, msg, short_msg=short_msg)
 
-                                               build = EbuildBuild(pkg=pkg, settings=pkgsettings)
+                                               build = EbuildBuild(pkg=pkg, register=self._register,
+                                                       schedule=self._schedule, settings=pkgsettings,
+                                                       unregister=self._unregister)
                                                retval = build.execute()
                                                if retval != os.EX_OK:
                                                        raise self._pkg_failure(retval)
@@ -6794,7 +6800,9 @@ class MergeTask(object):
                                                        (mergecount, len(mymergelist), pkg_key)
                                                emergelog(xterm_titles, msg, short_msg=short_msg)
 
-                                               build = EbuildBuild(pkg=pkg, settings=pkgsettings)
+                                               build = EbuildBuild(pkg=pkg, register=self._register,
+                                                       schedule=self._schedule, settings=pkgsettings,
+                                                       unregister=self._unregister)
                                                retval = build.execute()
                                                if retval != os.EX_OK:
                                                        raise self._pkg_failure(retval)