Reimplement parallel-fetch by spawning the `ebuild fetch` command for each
authorZac Medico <zmedico@gentoo.org>
Mon, 30 Jun 2008 12:08:16 +0000 (12:08 -0000)
committerZac Medico <zmedico@gentoo.org>
Mon, 30 Jun 2008 12:08:16 +0000 (12:08 -0000)
ebuild. The benefit of using this approach is that it can be integrated
together with parallel build scheduling that's planned. Parallel-fetch
support for binhost is not implemented yet, though it worked previously.

svn path=/main/trunk/; revision=10855

pym/_emerge/__init__.py
pym/portage/__init__.py

index d2f34d8b9e630115097ad77570957d6206e3c314..b6c74b58677a3fccc860e4ddded04bcb470adad5 100644 (file)
@@ -1433,26 +1433,141 @@ class _PackageMetadataWrapper(_PackageMetadataWrapperBase):
                                v = 0
                self._pkg.mtime = v
 
-class EbuildFetcher(Task):
+class EbuildFetcher(SlotObject):
 
        __slots__ = ("fetch_all", "pkg", "pretend", "settings")
 
-       def _get_hash_key(self):
-               hash_key = getattr(self, "_hash_key", None)
-               if hash_key is None:
-                       self._hash_key = ("EbuildFetcher", self.pkg._get_hash_key())
-               return self._hash_key
-
        def execute(self):
                portdb = self.pkg.root_config.trees["porttree"].dbapi
                ebuild_path = portdb.findname(self.pkg.cpv)
                debug = self.settings.get("PORTAGE_DEBUG") == "1"
+
                retval = portage.doebuild(ebuild_path, "fetch",
-                       self.settings["ROOT"], self.settings, debug,
-                       self.pretend, fetchonly=1, fetchall=self.fetch_all,
+                       self.settings["ROOT"], self.settings, debug=debug,
+                       listonly=self.pretend, fetchonly=1, fetchall=self.fetch_all,
                        mydbapi=portdb, tree="porttree")
                return retval
 
+class EbuildFetcherAsync(SlotObject):
+
+       __slots__ = ("log_file", "fd_pipes", "pkg",
+               "register", "unregister",
+               "pid", "returncode", "files")
+
+       _file_names = ("fetcher", "out")
+       _files_dict = slot_dict_class(_file_names)
+       _bufsize = 4096
+
+       def start(self):
+               # flush any pending output
+               fd_pipes = self.fd_pipes
+               if fd_pipes is None:
+                       fd_pipes = {
+                               0 : sys.stdin.fileno(),
+                               1 : sys.stdout.fileno(),
+                               2 : sys.stderr.fileno(),
+                       }
+
+               log_file = self.log_file
+               self.files = self._files_dict()
+               files = self.files
+
+               if log_file is not None:
+                       files["out"] = open(log_file, "a")
+                       portage.util.apply_secpass_permissions(log_file,
+                               uid=portage.portage_uid, gid=portage.portage_gid,
+                               mode=0660)
+               else:
+                       for fd in fd_pipes.itervalues():
+                               if fd == sys.stdout.fileno():
+                                       sys.stdout.flush()
+                               if fd == sys.stderr.fileno():
+                                       sys.stderr.flush()
+
+                       files["out"] = os.fdopen(os.dup(fd_pipes[1]), 'w')
+
+               master_fd, slave_fd = os.pipe()
+
+               import fcntl
+               fcntl.fcntl(master_fd, fcntl.F_SETFL,
+                       fcntl.fcntl(master_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
+
+               fd_pipes.setdefault(0, sys.stdin.fileno())
+               fd_pipes_orig = fd_pipes.copy()
+               fd_pipes[0] = fd_pipes_orig[0]
+               fd_pipes[1] = slave_fd
+               fd_pipes[2] = slave_fd
+
+               root_config = self.pkg.root_config
+               portdb = root_config.trees["porttree"].dbapi
+               ebuild_path = portdb.findname(self.pkg.cpv)
+               settings = root_config.settings
+
+               fetch_env = dict((k, settings[k]) for k in settings)
+               fetch_env["FEATURES"] = fetch_env.get("FEATURES", "") + " -cvs"
+               fetch_env["PORTAGE_NICENESS"] = "0"
+               fetch_env["PORTAGE_PARALLEL_FETCHONLY"] = "1"
+
+               ebuild_binary = os.path.join(
+                       settings["EBUILD_BIN_PATH"], "ebuild")
+
+               fetch_args = [ebuild_binary, ebuild_path, "fetch"]
+               debug = settings.get("PORTAGE_DEBUG") == "1"
+               if debug:
+                       fetch_args.append("--debug")
+
+               retval = portage.process.spawn(fetch_args, env=fetch_env,
+                       fd_pipes=fd_pipes, returnpid=True)
+
+               self.pid = retval[0]
+
+               os.close(slave_fd)
+               files["fetcher"] = os.fdopen(master_fd, 'r')
+               self.register(files["fetcher"].fileno(),
+                       select.POLLIN, self._output_handler)
+
+       def _output_handler(self, fd, event):
+               files = self.files
+               buf = array.array('B')
+               try:
+                       buf.fromfile(files["fetcher"], self._bufsize)
+               except EOFError:
+                       pass
+               if buf:
+                       buf.tofile(files["out"])
+                       files["out"].flush()
+               else:
+                       self.unregister(files["fetcher"].fileno())
+                       for f in files.values():
+                               f.close()
+
+       def poll(self):
+               if self.returncode is not None:
+                       return self.returncode
+               retval = os.waitpid(self.pid, os.WNOHANG)
+               if retval == (0, 0):
+                       return None
+               self._set_returncode(retval)
+               return self.returncode
+
+       def wait(self):
+               if self.returncode is not None:
+                       return self.returncode
+               self._set_returncode(os.waitpid(self.pid, 0))
+               return self.returncode
+
+       def _set_returncode(self, wait_retval):
+
+               retval = wait_retval[1]
+               portage.process.spawned_pids.remove(self.pid)
+               if retval != os.EX_OK:
+                       if retval & 0xff:
+                               retval = (retval & 0xff) << 8
+                       else:
+                               retval = retval >> 8
+
+               self.returncode = retval
+
 class EbuildBuildDir(SlotObject):
 
        __slots__ = ("pkg", "settings",
@@ -1566,9 +1681,12 @@ class EbuildBuild(Task):
                        ebuild_phase = EbuildPhase(fd_pipes=fd_pipes,
                                pkg=self.pkg, phase=mydo, register=self.register,
                                settings=settings, unregister=self.unregister)
+
                        ebuild_phase.start()
-                       self.schedule()
-                       retval = ebuild_phase.wait()
+                       retval = None
+                       while retval is None:
+                               self.schedule()
+                               retval = ebuild_phase.poll()
 
                        portage._post_phase_userpriv_perms(settings)
                        if mydo == "install":
@@ -1686,10 +1804,25 @@ class EbuildPhase(SlotObject):
                        for f in files.values():
                                f.close()
 
+       def poll(self):
+               if self.returncode is not None:
+                       return self.returncode
+               retval = os.waitpid(self.pid, os.WNOHANG)
+               if retval == (0, 0):
+                       return None
+               self._set_returncode(retval)
+               return self.returncode
+
        def wait(self):
-               pid = self.pid
-               retval = os.waitpid(pid, 0)[1]
-               portage.process.spawned_pids.remove(pid)
+               if self.returncode is not None:
+                       return self.returncode
+               self._set_returncode(os.waitpid(self.pid, 0))
+               return self.returncode
+
+       def _set_returncode(self, wait_retval):
+
+               retval = wait_retval[1]
+               portage.process.spawned_pids.remove(self.pid)
                if retval != os.EX_OK:
                        if retval & 0xff:
                                retval = (retval & 0xff) << 8
@@ -1706,7 +1839,6 @@ class EbuildPhase(SlotObject):
                                eerror(l, phase=self.phase, key=self.pkg.cpv)
 
                self.returncode = retval
-               return self.returncode
 
 class EbuildBinpkg(Task):
        """
@@ -6327,6 +6459,8 @@ class Scheduler(object):
                "--fetchonly", "--fetch-all-uri",
                "--nodeps", "--pretend"])
 
+       _fetch_log = "/var/log/emerge-fetch.log"
+
        def __init__(self, settings, trees, mtimedb, myopts,
                spinner, mergelist, favorites, digraph):
                self.settings = settings
@@ -6345,9 +6479,38 @@ class Scheduler(object):
                        self.pkgsettings[root] = portage.config(
                                clone=trees[root]["vartree"].settings)
                self.curval = 0
-               self._spawned_pids = []
                self._poll_event_handlers = {}
                self._poll = select.poll()
+               from collections import deque
+               self._task_queue = deque()
+               self._running_tasks = set()
+               self._max_jobs = 1
+               self._parallel_fetch = False
+               features = self.settings.features
+               if "parallel-fetch" in features and \
+                       not ("--pretend" in self.myopts or \
+                       "--fetch-all-uri" in self.myopts or \
+                       "--fetchonly" in self.myopts):
+                       if "distlocks" not in features:
+                               portage.writemsg(red("!!!")+"\n", noiselevel=-1)
+                               portage.writemsg(red("!!!")+" parallel-fetching " + \
+                                       "requires the distlocks feature enabled"+"\n",
+                                       noiselevel=-1)
+                               portage.writemsg(red("!!!")+" you have it disabled, " + \
+                                       "thus parallel-fetching is being disabled"+"\n",
+                                       noiselevel=-1)
+                               portage.writemsg(red("!!!")+"\n", noiselevel=-1)
+                       elif len(mergelist) > 1:
+                               self._parallel_fetch = True
+
+                               # clear out existing fetch log if it exists
+                               try:
+                                       open(self._fetch_log, 'w')
+                               except EnvironmentError:
+                                       pass
+
+       def _add_task(self, task):
+               self._task_queue.append(task)
 
        class _pkg_failure(portage.exception.PortageException):
                """
@@ -6400,20 +6563,18 @@ class Scheduler(object):
        def merge(self):
 
                keep_going = "--keep-going" in self.myopts
+               running_tasks = self._running_tasks
 
                while True:
                        try:
                                rval = self._merge()
                        finally:
-                               spawned_pids = self._spawned_pids
-                               while spawned_pids:
-                                       pid = spawned_pids.pop()
-                                       try:
-                                               if os.waitpid(pid, os.WNOHANG) == (0, 0):
-                                                       os.kill(pid, signal.SIGTERM)
-                                                       os.waitpid(pid, 0)
-                                       except OSError:
-                                               pass # cleaned up elsewhere.
+                               # clean up child process if necessary
+                               while running_tasks:
+                                       task = running_tasks.pop()
+                                       if task.poll() is None:
+                                               os.kill(task.pid, signal.SIGTERM)
+                                               task.wait()
 
                        if rval == os.EX_OK or not keep_going:
                                break
@@ -6493,25 +6654,6 @@ class Scheduler(object):
                mydepgraph.break_refs(dropped_tasks)
                return (mylist, dropped_tasks)
 
-       def _poll_child_processes(self):
-               """
-               After each merge, collect status from child processes
-               in order to clean up zombies (such as the parallel-fetch
-               process).
-               """
-               spawned_pids = self._spawned_pids
-               if not spawned_pids:
-                       return
-               for pid in list(spawned_pids):
-                       try:
-                               if os.waitpid(pid, os.WNOHANG) == (0, 0):
-                                       continue
-                       except OSError:
-                               # This pid has been cleaned up elsewhere,
-                               # so remove it from our list.
-                               pass
-                       spawned_pids.remove(pid)
-
        def _register(self, f, eventmask, handler):
                self._poll_event_handlers[f] = handler
                self._poll.register(f, eventmask)
@@ -6519,11 +6661,43 @@ class Scheduler(object):
        def _unregister(self, f):
                self._poll.unregister(f)
                del self._poll_event_handlers[f]
+               self._schedule_tasks()
 
        def _schedule(self):
-               while self._poll_event_handlers:
-                       for f, event in self._poll.poll():
-                               self._poll_event_handlers[f](f, event)
+               event_handlers = self._poll_event_handlers
+               running_tasks = self._running_tasks
+               poll = self._poll.poll
+
+               self._schedule_tasks()
+
+               while event_handlers:
+                       for f, event in poll():
+                               event_handlers[f](f, event)
+
+                       if len(event_handlers) <= len(running_tasks):
+                               # Assuming one handler per task, this
+                               # means the caller has unregistered it's
+                               # handler, so it's time to yield.
+                               break
+
+       def _schedule_tasks(self):
+               task_queue = self._task_queue
+               running_tasks = self._running_tasks
+               max_jobs = self._max_jobs
+               state_changed = False
+
+               for task in list(running_tasks):
+                       if task.poll() is not None:
+                               running_tasks.remove(task)
+                               state_changed = True
+
+               while task_queue and (len(running_tasks) < max_jobs):
+                       task = task_queue.popleft()
+                       task.start()
+                       running_tasks.add(task)
+                       state_changed = True
+
+               return state_changed
 
        def _merge(self):
                mylist = self._mergelist
@@ -6551,6 +6725,16 @@ class Scheduler(object):
                        if isinstance(x, Package) and x.operation == "merge"]
                mtimedb.commit()
 
+               if self._parallel_fetch:
+                       for pkg in mylist:
+                               if not isinstance(pkg, Package) or \
+                                       not pkg.type_name == "ebuild":
+                                       continue
+
+                               self._add_task(EbuildFetcherAsync(log_file=self._fetch_log,
+                                       pkg=pkg, register=self._register,
+                                       unregister=self._unregister))
+
                # Verify all the manifests now so that the user is notified of failure
                # as soon as possible.
                if "--fetchonly" not in self.myopts and \
@@ -6584,49 +6768,6 @@ class Scheduler(object):
                myfeat = self.settings.features[:]
                bad_resume_opts = set(["--ask", "--changelog", "--skipfirst",
                        "--resume"])
-               if "parallel-fetch" in myfeat and \
-                       not ("--pretend" in self.myopts or \
-                       "--fetch-all-uri" in self.myopts or \
-                       "--fetchonly" in self.myopts):
-                       if "distlocks" not in myfeat:
-                               print red("!!!")
-                               print red("!!!")+" parallel-fetching requires the distlocks feature enabled"
-                               print red("!!!")+" you have it disabled, thus parallel-fetching is being disabled"
-                               print red("!!!")
-                       elif len(mymergelist) > 1:
-                               fetch_log = "/var/log/emerge-fetch.log"
-                               logfile = open(fetch_log, "w")
-                               fd_pipes = {1:logfile.fileno(), 2:logfile.fileno()}
-                               portage.util.apply_secpass_permissions(fetch_log,
-                                       uid=portage.portage_uid, gid=portage.portage_gid,
-                                       mode=0660)
-                               fetch_env = os.environ.copy()
-                               fetch_env["FEATURES"] = fetch_env.get("FEATURES", "") + " -cvs"
-                               fetch_env["PORTAGE_NICENESS"] = "0"
-                               fetch_env["PORTAGE_PARALLEL_FETCHONLY"] = "1"
-                               fetch_args = [sys.argv[0], "--resume",
-                                       "--fetchonly", "--nodeps"]
-                               resume_opts = self.myopts.copy()
-                               # For automatic resume, we need to prevent
-                               # any of bad_resume_opts from leaking in
-                               # via EMERGE_DEFAULT_OPTS.
-                               resume_opts["--ignore-default-opts"] = True
-                               for myopt, myarg in resume_opts.iteritems():
-                                       if myopt not in bad_resume_opts:
-                                               if myarg is True:
-                                                       fetch_args.append(myopt)
-                                               else:
-                                                       fetch_args.append(myopt +"="+ myarg)
-                               self._spawned_pids.extend(
-                                       portage.process.spawn(
-                                       fetch_args, env=fetch_env,
-                                       fd_pipes=fd_pipes, returnpid=True))
-                               logfile.close() # belongs to the spawned process
-                               del fetch_log, logfile, fd_pipes, fetch_env, fetch_args, \
-                                       resume_opts
-                               print ">>> starting parallel fetching pid %d" % \
-                                       self._spawned_pids[-1]
-
                metadata_keys = [k for k in portage.auxdbkeys \
                        if not k.startswith("UNUSED_")] + ["USE"]
 
@@ -6926,7 +7067,6 @@ class Scheduler(object):
                        # due to power failure, SIGKILL, etc...
                        mtimedb.commit()
                        self.curval += 1
-                       self._poll_child_processes()
 
        def _post_merge(self, mtimedb, xterm_titles, failed_fetches):
                if "--pretend" not in self.myopts:
index 456f4e0133a5cdc80e1fa6da79f5033ad1a63702..50711da2895e4590363bece15e46da885bb1f2d1 100644 (file)
@@ -3274,8 +3274,9 @@ def fetch(myuris, mysettings, listonly=0, fetchonly=0, locks_in_subdir=".locks",
        # file size. The parent process will verify their checksums prior to
        # the unpack phase.
 
-       parallel_fetchonly = fetchonly and \
-               "PORTAGE_PARALLEL_FETCHONLY" in mysettings
+       parallel_fetchonly = "PORTAGE_PARALLEL_FETCHONLY" in mysettings
+       if parallel_fetchonly:
+               fetchonly = 1
 
        check_config_instance(mysettings)