http://scons.tigris.org/issues/show_bug.cgi?id=2329
[scons.git] / src / engine / SCons / Job.py
index 7b514092269b4d4dc57d4317f03ecf26c21175e4..08e37a5f6150f60403759d2c94a4eeabf3fef8dc 100644 (file)
@@ -33,6 +33,10 @@ __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
 
 import SCons.compat
 
+import os
+import signal
+
+import SCons.Errors
 
 # The default stack size (in kilobytes) of the threads used to execute
 # jobs in parallel.
@@ -42,8 +46,22 @@ import SCons.compat
 # parallelized the build. For example, the default stack size on linux
 # is 8 MBytes.
 
+explicit_stack_size = None
 default_stack_size = 256
 
+interrupt_msg = 'Build interrupted.'
+
+
+class InterruptState:
+   def __init__(self):
+       self.interrupted = False
+
+   def set(self):
+       self.interrupted = True
+
+   def __call__(self):
+       return self.interrupted
+
 
 class Jobs:
     """An instance of this class initializes N jobs, and provides
@@ -66,9 +84,8 @@ class Jobs:
 
         self.job = None
         if num > 1:
-            try:
-                stack_size = SCons.Job.stack_size
-            except AttributeError:
+            stack_size = explicit_stack_size
+            if stack_size is None:
                 stack_size = default_stack_size
                 
             try:
@@ -80,21 +97,71 @@ class Jobs:
             self.job = Serial(taskmaster)
             self.num_jobs = 1
 
-    def run(self):
-        """run the job"""
+    def run(self, postfunc=lambda: None):
+        """Run the jobs.
+
+        postfunc() will be invoked after the jobs has run. It will be
+        invoked even if the jobs are interrupted by a keyboard
+        interrupt (well, in fact by a signal such as either SIGINT,
+        SIGTERM or SIGHUP). The execution of postfunc() is protected
+        against keyboard interrupts and is guaranteed to run to
+        completion."""
+        self._setup_sig_handler()
         try:
             self.job.start()
-        except KeyboardInterrupt:
-            # mask any further keyboard interrupts so that scons
-            # can shutdown cleanly:
-            # (this only masks the keyboard interrupt for Python,
-            #  child processes can still get the keyboard interrupt)
-            import signal
-            signal.signal(signal.SIGINT, signal.SIG_IGN)
-            raise
-
-    def cleanup(self):
-        self.job.cleanup()
+        finally:
+            postfunc()
+            self._reset_sig_handler()
+
+    def were_interrupted(self):
+        """Returns whether the jobs were interrupted by a signal."""
+        return self.job.interrupted()
+
+    def _setup_sig_handler(self):
+        """Setup an interrupt handler so that SCons can shutdown cleanly in
+        various conditions:
+
+          a) SIGINT: Keyboard interrupt
+          b) SIGTERM: kill or system shutdown
+          c) SIGHUP: Controlling shell exiting
+
+        We handle all of these cases by stopping the taskmaster. It
+        turns out that it very difficult to stop the build process
+        by throwing asynchronously an exception such as
+        KeyboardInterrupt. For example, the python Condition
+        variables (threading.Condition) and queue's do not seem to
+        asynchronous-exception-safe. It would require adding a whole
+        bunch of try/finally block and except KeyboardInterrupt all
+        over the place.
+
+        Note also that we have to be careful to handle the case when
+        SCons forks before executing another process. In that case, we
+        want the child to exit immediately.
+        """
+        def handler(signum, stack, self=self, parentpid=os.getpid()):
+            if os.getpid() == parentpid:
+                self.job.taskmaster.stop()
+                self.job.interrupted.set()
+            else:
+                os._exit(2)
+
+        self.old_sigint  = signal.signal(signal.SIGINT, handler)
+        self.old_sigterm = signal.signal(signal.SIGTERM, handler)
+        try:
+            self.old_sighup = signal.signal(signal.SIGHUP, handler)
+        except AttributeError:
+            pass
+
+    def _reset_sig_handler(self):
+        """Restore the signal handlers to their previous state (before the
+         call to _setup_sig_handler()."""
+
+        signal.signal(signal.SIGINT, self.old_sigint)
+        signal.signal(signal.SIGTERM, self.old_sigterm)
+        try:
+            signal.signal(signal.SIGHUP, self.old_sighup)
+        except AttributeError:
+            pass
 
 class Serial:
     """This class is used to execute tasks in series, and is more efficient
@@ -114,6 +181,7 @@ class Serial:
         execute (e.g. execute() raised an exception)."""
         
         self.taskmaster = taskmaster
+        self.interrupted = InterruptState()
 
     def start(self):
         """Start the job. This will begin pulling tasks from the taskmaster
@@ -121,7 +189,7 @@ class Serial:
         fails to execute (i.e. execute() raises an exception), then the job will
         stop."""
         
-        while 1:
+        while True:
             task = self.taskmaster.next_task()
 
             if task is None:
@@ -129,11 +197,18 @@ class Serial:
 
             try:
                 task.prepare()
-                task.execute()
-            except KeyboardInterrupt:
-                raise
+                if task.needs_execute():
+                    task.execute()
             except:
-                task.exception_set()
+                if self.interrupted():
+                    try:
+                        raise SCons.Errors.BuildError(
+                            task.targets[0], errstr=interrupt_msg)
+                    except:
+                        task.exception_set()
+                else:
+                    task.exception_set()
+
                 # Let the failed() callback function arrange for the
                 # build to stop if that's appropriate.
                 task.failed()
@@ -141,15 +216,14 @@ class Serial:
                 task.executed()
 
             task.postprocess()
+        self.taskmaster.cleanup()
 
-    def cleanup(self):
-        pass
 
 # Trap import failure so that everything in the Job module but the
 # Parallel class (and its dependent classes) will work if the interpreter
 # doesn't support threads.
 try:
-    import Queue
+    import queue
     import threading
 except ImportError:
     pass
@@ -159,28 +233,29 @@ else:
         dequeues the task, executes it, and posts a tuple including the task
         and a boolean indicating whether the task executed successfully. """
 
-        def __init__(self, requestQueue, resultsQueue):
+        def __init__(self, requestQueue, resultsQueue, interrupted):
             threading.Thread.__init__(self)
             self.setDaemon(1)
             self.requestQueue = requestQueue
             self.resultsQueue = resultsQueue
+            self.interrupted = interrupted
             self.start()
 
         def run(self):
-            while 1:
+            while True:
                 task = self.requestQueue.get()
 
-                if not task:
+                if task is None:
                     # The "None" value is used as a sentinel by
                     # ThreadPool.cleanup().  This indicates that there
                     # are no more tasks, so we should quit.
                     break
 
                 try:
+                    if self.interrupted():
+                        raise SCons.Errors.BuildError(
+                            task.targets[0], errstr=interrupt_msg)
                     task.execute()
-                except KeyboardInterrupt:
-                    # be explicit here for test/interrupts.py
-                    ok = False
                 except:
                     task.exception_set()
                     ok = False
@@ -192,33 +267,32 @@ else:
     class ThreadPool:
         """This class is responsible for spawning and managing worker threads."""
 
-        def __init__(self, num, stack_size):
+        def __init__(self, num, stack_size, interrupted):
             """Create the request and reply queues, and 'num' worker threads.
             
             One must specify the stack size of the worker threads. The
             stack size is specified in kilobytes.
             """
-            self.requestQueue = Queue.Queue(0)
-            self.resultsQueue = Queue.Queue(0)
+            self.requestQueue = queue.Queue(0)
+            self.resultsQueue = queue.Queue(0)
 
             try:
                 prev_size = threading.stack_size(stack_size*1024) 
             except AttributeError, e:
                 # Only print a warning if the stack size has been
-                # explicitely set.
-                if hasattr(SCons.Job, 'stack_size'):
+                # explicitly set.
+                if not explicit_stack_size is None:
                     msg = "Setting stack size is unsupported by this version of Python:\n    " + \
                         e.args[0]
                     SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
             except ValueError, e:
-                msg = "Setting stack size failed:\n    " + \
-                    e.message
+                msg = "Setting stack size failed:\n    " + str(e)
                 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
 
             # Create worker threads
             self.workers = []
             for _ in range(num):
-                worker = Worker(self.requestQueue, self.resultsQueue)
+                worker = Worker(self.requestQueue, self.resultsQueue, interrupted)
                 self.workers.append(worker)
 
             # Once we drop Python 1.5 we can change the following to:
@@ -226,16 +300,16 @@ else:
             if 'prev_size' in locals().keys():
                 threading.stack_size(prev_size)
 
-        def put(self, obj):
+        def put(self, task):
             """Put task into request queue."""
-            self.requestQueue.put(obj)
+            self.requestQueue.put(task)
 
-        def get(self, block = True):
+        def get(self):
             """Remove and return a result tuple from the results queue."""
-            return self.resultsQueue.get(block)
+            return self.resultsQueue.get()
 
-        def preparation_failed(self, obj):
-            self.resultsQueue.put((obj, False))
+        def preparation_failed(self, task):
+            self.resultsQueue.put((task, False))
 
         def cleanup(self):
             """
@@ -289,7 +363,8 @@ else:
             multiple tasks simultaneously. """
 
             self.taskmaster = taskmaster
-            self.tp = ThreadPool(num, stack_size)
+            self.interrupted = InterruptState()
+            self.tp = ThreadPool(num, stack_size, self.interrupted)
 
             self.maxjobs = num
 
@@ -301,7 +376,7 @@ else:
 
             jobs = 0
             
-            while 1:
+            while True:
                 # Start up as many available tasks as we're
                 # allowed to.
                 while jobs < self.maxjobs:
@@ -309,34 +384,42 @@ else:
                     if task is None:
                         break
 
-                    # prepare task for execution
                     try:
+                        # prepare task for execution
                         task.prepare()
-                    except KeyboardInterrupt:
-                        raise
                     except:
-                        # Let the failed() callback function arrange
-                        # for the build to stop if that's appropriate.
                         task.exception_set()
-                        self.tp.preparation_failed(task)
-                        jobs = jobs + 1
-                        continue
-
-                    # dispatch task
-                    self.tp.put(task)
-                    jobs = jobs + 1
+                        task.failed()
+                        task.postprocess()
+                    else:
+                        if task.needs_execute():
+                            # dispatch task
+                            self.tp.put(task)
+                            jobs = jobs + 1
+                        else:
+                            task.executed()
+                            task.postprocess()
 
                 if not task and not jobs: break
 
                 # Let any/all completed tasks finish up before we go
                 # back and put the next batch of tasks on the queue.
-                while 1:
+                while True:
                     task, ok = self.tp.get()
-
                     jobs = jobs - 1
+
                     if ok:
                         task.executed()
                     else:
+                        if self.interrupted():
+                            try:
+                                raise SCons.Errors.BuildError(
+                                    task.targets[0], errstr=interrupt_msg)
+                            except:
+                                task.exception_set()
+
+                        # Let the failed() callback function arrange
+                        # for the build to stop if that's appropriate.
                         task.failed()
 
                     task.postprocess()
@@ -344,5 +427,11 @@ else:
                     if self.tp.resultsQueue.empty():
                         break
 
-        def cleanup(self):
             self.tp.cleanup()
+            self.taskmaster.cleanup()
+
+# Local Variables:
+# tab-width:4
+# indent-tabs-mode:nil
+# End:
+# vim: set expandtab tabstop=4 shiftwidth=4: