From: Zac Medico <zmedico@gentoo.org>
Date: Tue, 8 Jul 2008 09:50:37 +0000 (-0000)
Subject: Implement parallel build support by adding new --jobs and --load-average
X-Git-Tag: v2.2_rc2~158
X-Git-Url: http://git.tremily.us/?a=commitdiff_plain;h=00731b49c42442b4d15e375c874491cf4780eead;p=portage.git

Implement parallel build support by adding new --jobs and --load-average
options that are analogous to the corresponding `make` options. Input and
output handling still need work to make it look better and act more friendly
for things like interactive ebuilds that require input.

svn path=/main/trunk/; revision=10983
---

diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py
index 0071fd25b..8d7a925d7 100644
--- a/pym/_emerge/__init__.py
+++ b/pym/_emerge/__init__.py
@@ -855,6 +855,28 @@ class SlotObject(object):
 				myvalue = kwargs.get(myattr, None)
 				setattr(self, myattr, myvalue)
 
+	def copy(self):
+		"""
+		Create a new instance and copy all attributes
+		defined from __slots__ (including those from
+		inherited classes).
+		"""
+		obj = self.__class__()
+
+		classes = [self.__class__]
+		while classes:
+			c = classes.pop()
+			if c is SlotObject:
+				continue
+			classes.extend(c.__bases__)
+			slots = getattr(c, "__slots__", None)
+			if not slots:
+				continue
+			for myattr in slots:
+				setattr(obj, myattr, getattr(self, myattr))
+
+		return obj
+
 class AbstractDepPriority(SlotObject):
 	__slots__ = ("buildtime", "runtime", "runtime_post")
 
@@ -7539,6 +7561,9 @@ class Scheduler(object):
 		"--fetchonly", "--fetch-all-uri",
 		"--nodeps", "--pretend"])
 
+	_opts_no_restart = frozenset(["--buildpkgonly",
+		"--fetchonly", "--fetch-all-uri", "--pretend"])
+
 	_bad_resume_opts = set(["--ask", "--changelog",
 		"--resume", "--skipfirst"])
 
@@ -7591,9 +7616,11 @@ class Scheduler(object):
 		if settings.get("PORTAGE_DEBUG", "") == "1":
 			self.edebug = 1
 		self.pkgsettings = {}
+		self._config_pool = {}
 		for root in trees:
 			self.pkgsettings[root] = portage.config(
 				clone=trees[root]["vartree"].settings)
+			self._config_pool[root] = []
 		self.curval = 0
 		self._logger = self._emerge_log_class(
 			xterm_titles=("notitles" not in settings.features))
@@ -7616,6 +7643,7 @@ class Scheduler(object):
 		self._add_task = self._task_queues.prefetch.add
 		self._prefetchers = weakref.WeakValueDictionary()
 		self._pkg_queue = deque()
+		self._completed_tasks = set()
 		self._failed_pkgs = []
 		self._failed_fetches = []
 		self._parallel_fetch = False
@@ -7623,7 +7651,14 @@ class Scheduler(object):
 			if isinstance(x, Package) and x.operation == "merge"])
 		self._pkg_count = self._pkg_count_class(
 			curval=0, maxval=merge_count)
-		self._max_jobs = 1
+
+		max_jobs = myopts.get("--jobs")
+		if max_jobs is None:
+			max_jobs = 1
+		self._set_max_jobs(max_jobs)
+
+		self._max_load = myopts.get("--load-average")
+
 		self._set_digraph(digraph)
 		self._jobs = 0
 
@@ -7650,12 +7685,38 @@ class Scheduler(object):
 				except EnvironmentError:
 					pass
 
+	def _set_max_jobs(self, max_jobs):
+		self._max_jobs = max_jobs
+		self._task_queues.build.max_jobs = max_jobs
+
 	def _set_digraph(self, digraph):
 		if self._max_jobs < 2:
 			# save some memory
 			self._digraph = None
-		else:
-			self._digraph = digraph
+			return
+
+		self._digraph = digraph
+		self._prune_digraph()
+
+	def _prune_digraph(self):
+		"""
+		Prune any root nodes that are irrelevant.
+		"""
+
+		graph = self._digraph
+		completed_tasks = self._completed_tasks
+		removed_nodes = set()
+		while True:
+			for node in graph.root_nodes():
+				if not isinstance(node, Package) or \
+					node.installed or node.onlydeps or \
+					node in completed_tasks:
+					removed_nodes.add(node)
+			if removed_nodes:
+				graph.difference_update(removed_nodes)
+			if not removed_nodes:
+				break
+			removed_nodes.clear()
 
 	class _pkg_failure(portage.exception.PortageException):
 		"""
@@ -7797,6 +7858,39 @@ class Scheduler(object):
 
 		sys.stderr.write("\n")
 
+	def _is_restart_scheduled(self):
+		"""
+		Check if the merge list contains a replacement
+		for the current running instance, that will result
+		in restart after merge.
+		@rtype: bool
+		@returns: True if a restart is scheduled, False otherwise.
+		"""
+		if self._opts_no_restart.intersection(self.myopts):
+			return False
+
+		mergelist = self._mergelist
+
+		for i, pkg in enumerate(mergelist):
+			if self._is_restart_necessary(pkg) and \
+				i != len(mergelist) - 1:
+				return True
+
+		return False
+
+	def _is_restart_necessary(self, pkg):
+		"""
+		@return: True if merging the given package
+			requires restart, False otherwise.
+		"""
+
+		# Figure out if we need a restart.
+		if pkg.root == self._running_root.root and \
+			portage.match_from_list(
+			portage.const.PORTAGE_PACKAGE_ATOM, [pkg]):
+			return True
+		return False
+
 	def _restart_if_necessary(self, pkg):
 		"""
 		Use execv() to restart emerge. This happens
@@ -7804,15 +7898,10 @@ class Scheduler(object):
 		remaining packages in the list.
 		"""
 
-		if "--pretend" in self.myopts or \
-			"--fetchonly" in self.myopts or \
-			"--fetch-all-uri" in self.myopts:
+		if self._opts_no_restart.intersection(self.myopts):
 			return
 
-		# Figure out if we need a restart.
-		if pkg.root != self._running_root.root or \
-			not portage.match_from_list(
-			portage.const.PORTAGE_PACKAGE_ATOM, [pkg]):
+		if not self._is_restart_necessary(pkg):
 			return
 
 		if self._pkg_count.curval >= self._pkg_count.maxval:
@@ -7844,7 +7933,7 @@ class Scheduler(object):
 				if myarg is True:
 					mynewargv.append(myopt)
 				else:
-					mynewargv.append(myopt +"="+ myarg)
+					mynewargv.append(myopt +"="+ str(myarg))
 		# priority only needs to be adjusted on the first run
 		os.environ["PORTAGE_NICENESS"] = "0"
 		os.execv(mynewargv[0], mynewargv)
@@ -7932,12 +8021,14 @@ class Scheduler(object):
 				pass
 
 	def _merge_exit(self, merge):
-		self._jobs -= 1
+		self._job_exit(merge.merge)
 		pkg = merge.merge.pkg
 		if merge.returncode != os.EX_OK:
 			self._failed_pkgs.append((pkg, retval))
 			return
 
+		self._completed_tasks.add(pkg)
+
 		if pkg.installed:
 			return
 
@@ -7961,11 +8052,15 @@ class Scheduler(object):
 			self._task_queues.merge.schedule()
 		else:
 			self._failed_pkgs.append((build.pkg, build.returncode))
-			self._jobs -= 1
+			self._job_exit(build)
 
 	def _extract_exit(self, build):
 		self._build_exit(build)
 
+	def _job_exit(self, job):
+		self._jobs -= 1
+		self._deallocate_config(job.settings)
+
 	def _merge(self):
 
 		self._add_prefetchers()
@@ -7979,8 +8074,8 @@ class Scheduler(object):
 		finally:
 			# discard remaining packages if necessary
 			pkg_queue.clear()
-
-			# clean up child process if necessary
+			self._completed_tasks.clear()
+			self._digraph = None
 			self._task_queues.prefetch.clear()
 
 			# discard any failures and return the
@@ -7993,14 +8088,75 @@ class Scheduler(object):
 		return rval
 
 	def _choose_pkg(self):
-		return self._pkg_queue.popleft()
+		if self._max_jobs < 2:
+			return self._pkg_queue.popleft()
+
+		self._prune_digraph()
+
+		chosen_pkg = None
+		for pkg in self._pkg_queue:
+			if pkg.operation == "uninstall":
+				continue
+			if not self._dependent_on_scheduled_merges(pkg):
+				chosen_pkg = pkg
+				break
+
+		self._pkg_queue.remove(chosen_pkg)
+		return chosen_pkg
+
+	def _dependent_on_scheduled_merges(self, pkg):
+		"""
+		Traverse the subgraph of the given packages deep dependencies
+		to see if it contains any scheduled merges.
+		@rtype: bool
+		@returns: True if the package is dependent, False otherwise.
+		"""
+
+		graph = self._digraph
+		completed_tasks = self._completed_tasks
+
+		dependent = False
+		traversed_nodes = set()
+		node_stack = graph.child_nodes(pkg)
+		while node_stack:
+			node = node_stack.pop()
+			if node in traversed_nodes:
+				continue
+			traversed_nodes.add(node)
+			if not node.installed and \
+				node not in completed_tasks:
+				dependent = True
+				break
+			node_stack.extend(graph.child_nodes(node))
+
+		return dependent
+
+	def _allocate_config(self, root):
+		"""
+		Allocate a unique config instance for a task in order
+		to prevent interference between parallel tasks.
+		"""
+		if self._config_pool[root]:
+			temp_settings = self._config_pool[root].pop()
+		else:
+			temp_settings = portage.config(clone=self.pkgsettings[root])
+		return temp_settings
+
+	def _deallocate_config(self, settings):
+		self._config_pool[settings["ROOT"]].append(settings)
 
 	def _main_loop(self):
 
+		# Only allow 1 job max if a restart is scheduled
+		# due to portage update.
+		if self._is_restart_scheduled():
+			self._set_max_jobs(1)
+
 		pkg_queue = self._pkg_queue
 		failed_pkgs = self._failed_pkgs
 		task_queues = self._task_queues
 		max_jobs = self._max_jobs
+		max_load = self._max_load
 		background = max_jobs > 1
 
 		while pkg_queue and not failed_pkgs:
@@ -8009,8 +8165,26 @@ class Scheduler(object):
 				self._schedule_main()
 				continue
 
+			if max_load is not None and max_jobs > 1 and self._jobs > 1:
+				try:
+					avg1, avg5, avg15 = os.getloadavg()
+				except OSError, e:
+					writemsg("!!! getloadavg() failed: %s\n" % (e,),
+						noiselevel=-1)
+					del e
+					self._schedule_main()
+					continue
+
+				if avg1 >= max_load:
+					self._schedule_main()
+					continue
+
 			pkg = self._choose_pkg()
 
+			if pkg is None:
+				self._schedule_main()
+				continue
+
 			if not pkg.installed:
 				self._pkg_count.curval += 1
 
@@ -8065,10 +8239,10 @@ class Scheduler(object):
 			emerge_opts=self.myopts,
 			failed_fetches=self._failed_fetches,
 			find_blockers=self._find_blockers(pkg), logger=self._logger,
-			mtimedb=self._mtimedb, pkg=pkg, pkg_count=self._pkg_count,
+			mtimedb=self._mtimedb, pkg=pkg, pkg_count=self._pkg_count.copy(),
 			prefetcher=self._prefetchers.get(pkg),
 			scheduler=self._sched_iface,
-			settings=self.pkgsettings[pkg.root],
+			settings=self._allocate_config(pkg.root),
 			world_atom=self._world_atom)
 
 		return task
@@ -10752,6 +10926,24 @@ def parse_opts(tmpcmdline, silent=False):
 			"type":"choice",
 			"choices":("y", "n")
 		},
+
+		"--jobs": {
+
+			"help"   : "Specifies the number of packages to build " + \
+				"simultaneously.",
+
+			"action" : "store"
+		},
+
+		"--load-average": {
+
+			"help"   :"Specifies that no new builds should be started " + \
+				"if there are other builds running and the load average " + \
+				"is at least LOAD (a floating-point number).",
+
+			"action" : "store"
+		},
+
 		"--with-bdeps": {
 			"help":"include unnecessary build time dependencies",
 			"type":"choice",
@@ -10788,6 +10980,34 @@ def parse_opts(tmpcmdline, silent=False):
 
 	myoptions, myargs = parser.parse_args(args=tmpcmdline)
 
+	if myoptions.jobs:
+		try:
+			jobs = int(myoptions.jobs)
+		except ValueError:
+			jobs = 0
+
+		if jobs < 1:
+			jobs = None
+			if not silent:
+				writemsg("!!! Invalid --jobs parameter: '%s'\n" % \
+					(myoptions.jobs,), noiselevel=-1)
+
+		myoptions.jobs = jobs
+
+	if myoptions.load_average:
+		try:
+			load_average = float(myoptions.load_average)
+		except ValueError:
+			load_average = 0.0
+
+		if load_average <= 0.0:
+			load_average = None
+			if not silent:
+				writemsg("!!! Invalid --load-average parameter: '%s'\n" % \
+					(myoptions.load_average,), noiselevel=-1)
+
+		myoptions.load_average = load_average
+
 	for myopt in options:
 		v = getattr(myoptions, myopt.lstrip("--").replace("-", "_"))
 		if v: