EventLoop: thread-safe idle_add and timeout_add
authorZac Medico <zmedico@gentoo.org>
Thu, 27 Dec 2012 02:31:18 +0000 (18:31 -0800)
committerZac Medico <zmedico@gentoo.org>
Thu, 27 Dec 2012 02:39:20 +0000 (18:39 -0800)
This may be useful for using threads to handle blocking IO with Jython,
since Jython lacks the fcntl module which is needed for non-blocking IO
(see http://bugs.jython.org/issue1074).

pym/portage/util/_eventloop/EventLoop.py

index 17a468f28852d9771293b6f1b6a7b35c42160f9c..37e600792f0e7976c6af6770e725d43420f6c643 100644 (file)
@@ -9,12 +9,23 @@ import select
 import signal
 import time
 
+try:
+       import threading
+except ImportError:
+       import dummy_threading as threading
+
 from portage.util import writemsg_level
 from ..SlotObject import SlotObject
 from .PollConstants import PollConstants
 from .PollSelectAdapter import PollSelectAdapter
 
 class EventLoop(object):
+       """
+       An event loop, intended to be compatible with the GLib event loop.
+       Call the iteration method in order to execute one iteration of the
+       loop. The idle_add and timeout_add methods serve as thread-safe
+       means to interact with the loop's thread.
+       """
 
        supports_multiprocessing = True
 
@@ -44,6 +55,7 @@ class EventLoop(object):
                @type main: bool
                """
                self._use_signal = main
+               self._thread_rlock = threading.RLock()
                self._poll_event_queue = []
                self._poll_event_handlers = {}
                self._poll_event_handler_ids = {}
@@ -89,6 +101,14 @@ class EventLoop(object):
                self._sigchld_src_id = None
                self._pid = os.getpid()
 
+       def _new_source_id(self):
+               """
+               Generate a new source id. This method is thread-safe.
+               """
+               with self._thread_rlock:
+                       self._event_handler_id += 1
+                       return self._event_handler_id
+
        def _poll(self, timeout=None):
                """
                All poll() calls pass through here. The poll events
@@ -199,14 +219,17 @@ class EventLoop(object):
                return bool(events_handled)
 
        def _get_poll_timeout(self):
-               if self._child_handlers:
-                       if self._timeout_interval is None:
-                               timeout = self._sigchld_interval
+
+               with self._thread_rlock:
+                       if self._child_handlers:
+                               if self._timeout_interval is None:
+                                       timeout = self._sigchld_interval
+                               else:
+                                       timeout = min(self._sigchld_interval,
+                                               self._timeout_interval)
                        else:
-                               timeout = min(self._sigchld_interval,
-                                       self._timeout_interval)
-               else:
-                       timeout = self._timeout_interval
+                               timeout = self._timeout_interval
+
                return timeout
 
        def child_watch_add(self, pid, callback, data=None):
@@ -229,8 +252,7 @@ class EventLoop(object):
                @rtype: int
                @return: an integer ID
                """
-               self._event_handler_id += 1
-               source_id = self._event_handler_id
+               source_id = self._new_source_id()
                self._child_handlers[source_id] = self._child_callback_class(
                        callback=callback, data=data, pid=pid, source_id=source_id)
 
@@ -304,20 +326,21 @@ class EventLoop(object):
                """
                Like glib.idle_add(), if callback returns False it is
                automatically removed from the list of event sources and will
-               not be called again.
+               not be called again. This method is thread-safe.
 
                @type callback: callable
                @param callback: a function to call
                @rtype: int
                @return: an integer ID
                """
-               self._event_handler_id += 1
-               source_id = self._event_handler_id
-               self._idle_callbacks[source_id] = self._idle_callback_class(
-                       args=args, callback=callback, source_id=source_id)
+               with self._thread_rlock:
+                       source_id = self._new_source_id()
+                       self._idle_callbacks[source_id] = self._idle_callback_class(
+                               args=args, callback=callback, source_id=source_id)
                return source_id
 
        def _run_idle_callbacks(self):
+               # assumes caller has acquired self._thread_rlock
                if not self._idle_callbacks:
                        return
                # Iterate of our local list, since self._idle_callbacks can be
@@ -342,16 +365,18 @@ class EventLoop(object):
                milliseconds between calls to your function, and your function
                should return False to stop being called, or True to continue
                being called. Any additional positional arguments given here
-               are passed to your function when it's called.
+               are passed to your function when it's called. This method is
+               thread-safe.
                """
-               self._event_handler_id += 1
-               source_id = self._event_handler_id
-               self._timeout_handlers[source_id] = \
-                       self._timeout_handler_class(
-                               interval=interval, function=function, args=args,
-                               source_id=source_id, timestamp=time.time())
-               if self._timeout_interval is None or self._timeout_interval > interval:
-                       self._timeout_interval = interval
+               with self._thread_rlock:
+                       source_id = self._new_source_id()
+                       self._timeout_handlers[source_id] = \
+                               self._timeout_handler_class(
+                                       interval=interval, function=function, args=args,
+                                       source_id=source_id, timestamp=time.time())
+                       if self._timeout_interval is None or \
+                               self._timeout_interval > interval:
+                               self._timeout_interval = interval
                return source_id
 
        def _run_timeouts(self):
@@ -361,37 +386,39 @@ class EventLoop(object):
                        if self._poll_child_processes():
                                calls += 1
 
-               self._run_idle_callbacks()
-
-               if not self._timeout_handlers:
-                       return bool(calls)
-
-               ready_timeouts = []
-               current_time = time.time()
-               for x in self._timeout_handlers.values():
-                       elapsed_seconds = current_time - x.timestamp
-                       # elapsed_seconds < 0 means the system clock has been adjusted
-                       if elapsed_seconds < 0 or \
-                               (x.interval - 1000 * elapsed_seconds) <= 0:
-                               ready_timeouts.append(x)
-
-               # Iterate of our local list, since self._timeout_handlers can be
-               # modified during the exection of these callbacks.
-               for x in ready_timeouts:
-                       if x.source_id not in self._timeout_handlers:
-                               # it got cancelled while executing another timeout
-                               continue
-                       if x.calling:
-                               # don't call it recursively
-                               continue
-                       calls += 1
-                       x.calling = True
-                       try:
-                               x.timestamp = time.time()
-                               if not x.function(*x.args):
-                                       self.source_remove(x.source_id)
-                       finally:
-                               x.calling = False
+               with self._thread_rlock:
+
+                       self._run_idle_callbacks()
+
+                       if not self._timeout_handlers:
+                               return bool(calls)
+
+                       ready_timeouts = []
+                       current_time = time.time()
+                       for x in self._timeout_handlers.values():
+                               elapsed_seconds = current_time - x.timestamp
+                               # elapsed_seconds < 0 means the system clock has been adjusted
+                               if elapsed_seconds < 0 or \
+                                       (x.interval - 1000 * elapsed_seconds) <= 0:
+                                       ready_timeouts.append(x)
+
+                       # Iterate of our local list, since self._timeout_handlers can be
+                       # modified during the exection of these callbacks.
+                       for x in ready_timeouts:
+                               if x.source_id not in self._timeout_handlers:
+                                       # it got cancelled while executing another timeout
+                                       continue
+                               if x.calling:
+                                       # don't call it recursively
+                                       continue
+                               calls += 1
+                               x.calling = True
+                               try:
+                                       x.timestamp = time.time()
+                                       if not x.function(*x.args):
+                                               self.source_remove(x.source_id)
+                               finally:
+                                       x.calling = False
 
                return bool(calls)
 
@@ -413,8 +440,7 @@ class EventLoop(object):
                """
                if f in self._poll_event_handlers:
                        raise AssertionError("fd %d is already registered" % f)
-               self._event_handler_id += 1
-               source_id = self._event_handler_id
+               source_id = self._new_source_id()
                self._poll_event_handler_ids[source_id] = f
                self._poll_event_handlers[f] = self._io_handler_class(
                        args=args, callback=callback, f=f, source_id=source_id)
@@ -434,18 +460,21 @@ class EventLoop(object):
                                self.source_remove(self._sigchld_src_id)
                                self._sigchld_src_id = None
                        return True
-               idle_callback = self._idle_callbacks.pop(reg_id, None)
-               if idle_callback is not None:
-                       return True
-               timeout_handler = self._timeout_handlers.pop(reg_id, None)
-               if timeout_handler is not None:
-                       if timeout_handler.interval == self._timeout_interval:
-                               if self._timeout_handlers:
-                                       self._timeout_interval = \
-                                               min(x.interval for x in self._timeout_handlers.values())
-                               else:
-                                       self._timeout_interval = None
-                       return True
+
+               with self._thread_rlock:
+                       idle_callback = self._idle_callbacks.pop(reg_id, None)
+                       if idle_callback is not None:
+                               return True
+                       timeout_handler = self._timeout_handlers.pop(reg_id, None)
+                       if timeout_handler is not None:
+                               if timeout_handler.interval == self._timeout_interval:
+                                       if self._timeout_handlers:
+                                               self._timeout_interval = min(x.interval
+                                                       for x in self._timeout_handlers.values())
+                                       else:
+                                               self._timeout_interval = None
+                               return True
+
                f = self._poll_event_handler_ids.pop(reg_id, None)
                if f is None:
                        return False