Add a new CompositeTask class which can be used to combine separate
authorZac Medico <zmedico@gentoo.org>
Sat, 5 Jul 2008 12:21:43 +0000 (12:21 -0000)
committerZac Medico <zmedico@gentoo.org>
Sat, 5 Jul 2008 12:21:43 +0000 (12:21 -0000)
AsynchronousTask instances into a single instance. The CompositeTask
instance used task exit listeners as a means to (asynchronously) trigger
progression from one subtask to the next. This technique is used to
group together all the ebuild phases executed by EbuildExecuter, and
should be useful for grouping many more sets of tasks into similar
composite tasks.

svn path=/main/trunk/; revision=10940

pym/_emerge/__init__.py

index 71df47dd51247cc9b197a02e289f74b0fa3461c2..44f7e53e063d00cdec65553297c5159fc720036a 100644 (file)
@@ -1455,7 +1455,7 @@ class EbuildFetchPretend(SlotObject):
                return retval
 
 class AsynchronousTask(SlotObject):
-       __slots__ = ("cancelled", "returncode")
+       __slots__ = ("cancelled", "returncode") + ("_exit_listeners",)
 
        def start(self):
                """
@@ -1470,11 +1470,91 @@ class AsynchronousTask(SlotObject):
                return self.returncode
 
        def wait(self):
+               self._wait_hook()
                return self.returncode
 
        def cancel(self):
                pass
 
+       def addExitListener(self, f):
+               """
+               The function will be called with one argument, a reference to self.
+               """
+               if self._exit_listeners is None:
+                       self._exit_listeners = []
+               self._exit_listeners.append(f)
+
+       def removeExitListener(self, f):
+               self._exit_listeners.remove(f)
+
+       def _wait_hook(self):
+               """
+               Call this method before returning from wait. This hook is
+               used to trigger exit listeners when the returncode first
+               becomes available.
+               """
+               if self._exit_listeners is not None:
+                       for f in self._exit_listeners:
+                               f(self)
+                       self._exit_listeners = None
+
+class CompositeTask(AsynchronousTask):
+       """
+       A collection of tasks that executes sequentially. Each task
+       must have a _set_returncode() method that can be wrapped as
+       a means to trigger movement from one task to the next.
+       """
+
+       __slots__ = ("scheduler",) + ("_current_task", "_task_queue")
+
+       def __init__(self, **kwargs):
+               AsynchronousTask.__init__(self, **kwargs)
+               self._task_queue = deque()
+
+       def add(self, task):
+               self._task_queue.append(task)
+
+       def start(self):
+               self._start_next_task()
+
+       def isAlive(self):
+               return self._current_task is not None
+
+       def cancel(self):
+               self._task_queue.clear()
+               self.cancelled = True
+               self._current_task.cancel()
+
+       def wait(self):
+
+               while True:
+                       task = self._current_task
+                       if task is None:
+                               break
+                       self.scheduler.schedule(task.reg_id)
+                       task.wait()
+
+               self._wait_hook()
+               return self.returncode
+
+       def _start_next_task(self):
+               self._current_task = self._task_queue.popleft()
+               task = self._current_task
+               task.addExitListener(self._task_exit_handler)
+               task.start()
+
+       def _task_exit_handler(self, task):
+               if task is not self._current_task:
+                       raise AssertionError("Unrecognized task: %s" % (task,))
+
+               if self._task_queue and \
+                       task.returncode == os.EX_OK:
+                       self._start_next_task()
+                       return
+
+               self._current_task = None
+               self.returncode = task.returncode
+
 class SubProcess(AsynchronousTask):
        __slots__ = ("pid",)
 
@@ -1503,6 +1583,7 @@ class SubProcess(AsynchronousTask):
                if self.returncode is not None:
                        return self.returncode
                self._set_returncode(os.waitpid(self.pid, 0))
+               self._wait_hook()
                return self.returncode
 
        def _set_returncode(self, wait_retval):
@@ -1899,19 +1980,17 @@ class EbuildExecuter(SlotObject):
                        2 : sys.stderr.fileno(),
                }
 
-               for mydo in self._phases:
-                       ebuild_phase = EbuildPhase(fd_pipes=fd_pipes,
-                               pkg=self.pkg, phase=mydo, scheduler=self.scheduler,
-                               settings=settings, tree=tree)
+               composite_task = CompositeTask(scheduler=scheduler)
 
-                       ebuild_phase.start()
-                       self.scheduler.schedule(ebuild_phase.reg_id)
-                       retval = ebuild_phase.wait()
+               for phase in self._phases:
+                       composite_task.add(EbuildPhase(fd_pipes=fd_pipes,
+                               pkg=self.pkg, phase=phase, scheduler=self.scheduler,
+                               settings=settings, tree=tree))
 
-                       if retval != os.EX_OK:
-                               return retval
+               composite_task.start()
+               retval = composite_task.wait()
 
-               return os.EX_OK
+               return retval
 
 class EbuildPhase(SubProcess):