Implement parallel build support by adding new --jobs and --load-average
authorZac Medico <zmedico@gentoo.org>
Tue, 8 Jul 2008 09:50:37 +0000 (09:50 -0000)
committerZac Medico <zmedico@gentoo.org>
Tue, 8 Jul 2008 09:50:37 +0000 (09:50 -0000)
options that are analogous to the corresponding `make` options. Input and
output handling still need work to make it look better and act more friendly
for things like interactive ebuilds that require input.

svn path=/main/trunk/; revision=10983

pym/_emerge/__init__.py

index 0071fd25b5dd3efbcbcd4cc48e7ecec86317290f..8d7a925d7f20e321d26aaab201b2024d7ffbb85c 100644 (file)
@@ -855,6 +855,28 @@ class SlotObject(object):
                                myvalue = kwargs.get(myattr, None)
                                setattr(self, myattr, myvalue)
 
+       def copy(self):
+               """
+               Create a new instance and copy all attributes
+               defined from __slots__ (including those from
+               inherited classes).
+               """
+               obj = self.__class__()
+
+               classes = [self.__class__]
+               while classes:
+                       c = classes.pop()
+                       if c is SlotObject:
+                               continue
+                       classes.extend(c.__bases__)
+                       slots = getattr(c, "__slots__", None)
+                       if not slots:
+                               continue
+                       for myattr in slots:
+                               setattr(obj, myattr, getattr(self, myattr))
+
+               return obj
+
 class AbstractDepPriority(SlotObject):
        __slots__ = ("buildtime", "runtime", "runtime_post")
 
@@ -7539,6 +7561,9 @@ class Scheduler(object):
                "--fetchonly", "--fetch-all-uri",
                "--nodeps", "--pretend"])
 
+       _opts_no_restart = frozenset(["--buildpkgonly",
+               "--fetchonly", "--fetch-all-uri", "--pretend"])
+
        _bad_resume_opts = set(["--ask", "--changelog",
                "--resume", "--skipfirst"])
 
@@ -7591,9 +7616,11 @@ class Scheduler(object):
                if settings.get("PORTAGE_DEBUG", "") == "1":
                        self.edebug = 1
                self.pkgsettings = {}
+               self._config_pool = {}
                for root in trees:
                        self.pkgsettings[root] = portage.config(
                                clone=trees[root]["vartree"].settings)
+                       self._config_pool[root] = []
                self.curval = 0
                self._logger = self._emerge_log_class(
                        xterm_titles=("notitles" not in settings.features))
@@ -7616,6 +7643,7 @@ class Scheduler(object):
                self._add_task = self._task_queues.prefetch.add
                self._prefetchers = weakref.WeakValueDictionary()
                self._pkg_queue = deque()
+               self._completed_tasks = set()
                self._failed_pkgs = []
                self._failed_fetches = []
                self._parallel_fetch = False
@@ -7623,7 +7651,14 @@ class Scheduler(object):
                        if isinstance(x, Package) and x.operation == "merge"])
                self._pkg_count = self._pkg_count_class(
                        curval=0, maxval=merge_count)
-               self._max_jobs = 1
+
+               max_jobs = myopts.get("--jobs")
+               if max_jobs is None:
+                       max_jobs = 1
+               self._set_max_jobs(max_jobs)
+
+               self._max_load = myopts.get("--load-average")
+
                self._set_digraph(digraph)
                self._jobs = 0
 
@@ -7650,12 +7685,38 @@ class Scheduler(object):
                                except EnvironmentError:
                                        pass
 
+       def _set_max_jobs(self, max_jobs):
+               self._max_jobs = max_jobs
+               self._task_queues.build.max_jobs = max_jobs
+
        def _set_digraph(self, digraph):
                if self._max_jobs < 2:
                        # save some memory
                        self._digraph = None
-               else:
-                       self._digraph = digraph
+                       return
+
+               self._digraph = digraph
+               self._prune_digraph()
+
+       def _prune_digraph(self):
+               """
+               Prune any root nodes that are irrelevant.
+               """
+
+               graph = self._digraph
+               completed_tasks = self._completed_tasks
+               removed_nodes = set()
+               while True:
+                       for node in graph.root_nodes():
+                               if not isinstance(node, Package) or \
+                                       node.installed or node.onlydeps or \
+                                       node in completed_tasks:
+                                       removed_nodes.add(node)
+                       if removed_nodes:
+                               graph.difference_update(removed_nodes)
+                       if not removed_nodes:
+                               break
+                       removed_nodes.clear()
 
        class _pkg_failure(portage.exception.PortageException):
                """
@@ -7797,6 +7858,39 @@ class Scheduler(object):
 
                sys.stderr.write("\n")
 
+       def _is_restart_scheduled(self):
+               """
+               Check if the merge list contains a replacement
+               for the current running instance, that will result
+               in restart after merge.
+               @rtype: bool
+               @returns: True if a restart is scheduled, False otherwise.
+               """
+               if self._opts_no_restart.intersection(self.myopts):
+                       return False
+
+               mergelist = self._mergelist
+
+               for i, pkg in enumerate(mergelist):
+                       if self._is_restart_necessary(pkg) and \
+                               i != len(mergelist) - 1:
+                               return True
+
+               return False
+
+       def _is_restart_necessary(self, pkg):
+               """
+               @return: True if merging the given package
+                       requires restart, False otherwise.
+               """
+
+               # Figure out if we need a restart.
+               if pkg.root == self._running_root.root and \
+                       portage.match_from_list(
+                       portage.const.PORTAGE_PACKAGE_ATOM, [pkg]):
+                       return True
+               return False
+
        def _restart_if_necessary(self, pkg):
                """
                Use execv() to restart emerge. This happens
@@ -7804,15 +7898,10 @@ class Scheduler(object):
                remaining packages in the list.
                """
 
-               if "--pretend" in self.myopts or \
-                       "--fetchonly" in self.myopts or \
-                       "--fetch-all-uri" in self.myopts:
+               if self._opts_no_restart.intersection(self.myopts):
                        return
 
-               # Figure out if we need a restart.
-               if pkg.root != self._running_root.root or \
-                       not portage.match_from_list(
-                       portage.const.PORTAGE_PACKAGE_ATOM, [pkg]):
+               if not self._is_restart_necessary(pkg):
                        return
 
                if self._pkg_count.curval >= self._pkg_count.maxval:
@@ -7844,7 +7933,7 @@ class Scheduler(object):
                                if myarg is True:
                                        mynewargv.append(myopt)
                                else:
-                                       mynewargv.append(myopt +"="+ myarg)
+                                       mynewargv.append(myopt +"="+ str(myarg))
                # priority only needs to be adjusted on the first run
                os.environ["PORTAGE_NICENESS"] = "0"
                os.execv(mynewargv[0], mynewargv)
@@ -7932,12 +8021,14 @@ class Scheduler(object):
                                pass
 
        def _merge_exit(self, merge):
-               self._jobs -= 1
+               self._job_exit(merge.merge)
                pkg = merge.merge.pkg
                if merge.returncode != os.EX_OK:
                        self._failed_pkgs.append((pkg, retval))
                        return
 
+               self._completed_tasks.add(pkg)
+
                if pkg.installed:
                        return
 
@@ -7961,11 +8052,15 @@ class Scheduler(object):
                        self._task_queues.merge.schedule()
                else:
                        self._failed_pkgs.append((build.pkg, build.returncode))
-                       self._jobs -= 1
+                       self._job_exit(build)
 
        def _extract_exit(self, build):
                self._build_exit(build)
 
+       def _job_exit(self, job):
+               self._jobs -= 1
+               self._deallocate_config(job.settings)
+
        def _merge(self):
 
                self._add_prefetchers()
@@ -7979,8 +8074,8 @@ class Scheduler(object):
                finally:
                        # discard remaining packages if necessary
                        pkg_queue.clear()
-
-                       # clean up child process if necessary
+                       self._completed_tasks.clear()
+                       self._digraph = None
                        self._task_queues.prefetch.clear()
 
                        # discard any failures and return the
@@ -7993,14 +8088,75 @@ class Scheduler(object):
                return rval
 
        def _choose_pkg(self):
-               return self._pkg_queue.popleft()
+               if self._max_jobs < 2:
+                       return self._pkg_queue.popleft()
+
+               self._prune_digraph()
+
+               chosen_pkg = None
+               for pkg in self._pkg_queue:
+                       if pkg.operation == "uninstall":
+                               continue
+                       if not self._dependent_on_scheduled_merges(pkg):
+                               chosen_pkg = pkg
+                               break
+
+               self._pkg_queue.remove(chosen_pkg)
+               return chosen_pkg
+
+       def _dependent_on_scheduled_merges(self, pkg):
+               """
+               Traverse the subgraph of the given packages deep dependencies
+               to see if it contains any scheduled merges.
+               @rtype: bool
+               @returns: True if the package is dependent, False otherwise.
+               """
+
+               graph = self._digraph
+               completed_tasks = self._completed_tasks
+
+               dependent = False
+               traversed_nodes = set()
+               node_stack = graph.child_nodes(pkg)
+               while node_stack:
+                       node = node_stack.pop()
+                       if node in traversed_nodes:
+                               continue
+                       traversed_nodes.add(node)
+                       if not node.installed and \
+                               node not in completed_tasks:
+                               dependent = True
+                               break
+                       node_stack.extend(graph.child_nodes(node))
+
+               return dependent
+
+       def _allocate_config(self, root):
+               """
+               Allocate a unique config instance for a task in order
+               to prevent interference between parallel tasks.
+               """
+               if self._config_pool[root]:
+                       temp_settings = self._config_pool[root].pop()
+               else:
+                       temp_settings = portage.config(clone=self.pkgsettings[root])
+               return temp_settings
+
+       def _deallocate_config(self, settings):
+               self._config_pool[settings["ROOT"]].append(settings)
 
        def _main_loop(self):
 
+               # Only allow 1 job max if a restart is scheduled
+               # due to portage update.
+               if self._is_restart_scheduled():
+                       self._set_max_jobs(1)
+
                pkg_queue = self._pkg_queue
                failed_pkgs = self._failed_pkgs
                task_queues = self._task_queues
                max_jobs = self._max_jobs
+               max_load = self._max_load
                background = max_jobs > 1
 
                while pkg_queue and not failed_pkgs:
@@ -8009,8 +8165,26 @@ class Scheduler(object):
                                self._schedule_main()
                                continue
 
+                       if max_load is not None and max_jobs > 1 and self._jobs > 1:
+                               try:
+                                       avg1, avg5, avg15 = os.getloadavg()
+                               except OSError, e:
+                                       writemsg("!!! getloadavg() failed: %s\n" % (e,),
+                                               noiselevel=-1)
+                                       del e
+                                       self._schedule_main()
+                                       continue
+
+                               if avg1 >= max_load:
+                                       self._schedule_main()
+                                       continue
+
                        pkg = self._choose_pkg()
 
+                       if pkg is None:
+                               self._schedule_main()
+                               continue
+
                        if not pkg.installed:
                                self._pkg_count.curval += 1
 
@@ -8065,10 +8239,10 @@ class Scheduler(object):
                        emerge_opts=self.myopts,
                        failed_fetches=self._failed_fetches,
                        find_blockers=self._find_blockers(pkg), logger=self._logger,
-                       mtimedb=self._mtimedb, pkg=pkg, pkg_count=self._pkg_count,
+                       mtimedb=self._mtimedb, pkg=pkg, pkg_count=self._pkg_count.copy(),
                        prefetcher=self._prefetchers.get(pkg),
                        scheduler=self._sched_iface,
-                       settings=self.pkgsettings[pkg.root],
+                       settings=self._allocate_config(pkg.root),
                        world_atom=self._world_atom)
 
                return task
@@ -10752,6 +10926,24 @@ def parse_opts(tmpcmdline, silent=False):
                        "type":"choice",
                        "choices":("y", "n")
                },
+
+               "--jobs": {
+
+                       "help"   : "Specifies the number of packages to build " + \
+                               "simultaneously.",
+
+                       "action" : "store"
+               },
+
+               "--load-average": {
+
+                       "help"   :"Specifies that no new builds should be started " + \
+                               "if there are other builds running and the load average " + \
+                               "is at least LOAD (a floating-point number).",
+
+                       "action" : "store"
+               },
+
                "--with-bdeps": {
                        "help":"include unnecessary build time dependencies",
                        "type":"choice",
@@ -10788,6 +10980,34 @@ def parse_opts(tmpcmdline, silent=False):
 
        myoptions, myargs = parser.parse_args(args=tmpcmdline)
 
+       if myoptions.jobs:
+               try:
+                       jobs = int(myoptions.jobs)
+               except ValueError:
+                       jobs = 0
+
+               if jobs < 1:
+                       jobs = None
+                       if not silent:
+                               writemsg("!!! Invalid --jobs parameter: '%s'\n" % \
+                                       (myoptions.jobs,), noiselevel=-1)
+
+               myoptions.jobs = jobs
+
+       if myoptions.load_average:
+               try:
+                       load_average = float(myoptions.load_average)
+               except ValueError:
+                       load_average = 0.0
+
+               if load_average <= 0.0:
+                       load_average = None
+                       if not silent:
+                               writemsg("!!! Invalid --load-average parameter: '%s'\n" % \
+                                       (myoptions.load_average,), noiselevel=-1)
+
+               myoptions.load_average = load_average
+
        for myopt in options:
                v = getattr(myoptions, myopt.lstrip("--").replace("-", "_"))
                if v: