* Implement CompositeTask._poll().
authorZac Medico <zmedico@gentoo.org>
Mon, 7 Jul 2008 05:35:17 +0000 (05:35 -0000)
committerZac Medico <zmedico@gentoo.org>
Mon, 7 Jul 2008 05:35:17 +0000 (05:35 -0000)
* Make AsynchronousTask classes call self.wait() to notify
  exit listeners.
* Rewrite Scheduler._main_loop() to bring it closer to allowing
  parallel build scheduling.

svn path=/main/trunk/; revision=10966

pym/_emerge/__init__.py

index 93684885e8a83eca0f76fadf5a5a82a29fe4024b..0037f988d50989e8abd4de75d2ccf73ee9eed9c1 100644 (file)
@@ -1459,6 +1459,9 @@ class AsynchronousTask(SlotObject):
        Subclasses override _wait() and _poll() so that calls
        to public methods can be wrapped for implementing
        hooks such as exit listener notification.
+
+       Sublasses should call self.wait() to notify exit listeners after
+       the task is complete and self.returncode has been set.
        """
 
        __slots__ = ("cancelled", "returncode") + ("_exit_listeners",)
@@ -1528,6 +1531,29 @@ class CompositeTask(AsynchronousTask):
                if self._current_task is not None:
                        self._current_task.cancel()
 
+       def _poll(self):
+               """
+               This does a loop calling self._current_task.poll()
+               repeatedly as long as the value of self._current_task
+               keeps changing. It calls poll() a maximum of one time
+               for a given self._current_task instance. This is useful
+               since calling poll() on a task can trigger advance to
+               the next task could eventually lead to the returncode
+               being set in cases when polling only a single task would
+               not have the same effect.
+               """
+
+               prev = None
+               while True:
+                       task = self._current_task
+                       if task is None or task is prev:
+                               # don't poll the same task more than once
+                               break
+                       task.poll()
+                       prev = task
+
+               return self.returncode
+
        def _wait(self):
 
                while True:
@@ -1626,6 +1652,7 @@ class TaskSequence(CompositeTask):
                        self._start_next_task()
                else:
                        self._final_exit(task)
+                       self.wait()
 
 class SubProcess(AsynchronousTask):
        __slots__ = ("pid",)
@@ -1997,6 +2024,7 @@ class EbuildBuild(CompositeTask):
                        pkg = self.pkg
                        eerror("!!! Fetch for %s failed, continuing..." % pkg.cpv,
                                phase="unpack", key=pkg.cpv)
+               self.wait()
 
        def _unlock_builddir(self):
                portage.elog.elog_process(self.pkg.cpv, self.settings)
@@ -2012,6 +2040,7 @@ class EbuildBuild(CompositeTask):
 
                if not buildpkg:
                        self._final_exit(build)
+                       self.wait()
                        return
 
                packager = EbuildBinpkg(pkg=self.pkg,
@@ -2039,11 +2068,13 @@ class EbuildBuild(CompositeTask):
                if self._final_exit(packager) != os.EX_OK or \
                        self.opts.buildpkgonly:
                        self._unlock_builddir()
+               self.wait()
 
        def _clean_exit(self, clean_phase):
                if self._final_exit(clean_phase) != os.EX_OK or \
                        self.opts.buildpkgonly:
                        self._unlock_builddir()
+               self.wait()
 
        def install(self):
                """
@@ -2480,6 +2511,7 @@ class Binpkg(CompositeTask):
                        self._fetched_pkg = True
                        if self.opts.fetchonly:
                                self._final_exit(fetcher)
+                               self.wait()
                                return
                        elif self._default_exit(fetcher) != os.EX_OK:
                                return
@@ -2610,6 +2642,7 @@ class Binpkg(CompositeTask):
                        self._unlock_builddir()
                        writemsg("!!! Error Extracting '%s'\n" % self._pkg_path,
                                noiselevel=-1)
+               self.wait()
 
        def _unlock_builddir(self):
                portage.elog.elog_process(self.pkg.cpv, self.settings)
@@ -2840,6 +2873,7 @@ class MergeListItem(CompositeTask):
 
                        self._install_task = build
                        self._start_task(build, self._ebuild_exit)
+                       self.wait()
                        return
 
                elif pkg.type_name == "binary":
@@ -2852,12 +2886,14 @@ class MergeListItem(CompositeTask):
 
                        self._install_task = binpkg
                        self._start_task(binpkg, self._final_exit)
+                       self.wait()
                        return
 
        def _ebuild_exit(self, build):
                if self._final_exit(build) != os.EX_OK:
                        if self.build_opts.fetchonly:
                                self.failed_fetches.append(self.pkg.cpv)
+               self.wait()
 
        def _poll(self):
                self._install_task.poll()
@@ -7438,13 +7474,19 @@ class SequentialTaskQueue(SlotObject):
                self._task_queue.append(task)
 
        def schedule(self):
+
+               if not self:
+                       return False
+
                task_queue = self._task_queue
                running_tasks = self.running_tasks
                max_jobs = self.max_jobs
                state_changed = False
 
                for task in list(running_tasks):
-                       if not task.registered and task.poll() is not None:
+                       if hasattr(task, "registered") and task.registered:
+                               continue
+                       if task.poll() is not None:
                                running_tasks.remove(task)
                                state_changed = True
 
@@ -7465,6 +7507,12 @@ class SequentialTaskQueue(SlotObject):
                        task = running_tasks.pop()
                        task.cancel()
 
+       def __nonzero__(self):
+               return bool(self._task_queue or self.running_tasks)
+
+       def __len__(self):
+               return len(self._task_queue) + len(self.running_tasks)
+
 class Scheduler(object):
 
        _opts_ignore_blockers = \
@@ -7480,6 +7528,9 @@ class Scheduler(object):
        class _iface_class(SlotObject):
                __slots__ = ("register", "schedule")
 
+       _task_queues_class = slot_dict_class(
+               ("build", "extract", "merge", "prefetch",), prefix="")
+
        class _build_opts_class(SlotObject):
                __slots__ = ("buildpkg", "buildpkgonly",
                        "fetch_all_uri", "fetchonly", "pretend")
@@ -7539,12 +7590,11 @@ class Scheduler(object):
                except AttributeError:
                        self._poll = PollSelectAdapter()
 
-               self._task_queues = slot_dict_class(("build", "prefetch"), prefix="")
+               self._task_queues = self._task_queues_class()
                for k in self._task_queues.allowed_keys:
                        setattr(self._task_queues, k, SequentialTaskQueue())
 
                self._add_task = self._task_queues.prefetch.add
-               self._schedule_tasks = self._task_queues.prefetch.schedule
                self._prefetchers = weakref.WeakValueDictionary()
                self._pkg_queue = deque()
                self._failed_pkgs = []
@@ -7554,6 +7604,8 @@ class Scheduler(object):
                        if isinstance(x, Package) and x.operation == "merge"])
                self._pkg_count = self._pkg_count_class(
                        curval=0, maxval=merge_count)
+               self._max_jobs = 1
+               self._jobs = 0
 
                features = self.settings.features
                if "parallel-fetch" in features and \
@@ -7852,48 +7904,40 @@ class Scheduler(object):
                        elif isinstance(pkg, Blocker):
                                pass
 
-       def _choose_pkg(self):
-               return self._pkg_queue.popleft()
-
-       def _main_loop(self):
-
-               pkg_queue = self._pkg_queue
-
-               while pkg_queue:
-                       pkg = self._choose_pkg()
-
-                       if not pkg.installed:
-                               self._pkg_count.curval += 1
-
-                       task = self._task(pkg)
-                       task.start()
-                       retval = task.wait()
+       def _merge_exit(self, merge):
+               self._jobs -= 1
+               pkg = merge.merge.pkg
+               if merge.returncode != os.EX_OK:
+                       self._failed_pkgs.append((pkg, retval))
+                       return
 
-                       if retval == os.EX_OK:
-                               task = PackageMerge(merge=task)
-                               task.start()
-                               retval = task.wait()
+               if pkg.installed:
+                       return
 
-                       if retval == os.EX_OK:
-                               self.curval += 1
-                       else:
-                               self._failed_pkgs.append((pkg, retval))
-                               if not self._build_opts.fetchonly:
-                                       return
+               self._restart_if_necessary(pkg)
 
-                       if pkg.installed:
-                               continue
+               # Call mtimedb.commit() after each merge so that
+               # --resume still works after being interrupted
+               # by reboot, sigkill or similar.
+               mtimedb = self._mtimedb
+               del mtimedb["resume"]["mergelist"][0]
+               if not mtimedb["resume"]["mergelist"]:
+                       del mtimedb["resume"]
+               mtimedb.commit()
 
-                       self._restart_if_necessary(pkg)
+       def _build_exit(self, build):
+               if build.returncode == os.EX_OK:
+                       self.curval += 1
+                       merge = PackageMerge(merge=build)
+                       merge.addExitListener(self._merge_exit)
+                       self._task_queues.merge.add(merge)
+                       self._task_queues.merge.schedule()
+               else:
+                       self._failed_pkgs.append((build.pkg, build.returncode))
+                       self._jobs -= 1
 
-                       # Call mtimedb.commit() after each merge so that
-                       # --resume still works after being interrupted
-                       # by reboot, sigkill or similar.
-                       mtimedb = self._mtimedb
-                       del mtimedb["resume"]["mergelist"][0]
-                       if not mtimedb["resume"]["mergelist"]:
-                               del mtimedb["resume"]
-                       mtimedb.commit()
+       def _extract_exit(self, build):
+               self._build_exit(build)
 
        def _merge(self):
 
@@ -7921,6 +7965,65 @@ class Scheduler(object):
 
                return rval
 
+       def _choose_pkg(self):
+               return self._pkg_queue.popleft()
+
+       def _main_loop(self):
+
+               pkg_queue = self._pkg_queue
+               failed_pkgs = self._failed_pkgs
+               task_queues = self._task_queues
+
+               while pkg_queue and not failed_pkgs:
+
+                       pkg = self._choose_pkg()
+
+                       if not pkg.installed:
+                               self._pkg_count.curval += 1
+
+                       task = self._task(pkg)
+
+                       self._jobs += 1
+                       if pkg.installed:
+                               merge = PackageMerge(merge=task)
+                               merge.addExitListener(self._merge_exit)
+                               task_queues.merge.add(merge)
+                       elif pkg.built:
+                               task.addExitListener(self._extract_exit)
+                               task_queues.extract.add(task)
+                       else:
+                               task.addExitListener(self._build_exit)
+                               task_queues.build.add(task)
+
+                       self._schedule_main()
+
+               while self._jobs:
+                       self._schedule_main(wait=True)
+
+       def _schedule_main(self, wait=False):
+
+               event_handlers = self._poll_event_handlers
+               poll = self._poll.poll
+               max_jobs = self._max_jobs
+
+               self._schedule_tasks()
+
+               while event_handlers:
+                       jobs = self._jobs
+
+                       for f, event in poll():
+                               handler, reg_id = event_handlers[f]
+                               if not handler(f, event):
+                                       self._unregister(reg_id)
+
+                       if jobs == self._jobs:
+                               continue
+
+                       self._schedule_tasks()
+
+                       if not wait and self._jobs < max_jobs:
+                               break
+
        def _task(self, pkg):
 
                task = MergeListItem(args_set=self._args_set,
@@ -8025,6 +8128,10 @@ class Scheduler(object):
                del self._poll_event_handler_ids[reg_id]
                self._schedule_tasks()
 
+       def _schedule_tasks(self):
+               for x in self._task_queues.values():
+                       x.schedule()
+
        def _schedule(self, wait_id):
                """
                Schedule until wait_id is not longer registered