TaskScheduler: inherit AsyncScheduler
authorZac Medico <zmedico@gentoo.org>
Fri, 5 Oct 2012 20:48:53 +0000 (13:48 -0700)
committerZac Medico <zmedico@gentoo.org>
Fri, 5 Oct 2012 20:48:53 +0000 (13:48 -0700)
This allows the QueueScheduler class to be eliminated.

pym/_emerge/FifoIpcDaemon.py
pym/_emerge/QueueScheduler.py [deleted file]
pym/_emerge/TaskScheduler.py [deleted file]
pym/portage/tests/ebuild/test_ipc_daemon.py
pym/portage/tests/process/test_poll.py
pym/portage/util/_async/TaskScheduler.py [new file with mode: 0644]

index fcc4ab4b9a9ce65bfc8ded96f00a1e1e18439870..de9dc67b16442418e124dcf47750815540e8b958 100644 (file)
@@ -47,6 +47,8 @@ class FifoIpcDaemon(AbstractPollTask):
                if self.returncode is None:
                        self.returncode = 1
                self._unregister()
+               # notify exit listeners
+               self.wait()
 
        def _wait(self):
                if self.returncode is not None:
diff --git a/pym/_emerge/QueueScheduler.py b/pym/_emerge/QueueScheduler.py
deleted file mode 100644 (file)
index 206087c..0000000
+++ /dev/null
@@ -1,105 +0,0 @@
-# Copyright 1999-2012 Gentoo Foundation
-# Distributed under the terms of the GNU General Public License v2
-
-from _emerge.PollScheduler import PollScheduler
-
-class QueueScheduler(PollScheduler):
-
-       """
-       Add instances of SequentialTaskQueue and then call run(). The
-       run() method returns when no tasks remain.
-       """
-
-       def __init__(self, main=True, max_jobs=None, max_load=None):
-               PollScheduler.__init__(self, main=main)
-
-               if max_jobs is None:
-                       max_jobs = 1
-
-               self._max_jobs = max_jobs
-               self._max_load = max_load
-
-               self._queues = []
-               self._schedule_listeners = []
-
-       def add(self, q):
-               self._queues.append(q)
-
-       def remove(self, q):
-               self._queues.remove(q)
-
-       def clear(self):
-               for q in self._queues:
-                       q.clear()
-
-       def run(self, timeout=None):
-
-               timeout_callback = None
-               if timeout is not None:
-                       def timeout_callback():
-                               timeout_callback.timed_out = True
-                               return False
-                       timeout_callback.timed_out = False
-                       timeout_callback.timeout_id = self.sched_iface.timeout_add(
-                               timeout, timeout_callback)
-
-               term_check_id = self.sched_iface.idle_add(self._termination_check)
-               try:
-                       while not (timeout_callback is not None and
-                               timeout_callback.timed_out):
-                               # We don't have any callbacks to trigger _schedule(),
-                               # so we have to call it explicitly here.
-                               self._schedule()
-                               if self._keep_scheduling():
-                                       self.sched_iface.iteration()
-                               else:
-                                       break
-
-                       while self._is_work_scheduled() and \
-                               not (timeout_callback is not None and
-                               timeout_callback.timed_out):
-                               self.sched_iface.iteration()
-               finally:
-                       self.sched_iface.source_remove(term_check_id)
-                       if timeout_callback is not None:
-                               self.sched_iface.unregister(timeout_callback.timeout_id)
-
-       def _schedule_tasks(self):
-               """
-               @rtype: bool
-               @return: True if there may be remaining tasks to schedule,
-                       False otherwise.
-               """
-               if self._terminated_tasks:
-                       return
-
-               while self._can_add_job():
-                       n = self._max_jobs - self._running_job_count()
-                       if n < 1:
-                               break
-
-                       if not self._start_next_job(n):
-                               return
-
-       def _keep_scheduling(self):
-               return not self._terminated_tasks and any(self._queues)
-
-       def _running_job_count(self):
-               job_count = 0
-               for q in self._queues:
-                       job_count += len(q.running_tasks)
-               self._jobs = job_count
-               return job_count
-
-       def _start_next_job(self, n=1):
-               started_count = 0
-               for q in self._queues:
-                       initial_job_count = len(q.running_tasks)
-                       q.schedule()
-                       final_job_count = len(q.running_tasks)
-                       if final_job_count > initial_job_count:
-                               started_count += (final_job_count - initial_job_count)
-                       if started_count >= n:
-                               break
-               return started_count
-
diff --git a/pym/_emerge/TaskScheduler.py b/pym/_emerge/TaskScheduler.py
deleted file mode 100644 (file)
index 583bfe3..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-# Copyright 1999-2012 Gentoo Foundation
-# Distributed under the terms of the GNU General Public License v2
-
-from _emerge.QueueScheduler import QueueScheduler
-from _emerge.SequentialTaskQueue import SequentialTaskQueue
-
-class TaskScheduler(object):
-
-       """
-       A simple way to handle scheduling of AsynchrousTask instances. Simply
-       add tasks and call run(). The run() method returns when no tasks remain.
-       """
-
-       def __init__(self, main=True, max_jobs=None, max_load=None):
-               self._queue = SequentialTaskQueue(max_jobs=max_jobs)
-               self._scheduler = QueueScheduler(main=main,
-                       max_jobs=max_jobs, max_load=max_load)
-               self.sched_iface = self._scheduler.sched_iface
-               self.run = self._scheduler.run
-               self.clear = self._scheduler.clear
-               self.wait = self._queue.wait
-               self._scheduler.add(self._queue)
-
-       def add(self, task):
-               self._queue.add(task)
-
index 77277fe8ec1881896bd1574cc1e4c41dcf125cec..d4328a1d6ce06fa51f354a5700a351a83ce5fd75 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright 2010-2011 Gentoo Foundation
+# Copyright 2010-2012 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
 import tempfile
@@ -14,10 +14,12 @@ from portage.locks import hardlock_cleanup
 from portage.package.ebuild._ipc.ExitCommand import ExitCommand
 from portage.util import ensure_dirs
 from portage.util._async.ForkProcess import ForkProcess
+from portage.util._async.TaskScheduler import TaskScheduler
+from portage.util._eventloop.global_event_loop import global_event_loop
+from _emerge.PollScheduler import PollScheduler
 from _emerge.SpawnProcess import SpawnProcess
 from _emerge.EbuildBuildDir import EbuildBuildDir
 from _emerge.EbuildIpcDaemon import EbuildIpcDaemon
-from _emerge.TaskScheduler import TaskScheduler
 
 class SleepProcess(ForkProcess):
        """
@@ -33,6 +35,7 @@ class IpcDaemonTestCase(TestCase):
        _SCHEDULE_TIMEOUT = 40000 # 40 seconds
 
        def testIpcDaemon(self):
+               event_loop = global_event_loop()
                tmpdir = tempfile.mkdtemp()
                build_dir = None
                try:
@@ -54,9 +57,8 @@ class IpcDaemonTestCase(TestCase):
                                env["__PORTAGE_TEST_HARDLINK_LOCKS"] = \
                                        os.environ["__PORTAGE_TEST_HARDLINK_LOCKS"]
 
-                       task_scheduler = TaskScheduler(max_jobs=2)
                        build_dir = EbuildBuildDir(
-                               scheduler=task_scheduler.sched_iface,
+                               scheduler=PollScheduler(event_loop=event_loop).sched_iface,
                                settings=env)
                        build_dir.lock()
                        ensure_dirs(env['PORTAGE_BUILDDIR'])
@@ -71,26 +73,23 @@ class IpcDaemonTestCase(TestCase):
                                commands = {'exit' : exit_command}
                                daemon = EbuildIpcDaemon(commands=commands,
                                        input_fifo=input_fifo,
-                                       output_fifo=output_fifo,
-                                       scheduler=task_scheduler.sched_iface)
+                                       output_fifo=output_fifo)
                                proc = SpawnProcess(
                                        args=[BASH_BINARY, "-c",
                                        '"$PORTAGE_BIN_PATH"/ebuild-ipc exit %d' % exitcode],
-                                       env=env, scheduler=task_scheduler.sched_iface)
+                                       env=env)
+                               task_scheduler = TaskScheduler(iter([daemon, proc]),
+                                       max_jobs=2, event_loop=event_loop)
 
                                self.received_command = False
                                def exit_command_callback():
                                        self.received_command = True
-                                       task_scheduler.clear()
-                                       task_scheduler.wait()
+                                       task_scheduler.cancel()
 
                                exit_command.reply_hook = exit_command_callback
                                start_time = time.time()
-                               task_scheduler.add(daemon)
-                               task_scheduler.add(proc)
-                               task_scheduler.run(timeout=self._SCHEDULE_TIMEOUT)
-                               task_scheduler.clear()
-                               task_scheduler.wait()
+                               self._run(event_loop, task_scheduler, self._SCHEDULE_TIMEOUT)
+
                                hardlock_cleanup(env['PORTAGE_BUILDDIR'],
                                        remove_all_locks=True)
 
@@ -101,7 +100,7 @@ class IpcDaemonTestCase(TestCase):
                                self.assertEqual(daemon.isAlive(), False)
                                self.assertEqual(exit_command.exitcode, exitcode)
 
-                       # Intentionally short timeout test for QueueScheduler.run()
+                       # Intentionally short timeout test for EventLoop/AsyncScheduler.
                        # Use a ridiculously long sleep_time_s in case the user's
                        # system is heavily loaded (see bug #436334).
                        sleep_time_s = 600     #600.000 seconds
@@ -116,20 +115,18 @@ class IpcDaemonTestCase(TestCase):
                                        scheduler=task_scheduler.sched_iface)
                                proc = SleepProcess(seconds=sleep_time_s,
                                        scheduler=task_scheduler.sched_iface)
+                               task_scheduler = TaskScheduler(iter([daemon, proc]),
+                                       max_jobs=2, event_loop=event_loop)
 
                                self.received_command = False
                                def exit_command_callback():
                                        self.received_command = True
-                                       task_scheduler.clear()
-                                       task_scheduler.wait()
+                                       task_scheduler.cancel()
 
                                exit_command.reply_hook = exit_command_callback
                                start_time = time.time()
-                               task_scheduler.add(daemon)
-                               task_scheduler.add(proc)
-                               task_scheduler.run(timeout=short_timeout_ms)
-                               task_scheduler.clear()
-                               task_scheduler.wait()
+                               self._run(event_loop, task_scheduler, short_timeout_ms)
+
                                hardlock_cleanup(env['PORTAGE_BUILDDIR'],
                                        remove_all_locks=True)
 
@@ -144,3 +141,20 @@ class IpcDaemonTestCase(TestCase):
                        if build_dir is not None:
                                build_dir.unlock()
                        shutil.rmtree(tmpdir)
+
+       def _timeout_callback(self):
+               self._timed_out = True
+
+       def _run(self, event_loop, task_scheduler, timeout):
+               self._timed_out = False
+               timeout_id = event_loop.timeout_add(timeout, self._timeout_callback)
+
+               try:
+                       task_scheduler.start()
+                       while not self._timed_out and task_scheduler.poll() is None:
+                               event_loop.iteration()
+                       if self._timed_out:
+                               task_scheduler.cancel()
+                       task_scheduler.wait()
+               finally:
+                       event_loop.source_remove(timeout_id)
index d6667b4e089ec6edc141cd11c3fde80c3deefd85..3772d797fc507c004fc3828ab033a5b35ae5d032 100644 (file)
@@ -4,7 +4,7 @@
 from portage import os
 from portage.tests import TestCase
 from portage.util._pty import _create_pty_or_pipe
-from _emerge.TaskScheduler import TaskScheduler
+from portage.util._async.TaskScheduler import TaskScheduler
 from _emerge.PipeReader import PipeReader
 from _emerge.SpawnProcess import SpawnProcess
 
@@ -37,25 +37,24 @@ class PipeReaderTestCase(TestCase):
                # in order to avoid issue 5380 with python3.
                master_file = os.fdopen(master_fd, 'rb', 0)
                slave_file = os.fdopen(slave_fd, 'wb', 0)
-               task_scheduler = TaskScheduler(max_jobs=2)
                producer = SpawnProcess(
                        args=["bash", "-c", self._echo_cmd % test_string],
-                       env=os.environ, fd_pipes={1:slave_fd},
-                       scheduler=task_scheduler.sched_iface)
-               task_scheduler.add(producer)
-               slave_file.close()
+                       env=os.environ, fd_pipes={1:slave_fd})
 
                consumer = PipeReader(
                        input_files={"producer" : master_file},
-                       scheduler=task_scheduler.sched_iface, _use_array=self._use_array)
+                       _use_array=self._use_array)
 
-               task_scheduler.add(consumer)
+               task_scheduler = TaskScheduler(iter([producer, consumer]), max_jobs=2)
 
                # This will ensure that both tasks have exited, which
                # is necessary to avoid "ResourceWarning: unclosed file"
                # warnings since Python 3.2 (and also ensures that we
                # don't leave any zombie child processes).
-               task_scheduler.run()
+               task_scheduler.start()
+               slave_file.close()
+               task_scheduler.wait()
+
                self.assertEqual(producer.returncode, os.EX_OK)
                self.assertEqual(consumer.returncode, os.EX_OK)
 
diff --git a/pym/portage/util/_async/TaskScheduler.py b/pym/portage/util/_async/TaskScheduler.py
new file mode 100644 (file)
index 0000000..b0ec7af
--- /dev/null
@@ -0,0 +1,22 @@
+# Copyright 2012 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from .AsyncScheduler import AsyncScheduler
+
+class TaskScheduler(AsyncScheduler):
+
+       __slots__ = ('_task_iter',)
+
+       """
+       A simple way to handle scheduling of AbstractPollTask instances. Simply
+       pass a task iterator into the constructor and call start(). Use the
+       poll, wait, or addExitListener methods to be notified when all of the
+       tasks have completed.
+       """
+
+       def __init__(self, task_iter, **kwargs):
+               AsyncScheduler.__init__(self, **kwargs)
+               self._task_iter = task_iter
+
+       def _next_task(self):
+               return next(self._task_iter)