From e4edadf5ae7063f375d76be151c6d0e949980ecf Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Mon, 30 Jun 2008 12:08:16 +0000 Subject: [PATCH] Reimplement parallel-fetch by spawning the `ebuild fetch` command for each ebuild. The benefit of using this approach is that it can be integrated together with parallel build scheduling that's planned. Parallel-fetch support for binhost is not implemented yet, though it worked previously. svn path=/main/trunk/; revision=10855 --- pym/_emerge/__init__.py | 322 ++++++++++++++++++++++++++++------------ pym/portage/__init__.py | 5 +- 2 files changed, 234 insertions(+), 93 deletions(-) diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py index d2f34d8b9..b6c74b586 100644 --- a/pym/_emerge/__init__.py +++ b/pym/_emerge/__init__.py @@ -1433,26 +1433,141 @@ class _PackageMetadataWrapper(_PackageMetadataWrapperBase): v = 0 self._pkg.mtime = v -class EbuildFetcher(Task): +class EbuildFetcher(SlotObject): __slots__ = ("fetch_all", "pkg", "pretend", "settings") - def _get_hash_key(self): - hash_key = getattr(self, "_hash_key", None) - if hash_key is None: - self._hash_key = ("EbuildFetcher", self.pkg._get_hash_key()) - return self._hash_key - def execute(self): portdb = self.pkg.root_config.trees["porttree"].dbapi ebuild_path = portdb.findname(self.pkg.cpv) debug = self.settings.get("PORTAGE_DEBUG") == "1" + retval = portage.doebuild(ebuild_path, "fetch", - self.settings["ROOT"], self.settings, debug, - self.pretend, fetchonly=1, fetchall=self.fetch_all, + self.settings["ROOT"], self.settings, debug=debug, + listonly=self.pretend, fetchonly=1, fetchall=self.fetch_all, mydbapi=portdb, tree="porttree") return retval +class EbuildFetcherAsync(SlotObject): + + __slots__ = ("log_file", "fd_pipes", "pkg", + "register", "unregister", + "pid", "returncode", "files") + + _file_names = ("fetcher", "out") + _files_dict = slot_dict_class(_file_names) + _bufsize = 4096 + + def start(self): + # flush any pending output + fd_pipes = self.fd_pipes + if fd_pipes is None: + fd_pipes = { + 0 : sys.stdin.fileno(), + 1 : sys.stdout.fileno(), + 2 : sys.stderr.fileno(), + } + + log_file = self.log_file + self.files = self._files_dict() + files = self.files + + if log_file is not None: + files["out"] = open(log_file, "a") + portage.util.apply_secpass_permissions(log_file, + uid=portage.portage_uid, gid=portage.portage_gid, + mode=0660) + else: + for fd in fd_pipes.itervalues(): + if fd == sys.stdout.fileno(): + sys.stdout.flush() + if fd == sys.stderr.fileno(): + sys.stderr.flush() + + files["out"] = os.fdopen(os.dup(fd_pipes[1]), 'w') + + master_fd, slave_fd = os.pipe() + + import fcntl + fcntl.fcntl(master_fd, fcntl.F_SETFL, + fcntl.fcntl(master_fd, fcntl.F_GETFL) | os.O_NONBLOCK) + + fd_pipes.setdefault(0, sys.stdin.fileno()) + fd_pipes_orig = fd_pipes.copy() + fd_pipes[0] = fd_pipes_orig[0] + fd_pipes[1] = slave_fd + fd_pipes[2] = slave_fd + + root_config = self.pkg.root_config + portdb = root_config.trees["porttree"].dbapi + ebuild_path = portdb.findname(self.pkg.cpv) + settings = root_config.settings + + fetch_env = dict((k, settings[k]) for k in settings) + fetch_env["FEATURES"] = fetch_env.get("FEATURES", "") + " -cvs" + fetch_env["PORTAGE_NICENESS"] = "0" + fetch_env["PORTAGE_PARALLEL_FETCHONLY"] = "1" + + ebuild_binary = os.path.join( + settings["EBUILD_BIN_PATH"], "ebuild") + + fetch_args = [ebuild_binary, ebuild_path, "fetch"] + debug = settings.get("PORTAGE_DEBUG") == "1" + if debug: + fetch_args.append("--debug") + + retval = portage.process.spawn(fetch_args, env=fetch_env, + fd_pipes=fd_pipes, returnpid=True) + + self.pid = retval[0] + + os.close(slave_fd) + files["fetcher"] = os.fdopen(master_fd, 'r') + self.register(files["fetcher"].fileno(), + select.POLLIN, self._output_handler) + + def _output_handler(self, fd, event): + files = self.files + buf = array.array('B') + try: + buf.fromfile(files["fetcher"], self._bufsize) + except EOFError: + pass + if buf: + buf.tofile(files["out"]) + files["out"].flush() + else: + self.unregister(files["fetcher"].fileno()) + for f in files.values(): + f.close() + + def poll(self): + if self.returncode is not None: + return self.returncode + retval = os.waitpid(self.pid, os.WNOHANG) + if retval == (0, 0): + return None + self._set_returncode(retval) + return self.returncode + + def wait(self): + if self.returncode is not None: + return self.returncode + self._set_returncode(os.waitpid(self.pid, 0)) + return self.returncode + + def _set_returncode(self, wait_retval): + + retval = wait_retval[1] + portage.process.spawned_pids.remove(self.pid) + if retval != os.EX_OK: + if retval & 0xff: + retval = (retval & 0xff) << 8 + else: + retval = retval >> 8 + + self.returncode = retval + class EbuildBuildDir(SlotObject): __slots__ = ("pkg", "settings", @@ -1566,9 +1681,12 @@ class EbuildBuild(Task): ebuild_phase = EbuildPhase(fd_pipes=fd_pipes, pkg=self.pkg, phase=mydo, register=self.register, settings=settings, unregister=self.unregister) + ebuild_phase.start() - self.schedule() - retval = ebuild_phase.wait() + retval = None + while retval is None: + self.schedule() + retval = ebuild_phase.poll() portage._post_phase_userpriv_perms(settings) if mydo == "install": @@ -1686,10 +1804,25 @@ class EbuildPhase(SlotObject): for f in files.values(): f.close() + def poll(self): + if self.returncode is not None: + return self.returncode + retval = os.waitpid(self.pid, os.WNOHANG) + if retval == (0, 0): + return None + self._set_returncode(retval) + return self.returncode + def wait(self): - pid = self.pid - retval = os.waitpid(pid, 0)[1] - portage.process.spawned_pids.remove(pid) + if self.returncode is not None: + return self.returncode + self._set_returncode(os.waitpid(self.pid, 0)) + return self.returncode + + def _set_returncode(self, wait_retval): + + retval = wait_retval[1] + portage.process.spawned_pids.remove(self.pid) if retval != os.EX_OK: if retval & 0xff: retval = (retval & 0xff) << 8 @@ -1706,7 +1839,6 @@ class EbuildPhase(SlotObject): eerror(l, phase=self.phase, key=self.pkg.cpv) self.returncode = retval - return self.returncode class EbuildBinpkg(Task): """ @@ -6327,6 +6459,8 @@ class Scheduler(object): "--fetchonly", "--fetch-all-uri", "--nodeps", "--pretend"]) + _fetch_log = "/var/log/emerge-fetch.log" + def __init__(self, settings, trees, mtimedb, myopts, spinner, mergelist, favorites, digraph): self.settings = settings @@ -6345,9 +6479,38 @@ class Scheduler(object): self.pkgsettings[root] = portage.config( clone=trees[root]["vartree"].settings) self.curval = 0 - self._spawned_pids = [] self._poll_event_handlers = {} self._poll = select.poll() + from collections import deque + self._task_queue = deque() + self._running_tasks = set() + self._max_jobs = 1 + self._parallel_fetch = False + features = self.settings.features + if "parallel-fetch" in features and \ + not ("--pretend" in self.myopts or \ + "--fetch-all-uri" in self.myopts or \ + "--fetchonly" in self.myopts): + if "distlocks" not in features: + portage.writemsg(red("!!!")+"\n", noiselevel=-1) + portage.writemsg(red("!!!")+" parallel-fetching " + \ + "requires the distlocks feature enabled"+"\n", + noiselevel=-1) + portage.writemsg(red("!!!")+" you have it disabled, " + \ + "thus parallel-fetching is being disabled"+"\n", + noiselevel=-1) + portage.writemsg(red("!!!")+"\n", noiselevel=-1) + elif len(mergelist) > 1: + self._parallel_fetch = True + + # clear out existing fetch log if it exists + try: + open(self._fetch_log, 'w') + except EnvironmentError: + pass + + def _add_task(self, task): + self._task_queue.append(task) class _pkg_failure(portage.exception.PortageException): """ @@ -6400,20 +6563,18 @@ class Scheduler(object): def merge(self): keep_going = "--keep-going" in self.myopts + running_tasks = self._running_tasks while True: try: rval = self._merge() finally: - spawned_pids = self._spawned_pids - while spawned_pids: - pid = spawned_pids.pop() - try: - if os.waitpid(pid, os.WNOHANG) == (0, 0): - os.kill(pid, signal.SIGTERM) - os.waitpid(pid, 0) - except OSError: - pass # cleaned up elsewhere. + # clean up child process if necessary + while running_tasks: + task = running_tasks.pop() + if task.poll() is None: + os.kill(task.pid, signal.SIGTERM) + task.wait() if rval == os.EX_OK or not keep_going: break @@ -6493,25 +6654,6 @@ class Scheduler(object): mydepgraph.break_refs(dropped_tasks) return (mylist, dropped_tasks) - def _poll_child_processes(self): - """ - After each merge, collect status from child processes - in order to clean up zombies (such as the parallel-fetch - process). - """ - spawned_pids = self._spawned_pids - if not spawned_pids: - return - for pid in list(spawned_pids): - try: - if os.waitpid(pid, os.WNOHANG) == (0, 0): - continue - except OSError: - # This pid has been cleaned up elsewhere, - # so remove it from our list. - pass - spawned_pids.remove(pid) - def _register(self, f, eventmask, handler): self._poll_event_handlers[f] = handler self._poll.register(f, eventmask) @@ -6519,11 +6661,43 @@ class Scheduler(object): def _unregister(self, f): self._poll.unregister(f) del self._poll_event_handlers[f] + self._schedule_tasks() def _schedule(self): - while self._poll_event_handlers: - for f, event in self._poll.poll(): - self._poll_event_handlers[f](f, event) + event_handlers = self._poll_event_handlers + running_tasks = self._running_tasks + poll = self._poll.poll + + self._schedule_tasks() + + while event_handlers: + for f, event in poll(): + event_handlers[f](f, event) + + if len(event_handlers) <= len(running_tasks): + # Assuming one handler per task, this + # means the caller has unregistered it's + # handler, so it's time to yield. + break + + def _schedule_tasks(self): + task_queue = self._task_queue + running_tasks = self._running_tasks + max_jobs = self._max_jobs + state_changed = False + + for task in list(running_tasks): + if task.poll() is not None: + running_tasks.remove(task) + state_changed = True + + while task_queue and (len(running_tasks) < max_jobs): + task = task_queue.popleft() + task.start() + running_tasks.add(task) + state_changed = True + + return state_changed def _merge(self): mylist = self._mergelist @@ -6551,6 +6725,16 @@ class Scheduler(object): if isinstance(x, Package) and x.operation == "merge"] mtimedb.commit() + if self._parallel_fetch: + for pkg in mylist: + if not isinstance(pkg, Package) or \ + not pkg.type_name == "ebuild": + continue + + self._add_task(EbuildFetcherAsync(log_file=self._fetch_log, + pkg=pkg, register=self._register, + unregister=self._unregister)) + # Verify all the manifests now so that the user is notified of failure # as soon as possible. if "--fetchonly" not in self.myopts and \ @@ -6584,49 +6768,6 @@ class Scheduler(object): myfeat = self.settings.features[:] bad_resume_opts = set(["--ask", "--changelog", "--skipfirst", "--resume"]) - if "parallel-fetch" in myfeat and \ - not ("--pretend" in self.myopts or \ - "--fetch-all-uri" in self.myopts or \ - "--fetchonly" in self.myopts): - if "distlocks" not in myfeat: - print red("!!!") - print red("!!!")+" parallel-fetching requires the distlocks feature enabled" - print red("!!!")+" you have it disabled, thus parallel-fetching is being disabled" - print red("!!!") - elif len(mymergelist) > 1: - fetch_log = "/var/log/emerge-fetch.log" - logfile = open(fetch_log, "w") - fd_pipes = {1:logfile.fileno(), 2:logfile.fileno()} - portage.util.apply_secpass_permissions(fetch_log, - uid=portage.portage_uid, gid=portage.portage_gid, - mode=0660) - fetch_env = os.environ.copy() - fetch_env["FEATURES"] = fetch_env.get("FEATURES", "") + " -cvs" - fetch_env["PORTAGE_NICENESS"] = "0" - fetch_env["PORTAGE_PARALLEL_FETCHONLY"] = "1" - fetch_args = [sys.argv[0], "--resume", - "--fetchonly", "--nodeps"] - resume_opts = self.myopts.copy() - # For automatic resume, we need to prevent - # any of bad_resume_opts from leaking in - # via EMERGE_DEFAULT_OPTS. - resume_opts["--ignore-default-opts"] = True - for myopt, myarg in resume_opts.iteritems(): - if myopt not in bad_resume_opts: - if myarg is True: - fetch_args.append(myopt) - else: - fetch_args.append(myopt +"="+ myarg) - self._spawned_pids.extend( - portage.process.spawn( - fetch_args, env=fetch_env, - fd_pipes=fd_pipes, returnpid=True)) - logfile.close() # belongs to the spawned process - del fetch_log, logfile, fd_pipes, fetch_env, fetch_args, \ - resume_opts - print ">>> starting parallel fetching pid %d" % \ - self._spawned_pids[-1] - metadata_keys = [k for k in portage.auxdbkeys \ if not k.startswith("UNUSED_")] + ["USE"] @@ -6926,7 +7067,6 @@ class Scheduler(object): # due to power failure, SIGKILL, etc... mtimedb.commit() self.curval += 1 - self._poll_child_processes() def _post_merge(self, mtimedb, xterm_titles, failed_fetches): if "--pretend" not in self.myopts: diff --git a/pym/portage/__init__.py b/pym/portage/__init__.py index 456f4e013..50711da28 100644 --- a/pym/portage/__init__.py +++ b/pym/portage/__init__.py @@ -3274,8 +3274,9 @@ def fetch(myuris, mysettings, listonly=0, fetchonly=0, locks_in_subdir=".locks", # file size. The parent process will verify their checksums prior to # the unpack phase. - parallel_fetchonly = fetchonly and \ - "PORTAGE_PARALLEL_FETCHONLY" in mysettings + parallel_fetchonly = "PORTAGE_PARALLEL_FETCHONLY" in mysettings + if parallel_fetchonly: + fetchonly = 1 check_config_instance(mysettings) -- 2.26.2