Add a new BinpkgFetcherAsync class and use it to implement parellel-fetch
authorZac Medico <zmedico@gentoo.org>
Tue, 1 Jul 2008 09:47:32 +0000 (09:47 -0000)
committerZac Medico <zmedico@gentoo.org>
Tue, 1 Jul 2008 09:47:32 +0000 (09:47 -0000)
for --getbinpkg.

svn path=/main/trunk/; revision=10868

pym/_emerge/__init__.py

index acaf1628717f2482548075a88a401748f7401bae..9aea4415f1361b1fe2e75d6355629f4b27ab1270 100644 (file)
@@ -21,7 +21,11 @@ except KeyboardInterrupt:
        sys.exit(1)
 
 import array
+import fcntl
 import select
+import shlex
+import urlparse
+import weakref
 import gc
 import os, stat
 import platform
@@ -1991,6 +1995,195 @@ class BinpkgFetcher(Task):
                        rval = 1
                return rval
 
+class BinpkgFetcherAsync(SlotObject):
+
+       __slots__ = ("cancelled", "log_file", "fd_pipes", "pkg",
+               "register", "unregister",
+               "locked", "files", "pid", "pkg_path", "returncode", "_lock_obj")
+
+       _file_names = ("fetcher", "out")
+       _files_dict = slot_dict_class(_file_names)
+       _bufsize = 4096
+
+       def __init__(self, **kwargs):
+               SlotObject.__init__(self, **kwargs)
+               pkg = self.pkg
+               self.pkg_path = pkg.root_config.trees["bintree"].getname(pkg.cpv)
+
+       def start(self):
+
+               if self.cancelled:
+                       self.pid = -1
+                       return
+
+               fd_pipes = self.fd_pipes
+               if fd_pipes is None:
+                       fd_pipes = {
+                               0 : sys.stdin.fileno(),
+                               1 : sys.stdout.fileno(),
+                               2 : sys.stderr.fileno(),
+                       }
+
+               log_file = self.log_file
+               self.files = self._files_dict()
+               files = self.files
+
+               if log_file is not None:
+                       files["out"] = open(log_file, "a")
+                       portage.util.apply_secpass_permissions(log_file,
+                               uid=portage.portage_uid, gid=portage.portage_gid,
+                               mode=0660)
+               else:
+                       # flush any pending output
+                       for fd in fd_pipes.itervalues():
+                               if fd == sys.stdout.fileno():
+                                       sys.stdout.flush()
+                               if fd == sys.stderr.fileno():
+                                       sys.stderr.flush()
+
+                       files["out"] = os.fdopen(os.dup(fd_pipes[1]), 'w')
+
+               master_fd, slave_fd = os.pipe()
+               fcntl.fcntl(master_fd, fcntl.F_SETFL,
+                       fcntl.fcntl(master_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
+
+               fd_pipes.setdefault(0, sys.stdin.fileno())
+               fd_pipes_orig = fd_pipes.copy()
+               fd_pipes[0] = fd_pipes_orig[0]
+               fd_pipes[1] = slave_fd
+               fd_pipes[2] = slave_fd
+
+               pkg = self.pkg
+               bintree = pkg.root_config.trees["bintree"]
+               settings = bintree.settings
+               use_locks = "distlocks" in settings.features
+               pkg_path = self.pkg_path
+               resume = os.path.exists(pkg_path)
+
+               # urljoin doesn't work correctly with
+               # unrecognized protocols like sftp
+               if bintree._remote_has_index:
+                       rel_uri = bintree._remotepkgs[pkg.cpv].get("PATH")
+                       if not rel_uri:
+                               rel_uri = pkg.cpv + ".tbz2"
+                       uri = bintree._remote_base_uri.rstrip("/") + \
+                               "/" + rel_uri.lstrip("/")
+               else:
+                       uri = settings["PORTAGE_BINHOST"].rstrip("/") + \
+                               "/" + pkg.pf + ".tbz2"
+
+               protocol = urlparse.urlparse(uri)[0]
+               fcmd_prefix = "FETCHCOMMAND"
+               if resume:
+                       fcmd_prefix = "RESUMECOMMAND"
+               fcmd = settings.get(fcmd_prefix + "_" + protocol.upper())
+               if not fcmd:
+                       fcmd = settings.get(fcmd_prefix)
+
+               fcmd_vars = {
+                       "DISTDIR" : os.path.dirname(pkg_path),
+                       "URI"     : uri,
+                       "FILE"    : os.path.basename(pkg_path)
+               }
+
+               fetch_env = dict((k, settings[k]) for k in settings)
+               fetch_args = [portage.util.varexpand(x, mydict=fcmd_vars) \
+                       for x in shlex.split(fcmd)]
+
+               portage.util.ensure_dirs(os.path.dirname(pkg_path))
+               if use_locks:
+                       self.lock()
+
+               retval = portage.process.spawn(fetch_args, env=fetch_env,
+                       fd_pipes=fd_pipes, returnpid=True)
+
+               self.pid = retval[0]
+
+               os.close(slave_fd)
+               files["fetcher"] = os.fdopen(master_fd, 'r')
+               self.register(files["fetcher"].fileno(),
+                       select.POLLIN, self._output_handler)
+
+       def _output_handler(self, fd, event):
+               files = self.files
+               buf = array.array('B')
+               try:
+                       buf.fromfile(files["fetcher"], self._bufsize)
+               except EOFError:
+                       pass
+               if buf:
+                       buf.tofile(files["out"])
+                       files["out"].flush()
+               else:
+                       self.unregister(files["fetcher"].fileno())
+                       for f in files.values():
+                               f.close()
+                       if self.locked:
+                               self.unlock()
+
+       def lock(self):
+               """
+               This raises an AlreadyLocked exception if lock() is called
+               while a lock is already held. In order to avoid this, call
+               unlock() or check whether the "locked" attribute is True
+               or False before calling lock().
+               """
+               if self._lock_obj is not None:
+                       raise self.AlreadyLocked((self._lock_obj,))
+
+               self._lock_obj = portage.locks.lockfile(
+                       self.pkg_path, wantnewlockfile=1)
+               self.locked = True
+
+       class AlreadyLocked(portage.exception.PortageException):
+               pass
+
+       def unlock(self):
+               if self._lock_obj is None:
+                       return
+               portage.locks.unlockfile(self._lock_obj)
+               self._lock_obj = None
+               self.locked = False
+
+       def poll(self):
+               if self.returncode is not None:
+                       return self.returncode
+               retval = os.waitpid(self.pid, os.WNOHANG)
+               if retval == (0, 0):
+                       return None
+               self._set_returncode(retval)
+               return self.returncode
+
+       def cancel(self):
+               if self.isAlive():
+                       os.kill(self.pid, signal.SIGTERM)
+               self.cancelled = True
+               if self.pid is not None:
+                       self.wait()
+               return self.returncode
+
+       def isAlive(self):
+               return self.pid is not None and \
+                       self.returncode is None
+
+       def wait(self):
+               if self.returncode is not None:
+                       return self.returncode
+               self._set_returncode(os.waitpid(self.pid, 0))
+               return self.returncode
+
+       def _set_returncode(self, wait_retval):
+
+               retval = wait_retval[1]
+               portage.process.spawned_pids.remove(self.pid)
+               if retval != os.EX_OK:
+                       if retval & 0xff:
+                               retval = (retval & 0xff) << 8
+                       else:
+                               retval = retval >> 8
+
+               self.returncode = retval
+
 class BinpkgMerge(Task):
 
        __slots__ = ("find_blockers", "ldpath_mtimes",
@@ -6573,9 +6766,7 @@ class Scheduler(object):
                                self._task_queue.clear()
                                while running_tasks:
                                        task = running_tasks.pop()
-                                       if task.poll() is None:
-                                               os.kill(task.pid, signal.SIGTERM)
-                                               task.wait()
+                                       task.cancel()
 
                        if rval == os.EX_OK or not keep_going:
                                break
@@ -6694,8 +6885,10 @@ class Scheduler(object):
 
                while task_queue and (len(running_tasks) < max_jobs):
                        task = task_queue.popleft()
-                       task.start()
-                       running_tasks.add(task)
+                       cancelled = getattr(task, "cancelled", None)
+                       if not cancelled:
+                               task.start()
+                               running_tasks.add(task)
                        state_changed = True
 
                return state_changed
@@ -6726,15 +6919,27 @@ class Scheduler(object):
                        if isinstance(x, Package) and x.operation == "merge"]
                mtimedb.commit()
 
+               prefetchers = weakref.WeakValueDictionary()
+               getbinpkg = "--getbinpkg" in self.myopts
+
                if self._parallel_fetch:
                        for pkg in mylist:
-                               if not isinstance(pkg, Package) or \
-                                       not pkg.type_name == "ebuild":
+                               if not isinstance(pkg, Package):
                                        continue
-
-                               self._add_task(EbuildFetcherAsync(log_file=self._fetch_log,
-                                       pkg=pkg, register=self._register,
-                                       unregister=self._unregister))
+                               if pkg.type_name == "ebuild":
+                                       self._add_task(EbuildFetcherAsync(
+                                               log_file=self._fetch_log,
+                                               pkg=pkg, register=self._register,
+                                               unregister=self._unregister))
+                               elif pkg.type_name == "binary" and getbinpkg and \
+                                       pkg.root_config.trees["bintree"].isremote(pkg.cpv):
+                                       prefetcher = BinpkgFetcherAsync(
+                                               log_file=self._fetch_log,
+                                               pkg=pkg, register=self._register,
+                                               unregister=self._unregister)
+                                       prefetchers[pkg] = prefetcher
+                                       self._add_task(prefetcher)
+                                       del prefetcher
 
                # Verify all the manifests now so that the user is notified of failure
                # as soon as possible.
@@ -6805,14 +7010,15 @@ class Scheduler(object):
                                self._execute_task(bad_resume_opts,
                                        failed_fetches,
                                        mydbapi, mergecount,
-                                       myfeat, mymergelist, x, xterm_titles)
+                                       myfeat, mymergelist, x,
+                                       prefetchers, xterm_titles)
                        except self._pkg_failure, e:
                                return e.status
                return self._post_merge(mtimedb, xterm_titles, failed_fetches)
 
        def _execute_task(self, bad_resume_opts,
                failed_fetches, mydbapi, mergecount, myfeat,
-               mymergelist, pkg, xterm_titles):
+               mymergelist, pkg, prefetchers, xterm_titles):
                        favorites = self._favorites
                        mtimedb = self._mtimedb
                        from portage.elog import elog_process
@@ -6963,8 +7169,27 @@ class Scheduler(object):
                                                        phasefilter=filter_mergephases)
                                                build_dir.unlock()
 
-                       elif x[0]=="binary":
-                               #merge the tbz2
+                       elif x.type_name == "binary":
+                               # The prefetcher have already completed or it
+                               # could be running now. If it's running now,
+                               # wait for it to complete since it holds
+                               # a lock on the file being fetched. The
+                               # portage.locks functions are only designed
+                               # to work between separate processes. Since
+                               # the lock is held by the current process,
+                               # use the scheduler and fetcher methods to
+                               # synchronize with the fetcher.
+                               prefetcher = prefetchers.get(pkg)
+                               if prefetcher is not None:
+                                       if not prefetcher.isAlive():
+                                               prefetcher.cancel()
+                                       else:
+                                               retval = None
+                                               while retval is None:
+                                                       self._schedule()
+                                                       retval = prefetcher.poll()
+                                       del prefetcher
+
                                fetcher = BinpkgFetcher(pkg=pkg, pretend=pretend,
                                        use_locks=("distlocks" in pkgsettings.features))
                                mytbz2 = fetcher.pkg_path