self._max_jobs = 1
self._max_load = None
self._jobs = 0
+ self._poll_event_queue = []
self._poll_event_handlers = {}
self._poll_event_handler_ids = {}
# Increment id for each new handler.
return True
+ def _next_poll_event(self):
+ """
+ Since the _schedule_wait() loop is called by event
+ handlers from _poll_loop(), maintain a central event
+ queue for both of them to share events from a single
+ poll() call.
+ """
+ if not self._poll_event_queue:
+ self._poll_event_queue.extend(self._poll.poll())
+ return self._poll_event_queue.pop()
+
def _poll_loop(self):
event_handlers = self._poll_event_handlers
- poll = self._poll.poll
- state_change = 0
+ event_handled = False
while event_handlers:
- for f, event in poll():
- handler, reg_id = event_handlers[f]
- if not handler(f, event):
- state_change += 1
+ f, event = self._next_poll_event()
+ handler, reg_id = event_handlers[f]
+ handler(f, event)
+ event_handled = True
- if not state_change:
+ if not event_handled:
raise AssertionError("tight loop")
def _register(self, f, eventmask, handler):
"""
event_handlers = self._poll_event_handlers
handler_ids = self._poll_event_handler_ids
- poll = self._poll.poll
+ event_handled = False
if isinstance(wait_ids, int):
wait_ids = frozenset([wait_ids])
while wait_ids.intersection(handler_ids):
- for f, event in poll():
- handler, reg_id = event_handlers[f]
- handler(f, event)
+ f, event = self._next_poll_event()
+ handler, reg_id = event_handlers[f]
+ handler(f, event)
+ event_handled = True
+
+ return event_handled
class QueueScheduler(PollScheduler):