lock2.unlock()
finally:
shutil.rmtree(tempdir)
+
+ def testAsynchronousLockWaitCancel(self):
+ scheduler = PollScheduler().sched_iface
+ tempdir = tempfile.mkdtemp()
+ try:
+ path = os.path.join(tempdir, 'lock_me')
+ lock1 = AsynchronousLock(path=path, scheduler=scheduler)
+ lock1.start()
+ self.assertEqual(lock1.wait(), os.EX_OK)
+ lock2 = AsynchronousLock(path=path, scheduler=scheduler,
+ _force_async=True, _force_process=True)
+ lock2.start()
+ # lock2 should we waiting for lock1 to release
+ self.assertEqual(lock2.returncode, None)
+
+ # Cancel lock2 and then check wait() and returncode results.
+ lock2.cancel()
+ self.assertEqual(lock2.wait() == os.EX_OK, False)
+ self.assertEqual(lock2.returncode == os.EX_OK, False)
+ lock1.unlock()
+ finally:
+ shutil.rmtree(tempdir)