AsynchronousTask: don't wait for exit status
authorZac Medico <zmedico@gentoo.org>
Tue, 14 Feb 2012 02:35:03 +0000 (18:35 -0800)
committerZac Medico <zmedico@gentoo.org>
Tue, 14 Feb 2012 02:35:03 +0000 (18:35 -0800)
Synchronous waiting for status is not supported, since it would be
vulnerable to hitting the recursion limit when a large number of tasks
need to be terminated simultaneously, like in bug #402335.

pym/_emerge/AbstractEbuildProcess.py
pym/_emerge/AbstractPollTask.py
pym/_emerge/AsynchronousTask.py
pym/_emerge/MetadataRegen.py
pym/_emerge/PollScheduler.py
pym/_emerge/Scheduler.py
pym/_emerge/SequentialTaskQueue.py
pym/_emerge/TaskScheduler.py
pym/portage/tests/ebuild/test_ipc_daemon.py

index 5742cb2a77847199a08767636271c4a5963ebb52..c7b8f83cab444775899d167db787ff9ecf639781 100644 (file)
@@ -167,8 +167,7 @@ class AbstractEbuildProcess(SpawnProcess):
                        # of time, kill it (solves bug #278895). We try to avoid
                        # this when possible since it makes sandbox complain about
                        # being killed by a signal.
-                       self.cancelled = True
-                       self._cancel()
+                       self.cancel()
                        self._exit_timeout_id = \
                                self.scheduler.timeout_add(self._cancel_timeout,
                                        self._cancel_timeout_cb)
index af1c3ffe819f014a16943446d1842678cb43cc1e..2c847092525e06cde96e0840b97ca9126dd432a0 100644 (file)
@@ -123,6 +123,7 @@ class AbstractPollTask(AsynchronousTask):
                                self._log_poll_exception(event)
                                self._unregister()
                                self.cancel()
+                               self.wait()
                        elif event & self.scheduler.IO_HUP:
                                self._unregister()
                                self.wait()
index d57ccab2b6e10ac915bf143b962a873f34243aad..a1467b0b52e286b547f6c4cd50d83ca691fe2e82 100644 (file)
@@ -56,10 +56,17 @@ class AsynchronousTask(SlotObject):
                return self.returncode
 
        def cancel(self):
+               """
+               Cancel the task, but do not wait for exit status. If asynchronous exit
+               notification is desired, then use addExitListener to add a listener
+               before calling this method.
+               NOTE: Synchronous waiting for status is not supported, since it would
+               be vulnerable to hitting the recursion limit when a large number of
+               tasks need to be terminated simultaneously, like in bug #402335.
+               """
                if not self.cancelled:
                        self.cancelled = True
                        self._cancel()
-                       self.wait()
 
        def _cancel(self):
                """
index b4c98dc7e64ee848826f4cdd9181fee5373bcdd6..07fea73c4996be6cebe2dad0d013acec146f35ed 100644 (file)
@@ -37,8 +37,8 @@ class MetadataRegen(PollScheduler):
                self._remaining_tasks = True
 
        def _terminate_tasks(self):
-               while self._running_tasks:
-                       self._running_tasks.pop().cancel()
+               for task in list(self._running_tasks):
+                       task.cancel()
 
        def _iter_every_cp(self):
                portage.writemsg_stdout("Listing available packages...\n")
index 1db68072c2f8d314ec4f2a2108990921e8cbff91..6e416c300f73dd57e127d5b8fc19f66822835076 100644 (file)
@@ -86,6 +86,9 @@ class PollScheduler(object):
                implementation is running, in order to avoid potential
                interference. All tasks should be cleaned up at the earliest
                opportunity, but not necessarily before this method returns.
+               Typically, this method will send kill signals and return without
+               waiting for exit status. This allows basic cleanup to occur, such as
+               flushing of buffered output to logs.
                """
                raise NotImplementedError()
 
index b84f7bb6696d05d0776b3edc8025124e22144726..4b37026675704d803fbd48688864d8d2b5cf772e 100644 (file)
@@ -313,8 +313,7 @@ class Scheduler(PollScheduler):
 
        def _terminate_tasks(self):
                self._status_display.quiet = True
-               while self._running_tasks:
-                       task_id, task = self._running_tasks.popitem()
+               for task in list(self._running_tasks.values()):
                        task.cancel()
                for q in self._task_queues.values():
                        q.clear()
@@ -904,6 +903,7 @@ class Scheduler(PollScheduler):
                        finally:
                                if current_task is not None and current_task.isAlive():
                                        current_task.cancel()
+                                       current_task.wait()
                                clean_phase = EbuildPhase(background=False,
                                        phase='clean', scheduler=sched_iface, settings=settings)
                                clean_phase.start()
index 3cd56d2d657709149f6ea30bf755829707a41b0b..ebff430e3960e245ba6a7a6a0ce325d932d1adb6 100644 (file)
@@ -55,13 +55,20 @@ class SequentialTaskQueue(SlotObject):
                        self.schedule()
 
        def clear(self):
+               """
+               Clear the task queue and asynchronously terminate any running tasks.
+               """
                self._task_queue.clear()
-               running_tasks = self.running_tasks
-               while running_tasks:
-                       task = running_tasks.pop()
-                       task.removeExitListener(self._task_exit)
+               for task in list(self.running_tasks):
                        task.cancel()
 
+       def wait(self):
+               """
+               Synchronously wait for all running tasks to exit.
+               """
+               while self.running_tasks:
+                       next(iter(self.running_tasks)).wait()
+
        def __bool__(self):
                return bool(self._task_queue or self.running_tasks)
 
index 83c0cbe9650b42cedcfe33dcf73d53c97b8704f9..71ac80f1432c2ff7268b9cc52e0cf6079267618d 100644 (file)
@@ -18,6 +18,7 @@ class TaskScheduler(object):
                self.sched_iface = self._scheduler.sched_iface
                self.run = self._scheduler.run
                self.clear = self._scheduler.clear
+               self.wait = self._queue.wait
                self._scheduler.add(self._queue)
 
        def add(self, task):
index edfc058d7e2a93f87b9c0c43d8cdb9508bcc4547..0efab6584a991c423b5569f44a020ef56d832a66 100644 (file)
@@ -71,8 +71,8 @@ class IpcDaemonTestCase(TestCase):
                                self.received_command = False
                                def exit_command_callback():
                                        self.received_command = True
-                                       proc.cancel()
-                                       daemon.cancel()
+                                       task_scheduler.clear()
+                                       task_scheduler.wait()
 
                                exit_command.reply_hook = exit_command_callback
                                start_time = time.time()
@@ -80,6 +80,7 @@ class IpcDaemonTestCase(TestCase):
                                task_scheduler.add(proc)
                                task_scheduler.run(timeout=self._SCHEDULE_TIMEOUT)
                                task_scheduler.clear()
+                               task_scheduler.wait()
                                hardlock_cleanup(env['PORTAGE_BUILDDIR'],
                                        remove_all_locks=True)
 
@@ -108,8 +109,8 @@ class IpcDaemonTestCase(TestCase):
                                self.received_command = False
                                def exit_command_callback():
                                        self.received_command = True
-                                       proc.cancel()
-                                       daemon.cancel()
+                                       task_scheduler.clear()
+                                       task_scheduler.wait()
 
                                exit_command.reply_hook = exit_command_callback
                                start_time = time.time()
@@ -117,6 +118,7 @@ class IpcDaemonTestCase(TestCase):
                                task_scheduler.add(proc)
                                task_scheduler.run(timeout=short_timeout_ms)
                                task_scheduler.clear()
+                               task_scheduler.wait()
                                hardlock_cleanup(env['PORTAGE_BUILDDIR'],
                                        remove_all_locks=True)