Change the way the way things that have to call the scheduler interact
authorZac Medico <zmedico@gentoo.org>
Fri, 4 Jul 2008 06:15:43 +0000 (06:15 -0000)
committerZac Medico <zmedico@gentoo.org>
Fri, 4 Jul 2008 06:15:43 +0000 (06:15 -0000)
with it:

 * Return a unique integer id from scheduler.register(), to be passed back
   into other scheduler methods.

 * Control handler unregistration with the handler's return value, like
   some other frameworks do for similar callbacks.

 * Add a SpawnProcess.reg_id attribute to store the id returned from
   scheduler.register()

 * Pass the SpawnProcess.reg_id value into scheduler.schedule() calls,
   so the scheduler knows to return when the callback referred to by
   the given id unregisters itself by returning False.

svn path=/main/trunk/; revision=10921

pym/_emerge/__init__.py

index b02d347bc03a6430bc37191df912743c052d9d87..a903e52a6aebdb89f4784191a39be010d56d5a9a 100644 (file)
@@ -1507,7 +1507,7 @@ class SpawnProcess(SubProcess):
                "uid", "gid", "groups", "umask", "logfile",
                "path_lookup", "pre_exec")
 
-       __slots__ = ("args", "files", "register", "unregister", "registered") + \
+       __slots__ = ("args", "files", "registered", "reg_id", "scheduler") + \
                _spawn_kwarg_names
 
        _file_names = ("process", "out")
@@ -1573,9 +1573,9 @@ class SpawnProcess(SubProcess):
 
                os.close(slave_fd)
                files.process = os.fdopen(master_fd, 'r')
-               self.registered = True
-               self.register(files.process.fileno(),
+               self.reg_id = self.scheduler.register(files.process.fileno(),
                        select.POLLIN, self._output_handler)
+               self.registered = True
 
        def _output_handler(self, fd, event):
                files = self.files
@@ -1593,7 +1593,7 @@ class SpawnProcess(SubProcess):
                                f.flush()
                                f.close()
                        self.registered = False
-                       self.unregister(fd)
+               return self.registered
 
 class EbuildFetcherAsync(SpawnProcess):
 
@@ -1766,9 +1766,8 @@ class EbuildBuild(SlotObject):
                                        (pkg_count.curval, pkg_count.maxval, pkg.cpv)
                                logger.log(msg, short_msg=short_msg)
 
-                               build = EbuildExecuter(pkg=pkg, register=scheduler.register,
-                                       schedule=scheduler.schedule, settings=settings,
-                                       unregister=scheduler.unregister)
+                               build = EbuildExecuter(pkg=pkg, scheduler=scheduler,
+                                       settings=settings)
                                retval = build.execute()
                                if retval != os.EX_OK:
                                        return retval
@@ -1804,9 +1803,8 @@ class EbuildBuild(SlotObject):
                                        (pkg_count.curval, pkg_count.curval, pkg.cpv)
                                logger.log(msg, short_msg=short_msg)
 
-                               build = EbuildExecuter(pkg=pkg, register=scheduler.register,
-                                       schedule=scheduler.schedule, settings=settings,
-                                       unregister=scheduler.unregister)
+                               build = EbuildExecuter(pkg=pkg, scheduler=scheduler,
+                                       settings=settings)
                                retval = build.execute()
                                if retval != os.EX_OK:
                                        return retval
@@ -1827,7 +1825,7 @@ class EbuildBuild(SlotObject):
 
 class EbuildExecuter(SlotObject):
 
-       __slots__ = ("pkg", "register", "schedule", "settings", "unregister")
+       __slots__ = ("pkg", "scheduler", "settings")
 
        _phases = ("setup", "unpack", "compile", "test", "install")
 
@@ -1857,14 +1855,12 @@ class EbuildExecuter(SlotObject):
 
                for mydo in self._phases:
                        ebuild_phase = EbuildPhase(fd_pipes=fd_pipes,
-                               pkg=self.pkg, phase=mydo, register=self.register,
-                               settings=settings, tree=tree, unregister=self.unregister)
+                               pkg=self.pkg, phase=mydo, scheduler=self.scheduler,
+                               settings=settings, tree=tree)
 
                        ebuild_phase.start()
-                       retval = None
-                       while retval is None:
-                               self.schedule()
-                               retval = ebuild_phase.poll()
+                       self.scheduler.schedule(ebuild_phase.reg_id)
+                       retval = ebuild_phase.wait()
 
                        if retval != os.EX_OK:
                                return retval
@@ -1874,8 +1870,8 @@ class EbuildExecuter(SlotObject):
 class EbuildPhase(SubProcess):
 
        __slots__ = ("fd_pipes", "phase", "pkg",
-               "register", "settings", "tree", "unregister",
-               "files", "registered")
+               "scheduler", "settings", "tree",
+               "files", "registered", "reg_id")
 
        _file_names = ("log", "stdout", "ebuild")
        _files_dict = slot_dict_class(_file_names, prefix="")
@@ -1976,8 +1972,9 @@ class EbuildPhase(SubProcess):
 
                os.close(slave_fd)
                files.ebuild = os.fdopen(master_fd, 'r')
+               self.reg_id = self.scheduler.register(files.ebuild.fileno(),
+                       select.POLLIN, output_handler)
                self.registered = True
-               self.register(files.ebuild.fileno(), select.POLLIN, output_handler)
 
        def _output_handler(self, fd, event):
                files = self.files
@@ -1996,7 +1993,7 @@ class EbuildPhase(SubProcess):
                        for f in files.values():
                                f.close()
                        self.registered = False
-                       self.unregister(fd)
+               return self.registered
 
        def _dummy_handler(self, fd, event):
                """
@@ -2017,7 +2014,7 @@ class EbuildPhase(SubProcess):
                        for f in files.values():
                                f.close()
                        self.registered = False
-                       self.unregister(fd)
+               return self.registered
 
        def _set_returncode(self, wait_retval):
                SubProcess._set_returncode(self, wait_retval)
@@ -2195,9 +2192,8 @@ class Binpkg(SlotObject):
                                                for line in wrap(waiting_msg, 65))
                                        writemsg(waiting_msg, noiselevel=-1)
 
-                               while retval is None:
-                                       scheduler.schedule()
-                                       retval = prefetcher.poll()
+                                       scheduler.schedule(prefetcher.reg_id)
+                                       retval = prefetcher.wait()
                        del prefetcher
 
                fetcher = BinpkgFetcher(pkg=pkg, pretend=opts.pretend,
@@ -2306,27 +2302,22 @@ class Binpkg(SlotObject):
 
                        phase = "setup"
                        ebuild_phase = EbuildPhase(fd_pipes=fd_pipes,
-                               pkg=pkg, phase=phase, register=scheduler.register,
-                               settings=settings, tree=tree, unregister=scheduler.unregister)
+                               pkg=pkg, phase=phase, scheduler=scheduler,
+                               settings=settings, tree=tree)
 
                        ebuild_phase.start()
-                       retval = None
-                       while retval is None:
-                               scheduler.schedule()
-                               retval = ebuild_phase.poll()
+                       scheduler.schedule(ebuild_phase.reg_id)
+                       retval = ebuild_phase.wait()
 
                        if retval != os.EX_OK:
                                return retval
 
                        extractor = BinpkgExtractorAsync(image_dir=image_dir,
-                               pkg=pkg, pkg_path=pkg_path, register=scheduler.register,
-                               unregister=scheduler.unregister)
+                               pkg=pkg, pkg_path=pkg_path, scheduler=scheduler)
                        portage.writemsg_stdout(">>> Extracting %s\n" % pkg.cpv)
                        extractor.start()
-                       retval = None
-                       while retval is None:
-                               scheduler.schedule()
-                               retval = extractor.poll()
+                       scheduler.schedule(extractor.reg_id)
+                       retval = extractor.wait()
 
                        if retval != os.EX_OK:
                                writemsg("!!! Error Extracting '%s'\n" % pkg_path,
@@ -7061,7 +7052,7 @@ class Scheduler(object):
        _fetch_log = "/var/log/emerge-fetch.log"
 
        class _iface_class(SlotObject):
-               __slots__ = ("register", "schedule", "unregister")
+               __slots__ = ("register", "schedule")
 
        class _build_opts_class(SlotObject):
                __slots__ = ("buildpkg", "buildpkgonly",
@@ -7111,9 +7102,11 @@ class Scheduler(object):
                self._logger = self._emerge_log_class(
                        xterm_titles=("notitles" not in settings.features))
                self._sched_iface = self._iface_class(
-                       register=self._register, schedule=self._schedule,
-                               unregister=self._unregister)
+                       register=self._register, schedule=self._schedule)
                self._poll_event_handlers = {}
+               self._poll_event_handler_ids = {}
+               # Increment id for each new handler.
+               self._event_handler_id = 0
 
                try:
                        self._poll = select.poll()
@@ -7264,14 +7257,14 @@ class Scheduler(object):
                elif pkg.type_name == "ebuild":
 
                        prefetcher = EbuildFetcherAsync(logfile=self._fetch_log, pkg=pkg,
-                               register=self._register, unregister=self._unregister)
+                               scheduler=self._sched_iface)
 
                elif pkg.type_name == "binary" and \
                        "--getbinpkg" in self.myopts and \
                        pkg.root_config.trees["bintree"].isremote(pkg.cpv):
 
                        prefetcher = BinpkgFetcherAsync(logfile=self._fetch_log,
-                               pkg=pkg, register=self._register, unregister=self._unregister)
+                               pkg=pkg, scheduler=self._sched_iface)
 
                return prefetcher
 
@@ -7476,30 +7469,43 @@ class Scheduler(object):
                return (mylist, dropped_tasks)
 
        def _register(self, f, eventmask, handler):
-               self._poll_event_handlers[f] = handler
+               """
+               @rtype: Integer
+               @return: A unique registration id, for use in schedule() or
+                       unregister() calls.
+               """
+               self._event_handler_id += 1
+               reg_id = self._event_handler_id
+               self._poll_event_handler_ids[reg_id] = f
+               self._poll_event_handlers[f] = (handler, reg_id)
                self._poll.register(f, eventmask)
+               return reg_id
 
-       def _unregister(self, f):
+       def _unregister(self, reg_id):
+               f = self._poll_event_handler_ids[reg_id]
                self._poll.unregister(f)
                del self._poll_event_handlers[f]
+               del self._poll_event_handler_ids[reg_id]
                self._schedule_tasks()
 
-       def _schedule(self):
+       def _schedule(self, wait_id):
+               """
+               Schedule until wait_id is not longer registered
+               for poll() events.
+               @type wait_id: int
+               @param wait_id: a task id to wait for
+               """
                event_handlers = self._poll_event_handlers
-               running_tasks = self._prefetch_queue.running_tasks
+               handler_ids = self._poll_event_handler_ids
                poll = self._poll.poll
 
                self._schedule_tasks()
 
-               while event_handlers:
+               while wait_id in handler_ids:
                        for f, event in poll():
-                               event_handlers[f](f, event)
-
-                       if len(event_handlers) <= len(running_tasks):
-                               # Assuming one handler per task, this
-                               # means the caller has unregistered it's
-                               # handler, so it's time to yield.
-                               break
+                               handler, reg_id = event_handlers[f]
+                               if not handler(f, event):
+                                       self._unregister(reg_id)
 
        def _world_atom(self, pkg):
                """
@@ -7565,8 +7571,6 @@ class Scheduler(object):
                                self._logger.log(" >>> emerge (%s of %s) %s to %s" % \
                                        (pkg_count.curval, pkg_count.maxval, pkg.cpv, pkg.root))
 
-                       self._schedule()
-
                        if pkg.type_name == "ebuild":
                                build = EbuildBuild(args_set=self._args_set,
                                        find_blockers=self._find_blockers(pkg),