test_asynchronous_lock: test wait and cancel
authorZac Medico <zmedico@gentoo.org>
Tue, 17 May 2011 21:02:12 +0000 (14:02 -0700)
committerZac Medico <zmedico@gentoo.org>
Tue, 17 May 2011 21:02:12 +0000 (14:02 -0700)
pym/portage/tests/locks/test_asynchronous_lock.py

index e72adf684693ec7eb45c3116fc95f65f5b2ed2f3..dc4619dfdbe00c474faec0b3050e290c4670a2db 100644 (file)
@@ -62,3 +62,25 @@ class AsynchronousLockTestCase(TestCase):
                        lock2.unlock()
                finally:
                        shutil.rmtree(tempdir)
+
+       def testAsynchronousLockWaitCancel(self):
+               scheduler = PollScheduler().sched_iface
+               tempdir = tempfile.mkdtemp()
+               try:
+                       path = os.path.join(tempdir, 'lock_me')
+                       lock1 = AsynchronousLock(path=path, scheduler=scheduler)
+                       lock1.start()
+                       self.assertEqual(lock1.wait(), os.EX_OK)
+                       lock2 = AsynchronousLock(path=path, scheduler=scheduler,
+                               _force_async=True, _force_process=True)
+                       lock2.start()
+                       # lock2 should we waiting for lock1 to release
+                       self.assertEqual(lock2.returncode, None)
+
+                       # Cancel lock2 and then check wait() and returncode results.
+                       lock2.cancel()
+                       self.assertEqual(lock2.wait() == os.EX_OK, False)
+                       self.assertEqual(lock2.returncode == os.EX_OK, False)
+                       lock1.unlock()
+               finally:
+                       shutil.rmtree(tempdir)