Add a test case for intentionally short timeout with QueueScheduler.run().
authorZac Medico <zmedico@gentoo.org>
Fri, 3 Sep 2010 22:39:15 +0000 (15:39 -0700)
committerZac Medico <zmedico@gentoo.org>
Fri, 3 Sep 2010 22:39:15 +0000 (15:39 -0700)
pym/portage/tests/ebuild/test_ipc_daemon.py

index dee61a2d2ab173c0d9a217f3c8c16ce1bab4b919..3298040a8063e4c9e219fd174e665f2a301b0b8d 100644 (file)
@@ -41,8 +41,8 @@ class IpcDaemonTestCase(TestCase):
                        output_fifo = os.path.join(tmpdir, '.ipc_out')
                        os.mkfifo(input_fifo)
                        os.mkfifo(output_fifo)
+                       task_scheduler = TaskScheduler(max_jobs=2)
                        for exitcode in (0, 1, 2):
-                               task_scheduler = TaskScheduler(max_jobs=2)
                                exit_command = ExitCommand()
                                commands = {'exit' : exit_command}
                                daemon = EbuildIpcDaemon(commands=commands,
@@ -71,5 +71,39 @@ class IpcDaemonTestCase(TestCase):
                                        "command not received after %d seconds" % \
                                        (time.time() - start_time,))
                                self.assertEqual(exit_command.exitcode, exitcode)
+
+                       # Intentionally short timeout test for QueueScheduler.run()
+                       sleep_time_s = 10      # 10.000 seconds
+                       short_timeout_ms = 10  #  0.010 seconds
+
+                       for i in range(3):
+                               exit_command = ExitCommand()
+                               commands = {'exit' : exit_command}
+                               daemon = EbuildIpcDaemon(commands=commands,
+                                       input_fifo=input_fifo,
+                                       output_fifo=output_fifo,
+                                       scheduler=task_scheduler.sched_iface)
+                               proc = SpawnProcess(
+                                       args=[BASH_BINARY, "-c", 'exec sleep %d' % sleep_time_s],
+                                       env=env, scheduler=task_scheduler.sched_iface)
+
+                               self.received_command = False
+                               def exit_command_callback():
+                                       self.received_command = True
+                                       proc.cancel()
+                                       daemon.cancel()
+
+                               exit_command.reply_hook = exit_command_callback
+                               task_scheduler.add(daemon)
+                               task_scheduler.add(proc)
+                               start_time = time.time()
+                               task_scheduler.run(timeout=short_timeout_ms)
+                               task_scheduler.clear()
+
+                               self.assertEqual(self.received_command, False,
+                                       "command received after %d seconds" % \
+                                       (time.time() - start_time,))
+                               self.assertEqual(proc.returncode == os.EX_OK, False)
+
                finally:
                        shutil.rmtree(tmpdir)