X-Git-Url: http://git.tremily.us/?a=blobdiff_plain;f=src%2Fengine%2FSCons%2FJob.py;h=08e37a5f6150f60403759d2c94a4eeabf3fef8dc;hb=704f6e2480ef60718f1aa42c266f04afc9c79580;hp=b28aaaffed2891bdc92e09633c2f84c966ae9d4c;hpb=b3a9ef543300fbf7fc2f83d46adec2cc841cecc8;p=scons.git diff --git a/src/engine/SCons/Job.py b/src/engine/SCons/Job.py index b28aaaff..08e37a5f 100644 --- a/src/engine/SCons/Job.py +++ b/src/engine/SCons/Job.py @@ -33,6 +33,36 @@ __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__" import SCons.compat +import os +import signal + +import SCons.Errors + +# The default stack size (in kilobytes) of the threads used to execute +# jobs in parallel. +# +# We use a stack size of 256 kilobytes. The default on some platforms +# is too large and prevents us from creating enough threads to fully +# parallelized the build. For example, the default stack size on linux +# is 8 MBytes. + +explicit_stack_size = None +default_stack_size = 256 + +interrupt_msg = 'Build interrupted.' + + +class InterruptState: + def __init__(self): + self.interrupted = False + + def set(self): + self.interrupted = True + + def __call__(self): + return self.interrupted + + class Jobs: """An instance of this class initializes N jobs, and provides methods for starting, stopping, and waiting on all N jobs. @@ -54,8 +84,12 @@ class Jobs: self.job = None if num > 1: + stack_size = explicit_stack_size + if stack_size is None: + stack_size = default_stack_size + try: - self.job = Parallel(taskmaster, num) + self.job = Parallel(taskmaster, num, stack_size) self.num_jobs = num except NameError: pass @@ -63,21 +97,71 @@ class Jobs: self.job = Serial(taskmaster) self.num_jobs = 1 - def run(self): - """run the job""" + def run(self, postfunc=lambda: None): + """Run the jobs. + + postfunc() will be invoked after the jobs has run. It will be + invoked even if the jobs are interrupted by a keyboard + interrupt (well, in fact by a signal such as either SIGINT, + SIGTERM or SIGHUP). The execution of postfunc() is protected + against keyboard interrupts and is guaranteed to run to + completion.""" + self._setup_sig_handler() try: self.job.start() - except KeyboardInterrupt: - # mask any further keyboard interrupts so that scons - # can shutdown cleanly: - # (this only masks the keyboard interrupt for Python, - # child processes can still get the keyboard interrupt) - import signal - signal.signal(signal.SIGINT, signal.SIG_IGN) - raise - - def cleanup(self): - self.job.cleanup() + finally: + postfunc() + self._reset_sig_handler() + + def were_interrupted(self): + """Returns whether the jobs were interrupted by a signal.""" + return self.job.interrupted() + + def _setup_sig_handler(self): + """Setup an interrupt handler so that SCons can shutdown cleanly in + various conditions: + + a) SIGINT: Keyboard interrupt + b) SIGTERM: kill or system shutdown + c) SIGHUP: Controlling shell exiting + + We handle all of these cases by stopping the taskmaster. It + turns out that it very difficult to stop the build process + by throwing asynchronously an exception such as + KeyboardInterrupt. For example, the python Condition + variables (threading.Condition) and queue's do not seem to + asynchronous-exception-safe. It would require adding a whole + bunch of try/finally block and except KeyboardInterrupt all + over the place. + + Note also that we have to be careful to handle the case when + SCons forks before executing another process. In that case, we + want the child to exit immediately. + """ + def handler(signum, stack, self=self, parentpid=os.getpid()): + if os.getpid() == parentpid: + self.job.taskmaster.stop() + self.job.interrupted.set() + else: + os._exit(2) + + self.old_sigint = signal.signal(signal.SIGINT, handler) + self.old_sigterm = signal.signal(signal.SIGTERM, handler) + try: + self.old_sighup = signal.signal(signal.SIGHUP, handler) + except AttributeError: + pass + + def _reset_sig_handler(self): + """Restore the signal handlers to their previous state (before the + call to _setup_sig_handler().""" + + signal.signal(signal.SIGINT, self.old_sigint) + signal.signal(signal.SIGTERM, self.old_sigterm) + try: + signal.signal(signal.SIGHUP, self.old_sighup) + except AttributeError: + pass class Serial: """This class is used to execute tasks in series, and is more efficient @@ -97,6 +181,7 @@ class Serial: execute (e.g. execute() raised an exception).""" self.taskmaster = taskmaster + self.interrupted = InterruptState() def start(self): """Start the job. This will begin pulling tasks from the taskmaster @@ -104,7 +189,7 @@ class Serial: fails to execute (i.e. execute() raises an exception), then the job will stop.""" - while 1: + while True: task = self.taskmaster.next_task() if task is None: @@ -112,11 +197,18 @@ class Serial: try: task.prepare() - task.execute() - except KeyboardInterrupt: - raise + if task.needs_execute(): + task.execute() except: - task.exception_set() + if self.interrupted(): + try: + raise SCons.Errors.BuildError( + task.targets[0], errstr=interrupt_msg) + except: + task.exception_set() + else: + task.exception_set() + # Let the failed() callback function arrange for the # build to stop if that's appropriate. task.failed() @@ -124,15 +216,14 @@ class Serial: task.executed() task.postprocess() + self.taskmaster.cleanup() - def cleanup(self): - pass # Trap import failure so that everything in the Job module but the # Parallel class (and its dependent classes) will work if the interpreter # doesn't support threads. try: - import Queue + import queue import threading except ImportError: pass @@ -142,28 +233,29 @@ else: dequeues the task, executes it, and posts a tuple including the task and a boolean indicating whether the task executed successfully. """ - def __init__(self, requestQueue, resultsQueue): + def __init__(self, requestQueue, resultsQueue, interrupted): threading.Thread.__init__(self) self.setDaemon(1) self.requestQueue = requestQueue self.resultsQueue = resultsQueue + self.interrupted = interrupted self.start() def run(self): - while 1: + while True: task = self.requestQueue.get() - if not task: + if task is None: # The "None" value is used as a sentinel by # ThreadPool.cleanup(). This indicates that there # are no more tasks, so we should quit. break try: + if self.interrupted(): + raise SCons.Errors.BuildError( + task.targets[0], errstr=interrupt_msg) task.execute() - except KeyboardInterrupt: - # be explicit here for test/interrupts.py - ok = False except: task.exception_set() ok = False @@ -175,27 +267,49 @@ else: class ThreadPool: """This class is responsible for spawning and managing worker threads.""" - def __init__(self, num): - """Create the request and reply queues, and 'num' worker threads.""" - self.requestQueue = Queue.Queue(0) - self.resultsQueue = Queue.Queue(0) + def __init__(self, num, stack_size, interrupted): + """Create the request and reply queues, and 'num' worker threads. + + One must specify the stack size of the worker threads. The + stack size is specified in kilobytes. + """ + self.requestQueue = queue.Queue(0) + self.resultsQueue = queue.Queue(0) + + try: + prev_size = threading.stack_size(stack_size*1024) + except AttributeError, e: + # Only print a warning if the stack size has been + # explicitly set. + if not explicit_stack_size is None: + msg = "Setting stack size is unsupported by this version of Python:\n " + \ + e.args[0] + SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) + except ValueError, e: + msg = "Setting stack size failed:\n " + str(e) + SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) # Create worker threads self.workers = [] for _ in range(num): - worker = Worker(self.requestQueue, self.resultsQueue) + worker = Worker(self.requestQueue, self.resultsQueue, interrupted) self.workers.append(worker) - def put(self, obj): + # Once we drop Python 1.5 we can change the following to: + #if 'prev_size' in locals(): + if 'prev_size' in locals().keys(): + threading.stack_size(prev_size) + + def put(self, task): """Put task into request queue.""" - self.requestQueue.put(obj) + self.requestQueue.put(task) - def get(self, block = True): + def get(self): """Remove and return a result tuple from the results queue.""" - return self.resultsQueue.get(block) + return self.resultsQueue.get() - def preparation_failed(self, obj): - self.resultsQueue.put((obj, False)) + def preparation_failed(self, task): + self.resultsQueue.put((task, False)) def cleanup(self): """ @@ -233,7 +347,7 @@ else: This class is thread safe. """ - def __init__(self, taskmaster, num): + def __init__(self, taskmaster, num, stack_size): """Create a new parallel job given a taskmaster. The taskmaster's next_task() method should return the next @@ -249,7 +363,8 @@ else: multiple tasks simultaneously. """ self.taskmaster = taskmaster - self.tp = ThreadPool(num) + self.interrupted = InterruptState() + self.tp = ThreadPool(num, stack_size, self.interrupted) self.maxjobs = num @@ -261,7 +376,7 @@ else: jobs = 0 - while 1: + while True: # Start up as many available tasks as we're # allowed to. while jobs < self.maxjobs: @@ -269,34 +384,42 @@ else: if task is None: break - # prepare task for execution try: + # prepare task for execution task.prepare() - except KeyboardInterrupt: - raise except: - # Let the failed() callback function arrange - # for the build to stop if that's appropriate. task.exception_set() - self.tp.preparation_failed(task) - jobs = jobs + 1 - continue - - # dispatch task - self.tp.put(task) - jobs = jobs + 1 + task.failed() + task.postprocess() + else: + if task.needs_execute(): + # dispatch task + self.tp.put(task) + jobs = jobs + 1 + else: + task.executed() + task.postprocess() if not task and not jobs: break # Let any/all completed tasks finish up before we go # back and put the next batch of tasks on the queue. - while 1: + while True: task, ok = self.tp.get() - jobs = jobs - 1 + if ok: task.executed() else: + if self.interrupted(): + try: + raise SCons.Errors.BuildError( + task.targets[0], errstr=interrupt_msg) + except: + task.exception_set() + + # Let the failed() callback function arrange + # for the build to stop if that's appropriate. task.failed() task.postprocess() @@ -304,5 +427,11 @@ else: if self.tp.resultsQueue.empty(): break - def cleanup(self): self.tp.cleanup() + self.taskmaster.cleanup() + +# Local Variables: +# tab-width:4 +# indent-tabs-mode:nil +# End: +# vim: set expandtab tabstop=4 shiftwidth=4: