* Split out a SequentialTaskQueue class to encapsulate the parallel-fetch
authorZac Medico <zmedico@gentoo.org>
Fri, 4 Jul 2008 00:11:03 +0000 (00:11 -0000)
committerZac Medico <zmedico@gentoo.org>
Fri, 4 Jul 2008 00:11:03 +0000 (00:11 -0000)
  prefetcher queue.

* Fix broken return value status handling in Scheduler.merge().

svn path=/main/trunk/; revision=10918

pym/_emerge/__init__.py

index bff2e3a04e0be08a78f021fa8918d3b3d230a224..25cbc3bb5270cbe2323ff8a245bc5a354a856c47 100644 (file)
@@ -6966,6 +6966,49 @@ class PollSelectFallback(object):
                        poll_events.append((fd, select.POLLIN))
                return poll_events
 
+class SequentialTaskQueue(SlotObject):
+
+       __slots__ = ("max_jobs", "running_tasks", "_task_queue")
+
+       def __init__(self, **kwargs):
+               SlotObject.__init__(self, **kwargs)
+               from collections import deque
+               self._task_queue = deque()
+               self.running_tasks = set()
+               if self.max_jobs is None:
+                       self.max_jobs = 1
+
+       def add(self, task):
+               self._task_queue.append(task)
+
+       def schedule(self):
+               task_queue = self._task_queue
+               running_tasks = self.running_tasks
+               max_jobs = self.max_jobs
+               state_changed = False
+
+               for task in list(running_tasks):
+                       if not task.registered and task.poll() is not None:
+                               running_tasks.remove(task)
+                               state_changed = True
+
+               while task_queue and (len(running_tasks) < max_jobs):
+                       task = task_queue.popleft()
+                       cancelled = getattr(task, "cancelled", None)
+                       if not cancelled:
+                               task.start()
+                               running_tasks.add(task)
+                       state_changed = True
+
+               return state_changed
+
+       def clear(self):
+               self._task_queue.clear()
+               running_tasks = self.running_tasks
+               while running_tasks:
+                       task = running_tasks.pop()
+                       task.cancel()
+
 class Scheduler(object):
 
        _opts_ignore_blockers = \
@@ -7038,10 +7081,10 @@ class Scheduler(object):
                except AttributeError:
                        self._poll = PollSelectFallback()
 
-               from collections import deque
-               self._task_queue = deque()
-               self._running_tasks = set()
-               self._max_jobs = 1
+               self._prefetch_queue = SequentialTaskQueue()
+               self._add_task = self._prefetch_queue.add
+               self._schedule_tasks = self._prefetch_queue.schedule
+
                self._prefetchers = weakref.WeakValueDictionary()
                self._failed_fetches = []
                self._parallel_fetch = False
@@ -7071,9 +7114,6 @@ class Scheduler(object):
                                except EnvironmentError:
                                        pass
 
-       def _add_task(self, task):
-               self._task_queue.append(task)
-
        class _pkg_failure(portage.exception.PortageException):
                """
                An instance of this class is raised by unmerge() when
@@ -7282,7 +7322,7 @@ class Scheduler(object):
                mtimedb = self._mtimedb
 
                while True:
-                       self._merge()
+                       rval = self._merge()
                        self._show_failed_fetches()
                        del self._failed_fetches[:]
 
@@ -7350,11 +7390,7 @@ class Scheduler(object):
                                        return e.status
                finally:
                        # clean up child process if necessary
-                       self._task_queue.clear()
-                       running_tasks = self._running_tasks
-                       while running_tasks:
-                               task = running_tasks.pop()
-                               task.cancel()
+                       self._prefetch_queue.clear()
                return os.EX_OK
 
        def _save_resume_list(self):
@@ -7411,7 +7447,7 @@ class Scheduler(object):
 
        def _schedule(self):
                event_handlers = self._poll_event_handlers
-               running_tasks = self._running_tasks
+               running_tasks = self._prefetch_queue.running_tasks
                poll = self._poll.poll
 
                self._schedule_tasks()
@@ -7426,27 +7462,6 @@ class Scheduler(object):
                                # handler, so it's time to yield.
                                break
 
-       def _schedule_tasks(self):
-               task_queue = self._task_queue
-               running_tasks = self._running_tasks
-               max_jobs = self._max_jobs
-               state_changed = False
-
-               for task in list(running_tasks):
-                       if not task.registered and task.poll() is not None:
-                               running_tasks.remove(task)
-                               state_changed = True
-
-               while task_queue and (len(running_tasks) < max_jobs):
-                       task = task_queue.popleft()
-                       cancelled = getattr(task, "cancelled", None)
-                       if not cancelled:
-                               task.start()
-                               running_tasks.add(task)
-                       state_changed = True
-
-               return state_changed
-
        def _world_atom(self, pkg):
                """
                Add the package to the world file, but only if