dead_nodes = {}
while self._schedule():
- self._poll_loop()
+ self.sched_iface.run()
while self._jobs:
- self._poll_loop()
+ self.sched_iface.run()
if self._terminated_tasks:
self.returncode = 1
del self._poll_event_handlers[f]
return True
-class PollScheduler(EventLoop):
+class PollScheduler(object):
class _sched_iface_class(SlotObject):
__slots__ = ("idle_add", "io_add_watch", "iteration",
- "output", "register", "schedule",
+ "output", "register", "run",
"source_remove", "timeout_add", "unregister")
def __init__(self):
self._jobs = 0
self._scheduling = False
self._background = False
+ self._event_loop = EventLoop()
+ self._event_loop._schedule = self._schedule
self.sched_iface = self._sched_iface_class(
- idle_add=self._idle_add,
- io_add_watch=self._register,
- iteration=self._iteration,
+ idle_add=self._event_loop._idle_add,
+ io_add_watch=self._event_loop._register,
+ iteration=self._event_loop._iteration,
output=self._task_output,
- register=self._register,
- schedule=self._poll_loop,
- source_remove=self._unregister,
- timeout_add=self._timeout_add,
- unregister=self._unregister)
+ register=self._event_loop._register,
+ run=self._event_loop._poll_loop,
+ source_remove=self._event_loop._unregister,
+ timeout_add=self._event_loop._timeout_add,
+ unregister=self._event_loop._unregister)
def terminate(self):
"""
timeout_callback.timed_out = True
return False
timeout_callback.timed_out = False
- timeout_callback.timeout_id = self._timeout_add(
+ timeout_callback.timeout_id = self.sched_iface.timeout_add(
timeout, timeout_callback)
try:
while not (timeout_callback is not None and
timeout_callback.timed_out) and self._schedule():
- self._iteration()
+ self.sched_iface.iteration()
while not (timeout_callback is not None and
timeout_callback.timed_out) and self._running_job_count():
- self._iteration()
+ self.sched_iface.iteration()
finally:
if timeout_callback is not None:
- self._unregister(timeout_callback.timeout_id)
+ self.sched_iface.unregister(timeout_callback.timeout_id)
def _schedule_tasks(self):
"""
self._status_display = JobStatusDisplay(
xterm_titles=('notitles' not in settings.features))
- self._idle_add(self._idle_schedule)
- self._timeout_add(self._max_display_latency,
+ self.sched_iface.idle_add(self._idle_schedule)
+ self.sched_iface.timeout_add(self._max_display_latency,
self._status_display.display)
self._max_load = myopts.get("--load-average")
max_jobs = myopts.get("--jobs")
schedule=self._schedule_fetch)
self._sched_iface = self._iface_class(
fetch=fetch_iface, output=self._task_output,
- idle_add=self._idle_add,
- io_add_watch=self._register,
- iteration=self._iteration,
- register=self._register,
- schedule=self._poll_loop,
+ idle_add=self._event_loop._idle_add,
+ io_add_watch=self._event_loop._register,
+ iteration=self._event_loop._iteration,
+ register=self._event_loop._register,
+ schedule=self._event_loop._poll_loop,
scheduleSetup=self._schedule_setup,
scheduleUnpack=self._schedule_unpack,
- source_remove=self._unregister,
- timeout_add=self._timeout_add,
- unregister=self._unregister)
+ source_remove=self._event_loop._unregister,
+ timeout_add=self._event_loop._timeout_add,
+ unregister=self._event_loop._unregister)
self._prefetchers = weakref.WeakValueDictionary()
self._pkg_queue = []
self._set_max_jobs(1)
while self._schedule():
- self._poll_loop()
+ self.sched_iface.run()
while True:
self._schedule()
if not self._is_work_scheduled():
break
- self._poll_loop()
+ self.sched_iface.run()
def _keep_scheduling(self):
return bool(not self._terminated_tasks and self._pkg_queue and \
# is necessary to avoid "ResourceWarning: unclosed file"
# warnings since Python 3.2 (and also ensures that we
# don't leave any zombie child processes).
- scheduler.schedule()
+ scheduler.run()
self.assertEqual(producer.returncode, os.EX_OK)
self.assertEqual(consumer.returncode, os.EX_OK)