Add PollScheduler.terminate() for interruption.
authorZac Medico <zmedico@gentoo.org>
Sun, 16 Jan 2011 00:00:35 +0000 (16:00 -0800)
committerZac Medico <zmedico@gentoo.org>
Sun, 16 Jan 2011 20:33:32 +0000 (12:33 -0800)
This allows PollScheduler instances to do basic cleanup and terminate
gracefully when SIGINT or SIGTERM signals are received.

bin/egencache
pym/_emerge/AbstractEbuildProcess.py
pym/_emerge/MetadataRegen.py
pym/_emerge/PollScheduler.py
pym/_emerge/Scheduler.py
pym/_emerge/actions.py

index 2fb30a07c586c155bbcdcd101d44aaf7c34e7699..bf729c380bd50958b97355d9370ccc4883a05a94 100755 (executable)
@@ -258,7 +258,30 @@ class GenCache(object):
                                        level=logging.ERROR, noiselevel=-1)
 
        def run(self):
-               self._regen.run()
+
+               received_signal = []
+
+               def sighandler(signum, frame):
+                       signal.signal(signal.SIGINT, signal.SIG_IGN)
+                       signal.signal(signal.SIGTERM, signal.SIG_IGN)
+                       self._regen.terminate()
+                       received_signal.append(128 + signum)
+
+               earlier_sigint_handler = signal.signal(signal.SIGINT, sighandler)
+               earlier_sigterm_handler = signal.signal(signal.SIGTERM, sighandler)
+
+               try:
+                       self._regen.run()
+
+                       if received_signal:
+                               sys.exit(received_signal[0])
+               finally:
+                       # Restore previous handlers
+                       if earlier_sigint_handler is not None:
+                               signal.signal(signal.SIGINT, earlier_sigint_handler)
+                       if earlier_sigterm_handler is not None:
+                               signal.signal(signal.SIGTERM, earlier_sigterm_handler)
+
                self.returncode |= self._regen.returncode
                cp_missing = self._cp_missing
 
index 46c8f938f7fd89ed4004ae46fad8b2f2871afb9c..601aafe0e39ef2b5ad1d0fd4795645b69797257e 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright 1999-2010 Gentoo Foundation
+# Copyright 1999-2011 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
 import platform
@@ -237,11 +237,12 @@ class AbstractEbuildProcess(SpawnProcess):
                                self.returncode = self._exit_command.exitcode
                        else:
                                self.returncode = 1
-                               self._unexpected_exit()
+                               if not self.cancelled:
+                                       self._unexpected_exit()
                        if self._build_dir is not None:
                                self._build_dir.unlock()
                                self._build_dir = None
-               else:
+               elif not self.cancelled:
                        exit_file = self.settings.get('PORTAGE_EBUILD_EXIT_FILE')
                        if exit_file and not os.path.exists(exit_file):
                                self.returncode = 1
index bb7bb149c7ddc6a728c2e05a2217e39757531be9..45c4f4d29f0c978b9743653ba48f08004759df6d 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright 1999-2010 Gentoo Foundation
+# Copyright 1999-2011 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
 import portage
@@ -32,6 +32,11 @@ class MetadataRegen(PollScheduler):
                self._process_iter = self._iter_metadata_processes()
                self.returncode = os.EX_OK
                self._error_count = 0
+               self._running_tasks = set()
+
+       def _terminate_tasks(self):
+               while self._running_tasks:
+                       self._running_tasks.pop().cancel()
 
        def _iter_every_cp(self):
                portage.writemsg_stdout("Listing available packages...\n")
@@ -39,7 +44,7 @@ class MetadataRegen(PollScheduler):
                portage.writemsg_stdout("Regenerating cache entries...\n")
                every_cp.sort(reverse=True)
                try:
-                       while True:
+                       while not self._terminated.is_set():
                                yield every_cp.pop()
                except IndexError:
                        pass
@@ -51,10 +56,14 @@ class MetadataRegen(PollScheduler):
                consumer = self._consumer
 
                for cp in self._cp_iter:
+                       if self._terminated.is_set():
+                               break
                        cp_set.add(cp)
                        portage.writemsg_stdout("Processing %s\n" % cp)
                        cpv_list = portdb.cp_list(cp)
                        for cpv in cpv_list:
+                               if self._terminated.is_set():
+                                       break
                                valid_pkgs.add(cpv)
                                ebuild_path, repo_path = portdb.findname2(cpv)
                                if ebuild_path is None:
@@ -85,6 +94,10 @@ class MetadataRegen(PollScheduler):
                while self._jobs:
                        self._poll_loop()
 
+               if self._terminated.is_set():
+                       self.returncode = 1
+                       return
+
                if self._global_cleanse:
                        for mytree in portdb.porttrees:
                                try:
@@ -133,12 +146,15 @@ class MetadataRegen(PollScheduler):
                        False otherwise.
                """
                while self._can_add_job():
+                       if self._terminated.is_set():
+                               return False
                        try:
                                metadata_process = next(self._process_iter)
                        except StopIteration:
                                return False
 
                        self._jobs += 1
+                       self._running_tasks.add(metadata_process)
                        metadata_process.scheduler = self.sched_iface
                        metadata_process.addExitListener(self._metadata_exit)
                        metadata_process.start()
@@ -146,12 +162,14 @@ class MetadataRegen(PollScheduler):
 
        def _metadata_exit(self, metadata_process):
                self._jobs -= 1
+               self._running_tasks.discard(metadata_process)
                if metadata_process.returncode != os.EX_OK:
                        self.returncode = 1
                        self._error_count += 1
                        self._valid_pkgs.discard(metadata_process.cpv)
-                       portage.writemsg("Error processing %s, continuing...\n" % \
-                               (metadata_process.cpv,), noiselevel=-1)
+                       if not self._terminated.is_set():
+                               portage.writemsg("Error processing %s, continuing...\n" % \
+                                       (metadata_process.cpv,), noiselevel=-1)
 
                if self._consumer is not None:
                        # On failure, still notify the consumer (in this case the metadata
index e71350e5bb182d94acea6128e1ef94b99e0a02e7..a319066c0f611ad8270b48fa9aa55818ceba5044 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright 1999-2010 Gentoo Foundation
+# Copyright 1999-2011 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
 import gzip
@@ -6,6 +6,11 @@ import logging
 import select
 import time
 
+try:
+       import threading
+except ImportError:
+       import dummy_threading as threading
+
 from portage import _encodings
 from portage import _unicode_encode
 from portage.util import writemsg_level
@@ -21,6 +26,8 @@ class PollScheduler(object):
                __slots__ = ("output", "register", "schedule", "unregister")
 
        def __init__(self):
+               self._terminated = threading.Event()
+               self._terminated_tasks = False
                self._max_jobs = 1
                self._max_load = None
                self._jobs = 0
@@ -38,6 +45,24 @@ class PollScheduler(object):
                        schedule=self._schedule_wait,
                        unregister=self._unregister)
 
+       def terminate(self):
+               """
+               Schedules asynchronous, graceful termination of the scheduler
+               at the earliest opportunity.
+
+               This method is thread-safe (and safe for signal handlers).
+               """
+               self._terminated.set()
+
+       def _terminate_tasks(self):
+               """
+               Send signals to terminate all tasks. This is called once
+               from the event dispatching thread. All task should be
+               cleaned up at the earliest opportunity, but not necessarily
+               before this method returns.
+               """
+               raise NotImplementedError()
+
        def _schedule(self):
                """
                Calls _schedule_tasks() and automatically returns early from
@@ -120,6 +145,10 @@ class PollScheduler(object):
                raises StopIteration if timeout is None and there are
                no file descriptors to poll.
                """
+               if self._terminated.is_set() and \
+                       not self._terminated_tasks:
+                       self._terminated_tasks = True
+                       self._terminate_tasks()
                if not self._poll_event_queue:
                        self._poll(timeout)
                        if not self._poll_event_queue:
index 11589ff40a265293dffebfbe6da8cb3b96feafa9..c8301a2985c173b6daaba6da5a598b4cdd31176f 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright 1999-2010 Gentoo Foundation
+# Copyright 1999-2011 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
 from __future__ import print_function
@@ -292,6 +292,11 @@ class Scheduler(PollScheduler):
                        self._running_portage = self._pkg(cpv, "installed",
                                self._running_root, installed=True)
 
+       def _terminate_tasks(self):
+               self._status_display.quiet = True
+               for q in self._task_queues.values():
+                       q.clear()
+
        def _init_graph(self, graph_config):
                """
                Initialization structures used for dependency calculations
@@ -1160,6 +1165,10 @@ class Scheduler(PollScheduler):
 
                while True:
                        rval = self._merge()
+
+                       if self._terminated.is_set():
+                               return 1
+
                        if rval == os.EX_OK or fetchonly or not keep_going:
                                break
                        if "resume" not in mtimedb:
@@ -1398,9 +1407,9 @@ class Scheduler(PollScheduler):
                                build_dir=build_dir, build_log=build_log,
                                pkg=pkg,
                                returncode=merge.returncode))
-                       self._failed_pkg_msg(self._failed_pkgs[-1], "install", "to")
-
-                       self._status_display.failed = len(self._failed_pkgs)
+                       if not self._terminated.is_set():
+                               self._failed_pkg_msg(self._failed_pkgs[-1], "install", "to")
+                               self._status_display.failed = len(self._failed_pkgs)
                        return
 
                self._task_complete(pkg)
@@ -1452,9 +1461,9 @@ class Scheduler(PollScheduler):
                                build_dir=build_dir, build_log=build_log,
                                pkg=build.pkg,
                                returncode=build.returncode))
-                       self._failed_pkg_msg(self._failed_pkgs[-1], "emerge", "for")
-
-                       self._status_display.failed = len(self._failed_pkgs)
+                       if not self._terminated.is_set():
+                               self._failed_pkg_msg(self._failed_pkgs[-1], "emerge", "for")
+                               self._status_display.failed = len(self._failed_pkgs)
                        self._deallocate_config(build.settings)
                self._jobs -= 1
                self._status_display.running = self._jobs
@@ -1631,7 +1640,7 @@ class Scheduler(PollScheduler):
                                self._poll_loop()
 
        def _keep_scheduling(self):
-               return bool(self._pkg_queue and \
+               return bool(not self._terminated.is_set() and self._pkg_queue and \
                        not (self._failed_pkgs and not self._build_opts.fetchonly))
 
        def _is_work_scheduled(self):
index 39bfdbfca18f6cef5a1be28067478fd398323939..2caa4740df2938720f5e2a75e32dda8773116092 100644 (file)
@@ -420,11 +420,7 @@ def action_build(settings, trees, mtimedb,
                        mergetask = Scheduler(settings, trees, mtimedb, myopts,
                                spinner, favorites=favorites,
                                graph_config=mydepgraph.schedulerGraph())
-                       del mydepgraph
-                       clear_caches(trees)
 
-                       retval = mergetask.merge()
-                       merge_count = mergetask.curval
                else:
                        if "resume" in mtimedb and \
                        "mergelist" in mtimedb["resume"] and \
@@ -434,14 +430,40 @@ def action_build(settings, trees, mtimedb,
                                mtimedb.commit()
 
                        mydepgraph.saveNomergeFavorites()
-                       mergetask = Scheduler(settings, trees, mtimedb, myopts,
-                               spinner, favorites=favorites,
-                               graph_config=mydepgraph.schedulerGraph())
-                       del mydepgraph
-                       clear_caches(trees)
 
+               mergetask = Scheduler(settings, trees, mtimedb, myopts,
+                       spinner, favorites=favorites,
+                       graph_config=mydepgraph.schedulerGraph())
+
+               del mydepgraph
+               clear_caches(trees)
+
+               received_signal = []
+
+               def emergeexitsig(signum, frame):
+                       signal.signal(signal.SIGINT, signal.SIG_IGN)
+                       signal.signal(signal.SIGTERM, signal.SIG_IGN)
+                       portage.util.writemsg("\n\nExiting on signal %(signal)s\n" % \
+                               {"signal":signum})
+                       mergetask.terminate()
+                       received_signal.append(128 + signum)
+
+               earlier_sigint_handler = signal.signal(signal.SIGINT, emergeexitsig)
+               earlier_sigterm_handler = signal.signal(signal.SIGTERM, emergeexitsig)
+
+               try:
                        retval = mergetask.merge()
-                       merge_count = mergetask.curval
+
+                       if received_signal:
+                               sys.exit(received_signal[0])
+               finally:
+                       # Restore previous handlers
+                       if earlier_sigint_handler is not None:
+                               signal.signal(signal.SIGINT, earlier_sigint_handler)
+                       if earlier_sigterm_handler is not None:
+                               signal.signal(signal.SIGTERM, earlier_sigterm_handler)
+
+               merge_count = mergetask.curval
 
                if retval == os.EX_OK and not (buildpkgonly or fetchonly or pretend):
                        if "yes" == settings.get("AUTOCLEAN"):
@@ -1809,7 +1831,30 @@ def action_regen(settings, portdb, max_jobs, max_load):
        sys.stdout.flush()
 
        regen = MetadataRegen(portdb, max_jobs=max_jobs, max_load=max_load)
-       regen.run()
+       received_signal = []
+
+       def emergeexitsig(signum, frame):
+               signal.signal(signal.SIGINT, signal.SIG_IGN)
+               signal.signal(signal.SIGTERM, signal.SIG_IGN)
+               portage.util.writemsg("\n\nExiting on signal %(signal)s\n" % \
+                       {"signal":signum})
+               regen.terminate()
+               received_signal.append(128 + signum)
+
+       earlier_sigint_handler = signal.signal(signal.SIGINT, emergeexitsig)
+       earlier_sigterm_handler = signal.signal(signal.SIGTERM, emergeexitsig)
+
+       try:
+               regen.run()
+
+               if received_signal:
+                       sys.exit(received_signal[0])
+       finally:
+               # Restore previous handlers
+               if earlier_sigint_handler is not None:
+                       signal.signal(signal.SIGINT, earlier_sigint_handler)
+               if earlier_sigterm_handler is not None:
+                       signal.signal(signal.SIGTERM, earlier_sigterm_handler)
 
        portage.writemsg_stdout("done!\n")
        return regen.returncode