import signal
import time
+try:
+ import threading
+except ImportError:
+ import dummy_threading as threading
+
from portage.util import writemsg_level
from ..SlotObject import SlotObject
from .PollConstants import PollConstants
from .PollSelectAdapter import PollSelectAdapter
class EventLoop(object):
+ """
+ An event loop, intended to be compatible with the GLib event loop.
+ Call the iteration method in order to execute one iteration of the
+ loop. The idle_add and timeout_add methods serve as thread-safe
+ means to interact with the loop's thread.
+ """
supports_multiprocessing = True
@type main: bool
"""
self._use_signal = main
+ self._thread_rlock = threading.RLock()
self._poll_event_queue = []
self._poll_event_handlers = {}
self._poll_event_handler_ids = {}
self._sigchld_src_id = None
self._pid = os.getpid()
+ def _new_source_id(self):
+ """
+ Generate a new source id. This method is thread-safe.
+ """
+ with self._thread_rlock:
+ self._event_handler_id += 1
+ return self._event_handler_id
+
def _poll(self, timeout=None):
"""
All poll() calls pass through here. The poll events
return bool(events_handled)
def _get_poll_timeout(self):
- if self._child_handlers:
- if self._timeout_interval is None:
- timeout = self._sigchld_interval
+
+ with self._thread_rlock:
+ if self._child_handlers:
+ if self._timeout_interval is None:
+ timeout = self._sigchld_interval
+ else:
+ timeout = min(self._sigchld_interval,
+ self._timeout_interval)
else:
- timeout = min(self._sigchld_interval,
- self._timeout_interval)
- else:
- timeout = self._timeout_interval
+ timeout = self._timeout_interval
+
return timeout
def child_watch_add(self, pid, callback, data=None):
@rtype: int
@return: an integer ID
"""
- self._event_handler_id += 1
- source_id = self._event_handler_id
+ source_id = self._new_source_id()
self._child_handlers[source_id] = self._child_callback_class(
callback=callback, data=data, pid=pid, source_id=source_id)
"""
Like glib.idle_add(), if callback returns False it is
automatically removed from the list of event sources and will
- not be called again.
+ not be called again. This method is thread-safe.
@type callback: callable
@param callback: a function to call
@rtype: int
@return: an integer ID
"""
- self._event_handler_id += 1
- source_id = self._event_handler_id
- self._idle_callbacks[source_id] = self._idle_callback_class(
- args=args, callback=callback, source_id=source_id)
+ with self._thread_rlock:
+ source_id = self._new_source_id()
+ self._idle_callbacks[source_id] = self._idle_callback_class(
+ args=args, callback=callback, source_id=source_id)
return source_id
def _run_idle_callbacks(self):
+ # assumes caller has acquired self._thread_rlock
if not self._idle_callbacks:
return
# Iterate of our local list, since self._idle_callbacks can be
milliseconds between calls to your function, and your function
should return False to stop being called, or True to continue
being called. Any additional positional arguments given here
- are passed to your function when it's called.
+ are passed to your function when it's called. This method is
+ thread-safe.
"""
- self._event_handler_id += 1
- source_id = self._event_handler_id
- self._timeout_handlers[source_id] = \
- self._timeout_handler_class(
- interval=interval, function=function, args=args,
- source_id=source_id, timestamp=time.time())
- if self._timeout_interval is None or self._timeout_interval > interval:
- self._timeout_interval = interval
+ with self._thread_rlock:
+ source_id = self._new_source_id()
+ self._timeout_handlers[source_id] = \
+ self._timeout_handler_class(
+ interval=interval, function=function, args=args,
+ source_id=source_id, timestamp=time.time())
+ if self._timeout_interval is None or \
+ self._timeout_interval > interval:
+ self._timeout_interval = interval
return source_id
def _run_timeouts(self):
if self._poll_child_processes():
calls += 1
- self._run_idle_callbacks()
-
- if not self._timeout_handlers:
- return bool(calls)
-
- ready_timeouts = []
- current_time = time.time()
- for x in self._timeout_handlers.values():
- elapsed_seconds = current_time - x.timestamp
- # elapsed_seconds < 0 means the system clock has been adjusted
- if elapsed_seconds < 0 or \
- (x.interval - 1000 * elapsed_seconds) <= 0:
- ready_timeouts.append(x)
-
- # Iterate of our local list, since self._timeout_handlers can be
- # modified during the exection of these callbacks.
- for x in ready_timeouts:
- if x.source_id not in self._timeout_handlers:
- # it got cancelled while executing another timeout
- continue
- if x.calling:
- # don't call it recursively
- continue
- calls += 1
- x.calling = True
- try:
- x.timestamp = time.time()
- if not x.function(*x.args):
- self.source_remove(x.source_id)
- finally:
- x.calling = False
+ with self._thread_rlock:
+
+ self._run_idle_callbacks()
+
+ if not self._timeout_handlers:
+ return bool(calls)
+
+ ready_timeouts = []
+ current_time = time.time()
+ for x in self._timeout_handlers.values():
+ elapsed_seconds = current_time - x.timestamp
+ # elapsed_seconds < 0 means the system clock has been adjusted
+ if elapsed_seconds < 0 or \
+ (x.interval - 1000 * elapsed_seconds) <= 0:
+ ready_timeouts.append(x)
+
+ # Iterate of our local list, since self._timeout_handlers can be
+ # modified during the exection of these callbacks.
+ for x in ready_timeouts:
+ if x.source_id not in self._timeout_handlers:
+ # it got cancelled while executing another timeout
+ continue
+ if x.calling:
+ # don't call it recursively
+ continue
+ calls += 1
+ x.calling = True
+ try:
+ x.timestamp = time.time()
+ if not x.function(*x.args):
+ self.source_remove(x.source_id)
+ finally:
+ x.calling = False
return bool(calls)
"""
if f in self._poll_event_handlers:
raise AssertionError("fd %d is already registered" % f)
- self._event_handler_id += 1
- source_id = self._event_handler_id
+ source_id = self._new_source_id()
self._poll_event_handler_ids[source_id] = f
self._poll_event_handlers[f] = self._io_handler_class(
args=args, callback=callback, f=f, source_id=source_id)
self.source_remove(self._sigchld_src_id)
self._sigchld_src_id = None
return True
- idle_callback = self._idle_callbacks.pop(reg_id, None)
- if idle_callback is not None:
- return True
- timeout_handler = self._timeout_handlers.pop(reg_id, None)
- if timeout_handler is not None:
- if timeout_handler.interval == self._timeout_interval:
- if self._timeout_handlers:
- self._timeout_interval = \
- min(x.interval for x in self._timeout_handlers.values())
- else:
- self._timeout_interval = None
- return True
+
+ with self._thread_rlock:
+ idle_callback = self._idle_callbacks.pop(reg_id, None)
+ if idle_callback is not None:
+ return True
+ timeout_handler = self._timeout_handlers.pop(reg_id, None)
+ if timeout_handler is not None:
+ if timeout_handler.interval == self._timeout_interval:
+ if self._timeout_handlers:
+ self._timeout_interval = min(x.interval
+ for x in self._timeout_handlers.values())
+ else:
+ self._timeout_interval = None
+ return True
+
f = self._poll_event_handler_ids.pop(reg_id, None)
if f is None:
return False