"uid", "gid", "groups", "umask", "logfile",
"path_lookup", "pre_exec")
- __slots__ = ("args", "files", "register", "unregister", "registered") + \
+ __slots__ = ("args", "files", "registered", "reg_id", "scheduler") + \
_spawn_kwarg_names
_file_names = ("process", "out")
os.close(slave_fd)
files.process = os.fdopen(master_fd, 'r')
- self.registered = True
- self.register(files.process.fileno(),
+ self.reg_id = self.scheduler.register(files.process.fileno(),
select.POLLIN, self._output_handler)
+ self.registered = True
def _output_handler(self, fd, event):
files = self.files
f.flush()
f.close()
self.registered = False
- self.unregister(fd)
+ return self.registered
class EbuildFetcherAsync(SpawnProcess):
(pkg_count.curval, pkg_count.maxval, pkg.cpv)
logger.log(msg, short_msg=short_msg)
- build = EbuildExecuter(pkg=pkg, register=scheduler.register,
- schedule=scheduler.schedule, settings=settings,
- unregister=scheduler.unregister)
+ build = EbuildExecuter(pkg=pkg, scheduler=scheduler,
+ settings=settings)
retval = build.execute()
if retval != os.EX_OK:
return retval
(pkg_count.curval, pkg_count.curval, pkg.cpv)
logger.log(msg, short_msg=short_msg)
- build = EbuildExecuter(pkg=pkg, register=scheduler.register,
- schedule=scheduler.schedule, settings=settings,
- unregister=scheduler.unregister)
+ build = EbuildExecuter(pkg=pkg, scheduler=scheduler,
+ settings=settings)
retval = build.execute()
if retval != os.EX_OK:
return retval
class EbuildExecuter(SlotObject):
- __slots__ = ("pkg", "register", "schedule", "settings", "unregister")
+ __slots__ = ("pkg", "scheduler", "settings")
_phases = ("setup", "unpack", "compile", "test", "install")
for mydo in self._phases:
ebuild_phase = EbuildPhase(fd_pipes=fd_pipes,
- pkg=self.pkg, phase=mydo, register=self.register,
- settings=settings, tree=tree, unregister=self.unregister)
+ pkg=self.pkg, phase=mydo, scheduler=self.scheduler,
+ settings=settings, tree=tree)
ebuild_phase.start()
- retval = None
- while retval is None:
- self.schedule()
- retval = ebuild_phase.poll()
+ self.scheduler.schedule(ebuild_phase.reg_id)
+ retval = ebuild_phase.wait()
if retval != os.EX_OK:
return retval
class EbuildPhase(SubProcess):
__slots__ = ("fd_pipes", "phase", "pkg",
- "register", "settings", "tree", "unregister",
- "files", "registered")
+ "scheduler", "settings", "tree",
+ "files", "registered", "reg_id")
_file_names = ("log", "stdout", "ebuild")
_files_dict = slot_dict_class(_file_names, prefix="")
os.close(slave_fd)
files.ebuild = os.fdopen(master_fd, 'r')
+ self.reg_id = self.scheduler.register(files.ebuild.fileno(),
+ select.POLLIN, output_handler)
self.registered = True
- self.register(files.ebuild.fileno(), select.POLLIN, output_handler)
def _output_handler(self, fd, event):
files = self.files
for f in files.values():
f.close()
self.registered = False
- self.unregister(fd)
+ return self.registered
def _dummy_handler(self, fd, event):
"""
for f in files.values():
f.close()
self.registered = False
- self.unregister(fd)
+ return self.registered
def _set_returncode(self, wait_retval):
SubProcess._set_returncode(self, wait_retval)
for line in wrap(waiting_msg, 65))
writemsg(waiting_msg, noiselevel=-1)
- while retval is None:
- scheduler.schedule()
- retval = prefetcher.poll()
+ scheduler.schedule(prefetcher.reg_id)
+ retval = prefetcher.wait()
del prefetcher
fetcher = BinpkgFetcher(pkg=pkg, pretend=opts.pretend,
phase = "setup"
ebuild_phase = EbuildPhase(fd_pipes=fd_pipes,
- pkg=pkg, phase=phase, register=scheduler.register,
- settings=settings, tree=tree, unregister=scheduler.unregister)
+ pkg=pkg, phase=phase, scheduler=scheduler,
+ settings=settings, tree=tree)
ebuild_phase.start()
- retval = None
- while retval is None:
- scheduler.schedule()
- retval = ebuild_phase.poll()
+ scheduler.schedule(ebuild_phase.reg_id)
+ retval = ebuild_phase.wait()
if retval != os.EX_OK:
return retval
extractor = BinpkgExtractorAsync(image_dir=image_dir,
- pkg=pkg, pkg_path=pkg_path, register=scheduler.register,
- unregister=scheduler.unregister)
+ pkg=pkg, pkg_path=pkg_path, scheduler=scheduler)
portage.writemsg_stdout(">>> Extracting %s\n" % pkg.cpv)
extractor.start()
- retval = None
- while retval is None:
- scheduler.schedule()
- retval = extractor.poll()
+ scheduler.schedule(extractor.reg_id)
+ retval = extractor.wait()
if retval != os.EX_OK:
writemsg("!!! Error Extracting '%s'\n" % pkg_path,
_fetch_log = "/var/log/emerge-fetch.log"
class _iface_class(SlotObject):
- __slots__ = ("register", "schedule", "unregister")
+ __slots__ = ("register", "schedule")
class _build_opts_class(SlotObject):
__slots__ = ("buildpkg", "buildpkgonly",
self._logger = self._emerge_log_class(
xterm_titles=("notitles" not in settings.features))
self._sched_iface = self._iface_class(
- register=self._register, schedule=self._schedule,
- unregister=self._unregister)
+ register=self._register, schedule=self._schedule)
self._poll_event_handlers = {}
+ self._poll_event_handler_ids = {}
+ # Increment id for each new handler.
+ self._event_handler_id = 0
try:
self._poll = select.poll()
elif pkg.type_name == "ebuild":
prefetcher = EbuildFetcherAsync(logfile=self._fetch_log, pkg=pkg,
- register=self._register, unregister=self._unregister)
+ scheduler=self._sched_iface)
elif pkg.type_name == "binary" and \
"--getbinpkg" in self.myopts and \
pkg.root_config.trees["bintree"].isremote(pkg.cpv):
prefetcher = BinpkgFetcherAsync(logfile=self._fetch_log,
- pkg=pkg, register=self._register, unregister=self._unregister)
+ pkg=pkg, scheduler=self._sched_iface)
return prefetcher
return (mylist, dropped_tasks)
def _register(self, f, eventmask, handler):
- self._poll_event_handlers[f] = handler
+ """
+ @rtype: Integer
+ @return: A unique registration id, for use in schedule() or
+ unregister() calls.
+ """
+ self._event_handler_id += 1
+ reg_id = self._event_handler_id
+ self._poll_event_handler_ids[reg_id] = f
+ self._poll_event_handlers[f] = (handler, reg_id)
self._poll.register(f, eventmask)
+ return reg_id
- def _unregister(self, f):
+ def _unregister(self, reg_id):
+ f = self._poll_event_handler_ids[reg_id]
self._poll.unregister(f)
del self._poll_event_handlers[f]
+ del self._poll_event_handler_ids[reg_id]
self._schedule_tasks()
- def _schedule(self):
+ def _schedule(self, wait_id):
+ """
+ Schedule until wait_id is not longer registered
+ for poll() events.
+ @type wait_id: int
+ @param wait_id: a task id to wait for
+ """
event_handlers = self._poll_event_handlers
- running_tasks = self._prefetch_queue.running_tasks
+ handler_ids = self._poll_event_handler_ids
poll = self._poll.poll
self._schedule_tasks()
- while event_handlers:
+ while wait_id in handler_ids:
for f, event in poll():
- event_handlers[f](f, event)
-
- if len(event_handlers) <= len(running_tasks):
- # Assuming one handler per task, this
- # means the caller has unregistered it's
- # handler, so it's time to yield.
- break
+ handler, reg_id = event_handlers[f]
+ if not handler(f, event):
+ self._unregister(reg_id)
def _world_atom(self, pkg):
"""
self._logger.log(" >>> emerge (%s of %s) %s to %s" % \
(pkg_count.curval, pkg_count.maxval, pkg.cpv, pkg.root))
- self._schedule()
-
if pkg.type_name == "ebuild":
build = EbuildBuild(args_set=self._args_set,
find_blockers=self._find_blockers(pkg),