7e9fdfec9f432fcb3d4fc8d5f48a9d2a0dbb003f
[portage.git] / pym / portage / tests / locks / test_asynchronous_lock.py
1 # Copyright 2010 Gentoo Foundation
2 # Distributed under the terms of the GNU General Public License v2
3
4 import shutil
5 import tempfile
6
7 from portage import os
8 from portage.tests import TestCase
9 from _emerge.AsynchronousLock import AsynchronousLock
10 from _emerge.PollScheduler import PollScheduler
11
12 class AsynchronousLockTestCase(TestCase):
13
14         def testAsynchronousLock(self):
15                 scheduler = PollScheduler().sched_iface
16                 tempdir = tempfile.mkdtemp()
17                 try:
18                         path = os.path.join(tempdir, 'lock_me')
19                         for force_async in (True, False):
20                                 for force_dummy in (True, False):
21                                         async_lock = AsynchronousLock(path=path,
22                                                 scheduler=scheduler, _force_async=force_async,
23                                                 _force_thread=True,
24                                                 _force_dummy=force_dummy)
25                                         async_lock.start()
26                                         async_lock.wait()
27                                         async_lock.unlock()
28                                         self.assertEqual(async_lock.returncode, os.EX_OK)
29
30                                 async_lock = AsynchronousLock(path=path,
31                                         scheduler=scheduler, _force_async=force_async,
32                                         _force_process=True)
33                                 async_lock.start()
34                                 async_lock.wait()
35                                 async_lock.unlock()
36                                 self.assertEqual(async_lock.returncode, os.EX_OK)
37
38                 finally:
39                         shutil.rmtree(tempdir)