From c4451a1e94212025e060cfd8e6a2341527202086 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Sat, 15 Jan 2011 16:00:35 -0800 Subject: [PATCH] Add PollScheduler.terminate() for interruption. This allows PollScheduler instances to do basic cleanup and terminate gracefully when SIGINT or SIGTERM signals are received. --- bin/egencache | 25 ++++++++++- pym/_emerge/AbstractEbuildProcess.py | 7 +-- pym/_emerge/MetadataRegen.py | 26 +++++++++-- pym/_emerge/PollScheduler.py | 31 ++++++++++++- pym/_emerge/Scheduler.py | 25 +++++++---- pym/_emerge/actions.py | 67 +++++++++++++++++++++++----- 6 files changed, 153 insertions(+), 28 deletions(-) diff --git a/bin/egencache b/bin/egencache index 2fb30a07c..bf729c380 100755 --- a/bin/egencache +++ b/bin/egencache @@ -258,7 +258,30 @@ class GenCache(object): level=logging.ERROR, noiselevel=-1) def run(self): - self._regen.run() + + received_signal = [] + + def sighandler(signum, frame): + signal.signal(signal.SIGINT, signal.SIG_IGN) + signal.signal(signal.SIGTERM, signal.SIG_IGN) + self._regen.terminate() + received_signal.append(128 + signum) + + earlier_sigint_handler = signal.signal(signal.SIGINT, sighandler) + earlier_sigterm_handler = signal.signal(signal.SIGTERM, sighandler) + + try: + self._regen.run() + + if received_signal: + sys.exit(received_signal[0]) + finally: + # Restore previous handlers + if earlier_sigint_handler is not None: + signal.signal(signal.SIGINT, earlier_sigint_handler) + if earlier_sigterm_handler is not None: + signal.signal(signal.SIGTERM, earlier_sigterm_handler) + self.returncode |= self._regen.returncode cp_missing = self._cp_missing diff --git a/pym/_emerge/AbstractEbuildProcess.py b/pym/_emerge/AbstractEbuildProcess.py index 46c8f938f..601aafe0e 100644 --- a/pym/_emerge/AbstractEbuildProcess.py +++ b/pym/_emerge/AbstractEbuildProcess.py @@ -1,4 +1,4 @@ -# Copyright 1999-2010 Gentoo Foundation +# Copyright 1999-2011 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 import platform @@ -237,11 +237,12 @@ class AbstractEbuildProcess(SpawnProcess): self.returncode = self._exit_command.exitcode else: self.returncode = 1 - self._unexpected_exit() + if not self.cancelled: + self._unexpected_exit() if self._build_dir is not None: self._build_dir.unlock() self._build_dir = None - else: + elif not self.cancelled: exit_file = self.settings.get('PORTAGE_EBUILD_EXIT_FILE') if exit_file and not os.path.exists(exit_file): self.returncode = 1 diff --git a/pym/_emerge/MetadataRegen.py b/pym/_emerge/MetadataRegen.py index bb7bb149c..45c4f4d29 100644 --- a/pym/_emerge/MetadataRegen.py +++ b/pym/_emerge/MetadataRegen.py @@ -1,4 +1,4 @@ -# Copyright 1999-2010 Gentoo Foundation +# Copyright 1999-2011 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 import portage @@ -32,6 +32,11 @@ class MetadataRegen(PollScheduler): self._process_iter = self._iter_metadata_processes() self.returncode = os.EX_OK self._error_count = 0 + self._running_tasks = set() + + def _terminate_tasks(self): + while self._running_tasks: + self._running_tasks.pop().cancel() def _iter_every_cp(self): portage.writemsg_stdout("Listing available packages...\n") @@ -39,7 +44,7 @@ class MetadataRegen(PollScheduler): portage.writemsg_stdout("Regenerating cache entries...\n") every_cp.sort(reverse=True) try: - while True: + while not self._terminated.is_set(): yield every_cp.pop() except IndexError: pass @@ -51,10 +56,14 @@ class MetadataRegen(PollScheduler): consumer = self._consumer for cp in self._cp_iter: + if self._terminated.is_set(): + break cp_set.add(cp) portage.writemsg_stdout("Processing %s\n" % cp) cpv_list = portdb.cp_list(cp) for cpv in cpv_list: + if self._terminated.is_set(): + break valid_pkgs.add(cpv) ebuild_path, repo_path = portdb.findname2(cpv) if ebuild_path is None: @@ -85,6 +94,10 @@ class MetadataRegen(PollScheduler): while self._jobs: self._poll_loop() + if self._terminated.is_set(): + self.returncode = 1 + return + if self._global_cleanse: for mytree in portdb.porttrees: try: @@ -133,12 +146,15 @@ class MetadataRegen(PollScheduler): False otherwise. """ while self._can_add_job(): + if self._terminated.is_set(): + return False try: metadata_process = next(self._process_iter) except StopIteration: return False self._jobs += 1 + self._running_tasks.add(metadata_process) metadata_process.scheduler = self.sched_iface metadata_process.addExitListener(self._metadata_exit) metadata_process.start() @@ -146,12 +162,14 @@ class MetadataRegen(PollScheduler): def _metadata_exit(self, metadata_process): self._jobs -= 1 + self._running_tasks.discard(metadata_process) if metadata_process.returncode != os.EX_OK: self.returncode = 1 self._error_count += 1 self._valid_pkgs.discard(metadata_process.cpv) - portage.writemsg("Error processing %s, continuing...\n" % \ - (metadata_process.cpv,), noiselevel=-1) + if not self._terminated.is_set(): + portage.writemsg("Error processing %s, continuing...\n" % \ + (metadata_process.cpv,), noiselevel=-1) if self._consumer is not None: # On failure, still notify the consumer (in this case the metadata diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py index e71350e5b..a319066c0 100644 --- a/pym/_emerge/PollScheduler.py +++ b/pym/_emerge/PollScheduler.py @@ -1,4 +1,4 @@ -# Copyright 1999-2010 Gentoo Foundation +# Copyright 1999-2011 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 import gzip @@ -6,6 +6,11 @@ import logging import select import time +try: + import threading +except ImportError: + import dummy_threading as threading + from portage import _encodings from portage import _unicode_encode from portage.util import writemsg_level @@ -21,6 +26,8 @@ class PollScheduler(object): __slots__ = ("output", "register", "schedule", "unregister") def __init__(self): + self._terminated = threading.Event() + self._terminated_tasks = False self._max_jobs = 1 self._max_load = None self._jobs = 0 @@ -38,6 +45,24 @@ class PollScheduler(object): schedule=self._schedule_wait, unregister=self._unregister) + def terminate(self): + """ + Schedules asynchronous, graceful termination of the scheduler + at the earliest opportunity. + + This method is thread-safe (and safe for signal handlers). + """ + self._terminated.set() + + def _terminate_tasks(self): + """ + Send signals to terminate all tasks. This is called once + from the event dispatching thread. All task should be + cleaned up at the earliest opportunity, but not necessarily + before this method returns. + """ + raise NotImplementedError() + def _schedule(self): """ Calls _schedule_tasks() and automatically returns early from @@ -120,6 +145,10 @@ class PollScheduler(object): raises StopIteration if timeout is None and there are no file descriptors to poll. """ + if self._terminated.is_set() and \ + not self._terminated_tasks: + self._terminated_tasks = True + self._terminate_tasks() if not self._poll_event_queue: self._poll(timeout) if not self._poll_event_queue: diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py index f637dd075..ac48f853e 100644 --- a/pym/_emerge/Scheduler.py +++ b/pym/_emerge/Scheduler.py @@ -1,4 +1,4 @@ -# Copyright 1999-2010 Gentoo Foundation +# Copyright 1999-2011 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 from __future__ import print_function @@ -292,6 +292,11 @@ class Scheduler(PollScheduler): self._running_portage = self._pkg(cpv, "installed", self._running_root, installed=True) + def _terminate_tasks(self): + self._status_display.quiet = True + for q in self._task_queues.values(): + q.clear() + def _init_graph(self, graph_config): """ Initialization structures used for dependency calculations @@ -1162,6 +1167,10 @@ class Scheduler(PollScheduler): while True: rval = self._merge() + + if self._terminated.is_set(): + return 1 + if rval == os.EX_OK or fetchonly or not keep_going: break if "resume" not in mtimedb: @@ -1400,9 +1409,9 @@ class Scheduler(PollScheduler): build_dir=build_dir, build_log=build_log, pkg=pkg, returncode=merge.returncode)) - self._failed_pkg_msg(self._failed_pkgs[-1], "install", "to") - - self._status_display.failed = len(self._failed_pkgs) + if not self._terminated.is_set(): + self._failed_pkg_msg(self._failed_pkgs[-1], "install", "to") + self._status_display.failed = len(self._failed_pkgs) return self._task_complete(pkg) @@ -1454,9 +1463,9 @@ class Scheduler(PollScheduler): build_dir=build_dir, build_log=build_log, pkg=build.pkg, returncode=build.returncode)) - self._failed_pkg_msg(self._failed_pkgs[-1], "emerge", "for") - - self._status_display.failed = len(self._failed_pkgs) + if not self._terminated.is_set(): + self._failed_pkg_msg(self._failed_pkgs[-1], "emerge", "for") + self._status_display.failed = len(self._failed_pkgs) self._deallocate_config(build.settings) self._jobs -= 1 self._status_display.running = self._jobs @@ -1633,7 +1642,7 @@ class Scheduler(PollScheduler): self._poll_loop() def _keep_scheduling(self): - return bool(self._pkg_queue and \ + return bool(not self._terminated.is_set() and self._pkg_queue and \ not (self._failed_pkgs and not self._build_opts.fetchonly)) def _is_work_scheduled(self): diff --git a/pym/_emerge/actions.py b/pym/_emerge/actions.py index 8d30752f3..37debe0b1 100644 --- a/pym/_emerge/actions.py +++ b/pym/_emerge/actions.py @@ -420,11 +420,7 @@ def action_build(settings, trees, mtimedb, mergetask = Scheduler(settings, trees, mtimedb, myopts, spinner, favorites=favorites, graph_config=mydepgraph.schedulerGraph()) - del mydepgraph - clear_caches(trees) - retval = mergetask.merge() - merge_count = mergetask.curval else: if "resume" in mtimedb and \ "mergelist" in mtimedb["resume"] and \ @@ -434,14 +430,40 @@ def action_build(settings, trees, mtimedb, mtimedb.commit() mydepgraph.saveNomergeFavorites() - mergetask = Scheduler(settings, trees, mtimedb, myopts, - spinner, favorites=favorites, - graph_config=mydepgraph.schedulerGraph()) - del mydepgraph - clear_caches(trees) + mergetask = Scheduler(settings, trees, mtimedb, myopts, + spinner, favorites=favorites, + graph_config=mydepgraph.schedulerGraph()) + + del mydepgraph + clear_caches(trees) + + received_signal = [] + + def emergeexitsig(signum, frame): + signal.signal(signal.SIGINT, signal.SIG_IGN) + signal.signal(signal.SIGTERM, signal.SIG_IGN) + portage.util.writemsg("\n\nExiting on signal %(signal)s\n" % \ + {"signal":signum}) + mergetask.terminate() + received_signal.append(128 + signum) + + earlier_sigint_handler = signal.signal(signal.SIGINT, emergeexitsig) + earlier_sigterm_handler = signal.signal(signal.SIGTERM, emergeexitsig) + + try: retval = mergetask.merge() - merge_count = mergetask.curval + + if received_signal: + sys.exit(received_signal[0]) + finally: + # Restore previous handlers + if earlier_sigint_handler is not None: + signal.signal(signal.SIGINT, earlier_sigint_handler) + if earlier_sigterm_handler is not None: + signal.signal(signal.SIGTERM, earlier_sigterm_handler) + + merge_count = mergetask.curval if retval == os.EX_OK and not (buildpkgonly or fetchonly or pretend): if "yes" == settings.get("AUTOCLEAN"): @@ -1819,7 +1841,30 @@ def action_regen(settings, portdb, max_jobs, max_load): sys.stdout.flush() regen = MetadataRegen(portdb, max_jobs=max_jobs, max_load=max_load) - regen.run() + received_signal = [] + + def emergeexitsig(signum, frame): + signal.signal(signal.SIGINT, signal.SIG_IGN) + signal.signal(signal.SIGTERM, signal.SIG_IGN) + portage.util.writemsg("\n\nExiting on signal %(signal)s\n" % \ + {"signal":signum}) + regen.terminate() + received_signal.append(128 + signum) + + earlier_sigint_handler = signal.signal(signal.SIGINT, emergeexitsig) + earlier_sigterm_handler = signal.signal(signal.SIGTERM, emergeexitsig) + + try: + regen.run() + + if received_signal: + sys.exit(received_signal[0]) + finally: + # Restore previous handlers + if earlier_sigint_handler is not None: + signal.signal(signal.SIGINT, earlier_sigint_handler) + if earlier_sigterm_handler is not None: + signal.signal(signal.SIGTERM, earlier_sigterm_handler) portage.writemsg_stdout("done!\n") return regen.returncode -- 2.26.2