From d85c316ae2277076845c589b9a84656cfa967c98 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Sat, 5 Jul 2008 12:21:43 +0000 Subject: [PATCH] Add a new CompositeTask class which can be used to combine separate AsynchronousTask instances into a single instance. The CompositeTask instance used task exit listeners as a means to (asynchronously) trigger progression from one subtask to the next. This technique is used to group together all the ebuild phases executed by EbuildExecuter, and should be useful for grouping many more sets of tasks into similar composite tasks. svn path=/main/trunk/; revision=10940 --- pym/_emerge/__init__.py | 101 +++++++++++++++++++++++++++++++++++----- 1 file changed, 90 insertions(+), 11 deletions(-) diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py index 71df47dd5..44f7e53e0 100644 --- a/pym/_emerge/__init__.py +++ b/pym/_emerge/__init__.py @@ -1455,7 +1455,7 @@ class EbuildFetchPretend(SlotObject): return retval class AsynchronousTask(SlotObject): - __slots__ = ("cancelled", "returncode") + __slots__ = ("cancelled", "returncode") + ("_exit_listeners",) def start(self): """ @@ -1470,11 +1470,91 @@ class AsynchronousTask(SlotObject): return self.returncode def wait(self): + self._wait_hook() return self.returncode def cancel(self): pass + def addExitListener(self, f): + """ + The function will be called with one argument, a reference to self. + """ + if self._exit_listeners is None: + self._exit_listeners = [] + self._exit_listeners.append(f) + + def removeExitListener(self, f): + self._exit_listeners.remove(f) + + def _wait_hook(self): + """ + Call this method before returning from wait. This hook is + used to trigger exit listeners when the returncode first + becomes available. + """ + if self._exit_listeners is not None: + for f in self._exit_listeners: + f(self) + self._exit_listeners = None + +class CompositeTask(AsynchronousTask): + """ + A collection of tasks that executes sequentially. Each task + must have a _set_returncode() method that can be wrapped as + a means to trigger movement from one task to the next. + """ + + __slots__ = ("scheduler",) + ("_current_task", "_task_queue") + + def __init__(self, **kwargs): + AsynchronousTask.__init__(self, **kwargs) + self._task_queue = deque() + + def add(self, task): + self._task_queue.append(task) + + def start(self): + self._start_next_task() + + def isAlive(self): + return self._current_task is not None + + def cancel(self): + self._task_queue.clear() + self.cancelled = True + self._current_task.cancel() + + def wait(self): + + while True: + task = self._current_task + if task is None: + break + self.scheduler.schedule(task.reg_id) + task.wait() + + self._wait_hook() + return self.returncode + + def _start_next_task(self): + self._current_task = self._task_queue.popleft() + task = self._current_task + task.addExitListener(self._task_exit_handler) + task.start() + + def _task_exit_handler(self, task): + if task is not self._current_task: + raise AssertionError("Unrecognized task: %s" % (task,)) + + if self._task_queue and \ + task.returncode == os.EX_OK: + self._start_next_task() + return + + self._current_task = None + self.returncode = task.returncode + class SubProcess(AsynchronousTask): __slots__ = ("pid",) @@ -1503,6 +1583,7 @@ class SubProcess(AsynchronousTask): if self.returncode is not None: return self.returncode self._set_returncode(os.waitpid(self.pid, 0)) + self._wait_hook() return self.returncode def _set_returncode(self, wait_retval): @@ -1899,19 +1980,17 @@ class EbuildExecuter(SlotObject): 2 : sys.stderr.fileno(), } - for mydo in self._phases: - ebuild_phase = EbuildPhase(fd_pipes=fd_pipes, - pkg=self.pkg, phase=mydo, scheduler=self.scheduler, - settings=settings, tree=tree) + composite_task = CompositeTask(scheduler=scheduler) - ebuild_phase.start() - self.scheduler.schedule(ebuild_phase.reg_id) - retval = ebuild_phase.wait() + for phase in self._phases: + composite_task.add(EbuildPhase(fd_pipes=fd_pipes, + pkg=self.pkg, phase=phase, scheduler=self.scheduler, + settings=settings, tree=tree)) - if retval != os.EX_OK: - return retval + composite_task.start() + retval = composite_task.wait() - return os.EX_OK + return retval class EbuildPhase(SubProcess): -- 2.26.2