From 7535cabdf2fab76fc55df83643157613dfd66be9 Mon Sep 17 00:00:00 2001 From: David James Date: Thu, 24 Mar 2011 19:36:33 -0700 Subject: [PATCH] Merge packages asynchronously in Portage. This allows for the scheduler to continue to run while packages are being merged and installed, allowing for additional parallelism and making better use of the CPUs. Review URL: http://codereview.chromium.org/6713043 --- pym/_emerge/Binpkg.py | 21 +++---- pym/_emerge/EbuildBuild.py | 13 +++-- pym/_emerge/EbuildMerge.py | 47 +++++++++------ pym/_emerge/MergeListItem.py | 21 ++++--- pym/_emerge/PackageMerge.py | 12 ++-- pym/portage/dbapi/_MergeProcess.py | 82 +++++++++++++++++++++----- pym/portage/dbapi/vartree.py | 94 +++++++++++++++++------------- 7 files changed, 182 insertions(+), 108 deletions(-) diff --git a/pym/_emerge/Binpkg.py b/pym/_emerge/Binpkg.py index 00587451a..62d44c48f 100644 --- a/pym/_emerge/Binpkg.py +++ b/pym/_emerge/Binpkg.py @@ -307,7 +307,7 @@ class Binpkg(CompositeTask): portage.elog.elog_process(self.pkg.cpv, self.settings) self._build_dir.unlock() - def install(self): + def install(self, handler): # This gives bashrc users an opportunity to do various things # such as remove binary packages after they're installed. @@ -320,19 +320,20 @@ class Binpkg(CompositeTask): pkg=self.pkg, pkg_count=self.pkg_count, pkg_path=self._pkg_path, scheduler=self.scheduler, settings=settings, tree=self._tree, world_atom=self.world_atom) + task = merge.create_task() + task.addExitListener(self._install_exit) + self._start_task(task, handler) - try: - retval = merge.execute() - finally: - settings.pop("PORTAGE_BINPKG_FILE", None) - self._unlock_builddir() + def _install_exit(self, task): + self.settings.pop("PORTAGE_BINPKG_FILE", None) + self._unlock_builddir() - if retval == os.EX_OK and \ - 'binpkg-logs' not in self.settings.features and \ + if self._default_final_exit(task) != os.EX_OK: + return + + if 'binpkg-logs' not in self.settings.features and \ self.settings.get("PORTAGE_LOG_FILE"): try: os.unlink(self.settings["PORTAGE_LOG_FILE"]) except OSError: pass - return retval - diff --git a/pym/_emerge/EbuildBuild.py b/pym/_emerge/EbuildBuild.py index 98ab24522..c7a5f5cdb 100644 --- a/pym/_emerge/EbuildBuild.py +++ b/pym/_emerge/EbuildBuild.py @@ -314,7 +314,7 @@ class EbuildBuild(CompositeTask): self._unlock_builddir() self.wait() - def install(self): + def install(self, exit_handler): """ Install the package and then clean up and release locks. Only call this after the build has completed successfully @@ -343,10 +343,11 @@ class EbuildBuild(CompositeTask): (pkg_count.curval, pkg_count.maxval, pkg.cpv) logger.log(msg, short_msg=short_msg) - try: - rval = merge.execute() - finally: - self._unlock_builddir() + task = merge.create_task() + task.addExitListener(self._install_exit) + self._start_task(task, exit_handler) - return rval + def _install_exit(self, task): + self._unlock_builddir() + self._default_final_exit(task) diff --git a/pym/_emerge/EbuildMerge.py b/pym/_emerge/EbuildMerge.py index d73a262b3..6a5869270 100644 --- a/pym/_emerge/EbuildMerge.py +++ b/pym/_emerge/EbuildMerge.py @@ -4,6 +4,8 @@ from _emerge.SlotObject import SlotObject import portage from portage import os +from portage.dbapi._MergeProcess import MergeProcess +from portage.dbapi.vartree import dblink class EbuildMerge(SlotObject): @@ -11,28 +13,35 @@ class EbuildMerge(SlotObject): "pkg", "pkg_count", "pkg_path", "pretend", "scheduler", "settings", "tree", "world_atom") - def execute(self): + def create_task(self): root_config = self.pkg.root_config settings = self.settings - retval = portage.merge(settings["CATEGORY"], - settings["PF"], settings["D"], - os.path.join(settings["PORTAGE_BUILDDIR"], - "build-info"), root_config.root, settings, - myebuild=settings["EBUILD"], - mytree=self.tree, mydbapi=root_config.trees[self.tree].dbapi, - vartree=root_config.trees["vartree"], - prev_mtimes=self.ldpath_mtimes, - scheduler=self.scheduler, - blockers=self.find_blockers) - - if retval == os.EX_OK: - self.world_atom(self.pkg) - self._log_success() - - return retval - - def _log_success(self): + mycat = settings["CATEGORY"] + mypkg = settings["PF"] + pkgloc = settings["D"] + infloc = os.path.join(settings["PORTAGE_BUILDDIR"], "build-info") + myroot = root_config.root + myebuild = settings["EBUILD"] + mydbapi = root_config.trees[self.tree].dbapi + vartree = root_config.trees["vartree"] + background = (settings.get('PORTAGE_BACKGROUND') == '1') + logfile = settings.get('PORTAGE_LOG_FILE') + + merge_task = MergeProcess( + dblink=dblink, mycat=mycat, mypkg=mypkg, settings=settings, + treetype=self.tree, vartree=vartree, scheduler=self.scheduler, + background=background, blockers=self.find_blockers, pkgloc=pkgloc, + infloc=infloc, myebuild=myebuild, mydbapi=mydbapi, + prev_mtimes=self.ldpath_mtimes, logfile=logfile) + merge_task.addExitListener(self._log_exit) + return merge_task + + def _log_exit(self, task): + if task.returncode != os.EX_OK: + return + pkg = self.pkg + self.world_atom(pkg) pkg_count = self.pkg_count pkg_path = self.pkg_path logger = self.logger diff --git a/pym/_emerge/MergeListItem.py b/pym/_emerge/MergeListItem.py index 1dcc1780a..768865e6b 100644 --- a/pym/_emerge/MergeListItem.py +++ b/pym/_emerge/MergeListItem.py @@ -111,7 +111,7 @@ class MergeListItem(CompositeTask): self._install_task.wait() return self.returncode - def merge(self): + def merge(self, exit_handler): pkg = self.pkg build_opts = self.build_opts @@ -135,15 +135,14 @@ class MergeListItem(CompositeTask): world_atom=world_atom) uninstall.start() - retval = uninstall.wait() - if retval != os.EX_OK: - return retval - return os.EX_OK - - if build_opts.fetchonly or \ + self.returncode = uninstall.wait() + else: + self.returncode = os.EX_OK + exit_handler(self) + elif build_opts.fetchonly or \ build_opts.buildpkgonly: - return self.returncode - - retval = self._install_task.install() - return retval + exit_handler(self) + else: + self._current_task = self._install_task + self._install_task.install(exit_handler) diff --git a/pym/_emerge/PackageMerge.py b/pym/_emerge/PackageMerge.py index 4aecf8adb..45d2e7dc6 100644 --- a/pym/_emerge/PackageMerge.py +++ b/pym/_emerge/PackageMerge.py @@ -4,11 +4,6 @@ from _emerge.AsynchronousTask import AsynchronousTask from portage.output import colorize class PackageMerge(AsynchronousTask): - """ - TODO: Implement asynchronous merge so that the scheduler can - run while a merge is executing. - """ - __slots__ = ("merge",) def _start(self): @@ -40,6 +35,9 @@ class PackageMerge(AsynchronousTask): not self.merge.build_opts.buildpkgonly: self.merge.statusMessage(msg) - self.returncode = self.merge.merge() - self.wait() + self.merge.merge(self.exit_handler) + + def exit_handler(self, task): + self.returncode = task.returncode + self._wait_hook() diff --git a/pym/portage/dbapi/_MergeProcess.py b/pym/portage/dbapi/_MergeProcess.py index f717d12df..6e63f84fd 100644 --- a/pym/portage/dbapi/_MergeProcess.py +++ b/pym/portage/dbapi/_MergeProcess.py @@ -4,30 +4,72 @@ import signal import traceback +import errno +import fcntl import portage -from portage import os +from portage import os, StringIO +import portage.elog.messages +from _emerge.PollConstants import PollConstants from _emerge.SpawnProcess import SpawnProcess class MergeProcess(SpawnProcess): """ - Merge package files in a subprocess, so the Scheduler can run in the - main thread while files are moved or copied asynchronously. + Merge packages in a subprocess, so the Scheduler can run in the main + thread while files are moved or copied asynchronously. """ - __slots__ = ('cfgfiledict', 'conf_mem_file', \ - 'destroot', 'dblink', 'srcroot',) + __slots__ = ('dblink', 'mycat', 'mypkg', 'settings', 'treetype', + 'vartree', 'scheduler', 'blockers', 'pkgloc', 'infloc', 'myebuild', + 'mydbapi', 'prev_mtimes', '_elog_reader_fd', '_elog_reg_id', + '_buf') - def _spawn(self, args, fd_pipes=None, **kwargs): + def _elog_output_handler(self, fd, event): + output = None + if event & PollConstants.POLLIN: + try: + output = os.read(fd, self._bufsize) + except OSError as e: + if e.errno not in (errno.EAGAIN, errno.EINTR): + raise + if output: + lines = output.split('\n') + if len(lines) == 1: + self._buf += lines[0] + else: + lines[0] = self._buf + lines[0] + self._buf = lines.pop() + out = StringIO() + for line in lines: + funcname, phase, key, msg = line.split(' ', 3) + reporter = getattr(portage.elog.messages, funcname) + reporter(msg, phase=phase, key=key, out=out) + + def _spawn(self, args, fd_pipes, **kwargs): """ Fork a subprocess, apply local settings, and call - dblink._merge_process(). + dblink.merge(). """ + files = self._files + elog_reader_fd, elog_writer_fd = os.pipe() + fcntl.fcntl(elog_reader_fd, fcntl.F_SETFL, + fcntl.fcntl(elog_reader_fd, fcntl.F_GETFL) | os.O_NONBLOCK) + mylink = self.dblink(self.mycat, self.mypkg, settings=self.settings, + treetype=self.treetype, vartree=self.vartree, + blockers=self.blockers, scheduler=self.scheduler, + pipe=elog_writer_fd) + fd_pipes[elog_writer_fd] = elog_writer_fd + self._elog_reg_id = self.scheduler.register(elog_reader_fd, + self._registered_events, self._elog_output_handler) + pid = os.fork() if pid != 0: + self._elog_reader_fd = elog_reader_fd + self._buf = "" portage.process.spawned_pids.append(pid) return [pid] + os.close(elog_reader_fd) portage.process._setup_pipes(fd_pipes) # Use default signal handlers since the ones inherited @@ -35,18 +77,19 @@ class MergeProcess(SpawnProcess): signal.signal(signal.SIGINT, signal.SIG_DFL) signal.signal(signal.SIGTERM, signal.SIG_DFL) - portage.output.havecolor = self.dblink.settings.get('NOCOLOR') \ + portage.output.havecolor = self.settings.get('NOCOLOR') \ not in ('yes', 'true') - # In this subprocess we want dblink._display_merge() to use + # In this subprocess we want mylink._display_merge() to use # stdout/stderr directly since they are pipes. This behavior - # is triggered when dblink._scheduler is None. - self.dblink._scheduler = None + # is triggered when mylink._scheduler is None. + mylink._scheduler = None rval = 1 try: - rval = self.dblink._merge_process(self.srcroot, self.destroot, - self.cfgfiledict, self.conf_mem_file) + rval = mylink.merge(self.pkgloc, self.infloc, + myebuild=self.myebuild, mydbapi=self.mydbapi, + prev_mtimes=self.prev_mtimes) except SystemExit: raise except: @@ -55,3 +98,16 @@ class MergeProcess(SpawnProcess): # Call os._exit() from finally block, in order to suppress any # finally blocks from earlier in the call stack. See bug #345289. os._exit(rval) + + def _unregister(self): + """ + Unregister from the scheduler and close open files. + """ + if self._elog_reg_id is not None: + self.scheduler.unregister(self._elog_reg_id) + self._elog_reg_id = None + if self._elog_reader_fd: + os.close(self._elog_reader_fd) + self._elog_reader_fd = None + + super(MergeProcess, self)._unregister() diff --git a/pym/portage/dbapi/vartree.py b/pym/portage/dbapi/vartree.py index bf48b1508..66e2955a6 100644 --- a/pym/portage/dbapi/vartree.py +++ b/pym/portage/dbapi/vartree.py @@ -13,7 +13,8 @@ portage.proxy.lazyimport.lazyimport(globals(), 'portage.dbapi._MergeProcess:MergeProcess', 'portage.dep:dep_getkey,isjustname,match_from_list,' + \ 'use_reduce,_slot_re', - 'portage.elog:elog_process,_preload_elog_modules', + 'portage.elog:collect_ebuild_messages,collect_messages,' + \ + 'elog_process,_merge_logentries,_preload_elog_modules', 'portage.locks:lockdir,unlockdir', 'portage.output:bold,colorize', 'portage.package.ebuild.doebuild:doebuild_environment,' + \ @@ -1200,7 +1201,7 @@ class dblink(object): _file_merge_yield_interval = 20 def __init__(self, cat, pkg, myroot=None, settings=None, treetype=None, - vartree=None, blockers=None, scheduler=None): + vartree=None, blockers=None, scheduler=None, pipe=None): """ Creates a DBlink object for a given CPV. The given CPV may not be present in the database already. @@ -1259,6 +1260,7 @@ class dblink(object): self._md5_merge_map = {} self._hash_key = (self.myroot, self.mycpv) self._protect_obj = None + self._pipe = pipe def __hash__(self): return hash(self._hash_key) @@ -1502,7 +1504,7 @@ class dblink(object): continue others_in_slot.append(dblink(self.cat, catsplit(cur_cpv)[1], settings=self.settings, vartree=self.vartree, - treetype="vartree")) + treetype="vartree", pipe=self._pipe)) retval = self._security_check([self] + others_in_slot) if retval: @@ -1666,9 +1668,7 @@ class dblink(object): self._eerror(ebuild_phase, msg_lines) - # process logs created during pre/postrm - elog_process(self.mycpv, self.settings, - phasefilter=('prerm', 'postrm')) + self._elog_process() if retval == os.EX_OK: # myebuildpath might be None, so ensure @@ -1764,7 +1764,7 @@ class dblink(object): continue others_in_slot.append(dblink(self.cat, catsplit(cur_cpv)[1], settings=self.settings, - vartree=self.vartree, treetype="vartree")) + vartree=self.vartree, treetype="vartree", pipe=self._pipe)) dest_root = self._eroot dest_root_len = len(dest_root) - 1 @@ -2784,19 +2784,34 @@ class dblink(object): self._scheduler.dblinkElog(self, phase, _eerror, lines) - def _elog_subprocess(self, funcname, phase, lines): - """ - Subprocesses call this in order to create elog messages in - $T, for collection by the main process. - """ - cmd = "source %s/isolated-functions.sh ; " % \ - portage._shell_quote(self.settings["PORTAGE_BIN_PATH"]) - for line in lines: - cmd += "%s %s ; " % (funcname, portage._shell_quote(line)) - env = self.settings.environ() - env['EBUILD_PHASE'] = phase - subprocess.call([portage.const.BASH_BINARY, "-c", cmd], - env=env) + def _elog_process(self): + cpv = self.mycpv + if self._pipe is None: + elog_process(cpv, self.settings) + else: + logdir = os.path.join(self.settings["T"], "logging") + ebuild_logentries = collect_ebuild_messages(logdir) + py_logentries = collect_messages(key=cpv).get(cpv, {}) + logentries = _merge_logentries(py_logentries, ebuild_logentries) + funcnames = { + "INFO": "einfo", + "LOG": "elog", + "WARN": "ewarn", + "QA": "eqawarn", + "ERROR": "eerror" + } + buffer = [] + for phase, messages in logentries.items(): + for key, lines in messages: + funcname = funcnames[key] + if isinstance(lines, basestring): + lines = [lines] + for line in lines: + fields = (funcname, phase, cpv, line.rstrip('\n')) + buffer.append(' '.join(fields)) + buffer.append('\n') + if buffer: + os.write(self._pipe, ''.join(buffer)) def treewalk(self, srcroot, destroot, inforoot, myebuild, cleanup=0, mydbapi=None, prev_mtimes=None): @@ -2811,7 +2826,6 @@ class dblink(object): unmerges old version (if required) calls doebuild(mydo=pkg_postinst) calls env_update - calls elog_process @param srcroot: Typically this is ${D} @type srcroot: String (Path) @@ -2921,7 +2935,7 @@ class dblink(object): others_in_slot.append(dblink(self.cat, catsplit(cur_cpv)[1], settings=config(clone=self.settings), vartree=self.vartree, treetype="vartree", - scheduler=self._scheduler)) + scheduler=self._scheduler, pipe=self._pipe)) retval = self._security_check(others_in_slot) if retval: @@ -3069,8 +3083,6 @@ class dblink(object): # check for package collisions blockers = None if self._blockers is not None: - # This is only supposed to be called when - # the vdb is locked, like it is here. blockers = self._blockers() if blockers is None: blockers = [] @@ -3242,16 +3254,8 @@ class dblink(object): cfgfiledict["IGNORE"] = 1 break - merge_task = MergeProcess( - background=(self.settings.get('PORTAGE_BACKGROUND') == '1'), - cfgfiledict=cfgfiledict, conf_mem_file=conf_mem_file, dblink=self, - destroot=destroot, - logfile=self.settings.get('PORTAGE_LOG_FILE'), - scheduler=(scheduler or PollScheduler().sched_iface), - srcroot=srcroot) - - merge_task.start() - rval = merge_task.wait() + rval = self._merge_contents(srcroot, destroot, cfgfiledict, + conf_mem_file) if rval != os.EX_OK: return rval @@ -3438,7 +3442,7 @@ class dblink(object): return backup_p - def _merge_process(self, srcroot, destroot, cfgfiledict, conf_mem_file): + def _merge_contents(self, srcroot, destroot, cfgfiledict, conf_mem_file): cfgfiledict_orig = cfgfiledict.copy() @@ -3667,7 +3671,7 @@ class dblink(object): msg.append(_("This file will be renamed to a different name:")) msg.append(" '%s'" % backup_dest) msg.append("") - self._elog_subprocess("eerror", "preinst", msg) + self._eerror("preinst", msg) if movefile(mydest, backup_dest, mysettings=self.settings, encoding=_encodings['merge']) is None: @@ -3745,7 +3749,7 @@ class dblink(object): msg.append(_("This file will be merged with a different name:")) msg.append(" '%s'" % newdest) msg.append("") - self._elog_subprocess("eerror", "preinst", msg) + self._eerror("preinst", msg) mydest = newdest elif stat.S_ISREG(mydmode) or (stat.S_ISLNK(mydmode) and os.path.exists(mydest) and stat.S_ISREG(os.stat(mydest)[stat.ST_MODE])): @@ -3929,7 +3933,7 @@ class dblink(object): self._scheduler.dblinkEbuildPhase( self, mydbapi, myebuild, phase) - elog_process(self.mycpv, self.settings) + self._elog_process() if 'noclean' not in self.settings.features and \ (retval == os.EX_OK or \ @@ -4029,10 +4033,16 @@ def merge(mycat, mypkg, pkgloc, infloc, writemsg(_("Permission denied: access('%s', W_OK)\n") % settings['EROOT'], noiselevel=-1) return errno.EACCES - mylink = dblink(mycat, mypkg, settings=settings, treetype=mytree, - vartree=vartree, blockers=blockers, scheduler=scheduler) - return mylink.merge(pkgloc, infloc, myebuild=myebuild, - mydbapi=mydbapi, prev_mtimes=prev_mtimes) + background = (settings.get('PORTAGE_BACKGROUND') == '1') + merge_task = MergeProcess( + dblink=dblink, mycat=mycat, mypkg=mypkg, settings=settings, + treetype=mytree, vartree=vartree, scheduler=scheduler, + background=background, blockers=blockers, pkgloc=pkgloc, + infloc=infloc, myebuild=myebuild, mydbapi=mydbapi, + prev_mtimes=prev_mtimes, logfile=settings.get('PORTAGE_LOG_FILE')) + merge_task.start() + retcode = merge_task.wait() + return retcode def unmerge(cat, pkg, myroot=None, settings=None, mytrimworld=None, vartree=None, -- 2.26.2