From: Zac Medico Date: Sat, 12 Jul 2008 06:22:26 +0000 (-0000) Subject: Create a test case for the poll loop which uses the loop to read data from a X-Git-Tag: v2.2_rc2~123 X-Git-Url: http://git.tremily.us/?a=commitdiff_plain;h=31b18e72b3ed4077f31be583d9a83ad2de53df29;p=portage.git Create a test case for the poll loop which uses the loop to read data from a pipe and assert that the data written to the pipe is identical to the data read from the pipe. In order to implement this test, several useful classes have been added: * PipeReader Reads output from one or more files and saves it in memory, for retrieval via the getvalue() method. This is driven by the scheduler's poll() loop, so it runs entirely within the current process. * QueueScheduler Add instances of SequentialTaskQueue and then call run(). The run() method returns when no tasks remain. * TaskScheduler A simple way to handle scheduling of AsynchrousTask instances. Simply add tasks and call run(). The run() method returns when no tasks remain. svn path=/main/trunk/; revision=11022 --- diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py index 38f485aae..6f975449a 100644 --- a/pym/_emerge/__init__.py +++ b/pym/_emerge/__init__.py @@ -1498,13 +1498,17 @@ class AsynchronousTask(SlotObject): """ __slots__ = ("background", "cancelled", "returncode") + \ - ("_exit_listeners",) + ("_exit_listeners", "_start_listeners") def start(self): """ Start an asynchronous task and then return as soon as possible. """ - pass + self._start() + self._start_hook() + + def _start(self): + raise NotImplementedError(self) def isAlive(self): return self.returncode is None @@ -1529,6 +1533,25 @@ class AsynchronousTask(SlotObject): self.cancelled = True self.wait() + def addStartListener(self, f): + """ + The function will be called with one argument, a reference to self. + """ + if self._start_listeners is None: + self._start_listeners = [] + self._start_listeners.append(f) + + def removeStartListener(self, f): + self._start_listeners.remove(f) + + def _start_hook(self): + if self._start_listeners is not None: + start_listeners = self._start_listeners + self._start_listeners = None + + for f in start_listeners: + f(self) + def addExitListener(self, f): """ The function will be called with one argument, a reference to self. @@ -1558,7 +1581,63 @@ class AsynchronousTask(SlotObject): for f in exit_listeners: f(self) - self._exit_listeners = None + +class PipeReader(AsynchronousTask): + + """ + Reads output from one or more files and saves it in memory, + for retrieval via the getvalue() method. This is driven by + the scheduler's poll() loop, so it runs entirely within the + current process. + """ + + __slots__ = ("input_files", "scheduler",) + \ + ("pid", "registered", "_reg_ids", "_read_data") + + def _start(self): + self._reg_ids = set() + self._read_data = [] + for k, f in self.input_files.iteritems(): + fcntl.fcntl(f.fileno(), fcntl.F_SETFL, + fcntl.fcntl(f.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) + self._reg_ids.add(self.scheduler.register(f.fileno(), + PollConstants.POLLIN, self._output_handler)) + self.registered = True + + def isAlive(self): + return self.registered + + def _wait(self): + if self.returncode is not None: + return self.returncode + if self.registered: + self.scheduler.schedule(self._reg_ids) + self.returncode = os.EX_OK + return self.returncode + + def getvalue(self): + """Retrieve the entire contents""" + return "".join(self._read_data) + + def close(self): + """Free the memory buffer.""" + self._read_data = None + + def _output_handler(self, fd, event): + files = self.input_files + for f in files.itervalues(): + if fd == f.fileno(): + break + self._read_data.append(f.read()) + if not self._read_data[-1]: + for f in files.values(): + f.close() + self.registered = False + for reg_id in self._reg_ids: + self.scheduler.unregister(reg_id) + self.wait() + + return self.registered class CompositeTask(AsynchronousTask): @@ -1689,7 +1768,7 @@ class TaskSequence(CompositeTask): def add(self, task): self._task_queue.append(task) - def start(self): + def _start(self): self._start_next_task() def cancel(self): @@ -1802,7 +1881,7 @@ class SpawnProcess(SubProcess): _files_dict = slot_dict_class(_file_names, prefix="") _bufsize = 4096 - def start(self): + def _start(self): if self.cancelled: return @@ -1930,7 +2009,7 @@ class EbuildFetcher(SpawnProcess): __slots__ = ("fetchonly", "pkg",) - def start(self): + def _start(self): root_config = self.pkg.root_config portdb = root_config.trees["porttree"].dbapi @@ -1952,7 +2031,7 @@ class EbuildFetcher(SpawnProcess): self.args = fetch_args self.env = fetch_env - SpawnProcess.start(self) + SpawnProcess._start(self) class EbuildBuildDir(SlotObject): @@ -2036,7 +2115,7 @@ class EbuildBuild(CompositeTask): "prefetcher", "settings", "world_atom") + \ ("_build_dir", "_buildpkg", "_ebuild_path", "_tree") - def start(self): + def _start(self): logger = self.logger opts = self.opts @@ -2266,7 +2345,7 @@ class EbuildExecuter(CompositeTask): _phases = ("setup", "unpack", "compile", "test", "install") - def start(self): + def _start(self): pkg = self.pkg scheduler = self.scheduler tree = "porttree" @@ -2323,7 +2402,7 @@ class EbuildMetadataPhase(SubProcess): _bufsize = SpawnProcess._bufsize _metadata_fd = 9 - def start(self): + def _start(self): settings = self.settings settings.reset() ebuild_path = self.ebuild_path @@ -2408,7 +2487,7 @@ class EbuildPhase(SubProcess): _files_dict = slot_dict_class(_file_names, prefix="") _bufsize = 4096 - def start(self): + def _start(self): root_config = self.pkg.root_config tree = self.tree mydbapi = root_config.trees[tree].dbapi @@ -2588,7 +2667,7 @@ class EbuildBinpkg(EbuildPhase): """ __slots__ = ("_binpkg_tmpfile",) - def start(self): + def _start(self): self.phase = "package" self.tree = "porttree" pkg = self.pkg @@ -2607,7 +2686,7 @@ class EbuildBinpkg(EbuildPhase): settings.backup_changes("PORTAGE_BINPKG_TMPFILE") try: - EbuildPhase.start(self) + EbuildPhase._start(self) finally: settings.pop("PORTAGE_BINPKG_TMPFILE", None) @@ -2688,7 +2767,7 @@ class Binpkg(CompositeTask): ("_bintree", "_build_dir", "_ebuild_path", "_fetched_pkg", "_image_dir", "_infloc", "_pkg_path", "_tree", "_verify") - def start(self): + def _start(self): pkg = self.pkg settings = self.settings @@ -2945,7 +3024,7 @@ class BinpkgFetcher(SpawnProcess): pkg = self.pkg self.pkg_path = pkg.root_config.trees["bintree"].getname(pkg.cpv) - def start(self): + def _start(self): if self.cancelled: return @@ -3004,7 +3083,7 @@ class BinpkgFetcher(SpawnProcess): self.args = fetch_args self.env = fetch_env - SpawnProcess.start(self) + SpawnProcess._start(self) def _set_returncode(self, wait_retval): SpawnProcess._set_returncode(self, wait_retval) @@ -3038,7 +3117,7 @@ class BinpkgFetcher(SpawnProcess): class BinpkgVerifier(AsynchronousTask): __slots__ = ("pkg",) - def start(self): + def _start(self): """ Note: Unlike a normal AsynchronousTask.start() method, this one does all work is synchronously. The returncode @@ -3077,14 +3156,14 @@ class BinpkgExtractorAsync(SpawnProcess): _shell_binary = portage.const.BASH_BINARY - def start(self): + def _start(self): self.args = [self._shell_binary, "-c", "bzip2 -dqc -- %s | tar -xp -C %s -f -" % \ (portage._shell_quote(self.pkg_path), portage._shell_quote(self.image_dir))] self.env = self.pkg.root_config.settings.environ() - SpawnProcess.start(self) + SpawnProcess._start(self) class MergeListItem(CompositeTask): @@ -3100,7 +3179,7 @@ class MergeListItem(CompositeTask): "settings", "world_atom") + \ ("_install_task",) - def start(self): + def _start(self): pkg = self.pkg build_opts = self.build_opts @@ -3213,7 +3292,7 @@ class PackageMerge(AsynchronousTask): __slots__ = ("merge",) - def start(self): + def _start(self): self.returncode = self.merge.merge() self.wait() @@ -7729,7 +7808,8 @@ class PollSelectAdapter(PollConstants): class SequentialTaskQueue(SlotObject): - __slots__ = ("max_jobs", "running_tasks", "_task_queue", "_scheduling") + __slots__ = ("auto_schedule", "max_jobs", "running_tasks") + \ + ("_task_queue", "_scheduling") def __init__(self, **kwargs): SlotObject.__init__(self, **kwargs) @@ -7740,11 +7820,13 @@ class SequentialTaskQueue(SlotObject): def add(self, task): self._task_queue.append(task) - self.schedule() + if self.auto_schedule: + self.schedule() def addFront(self, task): self._task_queue.appendleft(task) - self.schedule() + if self.auto_schedule: + self.schedule() def schedule(self): @@ -7767,16 +7849,15 @@ class SequentialTaskQueue(SlotObject): if hasattr(task, "registered") and task.registered: continue if task.poll() is not None: - running_tasks.remove(task) state_changed = True while task_queue and (len(running_tasks) < max_jobs): task = task_queue.popleft() cancelled = getattr(task, "cancelled", None) if not cancelled: + running_tasks.add(task) task.addExitListener(self._task_exit) task.start() - running_tasks.add(task) state_changed = True self._scheduling = False @@ -7784,7 +7865,9 @@ class SequentialTaskQueue(SlotObject): return state_changed def _task_exit(self, task): - self.schedule() + self.running_tasks.remove(task) + if self.auto_schedule: + self.schedule() def clear(self): self._task_queue.clear() @@ -7799,7 +7882,10 @@ class SequentialTaskQueue(SlotObject): def __len__(self): return len(self._task_queue) + len(self.running_tasks) -class PollLoop(object): +class PollScheduler(object): + + class _sched_iface_class(SlotObject): + __slots__ = ("register", "schedule", "unregister") def __init__(self): self._max_jobs = 1 @@ -7814,15 +7900,18 @@ class PollLoop(object): except AttributeError: self._poll = PollSelectAdapter() + def _running_job_count(self): + return self._jobs + def _can_add_job(self): - jobs = self._jobs max_jobs = self._max_jobs max_load = self._max_load - if self._jobs >= self._max_jobs: + if self._running_job_count() >= self._max_jobs: return False - if max_load is not None and max_jobs > 1 and self._jobs > 1: + if max_load is not None and max_jobs > 1 and \ + self._running_job_count() > 1: try: avg1, avg5, avg15 = os.getloadavg() except OSError, e: @@ -7872,7 +7961,7 @@ class PollLoop(object): del self._poll_event_handlers[f] del self._poll_event_handler_ids[reg_id] - def _schedule(self, wait_id): + def _schedule(self, wait_ids): """ Schedule until wait_id is not longer registered for poll() events. @@ -7883,12 +7972,110 @@ class PollLoop(object): handler_ids = self._poll_event_handler_ids poll = self._poll.poll - while wait_id in handler_ids: + if isinstance(wait_ids, int): + wait_ids = frozenset([wait_ids]) + + while wait_ids.intersection(handler_ids): for f, event in poll(): handler, reg_id = event_handlers[f] handler(f, event) -class Scheduler(PollLoop): +class QueueScheduler(PollScheduler): + + """ + Add instances of SequentialTaskQueue and then call run(). The + run() method returns when no tasks remain. + """ + + def __init__(self, max_jobs=None, max_load=None): + PollScheduler.__init__(self) + + if max_jobs is None: + max_jobs = 1 + + self._max_jobs = max_jobs + self._max_load = max_load + self.sched_iface = self._sched_iface_class( + register=self._register, + schedule=self._schedule, + unregister=self._unregister) + + self._queues = [] + self._schedule_listeners = [] + + def add(self, q): + self._queues.append(q) + + def remove(self, q): + self._queues.remove(q) + + def run(self): + + while self._schedule_tasks(): + self._poll_loop() + + while self._running_job_count(): + self._poll_loop() + + def _schedule_tasks(self): + """ + @rtype: bool + @returns: True if there may be remaining tasks to schedule, + False otherwise. + """ + while self._can_add_job(): + n = self._max_jobs - self._running_job_count() + if n < 1: + break + + if not self._start_next_job(n): + return False + + for q in self._queues: + if q: + return True + return False + + def _running_job_count(self): + job_count = 0 + for q in self._queues: + job_count += len(q.running_tasks) + self._jobs = job_count + return job_count + + def _start_next_job(self, n=1): + started_count = 0 + for q in self._queues: + initial_job_count = len(q.running_tasks) + q.schedule() + final_job_count = len(q.running_tasks) + if final_job_count > initial_job_count: + started_count += (final_job_count - initial_job_count) + if started_count >= n: + break + return started_count + +class TaskScheduler(object): + + """ + A simple way to handle scheduling of AsynchrousTask instances. Simply + add tasks and call run(). The run() method returns when no tasks remain. + """ + + def __init__(self, max_jobs=None, max_load=None): + self._queue = SequentialTaskQueue(max_jobs=max_jobs) + self._scheduler = QueueScheduler(max_jobs=max_jobs, max_load=max_load) + self.sched_iface = self._scheduler.sched_iface + self.run = self._scheduler.run + self._scheduler.add(self._queue) + + def add(self, task): + self._queue.add(task) + + def run(self): + self._scheduler.schedule() + +class Scheduler(PollScheduler): _opts_ignore_blockers = \ frozenset(["--buildpkgonly", @@ -7930,7 +8117,7 @@ class Scheduler(PollLoop): def __init__(self, settings, trees, mtimedb, myopts, spinner, mergelist, favorites, digraph): - PollLoop.__init__(self) + PollScheduler.__init__(self) self.settings = settings self.target_root = settings["ROOT"] self.trees = trees @@ -7970,7 +8157,8 @@ class Scheduler(PollLoop): self._task_queues = self._task_queues_class() for k in self._task_queues.allowed_keys: - setattr(self._task_queues, k, SequentialTaskQueue()) + setattr(self._task_queues, k, + SequentialTaskQueue(auto_schedule=True)) self._prefetchers = weakref.WeakValueDictionary() self._pkg_queue = [] @@ -8738,13 +8926,10 @@ class Scheduler(PollLoop): return pkg -class MetadataRegen(PollLoop): - - class _sched_iface_class(SlotObject): - __slots__ = ("register", "schedule", "unregister") +class MetadataRegen(PollScheduler): def __init__(self, portdb, max_jobs=None, max_load=None): - PollLoop.__init__(self) + PollScheduler.__init__(self) self._portdb = portdb if max_jobs is None: diff --git a/pym/portage/tests/process/__init__.py b/pym/portage/tests/process/__init__.py new file mode 100644 index 000000000..a4a87a461 --- /dev/null +++ b/pym/portage/tests/process/__init__.py @@ -0,0 +1,3 @@ +# Copyright 1998-2008 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 +# $Id: __init__.py 6870 2007-06-19 07:22:18Z zmedico $ diff --git a/pym/portage/tests/process/__test__ b/pym/portage/tests/process/__test__ new file mode 100644 index 000000000..e69de29bb diff --git a/pym/portage/tests/process/test_poll.py b/pym/portage/tests/process/test_poll.py new file mode 100644 index 000000000..45c0ad03e --- /dev/null +++ b/pym/portage/tests/process/test_poll.py @@ -0,0 +1,43 @@ +# Copyright 1998-2008 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 +# $Id: test_spawn.py 8474 2007-11-09 03:35:38Z zmedico $ + +import errno, os, sys +from portage.tests import TestCase +from _emerge import PipeReader, SpawnProcess, TaskScheduler + +class PollTestCase(TestCase): + + def testPipeReader(self): + """ + Use a poll loop to read data from a pipe and assert that + the data written to the pipe is identical to the data + read from the pipe. + """ + + test_string = 2 * "blah blah blah\n" + + master_fd, slave_fd = os.pipe() + master_file = os.fdopen(master_fd, 'r') + + task_scheduler = TaskScheduler(max_jobs=2) + scheduler = task_scheduler.sched_iface + + producer = SpawnProcess( + args=["bash", "-c", "echo -n '%s'" % test_string], + fd_pipes={1:slave_fd}, scheduler=scheduler) + + consumer = PipeReader( + input_files={"producer" : master_file}, + scheduler=scheduler) + + task_scheduler.add(producer) + task_scheduler.add(consumer) + + def producer_start_cb(task): + os.close(slave_fd) + + producer.addStartListener(producer_start_cb) + task_scheduler.run() + + self.assertEqual(test_string, consumer.getvalue())