--- /dev/null
+"""scons.Job
+
+This module defines the Serial and Parallel classes that execute tasks to
+complete a build.
+
+"""
+
+__revision__ = "Job.py __REVISION__ __DATE__ __DEVELOPER__"
+
+class Serial:
+ """This class is used to execute tasks in series, and is more efficient
+ than Parallel, but is only appropriate for non-parallel builds. Only
+ one instance of this class should be in existence at a time.
+
+ This class is not thread safe.
+ """
+
+ def __init__(self, taskmaster):
+ """Create a new serial job given a taskmaster.
+
+ The taskmaster's next_task() method should return the next task that
+ needs to be executed, or None if there are no more tasks. The
+ taskmaster's executed() method will be called for each task when it is
+ finished being executed. The taskmaster's is_blocked() method will not
+ be called.
+ """
+
+ self.taskmaster = taskmaster
+
+ def start(self):
+
+ """Start the job. This will begin pulling tasks from the taskmaster
+ and executing them, and return when there are no more tasks. """
+
+ while 1:
+ task = self.taskmaster.next_task()
+
+ if task is None:
+ break
+
+ task.execute()
+ self.taskmaster.executed(task)
+
+ def stop(self):
+ """Serial jobs are always finished when start() returns, so there
+ is nothing to do here"""
+
+ pass
+
+ def wait(self):
+ """Serial jobs are always finished when start() returns, so there
+ is nothing to do here"""
+ pass
+
+
+# The will hold a condition variable once the first parallel task
+# is created.
+cv = None
+
+class Parallel:
+ """This class is used to execute tasks in parallel, and is less
+ efficient than Serial, but is appropriate for parallel builds. Create
+ an instance of this class for each job or thread you want.
+
+ This class is thread safe.
+ """
+
+
+ def __init__(self, taskmaster):
+
+ """Create a new parallel job given a taskmaster. Multiple jobs will
+ be using the taskmaster in parallel, but all method calls to taskmaster
+ methods are serialized by the jobs themselves.
+
+ The taskmaster's next_task() method should return the next task
+ that needs to be executed, or None if there are no more tasks. The
+ taskmaster's executed() method will be called for each task when it
+ is finished being executed. The taskmaster's is_blocked() method
+ should return true iff there are more tasks, but they can't be
+ executed until one or more other tasks have been
+ executed. next_task() will be called iff is_blocked() returned
+ false.
+
+ Note: calls to taskmaster are serialized, but calls to execute() on
+ distinct tasks are not serialized, because that is the whole point
+ of parallel jobs: they can execute multiple tasks
+ simultaneously. """
+
+ global cv
+
+ # import threading here so that everything in the Job module
+ # but the Parallel class will work if the interpreter doesn't
+ # support threads
+ import threading
+
+ self.taskmaster = taskmaster
+ self.thread = threading.Thread(None, self.__run)
+ self.stop_running = 0
+
+ if cv is None:
+ cv = threading.Condition()
+
+ def start(self):
+ """Start the job. This will spawn a thread that will begin pulling
+ tasks from the task master and executing them. This method returns
+ immediately and doesn't wait for the jobs to be executed.
+
+ To stop the job, call stop().
+ To wait for the job to finish, call wait().
+ """
+ self.thread.start()
+
+ def stop(self):
+ """Stop the job. This will cause the job to finish after the
+ currently executing task is done. A job that has been stopped can
+ not be restarted.
+
+ To wait for the job to finish, call wait().
+ """
+ self.stop_running = 1
+
+ def wait(self):
+ """Wait for the job to finish. A job is finished when either there
+ are no more tasks or the job has been stopped and it is no longer
+ executing a task.
+
+ This method should only be called after start() has been called.
+
+ To stop the job, call stop().
+ """
+ self.thread.join()
+
+ def __run(self):
+ """private method that actually executes the tasks"""
+
+ cv.acquire()
+
+ try:
+
+ while 1:
+ while self.taskmaster.is_blocked():
+ cv.wait(None)
+
+ task = self.taskmaster.next_task()
+
+ if task == None or self.stop_running:
+ break
+
+ cv.release()
+ task.execute()
+ cv.acquire()
+
+ self.taskmaster.executed(task)
+
+ if not self.taskmaster.is_blocked():
+ cv.notifyAll()
+
+ finally:
+ cv.release()
+
+
+
+
--- /dev/null
+__revision__ = "JobTests.py __REVISION__ __DATE__ __DEVELOPER__"
+
+import unittest
+import random
+import math
+import scons.Job
+import sys
+
+# a large number
+num_sines = 10000
+
+# how many parallel jobs to perform for the test
+num_jobs = 11
+
+# how many tasks to perform for the test
+num_tasks = num_jobs*5
+
+class DummyLock:
+ "fake lock class to use if threads are not supported"
+ def acquire(self):
+ pass
+
+ def release(self):
+ pass
+
+class NoThreadsException:
+ "raised by the ParallelTestCase if threads are not supported"
+
+ def __str__(self):
+ return "the interpreter doesn't support threads"
+
+class Task:
+ """A dummy task class for testing purposes."""
+
+ def __init__(self, i, taskmaster):
+ self.i = i
+ self.taskmaster = taskmaster
+ self.was_executed = 0
+
+ def execute(self):
+ self.taskmaster.guard.acquire()
+ self.taskmaster.begin_list.append(self.i)
+ self.taskmaster.guard.release()
+
+ # do something that will take some random amount of time:
+ for i in range(random.randrange(0, num_sines, 1)):
+ x = math.sin(i)
+
+ self.was_executed = 1
+
+ self.taskmaster.guard.acquire()
+ self.taskmaster.end_list.append(self.i)
+ self.taskmaster.guard.release()
+
+class Taskmaster:
+ """A dummy taskmaster class for testing the job classes."""
+
+ def __init__(self, n, test_case):
+ """n is the number of dummy tasks to perform."""
+
+ self.test_case = test_case
+ self.num_tasks = n
+ self.num_iterated = 0
+ self.num_executed = 0
+ # 'guard' guards 'task_begin_list' and 'task_end_list'
+ try:
+ import threading
+ self.guard = threading.Lock()
+ except:
+ self.guard = DummyLock()
+
+ # keep track of the order tasks are begun in
+ self.begin_list = []
+
+ # keep track of the order tasks are completed in
+ self.end_list = []
+
+
+ def next_task(self):
+ if self.all_tasks_are_iterated():
+ return None
+ else:
+ self.num_iterated = self.num_iterated + 1
+ return Task(self.num_iterated, self)
+
+ def all_tasks_are_executed(self):
+ return self.num_executed == self.num_tasks
+
+ def all_tasks_are_iterated(self):
+ return self.num_iterated == self.num_tasks
+
+ def executed(self, task):
+ self.num_executed = self.num_executed + 1
+
+ self.test_case.failUnless(task.was_executed,
+ "the task wasn't really executed")
+ self.test_case.failUnless(task.__class__ is Task,
+ "the task wasn't really a Task instance")
+
+
+ def is_blocked(self):
+ # simulate blocking tasks
+ return self.num_iterated - self.num_executed >= max(num_jobs/2, 2)
+
+ def tasks_where_serial(self):
+ "analyze the task order to see if they were serial"
+ serial = 1 # assume the tasks where serial
+ for i in range(num_tasks):
+ serial = serial and (self.begin_list[i]
+ == self.end_list[i]
+ == (i + 1))
+ return serial
+
+class ParallelTestCase(unittest.TestCase):
+ def runTest(self):
+ "test parallel jobs"
+
+ try:
+ import threading
+ except:
+ raise NoThreadsException()
+
+ taskmaster = Taskmaster(num_tasks, self)
+ jobs = []
+ for i in range(num_jobs):
+ jobs.append(scons.Job.Parallel(taskmaster))
+
+ for job in jobs:
+ job.start()
+
+ for job in jobs:
+ job.wait()
+
+ self.failUnless(not taskmaster.tasks_where_serial(),
+ "the tasks where not executed in parallel")
+ self.failUnless(taskmaster.all_tasks_are_executed(),
+ "all the tests where not executed")
+ self.failUnless(taskmaster.all_tasks_are_iterated(),
+ "all the tests where not iterated over")
+
+class SerialTestCase(unittest.TestCase):
+ def runTest(self):
+ "test a serial job"
+
+ taskmaster = Taskmaster(num_tasks, self)
+ job = scons.Job.Serial(taskmaster)
+ job.start()
+ self.failUnless(taskmaster.tasks_where_serial(),
+ "the tasks where not executed in series")
+ self.failUnless(taskmaster.all_tasks_are_executed(),
+ "all the tests where not executed")
+ self.failUnless(taskmaster.all_tasks_are_iterated(),
+ "all the tests where not iterated over")
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(ParallelTestCase())
+ suite.addTest(SerialTestCase())
+ return suite
+
+if __name__ == "__main__":
+ runner = unittest.TextTestRunner()
+ result = runner.run(suite())
+ if (len(result.failures) == 0
+ and len(result.errors) == 1
+ and type(result.errors[0][0]) == SerialTestCase
+ and type(result.errors[0][1][0]) == NoThreadsException):
+ sys.exit(2)
+ elif not result.wasSuccessful():
+ sys.exit(1)
+
+
+
+
+
+
+
+
+
+