finally:
shutil.rmtree(tempdir)
+
+ def testAsynchronousLockWait(self):
+ scheduler = PollScheduler().sched_iface
+ tempdir = tempfile.mkdtemp()
+ try:
+ path = os.path.join(tempdir, 'lock_me')
+ lock1 = AsynchronousLock(path=path, scheduler=scheduler)
+ lock1.start()
+ self.assertEqual(lock1.wait(), os.EX_OK)
+
+ # lock2 requires _force_async=True since the portage.locks
+ # module is not designed to work as intended here if the
+ # same process tries to lock the same file more than
+ # one time concurrently.
+ lock2 = AsynchronousLock(path=path, scheduler=scheduler,
+ _force_async=True, _force_process=True)
+ lock2.start()
+ # lock2 should we waiting for lock1 to release
+ self.assertEqual(lock2.returncode, None)
+
+ lock1.unlock()
+ self.assertEqual(lock2.wait(), os.EX_OK)
+ lock2.unlock()
+ finally:
+ shutil.rmtree(tempdir)