Refactor and simplify the main task scheduling and poll loops:
authorZac Medico <zmedico@gentoo.org>
Fri, 11 Jul 2008 02:38:35 +0000 (02:38 -0000)
committerZac Medico <zmedico@gentoo.org>
Fri, 11 Jul 2008 02:38:35 +0000 (02:38 -0000)
* Make output handlers unregister themselves and call wait() to notify
  exit listeners immediately. This makes the exit listeners more useful
  for scheduling tasks. This makes the poll loop nice an clean because
  it just calls the handlers and then the handlers can do the scheduling
  when necessary.

* Make SequentialTaskQueue.add() and addFront() trigger scheduling
  internally, so that it's more of a chain reaction than something that has
  to be done explicitly.

svn path=/main/trunk/; revision=11013

pym/_emerge/__init__.py

index 7f122bb4d555abbe0961acbab7fd7ceb34b9978e..dce4ef8d8092f17a0185f96cd5a9e99164ea52c0 100644 (file)
@@ -1886,7 +1886,8 @@ class SpawnProcess(SubProcess):
                                f.flush()
                                f.close()
                        self.registered = False
-                       self._wait()
+                       self.scheduler.unregister(self._reg_id)
+                       self.wait()
                return self.registered
 
        def _dummy_handler(self, fd, event):
@@ -1908,7 +1909,8 @@ class SpawnProcess(SubProcess):
                        for f in files.values():
                                f.close()
                        self.registered = False
-                       self._wait()
+                       self.scheduler.unregister(self._reg_id)
+                       self.wait()
                return self.registered
 
 class EbuildFetcher(SpawnProcess):
@@ -2045,11 +2047,11 @@ class EbuildBuild(CompositeTask):
                        prefetcher.cancel()
                elif prefetcher.poll() is None:
 
-                       waiting_msg = ("Fetching '%s' " + \
+                       waiting_msg = "Fetching files " + \
                                "in the background. " + \
                                "To view fetch progress, run `tail -f " + \
                                "/var/log/emerge-fetch.log` in another " + \
-                               "terminal.") % prefetcher.pkg_path
+                               "terminal."
                        msg_prefix = colorize("GOOD", " * ")
                        from textwrap import wrap
                        waiting_msg = "".join("%s%s\n" % (msg_prefix, line) \
@@ -2371,7 +2373,8 @@ class EbuildMetadataPhase(SubProcess):
                        for f in files.values():
                                f.close()
                        self.registered = False
-                       self._wait()
+                       self.scheduler.unregister(self._reg_id)
+                       self.wait()
 
                        if self.returncode == os.EX_OK:
                                metadata = izip(portage.auxdbkeys,
@@ -2519,7 +2522,8 @@ class EbuildPhase(SubProcess):
                        for f in files.values():
                                f.close()
                        self.registered = False
-                       self._wait()
+                       self.scheduler.unregister(self._reg_id)
+                       self.wait()
                return self.registered
 
        def _dummy_handler(self, fd, event):
@@ -2541,7 +2545,8 @@ class EbuildPhase(SubProcess):
                        for f in files.values():
                                f.close()
                        self.registered = False
-                       self._wait()
+                       self.scheduler.unregister(self._reg_id)
+                       self.wait()
                return self.registered
 
        def _set_returncode(self, wait_retval):
@@ -7720,9 +7725,11 @@ class SequentialTaskQueue(SlotObject):
 
        def add(self, task):
                self._task_queue.append(task)
+               self.schedule()
 
        def addFront(self, task):
                self._task_queue.appendleft(task)
+               self.schedule()
 
        def schedule(self):
 
@@ -7814,12 +7821,29 @@ class PollLoop(object):
 
                return True
 
+       def _poll_loop(self):
+
+               event_handlers = self._poll_event_handlers
+               poll = self._poll.poll
+               state_change = 0
+
+               while event_handlers:
+                       for f, event in poll():
+                               handler, reg_id = event_handlers[f]
+                               if not handler(f, event):
+                                       state_change += 1
+
+               if not state_change:
+                       raise AssertionError("tight loop")
+
        def _register(self, f, eventmask, handler):
                """
                @rtype: Integer
                @return: A unique registration id, for use in schedule() or
                        unregister() calls.
                """
+               if f in self._poll_event_handlers:
+                       raise AssertionError("fd %d is already registered" % f)
                self._event_handler_id += 1
                reg_id = self._event_handler_id
                self._poll_event_handler_ids[reg_id] = f
@@ -7832,7 +7856,6 @@ class PollLoop(object):
                self._poll.unregister(f)
                del self._poll_event_handlers[f]
                del self._poll_event_handler_ids[reg_id]
-               self._schedule_tasks()
 
        def _schedule(self, wait_id):
                """
@@ -7845,48 +7868,10 @@ class PollLoop(object):
                handler_ids = self._poll_event_handler_ids
                poll = self._poll.poll
 
-               self._schedule_tasks()
-
                while wait_id in handler_ids:
                        for f, event in poll():
                                handler, reg_id = event_handlers[f]
-                               if not handler(f, event):
-                                       self._unregister(reg_id)
-
-       def _schedule_tasks(self):
-               return False
-
-       def _schedule_main(self, wait=False):
-
-               event_handlers = self._poll_event_handlers
-               poll = self._poll.poll
-               max_jobs = self._max_jobs
-
-               state_change = 0
-
-               if self._schedule_tasks():
-                       state_change += 1
-
-               while event_handlers:
-                       jobs = self._jobs
-
-                       for f, event in poll():
-                               handler, reg_id = event_handlers[f]
-                               if not handler(f, event):
-                                       state_change += 1
-                                       self._unregister(reg_id)
-
-                       if jobs == self._jobs:
-                               continue
-
-                       if self._schedule_tasks():
-                               state_change += 1
-
-                       if not wait and self._jobs < max_jobs:
-                               break
-
-               if not state_change:
-                       raise AssertionError("tight loop")
+                               handler(f, event)
 
 class Scheduler(PollLoop):
 
@@ -7904,7 +7889,7 @@ class Scheduler(PollLoop):
        _fetch_log = "/var/log/emerge-fetch.log"
 
        class _iface_class(SlotObject):
-               __slots__ = ("fetch", "register", "schedule")
+               __slots__ = ("fetch", "register", "schedule", "unregister")
 
        class _fetch_iface_class(SlotObject):
                __slots__ = ("log_file", "schedule")
@@ -7966,7 +7951,7 @@ class Scheduler(PollLoop):
                        schedule=self._schedule_fetch)
                self._sched_iface = self._iface_class(
                        fetch=fetch_iface, register=self._register,
-                       schedule=self._schedule)
+                       schedule=self._schedule, unregister=self._unregister)
 
                self._task_queues = self._task_queues_class()
                for k in self._task_queues.allowed_keys:
@@ -8535,24 +8520,30 @@ class Scheduler(PollLoop):
                if self._is_restart_scheduled():
                        self._set_max_jobs(1)
 
-               pkg_queue = self._pkg_queue
-               failed_pkgs = self._failed_pkgs
+               while not self._failed_pkgs and \
+                       self._schedule_tasks():
+                       self._poll_loop()
+
+               while self._jobs:
+                       self._poll_loop()
+
+       def _schedule_tasks(self):
+               """
+               @rtype: bool
+               @returns: True if tasks remain to schedule, False otherwise.
+               """
+
                task_queues = self._task_queues
-               max_jobs = self._max_jobs
-               max_load = self._max_load
-               background = max_jobs > 1
+               background = self._max_jobs > 1
 
-               while pkg_queue and not failed_pkgs:
+               while self._can_add_job():
 
-                       if not self._can_add_job():
-                               self._schedule_main()
-                               continue
+                       if not self._pkg_queue:
+                               return False
 
                        pkg = self._choose_pkg()
-
                        if pkg is None:
-                               self._schedule_main()
-                               continue
+                               return True
 
                        if not pkg.installed:
                                self._pkg_count.curval += 1
@@ -8570,16 +8561,7 @@ class Scheduler(PollLoop):
                        else:
                                task.addExitListener(self._build_exit)
                                task_queues.jobs.add(task)
-
-               while self._jobs:
-                       self._schedule_main(wait=True)
-
-       def _schedule_tasks(self):
-               state_change = 0
-               for x in self._task_queues.values():
-                       if x.schedule():
-                               state_change += 1
-               return bool(state_change)
+               return True
 
        def _task(self, pkg, background):
 
@@ -8747,7 +8729,7 @@ class Scheduler(PollLoop):
 class MetadataRegen(PollLoop):
 
        class _sched_iface_class(SlotObject):
-               __slots__ = ("register", "schedule")
+               __slots__ = ("register", "schedule", "unregister")
 
        def __init__(self, portdb, max_jobs=None, max_load=None):
                PollLoop.__init__(self)
@@ -8756,14 +8738,15 @@ class MetadataRegen(PollLoop):
                if max_jobs is None:
                        max_jobs = 1
 
-               self._job_queue = SequentialTaskQueue(max_jobs=max_jobs)
                self._max_jobs = max_jobs
                self._max_load = max_load
                self._sched_iface = self._sched_iface_class(
                        register=self._register,
-                       schedule=self._schedule)
+                       schedule=self._schedule,
+                       unregister=self._unregister)
 
                self._valid_pkgs = set()
+               self._process_iter = self._iter_metadata_processes()
 
        def _iter_metadata_processes(self):
                portdb = self._portdb
@@ -8800,7 +8783,11 @@ class MetadataRegen(PollLoop):
                                dead_nodes = None
                                break
 
-               self._main_loop()
+               while self._schedule_tasks():
+                       self._poll_loop()
+
+               while self._jobs:
+                       self._poll_loop()
 
                if dead_nodes:
                        for y in self._valid_pkgs:
@@ -8816,31 +8803,23 @@ class MetadataRegen(PollLoop):
                                        except (KeyError, CacheError):
                                                pass
 
-       def _main_loop(self):
-
-               process_iter = self._iter_metadata_processes()
-
-               while True:
-
-                       if not self._can_add_job():
-                               self._schedule_main()
-                               continue
-
+       def _schedule_tasks(self):
+               """
+               @rtype: bool
+               @returns: True if there may be remaining tasks to schedule,
+                       False otherwise.
+               """
+               while self._can_add_job():
                        try:
-                               metadata_process = process_iter.next()
+                               metadata_process = self._process_iter.next()
                        except StopIteration:
-                               break
+                               return False
 
                        self._jobs += 1
                        metadata_process.scheduler = self._sched_iface
                        metadata_process.addExitListener(self._metadata_exit)
-                       self._job_queue.add(metadata_process)
-
-               while self._jobs:
-                       self._schedule_main(wait=True)
-
-       def _schedule_tasks(self):
-               return self._job_queue.schedule()
+                       metadata_process.start()
+               return True
 
        def _metadata_exit(self, metadata_process):
                self._jobs -= 1
@@ -8848,6 +8827,7 @@ class MetadataRegen(PollLoop):
                        self._valid_pkgs.discard(metadata_process.cpv)
                        portage.writemsg("Error processing %s, continuing...\n" % \
                                (metadata_process.cpv,))
+               self._schedule_tasks()
 
 class UninstallFailure(portage.exception.PortageException):
        """