Create a test case for the poll loop which uses the loop to read data from a
authorZac Medico <zmedico@gentoo.org>
Sat, 12 Jul 2008 06:22:26 +0000 (06:22 -0000)
committerZac Medico <zmedico@gentoo.org>
Sat, 12 Jul 2008 06:22:26 +0000 (06:22 -0000)
pipe and assert that the data written to the pipe is identical to the data
read from the pipe. In order to implement this test, several useful classes
have been added:

 * PipeReader

     Reads output from one or more files and saves it in memory,
     for retrieval via the getvalue() method. This is driven by
     the scheduler's poll() loop, so it runs entirely within the
     current process.

 * QueueScheduler

     Add instances of SequentialTaskQueue and then call run().
     The run() method returns when no tasks remain.

 * TaskScheduler

     A simple way to handle scheduling of AsynchrousTask instances. Simply
     add tasks and call run(). The run() method returns when no tasks remain.

svn path=/main/trunk/; revision=11022

pym/_emerge/__init__.py
pym/portage/tests/process/__init__.py [new file with mode: 0644]
pym/portage/tests/process/__test__ [new file with mode: 0644]
pym/portage/tests/process/test_poll.py [new file with mode: 0644]

index 38f485aae2e4711ca316799a16492e8e9b7c220c..6f975449abcca4f00a99448eb8612d9605960a6c 100644 (file)
@@ -1498,13 +1498,17 @@ class AsynchronousTask(SlotObject):
        """
 
        __slots__ = ("background", "cancelled", "returncode") + \
-               ("_exit_listeners",)
+               ("_exit_listeners", "_start_listeners")
 
        def start(self):
                """
                Start an asynchronous task and then return as soon as possible.
                """
-               pass
+               self._start()
+               self._start_hook()
+
+       def _start(self):
+               raise NotImplementedError(self)
 
        def isAlive(self):
                return self.returncode is None
@@ -1529,6 +1533,25 @@ class AsynchronousTask(SlotObject):
                self.cancelled = True
                self.wait()
 
+       def addStartListener(self, f):
+               """
+               The function will be called with one argument, a reference to self.
+               """
+               if self._start_listeners is None:
+                       self._start_listeners = []
+               self._start_listeners.append(f)
+
+       def removeStartListener(self, f):
+               self._start_listeners.remove(f)
+
+       def _start_hook(self):
+               if self._start_listeners is not None:
+                       start_listeners = self._start_listeners
+                       self._start_listeners = None
+
+                       for f in start_listeners:
+                               f(self)
+
        def addExitListener(self, f):
                """
                The function will be called with one argument, a reference to self.
@@ -1558,7 +1581,63 @@ class AsynchronousTask(SlotObject):
 
                        for f in exit_listeners:
                                f(self)
-                       self._exit_listeners = None
+
+class PipeReader(AsynchronousTask):
+
+       """
+       Reads output from one or more files and saves it in memory,
+       for retrieval via the getvalue() method. This is driven by
+       the scheduler's poll() loop, so it runs entirely within the
+       current process.
+       """
+
+       __slots__ = ("input_files", "scheduler",) + \
+               ("pid", "registered", "_reg_ids", "_read_data")
+
+       def _start(self):
+               self._reg_ids = set()
+               self._read_data = []
+               for k, f in self.input_files.iteritems():
+                       fcntl.fcntl(f.fileno(), fcntl.F_SETFL,
+                               fcntl.fcntl(f.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK)
+                       self._reg_ids.add(self.scheduler.register(f.fileno(),
+                               PollConstants.POLLIN, self._output_handler))
+               self.registered = True
+
+       def isAlive(self):
+               return self.registered
+
+       def _wait(self):
+               if self.returncode is not None:
+                       return self.returncode
+               if self.registered:
+                       self.scheduler.schedule(self._reg_ids)
+               self.returncode = os.EX_OK
+               return self.returncode
+
+       def getvalue(self):
+               """Retrieve the entire contents"""
+               return "".join(self._read_data)
+
+       def close(self):
+               """Free the memory buffer."""
+               self._read_data = None
+
+       def _output_handler(self, fd, event):
+               files = self.input_files
+               for f in files.itervalues():
+                       if fd == f.fileno():
+                               break
+               self._read_data.append(f.read())
+               if not self._read_data[-1]:
+                       for f in files.values():
+                               f.close()
+                       self.registered = False
+                       for reg_id in self._reg_ids:
+                               self.scheduler.unregister(reg_id)
+                       self.wait()
+
+               return self.registered
 
 class CompositeTask(AsynchronousTask):
 
@@ -1689,7 +1768,7 @@ class TaskSequence(CompositeTask):
        def add(self, task):
                self._task_queue.append(task)
 
-       def start(self):
+       def _start(self):
                self._start_next_task()
 
        def cancel(self):
@@ -1802,7 +1881,7 @@ class SpawnProcess(SubProcess):
        _files_dict = slot_dict_class(_file_names, prefix="")
        _bufsize = 4096
 
-       def start(self):
+       def _start(self):
 
                if self.cancelled:
                        return
@@ -1930,7 +2009,7 @@ class EbuildFetcher(SpawnProcess):
 
        __slots__ = ("fetchonly", "pkg",)
 
-       def start(self):
+       def _start(self):
 
                root_config = self.pkg.root_config
                portdb = root_config.trees["porttree"].dbapi
@@ -1952,7 +2031,7 @@ class EbuildFetcher(SpawnProcess):
 
                self.args = fetch_args
                self.env = fetch_env
-               SpawnProcess.start(self)
+               SpawnProcess._start(self)
 
 class EbuildBuildDir(SlotObject):
 
@@ -2036,7 +2115,7 @@ class EbuildBuild(CompositeTask):
                "prefetcher", "settings", "world_atom") + \
                ("_build_dir", "_buildpkg", "_ebuild_path", "_tree")
 
-       def start(self):
+       def _start(self):
 
                logger = self.logger
                opts = self.opts
@@ -2266,7 +2345,7 @@ class EbuildExecuter(CompositeTask):
 
        _phases = ("setup", "unpack", "compile", "test", "install")
 
-       def start(self):
+       def _start(self):
                pkg = self.pkg
                scheduler = self.scheduler
                tree = "porttree"
@@ -2323,7 +2402,7 @@ class EbuildMetadataPhase(SubProcess):
        _bufsize = SpawnProcess._bufsize
        _metadata_fd = 9
 
-       def start(self):
+       def _start(self):
                settings = self.settings
                settings.reset()
                ebuild_path = self.ebuild_path
@@ -2408,7 +2487,7 @@ class EbuildPhase(SubProcess):
        _files_dict = slot_dict_class(_file_names, prefix="")
        _bufsize = 4096
 
-       def start(self):
+       def _start(self):
                root_config = self.pkg.root_config
                tree = self.tree
                mydbapi = root_config.trees[tree].dbapi
@@ -2588,7 +2667,7 @@ class EbuildBinpkg(EbuildPhase):
        """
        __slots__ = ("_binpkg_tmpfile",)
 
-       def start(self):
+       def _start(self):
                self.phase = "package"
                self.tree = "porttree"
                pkg = self.pkg
@@ -2607,7 +2686,7 @@ class EbuildBinpkg(EbuildPhase):
                settings.backup_changes("PORTAGE_BINPKG_TMPFILE")
 
                try:
-                       EbuildPhase.start(self)
+                       EbuildPhase._start(self)
                finally:
                        settings.pop("PORTAGE_BINPKG_TMPFILE", None)
 
@@ -2688,7 +2767,7 @@ class Binpkg(CompositeTask):
                ("_bintree", "_build_dir", "_ebuild_path", "_fetched_pkg",
                "_image_dir", "_infloc", "_pkg_path", "_tree", "_verify")
 
-       def start(self):
+       def _start(self):
 
                pkg = self.pkg
                settings = self.settings
@@ -2945,7 +3024,7 @@ class BinpkgFetcher(SpawnProcess):
                pkg = self.pkg
                self.pkg_path = pkg.root_config.trees["bintree"].getname(pkg.cpv)
 
-       def start(self):
+       def _start(self):
 
                if self.cancelled:
                        return
@@ -3004,7 +3083,7 @@ class BinpkgFetcher(SpawnProcess):
 
                self.args = fetch_args
                self.env = fetch_env
-               SpawnProcess.start(self)
+               SpawnProcess._start(self)
 
        def _set_returncode(self, wait_retval):
                SpawnProcess._set_returncode(self, wait_retval)
@@ -3038,7 +3117,7 @@ class BinpkgFetcher(SpawnProcess):
 class BinpkgVerifier(AsynchronousTask):
        __slots__ = ("pkg",)
 
-       def start(self):
+       def _start(self):
                """
                Note: Unlike a normal AsynchronousTask.start() method,
                this one does all work is synchronously. The returncode
@@ -3077,14 +3156,14 @@ class BinpkgExtractorAsync(SpawnProcess):
 
        _shell_binary = portage.const.BASH_BINARY
 
-       def start(self):
+       def _start(self):
                self.args = [self._shell_binary, "-c",
                        "bzip2 -dqc -- %s | tar -xp -C %s -f -" % \
                        (portage._shell_quote(self.pkg_path),
                        portage._shell_quote(self.image_dir))]
 
                self.env = self.pkg.root_config.settings.environ()
-               SpawnProcess.start(self)
+               SpawnProcess._start(self)
 
 class MergeListItem(CompositeTask):
 
@@ -3100,7 +3179,7 @@ class MergeListItem(CompositeTask):
                "settings", "world_atom") + \
                ("_install_task",)
 
-       def start(self):
+       def _start(self):
 
                pkg = self.pkg
                build_opts = self.build_opts
@@ -3213,7 +3292,7 @@ class PackageMerge(AsynchronousTask):
 
        __slots__ = ("merge",)
 
-       def start(self):
+       def _start(self):
                self.returncode = self.merge.merge()
                self.wait()
 
@@ -7729,7 +7808,8 @@ class PollSelectAdapter(PollConstants):
 
 class SequentialTaskQueue(SlotObject):
 
-       __slots__ = ("max_jobs", "running_tasks", "_task_queue", "_scheduling")
+       __slots__ = ("auto_schedule", "max_jobs", "running_tasks") + \
+               ("_task_queue", "_scheduling")
 
        def __init__(self, **kwargs):
                SlotObject.__init__(self, **kwargs)
@@ -7740,11 +7820,13 @@ class SequentialTaskQueue(SlotObject):
 
        def add(self, task):
                self._task_queue.append(task)
-               self.schedule()
+               if self.auto_schedule:
+                       self.schedule()
 
        def addFront(self, task):
                self._task_queue.appendleft(task)
-               self.schedule()
+               if self.auto_schedule:
+                       self.schedule()
 
        def schedule(self):
 
@@ -7767,16 +7849,15 @@ class SequentialTaskQueue(SlotObject):
                        if hasattr(task, "registered") and task.registered:
                                continue
                        if task.poll() is not None:
-                               running_tasks.remove(task)
                                state_changed = True
 
                while task_queue and (len(running_tasks) < max_jobs):
                        task = task_queue.popleft()
                        cancelled = getattr(task, "cancelled", None)
                        if not cancelled:
+                               running_tasks.add(task)
                                task.addExitListener(self._task_exit)
                                task.start()
-                               running_tasks.add(task)
                        state_changed = True
 
                self._scheduling = False
@@ -7784,7 +7865,9 @@ class SequentialTaskQueue(SlotObject):
                return state_changed
 
        def _task_exit(self, task):
-               self.schedule()
+               self.running_tasks.remove(task)
+               if self.auto_schedule:
+                       self.schedule()
 
        def clear(self):
                self._task_queue.clear()
@@ -7799,7 +7882,10 @@ class SequentialTaskQueue(SlotObject):
        def __len__(self):
                return len(self._task_queue) + len(self.running_tasks)
 
-class PollLoop(object):
+class PollScheduler(object):
+
+       class _sched_iface_class(SlotObject):
+               __slots__ = ("register", "schedule", "unregister")
 
        def __init__(self):
                self._max_jobs = 1
@@ -7814,15 +7900,18 @@ class PollLoop(object):
                except AttributeError:
                        self._poll = PollSelectAdapter()
 
+       def _running_job_count(self):
+               return self._jobs
+
        def _can_add_job(self):
-               jobs = self._jobs
                max_jobs = self._max_jobs
                max_load = self._max_load
 
-               if self._jobs >= self._max_jobs:
+               if self._running_job_count() >= self._max_jobs:
                        return False
 
-               if max_load is not None and max_jobs > 1 and self._jobs > 1:
+               if max_load is not None and max_jobs > 1 and \
+                       self._running_job_count() > 1:
                        try:
                                avg1, avg5, avg15 = os.getloadavg()
                        except OSError, e:
@@ -7872,7 +7961,7 @@ class PollLoop(object):
                del self._poll_event_handlers[f]
                del self._poll_event_handler_ids[reg_id]
 
-       def _schedule(self, wait_id):
+       def _schedule(self, wait_ids):
                """
                Schedule until wait_id is not longer registered
                for poll() events.
@@ -7883,12 +7972,110 @@ class PollLoop(object):
                handler_ids = self._poll_event_handler_ids
                poll = self._poll.poll
 
-               while wait_id in handler_ids:
+               if isinstance(wait_ids, int):
+                       wait_ids = frozenset([wait_ids])
+
+               while wait_ids.intersection(handler_ids):
                        for f, event in poll():
                                handler, reg_id = event_handlers[f]
                                handler(f, event)
 
-class Scheduler(PollLoop):
+class QueueScheduler(PollScheduler):
+
+       """
+       Add instances of SequentialTaskQueue and then call run(). The
+       run() method returns when no tasks remain.
+       """
+
+       def __init__(self, max_jobs=None, max_load=None):
+               PollScheduler.__init__(self)
+
+               if max_jobs is None:
+                       max_jobs = 1
+
+               self._max_jobs = max_jobs
+               self._max_load = max_load
+               self.sched_iface = self._sched_iface_class(
+                       register=self._register,
+                       schedule=self._schedule,
+                       unregister=self._unregister)
+
+               self._queues = []
+               self._schedule_listeners = []
+
+       def add(self, q):
+               self._queues.append(q)
+
+       def remove(self, q):
+               self._queues.remove(q)
+
+       def run(self):
+
+               while self._schedule_tasks():
+                       self._poll_loop()
+
+               while self._running_job_count():
+                       self._poll_loop()
+
+       def _schedule_tasks(self):
+               """
+               @rtype: bool
+               @returns: True if there may be remaining tasks to schedule,
+                       False otherwise.
+               """
+               while self._can_add_job():
+                       n = self._max_jobs - self._running_job_count()
+                       if n < 1:
+                               break
+
+                       if not self._start_next_job(n):
+                               return False
+
+               for q in self._queues:
+                       if q:
+                               return True
+               return False
+
+       def _running_job_count(self):
+               job_count = 0
+               for q in self._queues:
+                       job_count += len(q.running_tasks)
+               self._jobs = job_count
+               return job_count
+
+       def _start_next_job(self, n=1):
+               started_count = 0
+               for q in self._queues:
+                       initial_job_count = len(q.running_tasks)
+                       q.schedule()
+                       final_job_count = len(q.running_tasks)
+                       if final_job_count > initial_job_count:
+                               started_count += (final_job_count - initial_job_count)
+                       if started_count >= n:
+                               break
+               return started_count
+
+class TaskScheduler(object):
+
+       """
+       A simple way to handle scheduling of AsynchrousTask instances. Simply
+       add tasks and call run(). The run() method returns when no tasks remain.
+       """
+
+       def __init__(self, max_jobs=None, max_load=None):
+               self._queue = SequentialTaskQueue(max_jobs=max_jobs)
+               self._scheduler = QueueScheduler(max_jobs=max_jobs, max_load=max_load)
+               self.sched_iface = self._scheduler.sched_iface
+               self.run = self._scheduler.run
+               self._scheduler.add(self._queue)
+
+       def add(self, task):
+               self._queue.add(task)
+
+       def run(self):
+               self._scheduler.schedule()
+
+class Scheduler(PollScheduler):
 
        _opts_ignore_blockers = \
                frozenset(["--buildpkgonly",
@@ -7930,7 +8117,7 @@ class Scheduler(PollLoop):
 
        def __init__(self, settings, trees, mtimedb, myopts,
                spinner, mergelist, favorites, digraph):
-               PollLoop.__init__(self)
+               PollScheduler.__init__(self)
                self.settings = settings
                self.target_root = settings["ROOT"]
                self.trees = trees
@@ -7970,7 +8157,8 @@ class Scheduler(PollLoop):
 
                self._task_queues = self._task_queues_class()
                for k in self._task_queues.allowed_keys:
-                       setattr(self._task_queues, k, SequentialTaskQueue())
+                       setattr(self._task_queues, k,
+                               SequentialTaskQueue(auto_schedule=True))
 
                self._prefetchers = weakref.WeakValueDictionary()
                self._pkg_queue = []
@@ -8738,13 +8926,10 @@ class Scheduler(PollLoop):
 
                return pkg
 
-class MetadataRegen(PollLoop):
-
-       class _sched_iface_class(SlotObject):
-               __slots__ = ("register", "schedule", "unregister")
+class MetadataRegen(PollScheduler):
 
        def __init__(self, portdb, max_jobs=None, max_load=None):
-               PollLoop.__init__(self)
+               PollScheduler.__init__(self)
                self._portdb = portdb
 
                if max_jobs is None:
diff --git a/pym/portage/tests/process/__init__.py b/pym/portage/tests/process/__init__.py
new file mode 100644 (file)
index 0000000..a4a87a4
--- /dev/null
@@ -0,0 +1,3 @@
+# Copyright 1998-2008 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+# $Id: __init__.py 6870 2007-06-19 07:22:18Z zmedico $
diff --git a/pym/portage/tests/process/__test__ b/pym/portage/tests/process/__test__
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/pym/portage/tests/process/test_poll.py b/pym/portage/tests/process/test_poll.py
new file mode 100644 (file)
index 0000000..45c0ad0
--- /dev/null
@@ -0,0 +1,43 @@
+# Copyright 1998-2008 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+# $Id: test_spawn.py 8474 2007-11-09 03:35:38Z zmedico $
+
+import errno, os, sys
+from portage.tests import TestCase
+from _emerge import PipeReader, SpawnProcess, TaskScheduler
+
+class PollTestCase(TestCase):
+
+       def testPipeReader(self):
+               """
+               Use a poll loop to read data from a pipe and assert that
+               the data written to the pipe is identical to the data
+               read from the pipe.
+               """
+
+               test_string = 2 * "blah blah blah\n"
+
+               master_fd, slave_fd = os.pipe()
+               master_file = os.fdopen(master_fd, 'r')
+
+               task_scheduler = TaskScheduler(max_jobs=2)
+               scheduler = task_scheduler.sched_iface
+
+               producer = SpawnProcess(
+                       args=["bash", "-c", "echo -n '%s'" % test_string],
+                       fd_pipes={1:slave_fd}, scheduler=scheduler)
+
+               consumer = PipeReader(
+                       input_files={"producer" : master_file},
+                       scheduler=scheduler)
+
+               task_scheduler.add(producer)
+               task_scheduler.add(consumer)
+
+               def producer_start_cb(task):
+                       os.close(slave_fd)
+
+               producer.addStartListener(producer_start_cb)
+               task_scheduler.run()
+
+               self.assertEqual(test_string, consumer.getvalue())