class SequentialTaskQueue(SlotObject):
__slots__ = ("max_jobs", "running_tasks") + \
- ("_dirty", "_scheduling", "_task_queue")
+ ("_scheduling", "_task_queue")
def __init__(self, **kwargs):
SlotObject.__init__(self, **kwargs)
self.running_tasks = set()
if self.max_jobs is None:
self.max_jobs = 1
- self._dirty = True
def add(self, task):
self._task_queue.append(task)
- self._dirty = True
self.schedule()
def addFront(self, task):
self._task_queue.appendleft(task)
- self._dirty = True
self.schedule()
def schedule(self):
- if not self._dirty:
- return False
-
- if not self:
- return False
-
if self._scheduling:
# Ignore any recursive schedule() calls triggered via
# self._task_exit().
- return False
+ return
self._scheduling = True
-
- task_queue = self._task_queue
- running_tasks = self.running_tasks
- max_jobs = self.max_jobs
- state_changed = False
-
- while task_queue and \
- (max_jobs is True or len(running_tasks) < max_jobs):
- task = task_queue.popleft()
- cancelled = getattr(task, "cancelled", None)
- if not cancelled:
- running_tasks.add(task)
- task.addExitListener(self._task_exit)
- task.start()
- state_changed = True
-
- self._dirty = False
- self._scheduling = False
-
- return state_changed
+ try:
+ while self._task_queue and (self.max_jobs is True or
+ len(self.running_tasks) < self.max_jobs):
+ task = self._task_queue.popleft()
+ cancelled = getattr(task, "cancelled", None)
+ if not cancelled:
+ self.running_tasks.add(task)
+ task.addExitListener(self._task_exit)
+ task.start()
+ finally:
+ self._scheduling = False
def _task_exit(self, task):
"""
"""
self.running_tasks.remove(task)
if self._task_queue:
- self._dirty = True
self.schedule()
def clear(self):
task = running_tasks.pop()
task.removeExitListener(self._task_exit)
task.cancel()
- self._dirty = False
def __bool__(self):
return bool(self._task_queue or self.running_tasks)