From 5228eb4ce60765cd421a5e363e51a71626739267 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Thu, 10 May 2012 23:32:26 -0700 Subject: [PATCH] PollScheduler: use local EventLoop (thread safe) For API consumers, this makes the doebuild() function compatible with threads, avoiding a ValueError raised by the signal module, as reported at http://bugs.sabayon.org/show_bug.cgi?id=3305. Classes derived from PollScheduler still use the signal module when possible. --- pym/_emerge/MetadataRegen.py | 2 +- pym/_emerge/PollScheduler.py | 14 ++++++- pym/_emerge/QueueScheduler.py | 4 +- pym/_emerge/Scheduler.py | 2 +- pym/_emerge/TaskScheduler.py | 6 +-- pym/portage/util/_eventloop/EventLoop.py | 50 +++++++++++++++--------- 6 files changed, 51 insertions(+), 27 deletions(-) diff --git a/pym/_emerge/MetadataRegen.py b/pym/_emerge/MetadataRegen.py index 79446ee79..e82015fd1 100644 --- a/pym/_emerge/MetadataRegen.py +++ b/pym/_emerge/MetadataRegen.py @@ -11,7 +11,7 @@ class MetadataRegen(PollScheduler): def __init__(self, portdb, cp_iter=None, consumer=None, max_jobs=None, max_load=None): - PollScheduler.__init__(self) + PollScheduler.__init__(self, main=True) self._portdb = portdb self._global_cleanse = False if cp_iter is None: diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py index 1c631c3f2..965dc20ee 100644 --- a/pym/_emerge/PollScheduler.py +++ b/pym/_emerge/PollScheduler.py @@ -13,6 +13,7 @@ from portage import _encodings from portage import _unicode_encode from portage.util import writemsg_level from portage.util.SlotObject import SlotObject +from portage.util._eventloop.EventLoop import EventLoop from portage.util._eventloop.global_event_loop import global_event_loop from _emerge.getloadavg import getloadavg @@ -26,7 +27,13 @@ class PollScheduler(object): "output", "register", "run", "source_remove", "timeout_add", "unregister") - def __init__(self): + def __init__(self, main=False): + """ + @param main: If True then use global_event_loop(), otherwise use + a local EventLoop instance (default is False, for safe use in + a non-main thread) + @type main: bool + """ self._terminated = threading.Event() self._terminated_tasks = False self._max_jobs = 1 @@ -34,7 +41,10 @@ class PollScheduler(object): self._jobs = 0 self._scheduling = False self._background = False - self._event_loop = global_event_loop() + if main: + self._event_loop = global_event_loop() + else: + self._event_loop = EventLoop(main=False) self.sched_iface = self._sched_iface_class( IO_ERR=self._event_loop.IO_ERR, IO_HUP=self._event_loop.IO_HUP, diff --git a/pym/_emerge/QueueScheduler.py b/pym/_emerge/QueueScheduler.py index 9d73b7826..206087c7a 100644 --- a/pym/_emerge/QueueScheduler.py +++ b/pym/_emerge/QueueScheduler.py @@ -10,8 +10,8 @@ class QueueScheduler(PollScheduler): run() method returns when no tasks remain. """ - def __init__(self, max_jobs=None, max_load=None): - PollScheduler.__init__(self) + def __init__(self, main=True, max_jobs=None, max_load=None): + PollScheduler.__init__(self, main=main) if max_jobs is None: max_jobs = 1 diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py index 5500acfbb..30a7e101b 100644 --- a/pym/_emerge/Scheduler.py +++ b/pym/_emerge/Scheduler.py @@ -137,7 +137,7 @@ class Scheduler(PollScheduler): def __init__(self, settings, trees, mtimedb, myopts, spinner, mergelist=None, favorites=None, graph_config=None, uninstall_only=False): - PollScheduler.__init__(self) + PollScheduler.__init__(self, main=True) if mergelist is not None: warnings.warn("The mergelist parameter of the " + \ diff --git a/pym/_emerge/TaskScheduler.py b/pym/_emerge/TaskScheduler.py index 71ac80f14..583bfe323 100644 --- a/pym/_emerge/TaskScheduler.py +++ b/pym/_emerge/TaskScheduler.py @@ -1,4 +1,4 @@ -# Copyright 1999-2009 Gentoo Foundation +# Copyright 1999-2012 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 from _emerge.QueueScheduler import QueueScheduler @@ -11,9 +11,9 @@ class TaskScheduler(object): add tasks and call run(). The run() method returns when no tasks remain. """ - def __init__(self, max_jobs=None, max_load=None): + def __init__(self, main=True, max_jobs=None, max_load=None): self._queue = SequentialTaskQueue(max_jobs=max_jobs) - self._scheduler = QueueScheduler( + self._scheduler = QueueScheduler(main=main, max_jobs=max_jobs, max_load=max_load) self.sched_iface = self._scheduler.sched_iface self.run = self._scheduler.run diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py index ef20ce401..bbbce5261 100644 --- a/pym/portage/util/_eventloop/EventLoop.py +++ b/pym/portage/util/_eventloop/EventLoop.py @@ -35,7 +35,15 @@ class EventLoop(object): __slots__ = ("args", "function", "calling", "interval", "source_id", "timestamp") - def __init__(self): + def __init__(self, main=True): + """ + @param main: If True then this is a singleton instance for use + in the main thread, otherwise it is a local instance which + can safely be use in a non-main thread (default is True, so + that global_event_loop does not need constructor arguments) + @type main: bool + """ + self._use_signal = main self._poll_event_queue = [] self._poll_event_handlers = {} self._poll_event_handler_ids = {} @@ -198,20 +206,22 @@ class EventLoop(object): self._child_handlers[source_id] = self._child_callback_class( callback=callback, data=data, pid=pid, source_id=source_id) - if self._sigchld_read is None: - self._sigchld_read, self._sigchld_write = os.pipe() - fcntl.fcntl(self._sigchld_read, fcntl.F_SETFL, - fcntl.fcntl(self._sigchld_read, fcntl.F_GETFL) | os.O_NONBLOCK) - - # The IO watch is dynamically registered and unregistered as - # needed, since we don't want to consider it as a valid source - # of events when there are no child listeners. It's important - # to distinguish when there are no valid sources of IO events, - # in order to avoid an endless poll call if there's no timeout. - if self._sigchld_src_id is None: - self._sigchld_src_id = self.io_add_watch( - self._sigchld_read, self.IO_IN, self._sigchld_io_cb) - signal.signal(signal.SIGCHLD, self._sigchld_sig_cb) + if self._use_signal: + if self._sigchld_read is None: + self._sigchld_read, self._sigchld_write = os.pipe() + fcntl.fcntl(self._sigchld_read, fcntl.F_SETFL, + fcntl.fcntl(self._sigchld_read, + fcntl.F_GETFL) | os.O_NONBLOCK) + + # The IO watch is dynamically registered and unregistered as + # needed, since we don't want to consider it as a valid source + # of events when there are no child listeners. It's important + # to distinguish when there are no valid sources of IO events, + # in order to avoid an endless poll call if there's no timeout. + if self._sigchld_src_id is None: + self._sigchld_src_id = self.io_add_watch( + self._sigchld_read, self.IO_IN, self._sigchld_io_cb) + signal.signal(signal.SIGCHLD, self._sigchld_sig_cb) # poll now, in case the SIGCHLD has already arrived self._poll_child_processes() @@ -318,10 +328,15 @@ class EventLoop(object): def _run_timeouts(self): + calls = 0 + if not self._use_signal: + if self._poll_child_processes(): + calls += 1 + self._run_idle_callbacks() if not self._timeout_handlers: - return False + return bool(calls) ready_timeouts = [] current_time = time.time() @@ -334,7 +349,6 @@ class EventLoop(object): # Iterate of our local list, since self._timeout_handlers can be # modified during the exection of these callbacks. - calls = 0 for x in ready_timeouts: if x.source_id not in self._timeout_handlers: # it got cancelled while executing another timeout @@ -387,7 +401,7 @@ class EventLoop(object): """ x = self._child_handlers.pop(reg_id, None) if x is not None: - if not self._child_handlers: + if not self._child_handlers and self._use_signal: signal.signal(signal.SIGCHLD, signal.SIG_DFL) self.source_remove(self._sigchld_src_id) self._sigchld_src_id = None -- 2.26.2