import SCons.compat
+import os
+import signal
+
+import SCons.Errors
# The default stack size (in kilobytes) of the threads used to execute
# jobs in parallel.
# parallelized the build. For example, the default stack size on linux
# is 8 MBytes.
+explicit_stack_size = None
default_stack_size = 256
+interrupt_msg = 'Build interrupted.'
+
+
+class InterruptState:
+ def __init__(self):
+ self.interrupted = False
+
+ def set(self):
+ self.interrupted = True
+
+ def __call__(self):
+ return self.interrupted
+
class Jobs:
"""An instance of this class initializes N jobs, and provides
self.job = None
if num > 1:
- try:
- stack_size = SCons.Job.stack_size
- except AttributeError:
+ stack_size = explicit_stack_size
+ if stack_size is None:
stack_size = default_stack_size
try:
self.job = Serial(taskmaster)
self.num_jobs = 1
- def run(self):
- """run the job"""
+ def run(self, postfunc=lambda: None):
+ """Run the jobs.
+
+ postfunc() will be invoked after the jobs has run. It will be
+ invoked even if the jobs are interrupted by a keyboard
+ interrupt (well, in fact by a signal such as either SIGINT,
+ SIGTERM or SIGHUP). The execution of postfunc() is protected
+ against keyboard interrupts and is guaranteed to run to
+ completion."""
+ self._setup_sig_handler()
try:
self.job.start()
- except KeyboardInterrupt:
- # mask any further keyboard interrupts so that scons
- # can shutdown cleanly:
- # (this only masks the keyboard interrupt for Python,
- # child processes can still get the keyboard interrupt)
- import signal
- signal.signal(signal.SIGINT, signal.SIG_IGN)
- raise
-
- def cleanup(self):
- self.job.cleanup()
+ finally:
+ postfunc()
+ self._reset_sig_handler()
+
+ def were_interrupted(self):
+ """Returns whether the jobs were interrupted by a signal."""
+ return self.job.interrupted()
+
+ def _setup_sig_handler(self):
+ """Setup an interrupt handler so that SCons can shutdown cleanly in
+ various conditions:
+
+ a) SIGINT: Keyboard interrupt
+ b) SIGTERM: kill or system shutdown
+ c) SIGHUP: Controlling shell exiting
+
+ We handle all of these cases by stopping the taskmaster. It
+ turns out that it very difficult to stop the build process
+ by throwing asynchronously an exception such as
+ KeyboardInterrupt. For example, the python Condition
+ variables (threading.Condition) and queue's do not seem to
+ asynchronous-exception-safe. It would require adding a whole
+ bunch of try/finally block and except KeyboardInterrupt all
+ over the place.
+
+ Note also that we have to be careful to handle the case when
+ SCons forks before executing another process. In that case, we
+ want the child to exit immediately.
+ """
+ def handler(signum, stack, self=self, parentpid=os.getpid()):
+ if os.getpid() == parentpid:
+ self.job.taskmaster.stop()
+ self.job.interrupted.set()
+ else:
+ os._exit(2)
+
+ self.old_sigint = signal.signal(signal.SIGINT, handler)
+ self.old_sigterm = signal.signal(signal.SIGTERM, handler)
+ try:
+ self.old_sighup = signal.signal(signal.SIGHUP, handler)
+ except AttributeError:
+ pass
+
+ def _reset_sig_handler(self):
+ """Restore the signal handlers to their previous state (before the
+ call to _setup_sig_handler()."""
+
+ signal.signal(signal.SIGINT, self.old_sigint)
+ signal.signal(signal.SIGTERM, self.old_sigterm)
+ try:
+ signal.signal(signal.SIGHUP, self.old_sighup)
+ except AttributeError:
+ pass
class Serial:
"""This class is used to execute tasks in series, and is more efficient
execute (e.g. execute() raised an exception)."""
self.taskmaster = taskmaster
+ self.interrupted = InterruptState()
def start(self):
"""Start the job. This will begin pulling tasks from the taskmaster
fails to execute (i.e. execute() raises an exception), then the job will
stop."""
- while 1:
+ while True:
task = self.taskmaster.next_task()
if task is None:
try:
task.prepare()
- task.execute()
- except KeyboardInterrupt:
- raise
+ if task.needs_execute():
+ task.execute()
except:
- task.exception_set()
+ if self.interrupted():
+ try:
+ raise SCons.Errors.BuildError(
+ task.targets[0], errstr=interrupt_msg)
+ except:
+ task.exception_set()
+ else:
+ task.exception_set()
+
# Let the failed() callback function arrange for the
# build to stop if that's appropriate.
task.failed()
task.executed()
task.postprocess()
+ self.taskmaster.cleanup()
- def cleanup(self):
- pass
# Trap import failure so that everything in the Job module but the
# Parallel class (and its dependent classes) will work if the interpreter
# doesn't support threads.
try:
- import Queue
+ import queue
import threading
except ImportError:
pass
dequeues the task, executes it, and posts a tuple including the task
and a boolean indicating whether the task executed successfully. """
- def __init__(self, requestQueue, resultsQueue):
+ def __init__(self, requestQueue, resultsQueue, interrupted):
threading.Thread.__init__(self)
self.setDaemon(1)
self.requestQueue = requestQueue
self.resultsQueue = resultsQueue
+ self.interrupted = interrupted
self.start()
def run(self):
- while 1:
+ while True:
task = self.requestQueue.get()
- if not task:
+ if task is None:
# The "None" value is used as a sentinel by
# ThreadPool.cleanup(). This indicates that there
# are no more tasks, so we should quit.
break
try:
+ if self.interrupted():
+ raise SCons.Errors.BuildError(
+ task.targets[0], errstr=interrupt_msg)
task.execute()
- except KeyboardInterrupt:
- # be explicit here for test/interrupts.py
- ok = False
except:
task.exception_set()
ok = False
class ThreadPool:
"""This class is responsible for spawning and managing worker threads."""
- def __init__(self, num, stack_size):
+ def __init__(self, num, stack_size, interrupted):
"""Create the request and reply queues, and 'num' worker threads.
One must specify the stack size of the worker threads. The
stack size is specified in kilobytes.
"""
- self.requestQueue = Queue.Queue(0)
- self.resultsQueue = Queue.Queue(0)
+ self.requestQueue = queue.Queue(0)
+ self.resultsQueue = queue.Queue(0)
try:
prev_size = threading.stack_size(stack_size*1024)
except AttributeError, e:
# Only print a warning if the stack size has been
- # explicitely set.
- if hasattr(SCons.Job, 'stack_size'):
+ # explicitly set.
+ if not explicit_stack_size is None:
msg = "Setting stack size is unsupported by this version of Python:\n " + \
e.args[0]
SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
except ValueError, e:
- msg = "Setting stack size failed:\n " + \
- e.message
+ msg = "Setting stack size failed:\n " + str(e)
SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
# Create worker threads
self.workers = []
for _ in range(num):
- worker = Worker(self.requestQueue, self.resultsQueue)
+ worker = Worker(self.requestQueue, self.resultsQueue, interrupted)
self.workers.append(worker)
# Once we drop Python 1.5 we can change the following to:
if 'prev_size' in locals().keys():
threading.stack_size(prev_size)
- def put(self, obj):
+ def put(self, task):
"""Put task into request queue."""
- self.requestQueue.put(obj)
+ self.requestQueue.put(task)
- def get(self, block = True):
+ def get(self):
"""Remove and return a result tuple from the results queue."""
- return self.resultsQueue.get(block)
+ return self.resultsQueue.get()
- def preparation_failed(self, obj):
- self.resultsQueue.put((obj, False))
+ def preparation_failed(self, task):
+ self.resultsQueue.put((task, False))
def cleanup(self):
"""
multiple tasks simultaneously. """
self.taskmaster = taskmaster
- self.tp = ThreadPool(num, stack_size)
+ self.interrupted = InterruptState()
+ self.tp = ThreadPool(num, stack_size, self.interrupted)
self.maxjobs = num
jobs = 0
- while 1:
+ while True:
# Start up as many available tasks as we're
# allowed to.
while jobs < self.maxjobs:
if task is None:
break
- # prepare task for execution
try:
+ # prepare task for execution
task.prepare()
- except KeyboardInterrupt:
- raise
except:
- # Let the failed() callback function arrange
- # for the build to stop if that's appropriate.
task.exception_set()
- self.tp.preparation_failed(task)
- jobs = jobs + 1
- continue
-
- # dispatch task
- self.tp.put(task)
- jobs = jobs + 1
+ task.failed()
+ task.postprocess()
+ else:
+ if task.needs_execute():
+ # dispatch task
+ self.tp.put(task)
+ jobs = jobs + 1
+ else:
+ task.executed()
+ task.postprocess()
if not task and not jobs: break
# Let any/all completed tasks finish up before we go
# back and put the next batch of tasks on the queue.
- while 1:
+ while True:
task, ok = self.tp.get()
-
jobs = jobs - 1
+
if ok:
task.executed()
else:
+ if self.interrupted():
+ try:
+ raise SCons.Errors.BuildError(
+ task.targets[0], errstr=interrupt_msg)
+ except:
+ task.exception_set()
+
+ # Let the failed() callback function arrange
+ # for the build to stop if that's appropriate.
task.failed()
task.postprocess()
if self.tp.resultsQueue.empty():
break
- def cleanup(self):
self.tp.cleanup()
+ self.taskmaster.cleanup()
+
+# Local Variables:
+# tab-width:4
+# indent-tabs-mode:nil
+# End:
+# vim: set expandtab tabstop=4 shiftwidth=4: