From 3acec46d1999346af877e0577bbe20401a4da21b Mon Sep 17 00:00:00 2001 From: stevenknight Date: Thu, 12 Jul 2001 11:49:19 +0000 Subject: [PATCH] create the Job class git-svn-id: http://scons.tigris.org/svn/scons/trunk@7 fdb21ef1-2011-0410-befe-b5e4ea1792b1 --- runtest.sh | 3 + src/MANIFEST | 1 + src/scons/Job.py | 163 ++++++++++++++++++++++++++++++++++++++ src/scons/JobTests.py | 180 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 347 insertions(+) create mode 100644 runtest.sh create mode 100644 src/scons/Job.py create mode 100644 src/scons/JobTests.py diff --git a/runtest.sh b/runtest.sh new file mode 100644 index 00000000..34bf0040 --- /dev/null +++ b/runtest.sh @@ -0,0 +1,3 @@ +# This script makes it possible to run a test without building first +export PYTHONPATH=`pwd`/src +python $1 diff --git a/src/MANIFEST b/src/MANIFEST index 508f1984..35c3b3cf 100644 --- a/src/MANIFEST +++ b/src/MANIFEST @@ -3,6 +3,7 @@ scons/__init__.py scons/Builder.py scons/Defaults.py scons/Environment.py +scons/Job.py scons/Node/__init__.py scons/Node/FS.py scons/Sig/__init__.py diff --git a/src/scons/Job.py b/src/scons/Job.py new file mode 100644 index 00000000..6da7e406 --- /dev/null +++ b/src/scons/Job.py @@ -0,0 +1,163 @@ +"""scons.Job + +This module defines the Serial and Parallel classes that execute tasks to +complete a build. + +""" + +__revision__ = "Job.py __REVISION__ __DATE__ __DEVELOPER__" + +class Serial: + """This class is used to execute tasks in series, and is more efficient + than Parallel, but is only appropriate for non-parallel builds. Only + one instance of this class should be in existence at a time. + + This class is not thread safe. + """ + + def __init__(self, taskmaster): + """Create a new serial job given a taskmaster. + + The taskmaster's next_task() method should return the next task that + needs to be executed, or None if there are no more tasks. The + taskmaster's executed() method will be called for each task when it is + finished being executed. The taskmaster's is_blocked() method will not + be called. + """ + + self.taskmaster = taskmaster + + def start(self): + + """Start the job. This will begin pulling tasks from the taskmaster + and executing them, and return when there are no more tasks. """ + + while 1: + task = self.taskmaster.next_task() + + if task is None: + break + + task.execute() + self.taskmaster.executed(task) + + def stop(self): + """Serial jobs are always finished when start() returns, so there + is nothing to do here""" + + pass + + def wait(self): + """Serial jobs are always finished when start() returns, so there + is nothing to do here""" + pass + + +# The will hold a condition variable once the first parallel task +# is created. +cv = None + +class Parallel: + """This class is used to execute tasks in parallel, and is less + efficient than Serial, but is appropriate for parallel builds. Create + an instance of this class for each job or thread you want. + + This class is thread safe. + """ + + + def __init__(self, taskmaster): + + """Create a new parallel job given a taskmaster. Multiple jobs will + be using the taskmaster in parallel, but all method calls to taskmaster + methods are serialized by the jobs themselves. + + The taskmaster's next_task() method should return the next task + that needs to be executed, or None if there are no more tasks. The + taskmaster's executed() method will be called for each task when it + is finished being executed. The taskmaster's is_blocked() method + should return true iff there are more tasks, but they can't be + executed until one or more other tasks have been + executed. next_task() will be called iff is_blocked() returned + false. + + Note: calls to taskmaster are serialized, but calls to execute() on + distinct tasks are not serialized, because that is the whole point + of parallel jobs: they can execute multiple tasks + simultaneously. """ + + global cv + + # import threading here so that everything in the Job module + # but the Parallel class will work if the interpreter doesn't + # support threads + import threading + + self.taskmaster = taskmaster + self.thread = threading.Thread(None, self.__run) + self.stop_running = 0 + + if cv is None: + cv = threading.Condition() + + def start(self): + """Start the job. This will spawn a thread that will begin pulling + tasks from the task master and executing them. This method returns + immediately and doesn't wait for the jobs to be executed. + + To stop the job, call stop(). + To wait for the job to finish, call wait(). + """ + self.thread.start() + + def stop(self): + """Stop the job. This will cause the job to finish after the + currently executing task is done. A job that has been stopped can + not be restarted. + + To wait for the job to finish, call wait(). + """ + self.stop_running = 1 + + def wait(self): + """Wait for the job to finish. A job is finished when either there + are no more tasks or the job has been stopped and it is no longer + executing a task. + + This method should only be called after start() has been called. + + To stop the job, call stop(). + """ + self.thread.join() + + def __run(self): + """private method that actually executes the tasks""" + + cv.acquire() + + try: + + while 1: + while self.taskmaster.is_blocked(): + cv.wait(None) + + task = self.taskmaster.next_task() + + if task == None or self.stop_running: + break + + cv.release() + task.execute() + cv.acquire() + + self.taskmaster.executed(task) + + if not self.taskmaster.is_blocked(): + cv.notifyAll() + + finally: + cv.release() + + + + diff --git a/src/scons/JobTests.py b/src/scons/JobTests.py new file mode 100644 index 00000000..2bd2bf04 --- /dev/null +++ b/src/scons/JobTests.py @@ -0,0 +1,180 @@ +__revision__ = "JobTests.py __REVISION__ __DATE__ __DEVELOPER__" + +import unittest +import random +import math +import scons.Job +import sys + +# a large number +num_sines = 10000 + +# how many parallel jobs to perform for the test +num_jobs = 11 + +# how many tasks to perform for the test +num_tasks = num_jobs*5 + +class DummyLock: + "fake lock class to use if threads are not supported" + def acquire(self): + pass + + def release(self): + pass + +class NoThreadsException: + "raised by the ParallelTestCase if threads are not supported" + + def __str__(self): + return "the interpreter doesn't support threads" + +class Task: + """A dummy task class for testing purposes.""" + + def __init__(self, i, taskmaster): + self.i = i + self.taskmaster = taskmaster + self.was_executed = 0 + + def execute(self): + self.taskmaster.guard.acquire() + self.taskmaster.begin_list.append(self.i) + self.taskmaster.guard.release() + + # do something that will take some random amount of time: + for i in range(random.randrange(0, num_sines, 1)): + x = math.sin(i) + + self.was_executed = 1 + + self.taskmaster.guard.acquire() + self.taskmaster.end_list.append(self.i) + self.taskmaster.guard.release() + +class Taskmaster: + """A dummy taskmaster class for testing the job classes.""" + + def __init__(self, n, test_case): + """n is the number of dummy tasks to perform.""" + + self.test_case = test_case + self.num_tasks = n + self.num_iterated = 0 + self.num_executed = 0 + # 'guard' guards 'task_begin_list' and 'task_end_list' + try: + import threading + self.guard = threading.Lock() + except: + self.guard = DummyLock() + + # keep track of the order tasks are begun in + self.begin_list = [] + + # keep track of the order tasks are completed in + self.end_list = [] + + + def next_task(self): + if self.all_tasks_are_iterated(): + return None + else: + self.num_iterated = self.num_iterated + 1 + return Task(self.num_iterated, self) + + def all_tasks_are_executed(self): + return self.num_executed == self.num_tasks + + def all_tasks_are_iterated(self): + return self.num_iterated == self.num_tasks + + def executed(self, task): + self.num_executed = self.num_executed + 1 + + self.test_case.failUnless(task.was_executed, + "the task wasn't really executed") + self.test_case.failUnless(task.__class__ is Task, + "the task wasn't really a Task instance") + + + def is_blocked(self): + # simulate blocking tasks + return self.num_iterated - self.num_executed >= max(num_jobs/2, 2) + + def tasks_where_serial(self): + "analyze the task order to see if they were serial" + serial = 1 # assume the tasks where serial + for i in range(num_tasks): + serial = serial and (self.begin_list[i] + == self.end_list[i] + == (i + 1)) + return serial + +class ParallelTestCase(unittest.TestCase): + def runTest(self): + "test parallel jobs" + + try: + import threading + except: + raise NoThreadsException() + + taskmaster = Taskmaster(num_tasks, self) + jobs = [] + for i in range(num_jobs): + jobs.append(scons.Job.Parallel(taskmaster)) + + for job in jobs: + job.start() + + for job in jobs: + job.wait() + + self.failUnless(not taskmaster.tasks_where_serial(), + "the tasks where not executed in parallel") + self.failUnless(taskmaster.all_tasks_are_executed(), + "all the tests where not executed") + self.failUnless(taskmaster.all_tasks_are_iterated(), + "all the tests where not iterated over") + +class SerialTestCase(unittest.TestCase): + def runTest(self): + "test a serial job" + + taskmaster = Taskmaster(num_tasks, self) + job = scons.Job.Serial(taskmaster) + job.start() + self.failUnless(taskmaster.tasks_where_serial(), + "the tasks where not executed in series") + self.failUnless(taskmaster.all_tasks_are_executed(), + "all the tests where not executed") + self.failUnless(taskmaster.all_tasks_are_iterated(), + "all the tests where not iterated over") + +def suite(): + suite = unittest.TestSuite() + suite.addTest(ParallelTestCase()) + suite.addTest(SerialTestCase()) + return suite + +if __name__ == "__main__": + runner = unittest.TextTestRunner() + result = runner.run(suite()) + if (len(result.failures) == 0 + and len(result.errors) == 1 + and type(result.errors[0][0]) == SerialTestCase + and type(result.errors[0][1][0]) == NoThreadsException): + sys.exit(2) + elif not result.wasSuccessful(): + sys.exit(1) + + + + + + + + + + -- 2.26.2