QueueScheduler: timeout compat for GlibEventLoop
[portage.git] / pym / _emerge / QueueScheduler.py
1 # Copyright 1999-2012 Gentoo Foundation
2 # Distributed under the terms of the GNU General Public License v2
3
4 from _emerge.PollScheduler import PollScheduler
5
6 class QueueScheduler(PollScheduler):
7
8         """
9         Add instances of SequentialTaskQueue and then call run(). The
10         run() method returns when no tasks remain.
11         """
12
13         def __init__(self, max_jobs=None, max_load=None):
14                 PollScheduler.__init__(self)
15
16                 if max_jobs is None:
17                         max_jobs = 1
18
19                 self._max_jobs = max_jobs
20                 self._max_load = max_load
21
22                 self._queues = []
23                 self._schedule_listeners = []
24
25         def add(self, q):
26                 self._queues.append(q)
27
28         def remove(self, q):
29                 self._queues.remove(q)
30
31         def clear(self):
32                 for q in self._queues:
33                         q.clear()
34
35         def run(self, timeout=None):
36
37                 timeout_callback = None
38                 if timeout is not None:
39                         def timeout_callback():
40                                 timeout_callback.timed_out = True
41                                 return False
42                         timeout_callback.timed_out = False
43                         timeout_callback.timeout_id = self.sched_iface.timeout_add(
44                                 timeout, timeout_callback)
45
46                 try:
47                         self._schedule()
48
49                         while self._keep_scheduling() and \
50                                 not (timeout_callback is not None and
51                                 timeout_callback.timed_out):
52                                 # We don't have any callbacks to trigger _schedule(),
53                                 # so we have to call it explicitly here.
54                                 self._schedule()
55                                 self.sched_iface.iteration()
56
57                         while self._is_work_scheduled() and \
58                                 not (timeout_callback is not None and
59                                 timeout_callback.timed_out):
60                                 self.sched_iface.iteration()
61                 finally:
62                         if timeout_callback is not None:
63                                 self.sched_iface.unregister(timeout_callback.timeout_id)
64
65         def _schedule_tasks(self):
66                 """
67                 @rtype: bool
68                 @returns: True if there may be remaining tasks to schedule,
69                         False otherwise.
70                 """
71                 if self._terminated_tasks:
72                         return
73
74                 while self._can_add_job():
75                         n = self._max_jobs - self._running_job_count()
76                         if n < 1:
77                                 break
78
79                         if not self._start_next_job(n):
80                                 return
81
82         def _keep_scheduling(self):
83                 return not self._terminated_tasks and any(self._queues)
84
85         def _running_job_count(self):
86                 job_count = 0
87                 for q in self._queues:
88                         job_count += len(q.running_tasks)
89                 self._jobs = job_count
90                 return job_count
91
92         def _start_next_job(self, n=1):
93                 started_count = 0
94                 for q in self._queues:
95                         initial_job_count = len(q.running_tasks)
96                         q.schedule()
97                         final_job_count = len(q.running_tasks)
98                         if final_job_count > initial_job_count:
99                                 started_count += (final_job_count - initial_job_count)
100                         if started_count >= n:
101                                 break
102                 return started_count
103