PollScheduler: use local EventLoop (thread safe)
authorZac Medico <zmedico@gentoo.org>
Fri, 11 May 2012 06:32:26 +0000 (23:32 -0700)
committerZac Medico <zmedico@gentoo.org>
Fri, 11 May 2012 06:54:29 +0000 (23:54 -0700)
For API consumers, this makes the doebuild() function compatible with
threads, avoiding a ValueError raised by the signal module, as reported
at http://bugs.sabayon.org/show_bug.cgi?id=3305. Classes derived from
PollScheduler still use the signal module when possible.

pym/_emerge/MetadataRegen.py
pym/_emerge/PollScheduler.py
pym/_emerge/QueueScheduler.py
pym/_emerge/Scheduler.py
pym/_emerge/TaskScheduler.py
pym/portage/util/_eventloop/EventLoop.py

index 79446ee799aa440945edaee20ed4a190cf43a14b..e82015fd12eb6d879c579c76431d7b9bee1c0729 100644 (file)
@@ -11,7 +11,7 @@ class MetadataRegen(PollScheduler):
 
        def __init__(self, portdb, cp_iter=None, consumer=None,
                max_jobs=None, max_load=None):
-               PollScheduler.__init__(self)
+               PollScheduler.__init__(self, main=True)
                self._portdb = portdb
                self._global_cleanse = False
                if cp_iter is None:
index 1c631c3f27b45d70c9387408d4cc871cc6665c34..965dc20ee03e74de8680bad50a0edc5ad46ce263 100644 (file)
@@ -13,6 +13,7 @@ from portage import _encodings
 from portage import _unicode_encode
 from portage.util import writemsg_level
 from portage.util.SlotObject import SlotObject
+from portage.util._eventloop.EventLoop import EventLoop
 from portage.util._eventloop.global_event_loop import global_event_loop
 
 from _emerge.getloadavg import getloadavg
@@ -26,7 +27,13 @@ class PollScheduler(object):
                        "output", "register", "run",
                        "source_remove", "timeout_add", "unregister")
 
-       def __init__(self):
+       def __init__(self, main=False):
+               """
+               @param main: If True then use global_event_loop(), otherwise use
+                       a local EventLoop instance (default is False, for safe use in
+                       a non-main thread)
+               @type main: bool
+               """
                self._terminated = threading.Event()
                self._terminated_tasks = False
                self._max_jobs = 1
@@ -34,7 +41,10 @@ class PollScheduler(object):
                self._jobs = 0
                self._scheduling = False
                self._background = False
-               self._event_loop = global_event_loop()
+               if main:
+                       self._event_loop = global_event_loop()
+               else:
+                       self._event_loop = EventLoop(main=False)
                self.sched_iface = self._sched_iface_class(
                        IO_ERR=self._event_loop.IO_ERR,
                        IO_HUP=self._event_loop.IO_HUP,
index 9d73b782631a56a1c0c9aa180f2b5fac9a0f4866..206087c7af4ebbf7e166ea8da9a306b92b01f678 100644 (file)
@@ -10,8 +10,8 @@ class QueueScheduler(PollScheduler):
        run() method returns when no tasks remain.
        """
 
-       def __init__(self, max_jobs=None, max_load=None):
-               PollScheduler.__init__(self)
+       def __init__(self, main=True, max_jobs=None, max_load=None):
+               PollScheduler.__init__(self, main=main)
 
                if max_jobs is None:
                        max_jobs = 1
index 5500acfbb385a65b4f48d011d076453675ac0537..30a7e101b693b02b4cfbac126c5fe06bcaf083f1 100644 (file)
@@ -137,7 +137,7 @@ class Scheduler(PollScheduler):
        def __init__(self, settings, trees, mtimedb, myopts,
                spinner, mergelist=None, favorites=None, graph_config=None,
                uninstall_only=False):
-               PollScheduler.__init__(self)
+               PollScheduler.__init__(self, main=True)
 
                if mergelist is not None:
                        warnings.warn("The mergelist parameter of the " + \
index 71ac80f1432c2ff7268b9cc52e0cf6079267618d..583bfe323940707e1b45fdcea2e95faf6b914db7 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright 1999-2009 Gentoo Foundation
+# Copyright 1999-2012 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
 from _emerge.QueueScheduler import QueueScheduler
@@ -11,9 +11,9 @@ class TaskScheduler(object):
        add tasks and call run(). The run() method returns when no tasks remain.
        """
 
-       def __init__(self, max_jobs=None, max_load=None):
+       def __init__(self, main=True, max_jobs=None, max_load=None):
                self._queue = SequentialTaskQueue(max_jobs=max_jobs)
-               self._scheduler = QueueScheduler(
+               self._scheduler = QueueScheduler(main=main,
                        max_jobs=max_jobs, max_load=max_load)
                self.sched_iface = self._scheduler.sched_iface
                self.run = self._scheduler.run
index ef20ce40106737323bc84e111bc9137afbab5bbc..bbbce5261c0ca12f00bfb83f800f87f6183b7545 100644 (file)
@@ -35,7 +35,15 @@ class EventLoop(object):
                __slots__ = ("args", "function", "calling", "interval", "source_id",
                        "timestamp")
 
-       def __init__(self):
+       def __init__(self, main=True):
+               """
+               @param main: If True then this is a singleton instance for use
+                       in the main thread, otherwise it is a local instance which
+                       can safely be use in a non-main thread (default is True, so
+                       that global_event_loop does not need constructor arguments)
+               @type main: bool
+               """
+               self._use_signal = main
                self._poll_event_queue = []
                self._poll_event_handlers = {}
                self._poll_event_handler_ids = {}
@@ -198,20 +206,22 @@ class EventLoop(object):
                self._child_handlers[source_id] = self._child_callback_class(
                        callback=callback, data=data, pid=pid, source_id=source_id)
 
-               if self._sigchld_read is None:
-                       self._sigchld_read, self._sigchld_write = os.pipe()
-                       fcntl.fcntl(self._sigchld_read, fcntl.F_SETFL,
-                               fcntl.fcntl(self._sigchld_read, fcntl.F_GETFL) | os.O_NONBLOCK)
-
-               # The IO watch is dynamically registered and unregistered as
-               # needed, since we don't want to consider it as a valid source
-               # of events when there are no child listeners. It's important
-               # to distinguish when there are no valid sources of IO events,
-               # in order to avoid an endless poll call if there's no timeout.
-               if self._sigchld_src_id is None:
-                       self._sigchld_src_id = self.io_add_watch(
-                               self._sigchld_read, self.IO_IN, self._sigchld_io_cb)
-                       signal.signal(signal.SIGCHLD, self._sigchld_sig_cb)
+               if self._use_signal:
+                       if self._sigchld_read is None:
+                               self._sigchld_read, self._sigchld_write = os.pipe()
+                               fcntl.fcntl(self._sigchld_read, fcntl.F_SETFL,
+                                       fcntl.fcntl(self._sigchld_read,
+                                       fcntl.F_GETFL) | os.O_NONBLOCK)
+
+                       # The IO watch is dynamically registered and unregistered as
+                       # needed, since we don't want to consider it as a valid source
+                       # of events when there are no child listeners. It's important
+                       # to distinguish when there are no valid sources of IO events,
+                       # in order to avoid an endless poll call if there's no timeout.
+                       if self._sigchld_src_id is None:
+                               self._sigchld_src_id = self.io_add_watch(
+                                       self._sigchld_read, self.IO_IN, self._sigchld_io_cb)
+                               signal.signal(signal.SIGCHLD, self._sigchld_sig_cb)
 
                # poll now, in case the SIGCHLD has already arrived
                self._poll_child_processes()
@@ -318,10 +328,15 @@ class EventLoop(object):
 
        def _run_timeouts(self):
 
+               calls = 0
+               if not self._use_signal:
+                       if self._poll_child_processes():
+                               calls += 1
+
                self._run_idle_callbacks()
 
                if not self._timeout_handlers:
-                       return False
+                       return bool(calls)
 
                ready_timeouts = []
                current_time = time.time()
@@ -334,7 +349,6 @@ class EventLoop(object):
 
                # Iterate of our local list, since self._timeout_handlers can be
                # modified during the exection of these callbacks.
-               calls = 0
                for x in ready_timeouts:
                        if x.source_id not in self._timeout_handlers:
                                # it got cancelled while executing another timeout
@@ -387,7 +401,7 @@ class EventLoop(object):
                """
                x = self._child_handlers.pop(reg_id, None)
                if x is not None:
-                       if not self._child_handlers:
+                       if not self._child_handlers and self._use_signal:
                                signal.signal(signal.SIGCHLD, signal.SIG_DFL)
                                self.source_remove(self._sigchld_src_id)
                                self._sigchld_src_id = None