self._task_queues = self._task_queues_class()
for k in self._task_queues.allowed_keys:
setattr(self._task_queues, k,
- SequentialTaskQueue(auto_schedule=True))
-
- # Merge tasks currently run synchronously which makes
- # it necessary to disable auto_schedule in order to
- # avoid excess recursion which prevents tasks from
- # being marked complete as soon as they should be.
- self._task_queues.merge.auto_schedule = False
+ SequentialTaskQueue())
self._prefetchers = weakref.WeakValueDictionary()
self._pkg_queue = []
self._poll_loop()
def _schedule_tasks(self):
- self._task_queues.merge.schedule()
+ remaining, state_change = self._schedule_tasks_imp()
+ for q in self._task_queues.values():
+ q.schedule()
# Cancel prefetchers if they're the only reason
# the main poll loop is still running.