Merge packages asynchronously in Portage.
authorDavid James <davidjames@google.com>
Fri, 25 Mar 2011 02:36:33 +0000 (19:36 -0700)
committerZac Medico <zmedico@gentoo.org>
Fri, 25 Mar 2011 02:36:33 +0000 (19:36 -0700)
This allows for the scheduler to continue to run while packages are
being merged and installed, allowing for additional parallelism and
making better use of the CPUs.

Review URL: http://codereview.chromium.org/6713043

pym/_emerge/Binpkg.py
pym/_emerge/EbuildBuild.py
pym/_emerge/EbuildMerge.py
pym/_emerge/MergeListItem.py
pym/_emerge/PackageMerge.py
pym/portage/dbapi/_MergeProcess.py
pym/portage/dbapi/vartree.py

index 00587451aeb9169d5320232b2a7dabb895408464..62d44c48f575c2f28c448a48973963565189d286 100644 (file)
@@ -307,7 +307,7 @@ class Binpkg(CompositeTask):
                portage.elog.elog_process(self.pkg.cpv, self.settings)
                self._build_dir.unlock()
 
-       def install(self):
+       def install(self, handler):
 
                # This gives bashrc users an opportunity to do various things
                # such as remove binary packages after they're installed.
@@ -320,19 +320,20 @@ class Binpkg(CompositeTask):
                        pkg=self.pkg, pkg_count=self.pkg_count,
                        pkg_path=self._pkg_path, scheduler=self.scheduler,
                        settings=settings, tree=self._tree, world_atom=self.world_atom)
+               task = merge.create_task()
+               task.addExitListener(self._install_exit)
+               self._start_task(task, handler)
 
-               try:
-                       retval = merge.execute()
-               finally:
-                       settings.pop("PORTAGE_BINPKG_FILE", None)
-                       self._unlock_builddir()
+       def _install_exit(self, task):
+               self.settings.pop("PORTAGE_BINPKG_FILE", None)
+               self._unlock_builddir()
 
-               if retval == os.EX_OK and \
-                       'binpkg-logs' not in self.settings.features and \
+               if self._default_final_exit(task) != os.EX_OK:
+                       return
+
+               if 'binpkg-logs' not in self.settings.features and \
                        self.settings.get("PORTAGE_LOG_FILE"):
                        try:
                                os.unlink(self.settings["PORTAGE_LOG_FILE"])
                        except OSError:
                                pass
-               return retval
-
index 98ab24522ee882a93fa1e3313b47a251c2682a78..c7a5f5cdb7b1739d75e316c5e9e3ecc61358af52 100644 (file)
@@ -314,7 +314,7 @@ class EbuildBuild(CompositeTask):
                        self._unlock_builddir()
                self.wait()
 
-       def install(self):
+       def install(self, exit_handler):
                """
                Install the package and then clean up and release locks.
                Only call this after the build has completed successfully
@@ -343,10 +343,11 @@ class EbuildBuild(CompositeTask):
                        (pkg_count.curval, pkg_count.maxval, pkg.cpv)
                logger.log(msg, short_msg=short_msg)
 
-               try:
-                       rval = merge.execute()
-               finally:
-                       self._unlock_builddir()
+               task = merge.create_task()
+               task.addExitListener(self._install_exit)
+               self._start_task(task, exit_handler)
 
-               return rval
+       def _install_exit(self, task):
+               self._unlock_builddir()
+               self._default_final_exit(task)
 
index d73a262b321cf7c8f3489bcca3b901c614f76380..6a586927023ab5fc30539b2c981fc989d4c23b93 100644 (file)
@@ -4,6 +4,8 @@
 from _emerge.SlotObject import SlotObject
 import portage
 from portage import os
+from portage.dbapi._MergeProcess import MergeProcess
+from portage.dbapi.vartree import dblink
 
 class EbuildMerge(SlotObject):
 
@@ -11,28 +13,35 @@ class EbuildMerge(SlotObject):
                "pkg", "pkg_count", "pkg_path", "pretend",
                "scheduler", "settings", "tree", "world_atom")
 
-       def execute(self):
+       def create_task(self):
                root_config = self.pkg.root_config
                settings = self.settings
-               retval = portage.merge(settings["CATEGORY"],
-                       settings["PF"], settings["D"],
-                       os.path.join(settings["PORTAGE_BUILDDIR"],
-                       "build-info"), root_config.root, settings,
-                       myebuild=settings["EBUILD"],
-                       mytree=self.tree, mydbapi=root_config.trees[self.tree].dbapi,
-                       vartree=root_config.trees["vartree"],
-                       prev_mtimes=self.ldpath_mtimes,
-                       scheduler=self.scheduler,
-                       blockers=self.find_blockers)
-
-               if retval == os.EX_OK:
-                       self.world_atom(self.pkg)
-                       self._log_success()
-
-               return retval
-
-       def _log_success(self):
+               mycat = settings["CATEGORY"]
+               mypkg = settings["PF"]
+               pkgloc = settings["D"]
+               infloc = os.path.join(settings["PORTAGE_BUILDDIR"], "build-info")
+               myroot = root_config.root
+               myebuild = settings["EBUILD"]
+               mydbapi = root_config.trees[self.tree].dbapi
+               vartree = root_config.trees["vartree"]
+               background = (settings.get('PORTAGE_BACKGROUND') == '1')
+               logfile = settings.get('PORTAGE_LOG_FILE')
+
+               merge_task = MergeProcess(
+                       dblink=dblink, mycat=mycat, mypkg=mypkg, settings=settings,
+                       treetype=self.tree, vartree=vartree, scheduler=self.scheduler,
+                       background=background, blockers=self.find_blockers, pkgloc=pkgloc,
+                       infloc=infloc, myebuild=myebuild, mydbapi=mydbapi,
+                       prev_mtimes=self.ldpath_mtimes, logfile=logfile)
+               merge_task.addExitListener(self._log_exit)
+               return merge_task
+
+       def _log_exit(self, task):
+               if task.returncode != os.EX_OK:
+                       return
+
                pkg = self.pkg
+               self.world_atom(pkg)
                pkg_count = self.pkg_count
                pkg_path = self.pkg_path
                logger = self.logger
index 1dcc1780a2b24a41d7f3458443c2f0d77c14ce0d..768865e6b26e6ff1dd3f7cbea02e4151865a695c 100644 (file)
@@ -111,7 +111,7 @@ class MergeListItem(CompositeTask):
                self._install_task.wait()
                return self.returncode
 
-       def merge(self):
+       def merge(self, exit_handler):
 
                pkg = self.pkg
                build_opts = self.build_opts
@@ -135,15 +135,14 @@ class MergeListItem(CompositeTask):
                                        world_atom=world_atom)
 
                                uninstall.start()
-                               retval = uninstall.wait()
-                               if retval != os.EX_OK:
-                                       return retval
-                       return os.EX_OK
-
-               if build_opts.fetchonly or \
+                               self.returncode = uninstall.wait()
+                       else:
+                               self.returncode = os.EX_OK
+                       exit_handler(self)
+               elif build_opts.fetchonly or \
                        build_opts.buildpkgonly:
-                       return self.returncode
-
-               retval = self._install_task.install()
-               return retval
+                       exit_handler(self)
+               else:
+                       self._current_task = self._install_task
+                       self._install_task.install(exit_handler)
 
index 4aecf8adbb0dd196e1cbcb9d8327da4c94540a58..45d2e7dc67c05fe7510907f89e989485acf968ef 100644 (file)
@@ -4,11 +4,6 @@
 from _emerge.AsynchronousTask import AsynchronousTask
 from portage.output import colorize
 class PackageMerge(AsynchronousTask):
-       """
-       TODO: Implement asynchronous merge so that the scheduler can
-       run while a merge is executing.
-       """
-
        __slots__ = ("merge",)
 
        def _start(self):
@@ -40,6 +35,9 @@ class PackageMerge(AsynchronousTask):
                        not self.merge.build_opts.buildpkgonly:
                        self.merge.statusMessage(msg)
 
-               self.returncode = self.merge.merge()
-               self.wait()
+               self.merge.merge(self.exit_handler)
+
+       def exit_handler(self, task):
+               self.returncode = task.returncode
+               self._wait_hook()
 
index f717d12df81b5bc7ca513cafee26afcc5d63daa1..6e63f84fda3c78d4eec64b5820d17b3878478781 100644 (file)
@@ -4,30 +4,72 @@
 import signal
 import traceback
 
+import errno
+import fcntl
 import portage
-from portage import os
+from portage import os, StringIO
+import portage.elog.messages
+from _emerge.PollConstants import PollConstants
 from _emerge.SpawnProcess import SpawnProcess
 
 class MergeProcess(SpawnProcess):
        """
-       Merge package files in a subprocess, so the Scheduler can run in the
-       main thread while files are moved or copied asynchronously.
+       Merge packages in a subprocess, so the Scheduler can run in the main
+       thread while files are moved or copied asynchronously.
        """
 
-       __slots__ = ('cfgfiledict', 'conf_mem_file', \
-               'destroot', 'dblink', 'srcroot',)
+       __slots__ = ('dblink', 'mycat', 'mypkg', 'settings', 'treetype',
+               'vartree', 'scheduler', 'blockers', 'pkgloc', 'infloc', 'myebuild',
+               'mydbapi', 'prev_mtimes', '_elog_reader_fd', '_elog_reg_id',
+               '_buf')
 
-       def _spawn(self, args, fd_pipes=None, **kwargs):
+       def _elog_output_handler(self, fd, event):
+               output = None
+               if event & PollConstants.POLLIN:
+                       try:
+                               output = os.read(fd, self._bufsize)
+                       except OSError as e:
+                               if e.errno not in (errno.EAGAIN, errno.EINTR):
+                                       raise
+               if output:
+                       lines = output.split('\n')
+                       if len(lines) == 1:
+                               self._buf += lines[0]
+                       else:
+                               lines[0] = self._buf + lines[0]
+                               self._buf = lines.pop()
+                               out = StringIO()
+                               for line in lines:
+                                       funcname, phase, key, msg = line.split(' ', 3)
+                                       reporter = getattr(portage.elog.messages, funcname)
+                                       reporter(msg, phase=phase, key=key, out=out)
+
+       def _spawn(self, args, fd_pipes, **kwargs):
                """
                Fork a subprocess, apply local settings, and call
-               dblink._merge_process().
+               dblink.merge().
                """
 
+               files = self._files
+               elog_reader_fd, elog_writer_fd = os.pipe()
+               fcntl.fcntl(elog_reader_fd, fcntl.F_SETFL,
+                       fcntl.fcntl(elog_reader_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
+               mylink = self.dblink(self.mycat, self.mypkg, settings=self.settings,
+                       treetype=self.treetype, vartree=self.vartree,
+                       blockers=self.blockers, scheduler=self.scheduler,
+                       pipe=elog_writer_fd)
+               fd_pipes[elog_writer_fd] = elog_writer_fd
+               self._elog_reg_id = self.scheduler.register(elog_reader_fd,
+                       self._registered_events, self._elog_output_handler)
+
                pid = os.fork()
                if pid != 0:
+                       self._elog_reader_fd = elog_reader_fd
+                       self._buf = ""
                        portage.process.spawned_pids.append(pid)
                        return [pid]
 
+               os.close(elog_reader_fd)
                portage.process._setup_pipes(fd_pipes)
 
                # Use default signal handlers since the ones inherited
@@ -35,18 +77,19 @@ class MergeProcess(SpawnProcess):
                signal.signal(signal.SIGINT, signal.SIG_DFL)
                signal.signal(signal.SIGTERM, signal.SIG_DFL)
 
-               portage.output.havecolor = self.dblink.settings.get('NOCOLOR') \
+               portage.output.havecolor = self.settings.get('NOCOLOR') \
                        not in ('yes', 'true')
 
-               # In this subprocess we want dblink._display_merge() to use
+               # In this subprocess we want mylink._display_merge() to use
                # stdout/stderr directly since they are pipes. This behavior
-               # is triggered when dblink._scheduler is None.
-               self.dblink._scheduler = None
+               # is triggered when mylink._scheduler is None.
+               mylink._scheduler = None
 
                rval = 1
                try:
-                       rval = self.dblink._merge_process(self.srcroot, self.destroot,
-                               self.cfgfiledict, self.conf_mem_file)
+                       rval = mylink.merge(self.pkgloc, self.infloc,
+                               myebuild=self.myebuild, mydbapi=self.mydbapi,
+                               prev_mtimes=self.prev_mtimes)
                except SystemExit:
                        raise
                except:
@@ -55,3 +98,16 @@ class MergeProcess(SpawnProcess):
                        # Call os._exit() from finally block, in order to suppress any
                        # finally blocks from earlier in the call stack. See bug #345289.
                        os._exit(rval)
+
+       def _unregister(self):
+               """
+               Unregister from the scheduler and close open files.
+               """
+               if self._elog_reg_id is not None:
+                       self.scheduler.unregister(self._elog_reg_id)
+                       self._elog_reg_id = None
+               if self._elog_reader_fd:
+                       os.close(self._elog_reader_fd)
+                       self._elog_reader_fd = None
+
+               super(MergeProcess, self)._unregister()
index bf48b15081ed2a8928188548213d8c6632341ce1..66e2955a62e2950b52c91732d7262d1acc9a911d 100644 (file)
@@ -13,7 +13,8 @@ portage.proxy.lazyimport.lazyimport(globals(),
        'portage.dbapi._MergeProcess:MergeProcess',
        'portage.dep:dep_getkey,isjustname,match_from_list,' + \
                'use_reduce,_slot_re',
-       'portage.elog:elog_process,_preload_elog_modules',
+       'portage.elog:collect_ebuild_messages,collect_messages,' + \
+               'elog_process,_merge_logentries,_preload_elog_modules',
        'portage.locks:lockdir,unlockdir',
        'portage.output:bold,colorize',
        'portage.package.ebuild.doebuild:doebuild_environment,' + \
@@ -1200,7 +1201,7 @@ class dblink(object):
        _file_merge_yield_interval = 20
 
        def __init__(self, cat, pkg, myroot=None, settings=None, treetype=None,
-               vartree=None, blockers=None, scheduler=None):
+               vartree=None, blockers=None, scheduler=None, pipe=None):
                """
                Creates a DBlink object for a given CPV.
                The given CPV may not be present in the database already.
@@ -1259,6 +1260,7 @@ class dblink(object):
                self._md5_merge_map = {}
                self._hash_key = (self.myroot, self.mycpv)
                self._protect_obj = None
+               self._pipe = pipe
 
        def __hash__(self):
                return hash(self._hash_key)
@@ -1502,7 +1504,7 @@ class dblink(object):
                                        continue
                                others_in_slot.append(dblink(self.cat, catsplit(cur_cpv)[1],
                                        settings=self.settings, vartree=self.vartree,
-                                       treetype="vartree"))
+                                       treetype="vartree", pipe=self._pipe))
 
                        retval = self._security_check([self] + others_in_slot)
                        if retval:
@@ -1666,9 +1668,7 @@ class dblink(object):
 
                                                        self._eerror(ebuild_phase, msg_lines)
 
-                                               # process logs created during pre/postrm
-                                               elog_process(self.mycpv, self.settings,
-                                                       phasefilter=('prerm', 'postrm'))
+                                       self._elog_process()
 
                                        if retval == os.EX_OK:
                                                # myebuildpath might be None, so ensure
@@ -1764,7 +1764,7 @@ class dblink(object):
                                        continue
                                others_in_slot.append(dblink(self.cat, catsplit(cur_cpv)[1],
                                        settings=self.settings,
-                                       vartree=self.vartree, treetype="vartree"))
+                                       vartree=self.vartree, treetype="vartree", pipe=self._pipe))
 
                dest_root = self._eroot
                dest_root_len = len(dest_root) - 1
@@ -2784,19 +2784,34 @@ class dblink(object):
                        self._scheduler.dblinkElog(self,
                                phase, _eerror, lines)
 
-       def _elog_subprocess(self, funcname, phase, lines):
-               """
-               Subprocesses call this in order to create elog messages in
-               $T, for collection by the main process.
-               """
-               cmd = "source %s/isolated-functions.sh ; " % \
-                       portage._shell_quote(self.settings["PORTAGE_BIN_PATH"])
-               for line in lines:
-                       cmd += "%s %s ; " % (funcname, portage._shell_quote(line))
-               env = self.settings.environ()
-               env['EBUILD_PHASE'] = phase
-               subprocess.call([portage.const.BASH_BINARY, "-c", cmd],
-                       env=env)
+       def _elog_process(self):
+               cpv = self.mycpv
+               if self._pipe is None:
+                       elog_process(cpv, self.settings)
+               else:
+                       logdir = os.path.join(self.settings["T"], "logging")
+                       ebuild_logentries = collect_ebuild_messages(logdir)
+                       py_logentries = collect_messages(key=cpv).get(cpv, {})
+                       logentries = _merge_logentries(py_logentries, ebuild_logentries)
+                       funcnames = {
+                               "INFO": "einfo",
+                               "LOG": "elog",
+                               "WARN": "ewarn",
+                               "QA": "eqawarn",
+                               "ERROR": "eerror"
+                       }
+                       buffer = []
+                       for phase, messages in logentries.items():
+                               for key, lines in messages:
+                                       funcname = funcnames[key]
+                                       if isinstance(lines, basestring):
+                                               lines = [lines]
+                                       for line in lines:
+                                               fields = (funcname, phase, cpv, line.rstrip('\n'))
+                                               buffer.append(' '.join(fields))
+                                               buffer.append('\n')
+                       if buffer:
+                               os.write(self._pipe, ''.join(buffer))
 
        def treewalk(self, srcroot, destroot, inforoot, myebuild, cleanup=0,
                mydbapi=None, prev_mtimes=None):
@@ -2811,7 +2826,6 @@ class dblink(object):
                unmerges old version (if required)
                calls doebuild(mydo=pkg_postinst)
                calls env_update
-               calls elog_process
                
                @param srcroot: Typically this is ${D}
                @type srcroot: String (Path)
@@ -2921,7 +2935,7 @@ class dblink(object):
                        others_in_slot.append(dblink(self.cat, catsplit(cur_cpv)[1],
                                settings=config(clone=self.settings),
                                vartree=self.vartree, treetype="vartree",
-                               scheduler=self._scheduler))
+                               scheduler=self._scheduler, pipe=self._pipe))
 
                retval = self._security_check(others_in_slot)
                if retval:
@@ -3069,8 +3083,6 @@ class dblink(object):
                # check for package collisions
                blockers = None
                if self._blockers is not None:
-                       # This is only supposed to be called when
-                       # the vdb is locked, like it is here.
                        blockers = self._blockers()
                if blockers is None:
                        blockers = []
@@ -3242,16 +3254,8 @@ class dblink(object):
                                cfgfiledict["IGNORE"] = 1
                                break
 
-               merge_task = MergeProcess(
-                       background=(self.settings.get('PORTAGE_BACKGROUND') == '1'),
-                       cfgfiledict=cfgfiledict, conf_mem_file=conf_mem_file, dblink=self,
-                       destroot=destroot,
-                       logfile=self.settings.get('PORTAGE_LOG_FILE'),
-                       scheduler=(scheduler or PollScheduler().sched_iface),
-                       srcroot=srcroot)
-
-               merge_task.start()
-               rval = merge_task.wait()
+               rval = self._merge_contents(srcroot, destroot, cfgfiledict,
+                       conf_mem_file)
                if rval != os.EX_OK:
                        return rval
 
@@ -3438,7 +3442,7 @@ class dblink(object):
 
                return backup_p
 
-       def _merge_process(self, srcroot, destroot, cfgfiledict, conf_mem_file):
+       def _merge_contents(self, srcroot, destroot, cfgfiledict, conf_mem_file):
 
                cfgfiledict_orig = cfgfiledict.copy()
 
@@ -3667,7 +3671,7 @@ class dblink(object):
                                                msg.append(_("This file will be renamed to a different name:"))
                                                msg.append("  '%s'" % backup_dest)
                                                msg.append("")
-                                               self._elog_subprocess("eerror", "preinst", msg)
+                                               self._eerror("preinst", msg)
                                                if movefile(mydest, backup_dest,
                                                        mysettings=self.settings,
                                                        encoding=_encodings['merge']) is None:
@@ -3745,7 +3749,7 @@ class dblink(object):
                                                msg.append(_("This file will be merged with a different name:"))
                                                msg.append("  '%s'" % newdest)
                                                msg.append("")
-                                               self._elog_subprocess("eerror", "preinst", msg)
+                                               self._eerror("preinst", msg)
                                                mydest = newdest
 
                                        elif stat.S_ISREG(mydmode) or (stat.S_ISLNK(mydmode) and os.path.exists(mydest) and stat.S_ISREG(os.stat(mydest)[stat.ST_MODE])):
@@ -3929,7 +3933,7 @@ class dblink(object):
                                        self._scheduler.dblinkEbuildPhase(
                                                self, mydbapi, myebuild, phase)
 
-                               elog_process(self.mycpv, self.settings)
+                               self._elog_process()
 
                                if 'noclean' not in self.settings.features and \
                                        (retval == os.EX_OK or \
@@ -4029,10 +4033,16 @@ def merge(mycat, mypkg, pkgloc, infloc,
                writemsg(_("Permission denied: access('%s', W_OK)\n") % settings['EROOT'],
                        noiselevel=-1)
                return errno.EACCES
-       mylink = dblink(mycat, mypkg, settings=settings, treetype=mytree,
-               vartree=vartree, blockers=blockers, scheduler=scheduler)
-       return mylink.merge(pkgloc, infloc, myebuild=myebuild,
-               mydbapi=mydbapi, prev_mtimes=prev_mtimes)
+       background = (settings.get('PORTAGE_BACKGROUND') == '1')
+       merge_task = MergeProcess(
+               dblink=dblink, mycat=mycat, mypkg=mypkg, settings=settings,
+               treetype=mytree, vartree=vartree, scheduler=scheduler,
+               background=background, blockers=blockers, pkgloc=pkgloc,
+               infloc=infloc, myebuild=myebuild, mydbapi=mydbapi,
+               prev_mtimes=prev_mtimes, logfile=settings.get('PORTAGE_LOG_FILE'))
+       merge_task.start()
+       retcode = merge_task.wait()
+       return retcode
 
 def unmerge(cat, pkg, myroot=None, settings=None,
        mytrimworld=None, vartree=None,