From 8bcd4a33e914f362cec4b7421c019a0969aa7276 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Thu, 10 Jul 2008 15:00:17 +0000 Subject: [PATCH] Add support for parallel --regen, using the existing --jobs and --load-average options: * Split out a PollLoop base class from Scheduler and derive a MetadataRegen class to handle parallel scheduling for --regen. * Add pordbapi._metadata_process() and _metadata_callback() methods to implement asynchronous metadata generation. One method returns an EbuildMetadataPhase instance to encapsulate the async task. The other method is called to save the metadata when the process completes successfully. These methods share code with aux_get to avoid duplicate code. svn path=/main/trunk/; revision=11012 --- pym/_emerge/__init__.py | 461 ++++++++++++++++++++++++---------- pym/portage/__init__.py | 7 +- pym/portage/dbapi/porttree.py | 168 ++++++++----- 3 files changed, 434 insertions(+), 202 deletions(-) diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py index 0cd94208d..7f122bb4d 100644 --- a/pym/_emerge/__init__.py +++ b/pym/_emerge/__init__.py @@ -38,7 +38,7 @@ except ImportError: from os import path as osp sys.path.insert(0, osp.join(osp.dirname(osp.dirname(osp.realpath(__file__))), "pym")) import portage -portage._disable_legacy_globals() + from portage import digraph, portdbapi from portage.const import NEWS_LIB_PATH, CACHE_PATH, PRIVATE_PATH, USER_CONFIG_PATH, GLOBAL_CONFIG_PATH @@ -2292,6 +2292,95 @@ class EbuildExecuter(CompositeTask): self._start_task(ebuild_phases, self._default_final_exit) +class EbuildMetadataPhase(SubProcess): + + """ + Asynchronous interface for the ebuild "depend" phase which is + used to extract metadata from the ebuild. + """ + + __slots__ = ("cpv", "ebuild_path", "fd_pipes", "metadata_callback", + "ebuild_mtime", "portdb", "repo_path", "settings") + \ + ("files", "_raw_metadata") + + _file_names = ("ebuild",) + _files_dict = slot_dict_class(_file_names, prefix="") + _bufsize = SpawnProcess._bufsize + _metadata_fd = 9 + + def start(self): + settings = self.settings + ebuild_path = self.ebuild_path + debug = settings.get("PORTAGE_DEBUG") == "1" + master_fd = None + slave_fd = None + fd_pipes = None + if self.fd_pipes is not None: + fd_pipes = self.fd_pipes.copy() + else: + fd_pipes = {} + + fd_pipes.setdefault(0, sys.stdin.fileno()) + fd_pipes.setdefault(1, sys.stdout.fileno()) + fd_pipes.setdefault(2, sys.stderr.fileno()) + + # flush any pending output + for fd in fd_pipes.itervalues(): + if fd == sys.stdout.fileno(): + sys.stdout.flush() + if fd == sys.stderr.fileno(): + sys.stderr.flush() + + fd_pipes_orig = fd_pipes.copy() + self.files = self._files_dict() + files = self.files + + master_fd, slave_fd = os.pipe() + fcntl.fcntl(master_fd, fcntl.F_SETFL, + fcntl.fcntl(master_fd, fcntl.F_GETFL) | os.O_NONBLOCK) + + fd_pipes[self._metadata_fd] = slave_fd + + retval = portage.doebuild(ebuild_path, "depend", + settings["ROOT"], settings, debug, + mydbapi=self.portdb, tree="porttree", + fd_pipes=fd_pipes, returnpid=True) + + os.close(slave_fd) + + if isinstance(retval, int): + # doebuild failed before spawning + os.close(master_fd) + self.returncode = retval + self.wait() + return + + self.pid = retval[0] + portage.process.spawned_pids.remove(self.pid) + + self._raw_metadata = [] + files.ebuild = os.fdopen(master_fd, 'r') + self._reg_id = self.scheduler.register(files.ebuild.fileno(), + PollConstants.POLLIN, self._output_handler) + self.registered = True + + def _output_handler(self, fd, event): + files = self.files + self._raw_metadata.append(files.ebuild.read()) + if not self._raw_metadata[-1]: + for f in files.values(): + f.close() + self.registered = False + self._wait() + + if self.returncode == os.EX_OK: + metadata = izip(portage.auxdbkeys, + "".join(self._raw_metadata).splitlines()) + self.metadata_callback(self.cpv, self.ebuild_path, + self.repo_path, metadata, self.ebuild_mtime) + + return self.registered + class EbuildPhase(SubProcess): __slots__ = ("fd_pipes", "phase", "pkg", @@ -2387,6 +2476,15 @@ class EbuildPhase(SubProcess): mydbapi=mydbapi, tree=tree, fd_pipes=fd_pipes, returnpid=True) + os.close(slave_fd) + + if isinstance(retval, int): + # doebuild failed before spawning + os.close(master_fd) + self.returncode = retval + self.wait() + return + self.pid = retval[0] portage.process.spawned_pids.remove(self.pid) @@ -2398,7 +2496,6 @@ class EbuildPhase(SubProcess): else: output_handler = self._dummy_handler - os.close(slave_fd) files.ebuild = os.fdopen(master_fd, 'r') self._reg_id = self.scheduler.register(files.ebuild.fileno(), PollConstants.POLLIN, output_handler) @@ -7680,7 +7777,118 @@ class SequentialTaskQueue(SlotObject): def __len__(self): return len(self._task_queue) + len(self.running_tasks) -class Scheduler(object): +class PollLoop(object): + + def __init__(self): + self._max_jobs = 1 + self._max_load = None + self._jobs = 0 + self._poll_event_handlers = {} + self._poll_event_handler_ids = {} + # Increment id for each new handler. + self._event_handler_id = 0 + try: + self._poll = select.poll() + except AttributeError: + self._poll = PollSelectAdapter() + + def _can_add_job(self): + jobs = self._jobs + max_jobs = self._max_jobs + max_load = self._max_load + + if self._jobs >= self._max_jobs: + return False + + if max_load is not None and max_jobs > 1 and self._jobs > 1: + try: + avg1, avg5, avg15 = os.getloadavg() + except OSError, e: + writemsg("!!! getloadavg() failed: %s\n" % (e,), + noiselevel=-1) + del e + return False + + if avg1 >= max_load: + return False + + return True + + def _register(self, f, eventmask, handler): + """ + @rtype: Integer + @return: A unique registration id, for use in schedule() or + unregister() calls. + """ + self._event_handler_id += 1 + reg_id = self._event_handler_id + self._poll_event_handler_ids[reg_id] = f + self._poll_event_handlers[f] = (handler, reg_id) + self._poll.register(f, eventmask) + return reg_id + + def _unregister(self, reg_id): + f = self._poll_event_handler_ids[reg_id] + self._poll.unregister(f) + del self._poll_event_handlers[f] + del self._poll_event_handler_ids[reg_id] + self._schedule_tasks() + + def _schedule(self, wait_id): + """ + Schedule until wait_id is not longer registered + for poll() events. + @type wait_id: int + @param wait_id: a task id to wait for + """ + event_handlers = self._poll_event_handlers + handler_ids = self._poll_event_handler_ids + poll = self._poll.poll + + self._schedule_tasks() + + while wait_id in handler_ids: + for f, event in poll(): + handler, reg_id = event_handlers[f] + if not handler(f, event): + self._unregister(reg_id) + + def _schedule_tasks(self): + return False + + def _schedule_main(self, wait=False): + + event_handlers = self._poll_event_handlers + poll = self._poll.poll + max_jobs = self._max_jobs + + state_change = 0 + + if self._schedule_tasks(): + state_change += 1 + + while event_handlers: + jobs = self._jobs + + for f, event in poll(): + handler, reg_id = event_handlers[f] + if not handler(f, event): + state_change += 1 + self._unregister(reg_id) + + if jobs == self._jobs: + continue + + if self._schedule_tasks(): + state_change += 1 + + if not wait and self._jobs < max_jobs: + break + + if not state_change: + raise AssertionError("tight loop") + +class Scheduler(PollLoop): _opts_ignore_blockers = \ frozenset(["--buildpkgonly", @@ -7722,6 +7930,7 @@ class Scheduler(object): def __init__(self, settings, trees, mtimedb, myopts, spinner, mergelist, favorites, digraph): + PollLoop.__init__(self) self.settings = settings self.target_root = settings["ROOT"] self.trees = trees @@ -7758,15 +7967,6 @@ class Scheduler(object): self._sched_iface = self._iface_class( fetch=fetch_iface, register=self._register, schedule=self._schedule) - self._poll_event_handlers = {} - self._poll_event_handler_ids = {} - # Increment id for each new handler. - self._event_handler_id = 0 - - try: - self._poll = select.poll() - except AttributeError: - self._poll = PollSelectAdapter() self._task_queues = self._task_queues_class() for k in self._task_queues.allowed_keys: @@ -7792,7 +7992,6 @@ class Scheduler(object): self._max_load = myopts.get("--load-average") self._set_digraph(digraph) - self._jobs = 0 features = self.settings.features if "parallel-fetch" in features and \ @@ -8345,24 +8544,10 @@ class Scheduler(object): while pkg_queue and not failed_pkgs: - if self._jobs >= max_jobs: + if not self._can_add_job(): self._schedule_main() continue - if max_load is not None and max_jobs > 1 and self._jobs > 1: - try: - avg1, avg5, avg15 = os.getloadavg() - except OSError, e: - writemsg("!!! getloadavg() failed: %s\n" % (e,), - noiselevel=-1) - del e - self._schedule_main() - continue - - if avg1 >= max_load: - self._schedule_main() - continue - pkg = self._choose_pkg() if pkg is None: @@ -8389,38 +8574,6 @@ class Scheduler(object): while self._jobs: self._schedule_main(wait=True) - def _schedule_main(self, wait=False): - - event_handlers = self._poll_event_handlers - poll = self._poll.poll - max_jobs = self._max_jobs - - state_change = 0 - - if self._schedule_tasks(): - state_change += 1 - - while event_handlers: - jobs = self._jobs - - for f, event in poll(): - handler, reg_id = event_handlers[f] - if not handler(f, event): - state_change += 1 - self._unregister(reg_id) - - if jobs == self._jobs: - continue - - if self._schedule_tasks(): - state_change += 1 - - if not wait and self._jobs < max_jobs: - break - - if not state_change: - raise AssertionError("tight loop") - def _schedule_tasks(self): state_change = 0 for x in self._task_queues.values(): @@ -8524,45 +8677,6 @@ class Scheduler(object): return True return False - def _register(self, f, eventmask, handler): - """ - @rtype: Integer - @return: A unique registration id, for use in schedule() or - unregister() calls. - """ - self._event_handler_id += 1 - reg_id = self._event_handler_id - self._poll_event_handler_ids[reg_id] = f - self._poll_event_handlers[f] = (handler, reg_id) - self._poll.register(f, eventmask) - return reg_id - - def _unregister(self, reg_id): - f = self._poll_event_handler_ids[reg_id] - self._poll.unregister(f) - del self._poll_event_handlers[f] - del self._poll_event_handler_ids[reg_id] - self._schedule_tasks() - - def _schedule(self, wait_id): - """ - Schedule until wait_id is not longer registered - for poll() events. - @type wait_id: int - @param wait_id: a task id to wait for - """ - event_handlers = self._poll_event_handlers - handler_ids = self._poll_event_handler_ids - poll = self._poll.poll - - self._schedule_tasks() - - while wait_id in handler_ids: - for f, event in poll(): - handler, reg_id = event_handlers[f] - if not handler(f, event): - self._unregister(reg_id) - def _world_atom(self, pkg): """ Add the package to the world file, but only if @@ -8630,6 +8744,111 @@ class Scheduler(object): return pkg +class MetadataRegen(PollLoop): + + class _sched_iface_class(SlotObject): + __slots__ = ("register", "schedule") + + def __init__(self, portdb, max_jobs=None, max_load=None): + PollLoop.__init__(self) + self._portdb = portdb + + if max_jobs is None: + max_jobs = 1 + + self._job_queue = SequentialTaskQueue(max_jobs=max_jobs) + self._max_jobs = max_jobs + self._max_load = max_load + self._sched_iface = self._sched_iface_class( + register=self._register, + schedule=self._schedule) + + self._valid_pkgs = set() + + def _iter_metadata_processes(self): + portdb = self._portdb + valid_pkgs = self._valid_pkgs + every_cp = portdb.cp_all() + every_cp.sort(reverse=True) + + while every_cp: + cp = every_cp.pop() + portage.writemsg_stdout("Processing %s\n" % cp) + cpv_list = portdb.cp_list(cp) + for cpv in cpv_list: + valid_pkgs.add(cpv) + ebuild_path, repo_path = portdb.findname2(cpv) + metadata_process = portdb._metadata_process( + cpv, ebuild_path, repo_path) + if metadata_process is None: + continue + yield metadata_process + + def run(self): + + portdb = self._portdb + from portage.cache.cache_errors import CacheError + dead_nodes = {} + + for mytree in portdb.porttrees: + try: + dead_nodes[mytree] = set(portdb.auxdb[mytree].iterkeys()) + except CacheError, e: + portage.writemsg("Error listing cache entries for " + \ + "'%s': %s, continuing...\n" % (mytree, e), noiselevel=-1) + del e + dead_nodes = None + break + + self._main_loop() + + if dead_nodes: + for y in self._valid_pkgs: + for mytree in portdb.porttrees: + if portdb.findname2(y, mytree=mytree)[0]: + dead_nodes[mytree].discard(y) + + for mytree, nodes in dead_nodes.iteritems(): + auxdb = portdb.auxdb[mytree] + for y in nodes: + try: + del auxdb[y] + except (KeyError, CacheError): + pass + + def _main_loop(self): + + process_iter = self._iter_metadata_processes() + + while True: + + if not self._can_add_job(): + self._schedule_main() + continue + + try: + metadata_process = process_iter.next() + except StopIteration: + break + + self._jobs += 1 + metadata_process.scheduler = self._sched_iface + metadata_process.addExitListener(self._metadata_exit) + self._job_queue.add(metadata_process) + + while self._jobs: + self._schedule_main(wait=True) + + def _schedule_tasks(self): + return self._job_queue.schedule() + + def _metadata_exit(self, metadata_process): + self._jobs -= 1 + if metadata_process.returncode != os.EX_OK: + self._valid_pkgs.discard(metadata_process.cpv) + portage.writemsg("Error processing %s, continuing...\n" % \ + (metadata_process.cpv,)) + class UninstallFailure(portage.exception.PortageException): """ An instance of this class is raised by unmerge() when @@ -9934,7 +10153,7 @@ def action_metadata(settings, portdb, myopts): sys.stdout.flush() os.umask(old_umask) -def action_regen(settings, portdb): +def action_regen(settings, portdb, max_jobs, max_load): xterm_titles = "notitles" not in settings.features emergelog(xterm_titles, " === regen") #regenerate cache entries @@ -9946,40 +10165,10 @@ def action_regen(settings, portdb): except: pass sys.stdout.flush() - mynodes = portdb.cp_all() - from portage.cache.cache_errors import CacheError - dead_nodes = {} - for mytree in portdb.porttrees: - try: - dead_nodes[mytree] = set(portdb.auxdb[mytree].iterkeys()) - except CacheError, e: - portage.writemsg("Error listing cache entries for " + \ - "'%s': %s, continuing...\n" % (mytree, e), noiselevel=-1) - del e - dead_nodes = None - break - for x in mynodes: - mymatches = portdb.cp_list(x) - portage.writemsg_stdout("Processing %s\n" % x) - for y in mymatches: - try: - foo = portdb.aux_get(y,["DEPEND"]) - except (KeyError, portage.exception.PortageException), e: - portage.writemsg( - "Error processing %(cpv)s, continuing... (%(e)s)\n" % \ - {"cpv":y,"e":str(e)}, noiselevel=-1) - if dead_nodes: - for mytree in portdb.porttrees: - if portdb.findname2(y, mytree=mytree)[0]: - dead_nodes[mytree].discard(y) - if dead_nodes: - for mytree, nodes in dead_nodes.iteritems(): - auxdb = portdb.auxdb[mytree] - for y in nodes: - try: - del auxdb[y] - except (KeyError, CacheError): - pass + + regen = MetadataRegen(portdb, max_jobs=max_jobs, max_load=max_load) + regen.run() + portage.writemsg_stdout("done!\n") def action_config(settings, trees, myopts, myfiles): @@ -11408,6 +11597,7 @@ def adjust_config(myopts, settings): def emerge_main(): global portage # NFC why this is necessary now - genone + portage._disable_legacy_globals() # Disable color until we're sure that it should be enabled (after # EMERGE_DEFAULT_OPTS has been parsed). portage.output.havecolor = 0 @@ -11789,7 +11979,8 @@ def emerge_main(): action_metadata(settings, portdb, myopts) elif myaction=="regen": validate_ebuild_environment(trees) - action_regen(settings, portdb) + action_regen(settings, portdb, myopts.get("--jobs"), + myopts.get("--load-average")) # HELP action elif "config"==myaction: validate_ebuild_environment(trees) diff --git a/pym/portage/__init__.py b/pym/portage/__init__.py index bd800b4e2..453096f67 100644 --- a/pym/portage/__init__.py +++ b/pym/portage/__init__.py @@ -5011,7 +5011,12 @@ def doebuild(myebuild, mydo, myroot, mysettings, debug=0, listonly=0, if mydo == "depend": writemsg("!!! DEBUG: dbkey: %s\n" % str(dbkey), 2) droppriv = "userpriv" in mysettings.features - if isinstance(dbkey, dict): + if returnpid: + mypids = spawn(_shell_quote(ebuild_sh_binary) + " depend", + mysettings, fd_pipes=fd_pipes, returnpid=True, + droppriv=droppriv) + return mypids + elif isinstance(dbkey, dict): mysettings["dbkey"] = "" pr, pw = os.pipe() fd_pipes = { diff --git a/pym/portage/dbapi/porttree.py b/pym/portage/dbapi/porttree.py index dd8015cc1..797c886f2 100644 --- a/pym/portage/dbapi/porttree.py +++ b/pym/portage/dbapi/porttree.py @@ -229,6 +229,104 @@ class portdbapi(dbapi): return[file, x] return None, 0 + def _metadata_process(self, cpv, ebuild_path, repo_path): + """ + Create an EbuildMetadataPhase instance to generate metadata for the + give ebuild. + @rtype: EbuildMetadataPhase + @returns: A new EbuildMetadataPhase instance, or None if the + metadata cache is already valid. + """ + metadata, st, emtime = self._pull_valid_cache(cpv, ebuild_path, repo_path) + if metadata is not None: + return None + + import _emerge + process = _emerge.EbuildMetadataPhase(cpv=cpv, ebuild_path=ebuild_path, + ebuild_mtime=emtime, metadata_callback=self._metadata_callback, + portdb=self, repo_path=repo_path, settings=self.doebuild_settings) + return process + + def _metadata_callback(self, cpv, ebuild_path, repo_path, metadata, mtime): + + i = metadata + if hasattr(metadata, "iteritems"): + i = metadata.iteritems() + metadata = dict(i) + + if "EAPI" not in metadata or not metadata["EAPI"].strip(): + metadata["EAPI"] = "0" + + if not eapi_is_supported(metadata["EAPI"]): + # if newer version, wipe everything and negate eapi + eapi = metadata["EAPI"] + metadata = {} + map(lambda x: metadata.setdefault(x, ""), auxdbkeys) + metadata["EAPI"] = "-" + eapi + + if metadata.get("INHERITED", False): + metadata["_eclasses_"] = \ + self.eclassdb.get_eclass_data(metadata["INHERITED"].split()) + else: + metadata["_eclasses_"] = {} + + metadata.pop("INHERITED", None) + metadata["_mtime_"] = mtime + self.auxdb[repo_path][cpv] = metadata + + def _pull_valid_cache(self, cpv, ebuild_path, repo_path): + + try: + st = os.stat(ebuild_path) + emtime = st[stat.ST_MTIME] + except OSError: + writemsg("!!! aux_get(): ebuild for " + \ + "'%s' does not exist at:\n" % (cpv,), noiselevel=-1) + writemsg("!!! %s\n" % ebuild_path, noiselevel=-1) + raise KeyError(cpv) + + # Pull pre-generated metadata from the metadata/cache/ + # directory if it exists and is valid, otherwise fall + # back to the normal writable cache. + auxdbs = [] + pregen_auxdb = self._pregen_auxdb.get(repo_path) + if pregen_auxdb is not None: + auxdbs.append(pregen_auxdb) + auxdbs.append(self.auxdb[repo_path]) + + doregen = True + for auxdb in auxdbs: + try: + metadata = auxdb[cpv] + eapi = metadata.get("EAPI","").strip() + if not eapi: + eapi = "0" + if eapi.startswith("-") and eapi_is_supported(eapi[1:]): + pass + elif emtime != int(metadata.get("_mtime_", 0)): + pass + elif len(metadata.get("_eclasses_", [])) > 0: + if self.eclassdb.is_eclass_data_valid( + metadata["_eclasses_"]): + doregen = False + else: + doregen = False + except KeyError: + pass + except CacheError: + if auxdb is not pregen_auxdb: + try: + del auxdb[cpv] + except KeyError: + pass + if not doregen: + break + + if doregen: + metadata = None + + return (metadata, st, emtime) + def aux_get(self, mycpv, mylist, mytree=None): "stub code for returning auxilliary db information, such as SLOT, DEPEND, etc." 'input: "sys-apps/foo-1.0",["SLOT","DEPEND","HOMEPAGE"]' @@ -294,52 +392,8 @@ class portdbapi(dbapi): writemsg("!!! Manifest is missing or inaccessable: %(manifest)s\n" % {"manifest":myManifestPath}, noiselevel=-1) - - try: - st = os.stat(myebuild) - emtime = st[stat.ST_MTIME] - except OSError: - writemsg("!!! aux_get(): ebuild for '%(cpv)s' does not exist at:\n" % {"cpv":mycpv}, - noiselevel=-1) - writemsg("!!! %s\n" % myebuild, - noiselevel=-1) - raise KeyError(mycpv) - - # Pull pre-generated metadata from the metadata/cache/ - # directory if it exists and is valid, otherwise fall - # back to the normal writable cache. - auxdbs = [] - pregen_auxdb = self._pregen_auxdb.get(mylocation) - if pregen_auxdb is not None: - auxdbs.append(pregen_auxdb) - auxdbs.append(self.auxdb[mylocation]) - - doregen = True - for auxdb in auxdbs: - try: - mydata = auxdb[mycpv] - eapi = mydata.get("EAPI","").strip() - if not eapi: - eapi = "0" - if eapi.startswith("-") and eapi_is_supported(eapi[1:]): - pass - elif emtime != long(mydata.get("_mtime_", 0)): - pass - elif len(mydata.get("_eclasses_", [])) > 0: - if self.eclassdb.is_eclass_data_valid(mydata["_eclasses_"]): - doregen = False - else: - doregen = False - except KeyError: - pass - except CacheError: - if auxdb is not pregen_auxdb: - try: - del auxdb[mycpv] - except KeyError: - pass - if not doregen: - break + mydata, st, emtime = self._pull_valid_cache(mycpv, myebuild, mylocation) + doregen = mydata is None writemsg("auxdb is valid: "+str(not doregen)+" "+str(pkg)+"\n", 2) @@ -360,26 +414,8 @@ class portdbapi(dbapi): self._broken_ebuilds.add(myebuild) raise KeyError(mycpv) - if "EAPI" not in mydata or not mydata["EAPI"].strip(): - mydata["EAPI"] = "0" - - if not eapi_is_supported(mydata["EAPI"]): - # if newer version, wipe everything and negate eapi - eapi = mydata["EAPI"] - mydata = {} - map(lambda x: mydata.setdefault(x, ""), auxdbkeys) - mydata["EAPI"] = "-"+eapi - - if mydata.get("INHERITED", False): - mydata["_eclasses_"] = self.eclassdb.get_eclass_data(mydata["INHERITED"].split()) - else: - mydata["_eclasses_"] = {} - - del mydata["INHERITED"] - - mydata["_mtime_"] = emtime - - self.auxdb[mylocation][mycpv] = mydata + self._metadata_callback( + mycpv, myebuild, mylocation, mydata, emtime) if not mydata.setdefault("EAPI", "0"): mydata["EAPI"] = "0" -- 2.26.2