Add a new AsynchronousLock class that uses the portage.locks module to
authorZac Medico <zmedico@gentoo.org>
Tue, 14 Sep 2010 02:25:58 +0000 (19:25 -0700)
committerZac Medico <zmedico@gentoo.org>
Tue, 14 Sep 2010 02:25:58 +0000 (19:25 -0700)
acquire a lock asynchronously, using a background thread.

pym/_emerge/AsynchronousLock.py [new file with mode: 0644]
pym/portage/tests/locks/__init__.py [new file with mode: 0644]
pym/portage/tests/locks/__test__ [new file with mode: 0644]
pym/portage/tests/locks/test_asynchronous_lock.py [new file with mode: 0644]

diff --git a/pym/_emerge/AsynchronousLock.py b/pym/_emerge/AsynchronousLock.py
new file mode 100644 (file)
index 0000000..6eb90b4
--- /dev/null
@@ -0,0 +1,103 @@
+# Copyright 2010 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import dummy_threading
+import fcntl
+try:
+       import threading
+except ImportError:
+       import dummy_threading as threading
+
+from portage import os
+from portage.exception import TryAgain
+from portage.locks import lockfile, unlockfile
+from _emerge.AbstractPollTask import AbstractPollTask
+from _emerge.PollConstants import PollConstants
+
+class AsynchronousLock(AbstractPollTask):
+       """
+       This uses the portage.locks module to acquire a lock asynchronously,
+       using a background thread. After the lock is acquired, the thread
+       writes to a pipe in order to notify a poll loop running in the main
+       thread.
+
+       If the threading module is unavailable then the dummy_threading
+       module will be used, and the lock will be acquired synchronously
+       (before the start() method returns).
+       """
+
+       __slots__ = ('lock_obj', 'path',) + \
+               ('_files', '_force_thread', '_force_dummy',
+               '_thread', '_reg_id',)
+
+       def _start(self):
+
+               if self._force_thread:
+                       self._start_thread()
+                       return
+
+               try:
+                       self.lock_obj = lockfile(self.path,
+                               wantnewlockfile=True, flags=os.O_NONBLOCK)
+               except TryAgain:
+                       self._start_thread()
+               else:
+                       self.returncode = os.EX_OK
+                       self.wait()
+
+       def _start_thread(self):
+               pr, pw = os.pipe()
+               self._files = {}
+               self._files['pipe_read'] = os.fdopen(pr, 'rb', 0)
+               self._files['pipe_write'] = os.fdopen(pw, 'wb', 0)
+               for k, f in self._files.items():
+                       fcntl.fcntl(f.fileno(), fcntl.F_SETFL,
+                               fcntl.fcntl(f.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK)
+               self._reg_id = self.scheduler.register(self._files['pipe_read'].fileno(),
+                       PollConstants.POLLIN, self._output_handler)
+               self._registered = True
+               threading_mod = threading
+               if self._force_dummy:
+                       threading_mod = dummy_threading
+               self._thread = threading_mod.Thread(target=self._run_lock)
+               self._thread.start()
+
+       def _run_lock(self):
+               self.lock_obj = lockfile(self.path, wantnewlockfile=True)
+               self._files['pipe_write'].write(b'\0')
+
+       def _output_handler(self, f, event):
+               buf = self._read_buf(self._files['pipe_read'], event)
+               if buf:
+                       self._unregister()
+                       self.returncode = os.EX_OK
+                       self.wait()
+
+       def _wait(self):
+               if self.returncode is not None:
+                       return self.returncode
+               if self._registered:
+                       self.scheduler.schedule(self._reg_id)
+               return self.returncode
+
+       def unlock(self):
+               if self.lock_obj is None:
+                       raise AssertionError('not locked')
+               unlockfile(self.lock_obj)
+               self.lock_obj = None
+
+       def _unregister(self):
+               self._registered = False
+
+               if self._thread is not None:
+                       self._thread.join()
+                       self._thread = None
+
+               if self._reg_id is not None:
+                       self.scheduler.unregister(self._reg_id)
+                       self._reg_id = None
+
+               if self._files is not None:
+                       for f in self._files.values():
+                               f.close()
+                       self._files = None
diff --git a/pym/portage/tests/locks/__init__.py b/pym/portage/tests/locks/__init__.py
new file mode 100644 (file)
index 0000000..21a391a
--- /dev/null
@@ -0,0 +1,2 @@
+# Copyright 2010 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
diff --git a/pym/portage/tests/locks/__test__ b/pym/portage/tests/locks/__test__
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/pym/portage/tests/locks/test_asynchronous_lock.py b/pym/portage/tests/locks/test_asynchronous_lock.py
new file mode 100644 (file)
index 0000000..ac38462
--- /dev/null
@@ -0,0 +1,29 @@
+# Copyright 2010 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import shutil
+import tempfile
+
+from portage import os
+from portage.tests import TestCase
+from _emerge.AsynchronousLock import AsynchronousLock
+from _emerge.PollScheduler import PollScheduler
+
+class AsynchronousLockTestCase(TestCase):
+
+       def testAsynchronousLock(self):
+               scheduler = PollScheduler().sched_iface
+               tempdir = tempfile.mkdtemp()
+               try:
+                       path = os.path.join(tempdir, 'lock_me')
+                       for force_thread in (True, False):
+                               for force_dummy in (True, False):
+                                       async_lock = AsynchronousLock(path=path,
+                                               scheduler=scheduler, _force_dummy=force_dummy,
+                                               _force_thread=force_thread)
+                                       async_lock.start()
+                                       async_lock.wait()
+                                       async_lock.unlock()
+                                       self.assertEqual(async_lock.returncode, os.EX_OK)
+               finally:
+                       shutil.rmtree(tempdir)