From: stevenknight Date: Wed, 22 Oct 2003 03:15:44 +0000 (+0000) Subject: Really handle lack of the threading.py module when run by non-threaded Pythons. X-Git-Url: http://git.tremily.us/?a=commitdiff_plain;h=f7d2ebdf6ea4a1edd0abd1df5f75136aa1708dab;p=scons.git Really handle lack of the threading.py module when run by non-threaded Pythons. git-svn-id: http://scons.tigris.org/svn/scons/trunk@827 fdb21ef1-2011-0410-befe-b5e4ea1792b1 --- diff --git a/src/CHANGES.txt b/src/CHANGES.txt index bb32ad8b..d7a4aecb 100644 --- a/src/CHANGES.txt +++ b/src/CHANGES.txt @@ -132,6 +132,9 @@ RELEASE X.XX - XXX the one being checked for. (Prototype code contributed by Gerard Patel and Niall Douglas). + - Supply a warning when -j is used and threading isn't built in to + the current version of Python. + From Clark McGrew: - Generalize the action for .tex files so that it will decide whether diff --git a/src/engine/SCons/Job.py b/src/engine/SCons/Job.py index d26b73ca..ee84e344 100644 --- a/src/engine/SCons/Job.py +++ b/src/engine/SCons/Job.py @@ -43,12 +43,23 @@ class Jobs: If 'num' is 1 or less, then a serial job will be used, otherwise a parallel job with 'num' worker threads will be used. + + The 'num_jobs' attribute will be set to the actual number of jobs + allocated. If more than one job is requested but the Parallel + class can't do it, it gets reset to 1. Wrapping interfaces that + care should check the value of 'num_jobs' after initialization. """ + self.job = None if num > 1: - self.job = Parallel(taskmaster, num) - else: + try: + self.job = Parallel(taskmaster, num) + self.num_jobs = num + except NameError: + pass + if self.job is None: self.job = Serial(taskmaster) + self.num_jobs = 1 def run(self): """run the job""" @@ -116,127 +127,127 @@ try: import threading except ImportError: pass +else: + class Worker(threading.Thread): + """A worker thread waits on a task to be posted to its request queue, + dequeues the task, executes it, and posts a tuple including the task + and a boolean indicating whether the task executed successfully. """ + + def __init__(self, requestQueue, resultsQueue): + threading.Thread.__init__(self) + self.setDaemon(1) + self.requestQueue = requestQueue + self.resultsQueue = resultsQueue + self.start() + + def run(self): + while 1: + task = self.requestQueue.get() -class Worker(threading.Thread): - """A worker thread waits on a task to be posted to its request queue, - dequeues the task, executes it, and posts a tuple including the task - and a boolean indicating whether the task executed successfully. """ - - def __init__(self, requestQueue, resultsQueue): - threading.Thread.__init__(self) - self.setDaemon(1) - self.requestQueue = requestQueue - self.resultsQueue = resultsQueue - self.start() - - def run(self): - while 1: - task = self.requestQueue.get() - - try: - task.execute() - except KeyboardInterrupt: - # be explicit here for test/interrupts.py - ok = False - except: - ok = 0 - else: - ok = 1 - - self.resultsQueue.put((task, ok)) - -class ThreadPool: - """This class is responsible for spawning and managing worker threads.""" + try: + task.execute() + except KeyboardInterrupt: + # be explicit here for test/interrupts.py + ok = False + except: + ok = 0 + else: + ok = 1 - def __init__(self, num): - """Create the request and reply queues, and 'num' worker threads.""" - self.requestQueue = Queue.Queue(0) - self.resultsQueue = Queue.Queue(0) + self.resultsQueue.put((task, ok)) - # Create worker threads - for i in range(num): - worker = Worker(self.requestQueue, self.resultsQueue) + class ThreadPool: + """This class is responsible for spawning and managing worker threads.""" - def put(self, obj): - """Put task into request queue.""" - self.requestQueue.put(obj) + def __init__(self, num): + """Create the request and reply queues, and 'num' worker threads.""" + self.requestQueue = Queue.Queue(0) + self.resultsQueue = Queue.Queue(0) - def get(self, block = 1): - """Remove and return a result tuple from the results queue.""" - return self.resultsQueue.get(block) - - def get_nowait(self): - """Remove and result a result tuple from the results queue - without blocking.""" - return self.get(0) + # Create worker threads + for i in range(num): + worker = Worker(self.requestQueue, self.resultsQueue) -class Parallel: - """This class is used to execute tasks in parallel, and is somewhat - less efficient than Serial, but is appropriate for parallel builds. + def put(self, obj): + """Put task into request queue.""" + self.requestQueue.put(obj) - This class is thread safe. - """ + def get(self, block = 1): + """Remove and return a result tuple from the results queue.""" + return self.resultsQueue.get(block) + + def get_nowait(self): + """Remove and result a result tuple from the results queue + without blocking.""" + return self.get(0) - def __init__(self, taskmaster, num): - """Create a new parallel job given a taskmaster. + class Parallel: + """This class is used to execute tasks in parallel, and is somewhat + less efficient than Serial, but is appropriate for parallel builds. - The taskmaster's next_task() method should return the next task - that needs to be executed, or None if there are no more tasks. The - taskmaster's executed() method will be called for each task when it - is successfully executed or failed() will be called if the task - failed to execute (i.e. execute() raised an exception). The - taskmaster's is_blocked() method should return true iff there are - more tasks, but they can't be executed until one or more other - tasks have been executed. next_task() will be called iff - is_blocked() returned false. - - Note: calls to taskmaster are serialized, but calls to execute() on - distinct tasks are not serialized, because that is the whole point - of parallel jobs: they can execute multiple tasks - simultaneously. """ + This class is thread safe. + """ - self.taskmaster = taskmaster - self.tp = ThreadPool(num) + def __init__(self, taskmaster, num): + """Create a new parallel job given a taskmaster. - self.jobs = 0 - self.maxjobs = num + The taskmaster's next_task() method should return the next task + that needs to be executed, or None if there are no more tasks. The + taskmaster's executed() method will be called for each task when it + is successfully executed or failed() will be called if the task + failed to execute (i.e. execute() raised an exception). The + taskmaster's is_blocked() method should return true iff there are + more tasks, but they can't be executed until one or more other + tasks have been executed. next_task() will be called iff + is_blocked() returned false. - def start(self): - """Start the job. This will begin pulling tasks from the - taskmaster and executing them, and return when there are no - more tasks. If a task fails to execute (i.e. execute() raises - an exception), then the job will stop.""" + Note: calls to taskmaster are serialized, but calls to execute() on + distinct tasks are not serialized, because that is the whole point + of parallel jobs: they can execute multiple tasks + simultaneously. """ - while 1: - if self.jobs < self.maxjobs: - task = self.taskmaster.next_task() - if task is None: - break + self.taskmaster = taskmaster + self.tp = ThreadPool(num) - # prepare task for execution - try: - task.prepare() - except KeyboardInterrupt: - raise - except: - # Let the failed() callback function arrange for the - # build to stop if that's appropriate. - task.failed() + self.jobs = 0 + self.maxjobs = num - # dispatch task - self.tp.put(task) - self.jobs = self.jobs + 1 + def start(self): + """Start the job. This will begin pulling tasks from the + taskmaster and executing them, and return when there are no + more tasks. If a task fails to execute (i.e. execute() raises + an exception), then the job will stop.""" while 1: - try: - task, ok = self.tp.get_nowait() - except Queue.Empty: - if not (self.jobs is self.maxjobs or self.taskmaster.is_blocked()): + if self.jobs < self.maxjobs: + task = self.taskmaster.next_task() + if task is None: break - task, ok = self.tp.get() - self.jobs = self.jobs - 1 - if ok: - task.executed() - else: - task.failed() + # prepare task for execution + try: + task.prepare() + except KeyboardInterrupt: + raise + except: + # Let the failed() callback function arrange for the + # build to stop if that's appropriate. + task.failed() + + # dispatch task + self.tp.put(task) + self.jobs = self.jobs + 1 + + while 1: + try: + task, ok = self.tp.get_nowait() + except Queue.Empty: + if not (self.jobs is self.maxjobs or self.taskmaster.is_blocked()): + break + task, ok = self.tp.get() + + self.jobs = self.jobs - 1 + if ok: + task.executed() + else: + task.failed() diff --git a/src/engine/SCons/JobTests.py b/src/engine/SCons/JobTests.py index 1b09fd64..1b0128b3 100644 --- a/src/engine/SCons/JobTests.py +++ b/src/engine/SCons/JobTests.py @@ -190,7 +190,7 @@ class Taskmaster: class ParallelTestCase(unittest.TestCase): def runTest(self): "test parallel jobs" - + try: import threading except: @@ -226,6 +226,31 @@ class SerialTestCase(unittest.TestCase): self.failIf(taskmaster.num_failed, "some task(s) failed to execute") +class NoParallelTestCase(unittest.TestCase): + def runTest(self): + "test handling lack of parallel support" + def NoParallel(tm, num): + raise NameError + save_Parallel = SCons.Job.Parallel + SCons.Job.Parallel = NoParallel + try: + taskmaster = Taskmaster(num_tasks, self, Task) + jobs = SCons.Job.Jobs(2, taskmaster) + self.failUnless(jobs.num_jobs == 1, + "unexpected number of jobs %d" % jobs.num_jobs) + jobs.run() + self.failUnless(taskmaster.tasks_were_serial(), + "the tasks were not executed in series") + self.failUnless(taskmaster.all_tasks_are_executed(), + "all the tests were not executed") + self.failUnless(taskmaster.all_tasks_are_iterated(), + "all the tests were not iterated over") + self.failIf(taskmaster.num_failed, + "some task(s) failed to execute") + finally: + SCons.Job.Parallel = save_Parallel + + class SerialExceptionTestCase(unittest.TestCase): def runTest(self): "test a serial job with tasks that raise exceptions" @@ -261,6 +286,7 @@ def suite(): suite = unittest.TestSuite() suite.addTest(ParallelTestCase()) suite.addTest(SerialTestCase()) + suite.addTest(NoParallelTestCase()) suite.addTest(SerialExceptionTestCase()) suite.addTest(ParallelExceptionTestCase()) return suite diff --git a/src/engine/SCons/Script/__init__.py b/src/engine/SCons/Script/__init__.py index 03cb6d8a..a533c29d 100644 --- a/src/engine/SCons/Script/__init__.py +++ b/src/engine/SCons/Script/__init__.py @@ -703,6 +703,7 @@ def _main(args, parser): SCons.Warnings._warningOut = _scons_internal_warning SCons.Warnings.enableWarningClass(SCons.Warnings.DeprecatedWarning) SCons.Warnings.enableWarningClass(SCons.Warnings.CorruptSConsignWarning) + SCons.Warnings.enableWarningClass(SCons.Warnings.NoParallelSupportWarning) global ssoptions ssoptions = SConscriptSettableOptions(options) @@ -964,7 +965,12 @@ def _main(args, parser): progress_display("scons: " + opening_message) taskmaster = SCons.Taskmaster.Taskmaster(nodes, task_class, calc, order) - jobs = SCons.Job.Jobs(ssoptions.get('num_jobs'), taskmaster) + nj = ssoptions.get('num_jobs') + jobs = SCons.Job.Jobs(nj, taskmaster) + if nj > 1 and jobs.num_jobs == 1: + msg = "parallel builds are unsupported by this version of Python;\n" + \ + "\tignoring -j or num_jobs option.\n" + SCons.Warnings.warn(SCons.Warnings.NoParallelSupportWarning, msg) try: jobs.run() diff --git a/src/engine/SCons/Warnings.py b/src/engine/SCons/Warnings.py index 436e6633..cb78205a 100644 --- a/src/engine/SCons/Warnings.py +++ b/src/engine/SCons/Warnings.py @@ -49,6 +49,9 @@ class ReservedVariableWarning(Warning): class CacheWriteErrorWarning(Warning): pass +class NoParallelSupportWarning(Warning): + pass + _warningAsException = 0 # The below is a list of 2-tuples. The first element is a class object. diff --git a/test/option-j.py b/test/option-j.py index cb1a88ac..fdf7b839 100644 --- a/test/option-j.py +++ b/test/option-j.py @@ -121,7 +121,41 @@ test.fail_test(start2 < finish1) test.run(arguments='-j 2 out') -# Test that a failed build with -j works properly. +# Test that we fall back and warn properly if there's no threading.py +# module (simulated), which is the case if this version of Python wasn't +# built with threading support. + +test.subdir('pythonlib') + +test.write(['pythonlib', 'threading.py'], """\ +raise ImportError +""") + +save_pythonpath = os.environ.get('PYTHONPATH', '') +os.environ['PYTHONPATH'] = test.workpath('pythonlib') + +#start2, finish1 = RunTest('-j 2 f1, f2', "fifth") + +test.write('f1.in', 'f1.in pythonlib\n') +test.write('f2.in', 'f2.in pythonlib\n') + +test.run(arguments = "-j 2 f1 f2", stderr=None) + +warn = \ +"""scons: warning: parallel builds are unsupported by this version of Python; +\tignoring -j or num_jobs option. +""" +test.fail_test(string.find(test.stderr(), warn) == -1) + +str = test.read("f1") +start1,finish1 = map(float, string.split(str, "\n")) + +str = test.read("f2") +start2,finish2 = map(float, string.split(str, "\n")) + +test.fail_test(start2 < finish1) + +os.environ['PYTHONPATH'] = save_pythonpath # Test SetJobs() with no -j: