* Unify the scheduler's "build" and "extract" queues into a single
authorZac Medico <zmedico@gentoo.org>
Wed, 9 Jul 2008 12:33:08 +0000 (12:33 -0000)
committerZac Medico <zmedico@gentoo.org>
Wed, 9 Jul 2008 12:33:08 +0000 (12:33 -0000)
   "jobs" queue.

 * Add support for logging fetches to /var/log/emerge-fetch.log when
   --jobs is enabled. Previously this log was only used for
   the parallel-fetch feature but now it's also used for --jobs.
   The scheduler's "prefetch" queue has been renamed to "fetch" since
   it's not exclusively used for parallel-fetch anymore.

 * Pass the "background" parameter from Binpkg in to the BinpkgFetcher
   instance, to send output to emerge-fetch.log instead of stdout.

svn path=/main/trunk/; revision=10999

pym/_emerge/__init__.py

index a696e5c2b23023bf8ce396eaf4eb87ab106e0f83..c74b5f12eb372fe81e1971495458a907d495b74b 100644 (file)
@@ -2541,7 +2541,8 @@ class Binpkg(CompositeTask):
                pkg = self.pkg
                pkg_count = self.pkg_count
                fetcher = BinpkgFetcher(background=self.background,
-                       pkg=self.pkg, scheduler=self.scheduler)
+                       logfile=self.scheduler.fetch.log_file, pkg=self.pkg,
+                       scheduler=self.scheduler)
                pkg_path = fetcher.pkg_path
                self._pkg_path = pkg_path
 
@@ -2553,7 +2554,12 @@ class Binpkg(CompositeTask):
                                (pkg_count.curval, pkg_count.maxval, pkg.cpv)
                        self.logger.log(msg, short_msg=short_msg)
 
-                       self._start_task(fetcher, self._fetcher_exit)
+                       if self.background:
+                               fetcher.addExitListener(self._fetcher_exit)
+                               self._current_task = fetcher
+                               self.scheduler.fetch.schedule(fetcher)
+                       else:
+                               self._start_task(fetcher, self._fetcher_exit)
                        return
 
                self._fetcher_exit(fetcher)
@@ -7530,6 +7536,9 @@ class SequentialTaskQueue(SlotObject):
        def add(self, task):
                self._task_queue.append(task)
 
+       def addFront(self, task):
+               self._task_queue.appendleft(task)
+
        def schedule(self):
 
                if not self:
@@ -7586,10 +7595,13 @@ class Scheduler(object):
        _fetch_log = "/var/log/emerge-fetch.log"
 
        class _iface_class(SlotObject):
-               __slots__ = ("register", "schedule")
+               __slots__ = ("fetch", "register", "schedule")
+
+       class _fetch_iface_class(SlotObject):
+               __slots__ = ("log_file", "schedule")
 
        _task_queues_class = slot_dict_class(
-               ("build", "extract", "merge", "prefetch",), prefix="")
+               ("merge", "jobs", "fetch",), prefix="")
 
        class _build_opts_class(SlotObject):
                __slots__ = ("buildpkg", "buildpkgonly",
@@ -7640,8 +7652,11 @@ class Scheduler(object):
                self.curval = 0
                self._logger = self._emerge_log_class(
                        xterm_titles=("notitles" not in settings.features))
+               fetch_iface = self._fetch_iface_class(log_file=self._fetch_log,
+                       schedule=self._schedule_fetch)
                self._sched_iface = self._iface_class(
-                       register=self._register, schedule=self._schedule)
+                       fetch=fetch_iface, register=self._register,
+                       schedule=self._schedule)
                self._poll_event_handlers = {}
                self._poll_event_handler_ids = {}
                # Increment id for each new handler.
@@ -7656,7 +7671,6 @@ class Scheduler(object):
                for k in self._task_queues.allowed_keys:
                        setattr(self._task_queues, k, SequentialTaskQueue())
 
-               self._add_task = self._task_queues.prefetch.add
                self._prefetchers = weakref.WeakValueDictionary()
                self._pkg_queue = []
                self._completed_tasks = set()
@@ -7672,6 +7686,7 @@ class Scheduler(object):
                if max_jobs is None:
                        max_jobs = 1
                self._set_max_jobs(max_jobs)
+               background = self._max_jobs > 1
 
                self._max_load = myopts.get("--load-average")
 
@@ -7695,6 +7710,7 @@ class Scheduler(object):
                        elif len(mergelist) > 1:
                                self._parallel_fetch = True
 
+               if background or self._parallel_fetch:
                                # clear out existing fetch log if it exists
                                try:
                                        open(self._fetch_log, 'w')
@@ -7703,7 +7719,7 @@ class Scheduler(object):
 
        def _set_max_jobs(self, max_jobs):
                self._max_jobs = max_jobs
-               self._task_queues.build.max_jobs = max_jobs
+               self._task_queues.jobs.max_jobs = max_jobs
 
        def _set_digraph(self, digraph):
                if self._max_jobs < 2:
@@ -7769,6 +7785,13 @@ class Scheduler(object):
                        if pargs:
                                self.status = pargs[0]
 
+       def _schedule_fetch(self, fetcher):
+               """
+               Schedule a fetcher on the fetch queue, in order to
+               serialize access to the fetch log.
+               """
+               self._task_queues.fetch.addFront(fetcher)
+
        def _find_blockers(self, new_pkg):
                """
                Returns a callable which should be called only when
@@ -7855,7 +7878,7 @@ class Scheduler(object):
                        for pkg in self._mergelist:
                                prefetcher = self._create_prefetcher(pkg)
                                if prefetcher is not None:
-                                       self._add_task(prefetcher)
+                                       self._task_queues.fetch.add(prefetcher)
                                        prefetchers[pkg] = prefetcher
 
        def _create_prefetcher(self, pkg):
@@ -8116,7 +8139,7 @@ class Scheduler(object):
                        del pkg_queue[:]
                        self._completed_tasks.clear()
                        self._digraph = None
-                       self._task_queues.prefetch.clear()
+                       self._task_queues.fetch.clear()
 
                        # discard any failures and return the
                        # exist status of the last one
@@ -8239,10 +8262,10 @@ class Scheduler(object):
                                task_queues.merge.add(merge)
                        elif pkg.built:
                                task.addExitListener(self._extract_exit)
-                               task_queues.extract.add(task)
+                               task_queues.jobs.add(task)
                        else:
                                task.addExitListener(self._build_exit)
-                               task_queues.build.add(task)
+                               task_queues.jobs.add(task)
 
                while self._jobs:
                        self._schedule_main(wait=True)