AsynchronousLock: use subprocess if no threads
authorZac Medico <zmedico@gentoo.org>
Fri, 22 Oct 2010 01:13:33 +0000 (18:13 -0700)
committerZac Medico <zmedico@gentoo.org>
Wed, 24 Nov 2010 01:37:10 +0000 (17:37 -0800)
bin/lock-helper.py [new file with mode: 0755]
pym/_emerge/AsynchronousLock.py
pym/_emerge/BinpkgFetcher.py
pym/_emerge/EbuildBuildDir.py
pym/portage/__init__.py
pym/portage/dbapi/vartree.py
pym/portage/tests/locks/test_asynchronous_lock.py

diff --git a/bin/lock-helper.py b/bin/lock-helper.py
new file mode 100755 (executable)
index 0000000..5c332b2
--- /dev/null
@@ -0,0 +1,23 @@
+#!/usr/bin/python
+# Copyright 2010 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import sys
+import portage
+
+def main(args):
+
+       if args and sys.hexversion < 0x3000000 and not isinstance(args[0], unicode):
+               for i, x in enumerate(args):
+                       args[i] = portage._unicode_decode(x, errors='strict')
+
+       lock_obj = portage.locks.lockfile(args[0], wantnewlockfile=True)
+       sys.stdout.write('\0')
+       sys.stdout.flush()
+       sys.stdin.read(1)
+       portage.locks.unlockfile(lock_obj)
+       return portage.os.EX_OK
+
+if __name__ == "__main__":
+       rval = main(sys.argv[1:])
+       sys.exit(rval)
index 6eb90b4e4396de434ad7433995352f85338eaa7b..bce81ed5c674fc461f7a4508e23bb6c7b95c521a 100644 (file)
@@ -3,18 +3,76 @@
 
 import dummy_threading
 import fcntl
+import sys
+
 try:
        import threading
 except ImportError:
        import dummy_threading as threading
 
+import portage
 from portage import os
 from portage.exception import TryAgain
 from portage.locks import lockfile, unlockfile
 from _emerge.AbstractPollTask import AbstractPollTask
+from _emerge.AsynchronousTask import AsynchronousTask
 from _emerge.PollConstants import PollConstants
+from _emerge.SpawnProcess import SpawnProcess
+
+class AsynchronousLock(AsynchronousTask):
+       """
+       This uses the portage.locks module to acquire a lock asynchronously,
+       using either a thread (if available) or a subprocess.
+       """
+
+       __slots__ = ('path', 'scheduler',) + \
+               ('_imp', '_force_async', '_force_dummy', '_force_process', \
+               '_force_thread')
+
+       def _start(self):
+
+               if not self._force_async:
+                       try:
+                               self._imp = lockfile(self.path,
+                                       wantnewlockfile=True, flags=os.O_NONBLOCK)
+                       except TryAgain:
+                               pass
+                       else:
+                               self.returncode = os.EX_OK
+                               self.wait()
+                               return
+
+               if self._force_process or \
+                       (not self._force_thread and threading is dummy_threading):
+                       self._imp = _LockProcess(path=self.path, scheduler=self.scheduler)
+               else:
+                       self._imp = _LockThread(path=self.path,
+                               scheduler=self.scheduler,
+                               _force_dummy=self._force_dummy)
+
+               self._imp.addExitListener(self._imp_exit)
+               self._imp.start()
+
+       def _imp_exit(self, imp):
+               # call exit listeners
+               self.wait()
+
+       def _wait(self):
+               if self.returncode is not None:
+                       return self.returncode
+               self.returncode = self._imp._wait()
+               return self.returncode
+
+       def unlock(self):
+               if self._imp is None:
+                       raise AssertionError('not locked')
+               if isinstance(self._imp, (_LockProcess, _LockThread)):
+                       self._imp.unlock()
+               else:
+                       unlockfile(self._imp)
+               self._imp = None
 
-class AsynchronousLock(AbstractPollTask):
+class _LockThread(AbstractPollTask):
        """
        This uses the portage.locks module to acquire a lock asynchronously,
        using a background thread. After the lock is acquired, the thread
@@ -27,25 +85,10 @@ class AsynchronousLock(AbstractPollTask):
        """
 
        __slots__ = ('lock_obj', 'path',) + \
-               ('_files', '_force_thread', '_force_dummy',
+               ('_files', '_force_dummy',
                '_thread', '_reg_id',)
 
        def _start(self):
-
-               if self._force_thread:
-                       self._start_thread()
-                       return
-
-               try:
-                       self.lock_obj = lockfile(self.path,
-                               wantnewlockfile=True, flags=os.O_NONBLOCK)
-               except TryAgain:
-                       self._start_thread()
-               else:
-                       self.returncode = os.EX_OK
-                       self.wait()
-
-       def _start_thread(self):
                pr, pw = os.pipe()
                self._files = {}
                self._files['pipe_read'] = os.fdopen(pr, 'rb', 0)
@@ -101,3 +144,81 @@ class AsynchronousLock(AbstractPollTask):
                        for f in self._files.values():
                                f.close()
                        self._files = None
+
+class _LockProcess(AbstractPollTask):
+       """
+       This uses the portage.locks module to acquire a lock asynchronously,
+       using a subprocess. After the lock is acquired, the process
+       writes to a pipe in order to notify a poll loop running in the main
+       process. The unlock() method notifies the subprocess to release the
+       lock and exit.
+       """
+
+       __slots__ = ('path', 'scheduler',) + \
+               ('_proc', '_files', '_reg_id')
+
+       def _start(self):
+               in_pr, in_pw = os.pipe()
+               out_pr, out_pw = os.pipe()
+               self._files = {}
+               self._files['pipe_in'] = os.fdopen(in_pr, 'rb', 0)
+               self._files['pipe_out'] = os.fdopen(out_pw, 'wb', 0)
+               fcntl.fcntl(in_pr, fcntl.F_SETFL,
+                       fcntl.fcntl(in_pr, fcntl.F_GETFL) | os.O_NONBLOCK)
+               self._reg_id = self.scheduler.register(in_pr,
+                       PollConstants.POLLIN, self._output_handler)
+               self._registered = True
+               self._proc = SpawnProcess(
+                       args=[portage._python_interpreter,
+                               os.path.join(portage._bin_path, 'lock-helper.py'), self.path],
+                               env=os.environ,
+                               fd_pipes={0:out_pr, 1:in_pw, 2:sys.stderr.fileno()},
+                               scheduler=self.scheduler)
+               self._proc.addExitListener(self._proc_exit)
+               self._proc.start()
+               os.close(out_pr)
+               os.close(in_pw)
+
+       def _proc_exit(self, proc):
+               if proc.returncode != os.EX_OK:
+                       # There's no good reason for locks to fail.
+                       raise AssertionError('lock process failed with returncode %s' \
+                               % (proc.returncode,))
+
+       def _wait(self):
+               if self.returncode is not None:
+                       return self.returncode
+               if self._registered:
+                       self.scheduler.schedule(self._reg_id)
+               return self.returncode
+
+       def _output_handler(self, f, event):
+               buf = self._read_buf(self._files['pipe_in'], event)
+               if buf:
+                       self._unregister()
+                       self.returncode = os.EX_OK
+                       self.wait()
+
+       def _unregister(self):
+               self._registered = False
+
+               if self._reg_id is not None:
+                       self.scheduler.unregister(self._reg_id)
+                       self._reg_id = None
+
+               if self._files is not None:
+                       try:
+                               pipe_in = self._files.pop('pipe_in')
+                       except KeyError:
+                               pass
+                       else:
+                               pipe_in.close()
+
+       def unlock(self):
+               if self._proc is None:
+                       raise AssertionError('not locked')
+               self._files['pipe_out'].write(b'\0')
+               self._files['pipe_out'].close()
+               self._files = None
+               self._proc.wait()
+               self._proc = None
index 6fbce97dc40e6dbbf848d1b419a7c753467427ef..9876cf444ae4811f0d21e2b52877d403ff65d410 100644 (file)
@@ -144,7 +144,7 @@ class BinpkgFetcher(SpawnProcess):
                        scheduler=self.scheduler)
                async_lock.start()
                async_lock.wait()
-               self._lock_obj = async_lock.lock_obj
+               self._lock_obj = async_lock
                self.locked = True
 
        class AlreadyLocked(portage.exception.PortageException):
@@ -153,7 +153,7 @@ class BinpkgFetcher(SpawnProcess):
        def unlock(self):
                if self._lock_obj is None:
                        return
-               portage.locks.unlockfile(self._lock_obj)
+               self._lock_obj.unlock()
                self._lock_obj = None
                self.locked = False
 
index c901fe667b1416435b4e1a24e5780aa773a12985..1da3c93ae2c755aa3b6fff3b4a74a1c9b77c15ab 100644 (file)
@@ -55,7 +55,7 @@ class EbuildBuildDir(SlotObject):
                                scheduler=self.scheduler)
                        builddir_lock.start()
                        builddir_lock.wait()
-                       self._lock_obj = builddir_lock.lock_obj
+                       self._lock_obj = builddir_lock
                        self.settings['PORTAGE_BUILDIR_LOCKED'] = '1'
                finally:
                        self.locked = self._lock_obj is not None
@@ -79,7 +79,7 @@ class EbuildBuildDir(SlotObject):
                if self._lock_obj is None:
                        return
 
-               portage.locks.unlockdir(self._lock_obj)
+               self._lock_obj.unlock()
                self._lock_obj = None
                self.locked = False
                self.settings.pop('PORTAGE_BUILDIR_LOCKED', None)
index 4d400999abd8ac78fe8c40d308b48afe0dd425f4..d302b52427f58060887bb7445fdde4438c7a880e 100644 (file)
@@ -327,6 +327,7 @@ except (ImportError, OSError) as e:
 # ===========================================================================
 
 _python_interpreter = os.path.realpath(sys.executable)
+_bin_path = PORTAGE_BIN_PATH
 
 def _ensure_default_encoding():
 
index f756b70e09e76b339c90249fe6665fe0fa8b4696..d14d06b7a30c3cdce2f6be4b5e92be9112ada43d 100644 (file)
@@ -1299,11 +1299,14 @@ class dblink(object):
                                scheduler=self._scheduler)
                        async_lock.start()
                        async_lock.wait()
-                       self._lock_vdb = async_lock.lock_obj
+                       self._lock_vdb = async_lock
 
        def unlockdb(self):
-               if self._lock_vdb:
-                       unlockdir(self._lock_vdb)
+               if self._lock_vdb is not None:
+                       if isinstance(self._lock_vdb, AsynchronousLock):
+                               self._lock_vdb.unlock()
+                       else:
+                               unlockdir(self._lock_vdb)
                        self._lock_vdb = None
 
        def getpath(self):
@@ -3824,6 +3827,7 @@ class dblink(object):
                                settings.backup_changes(var_name)
                                shutil.copytree(var_orig, var_new, symlinks=True)
                                os.chmod(var_new, dir_perms)
+                       portage._bin_path = settings['PORTAGE_BIN_PATH']
                        os.chmod(base_path_tmp, dir_perms)
                        # This serves so pre-load the modules.
                        _preload_elog_modules(self.settings)
index ac38462ed38b3bb9e4cbebf1502b8c8fd81e8563..7e9fdfec9f432fcb3d4fc8d5f48a9d2a0dbb003f 100644 (file)
@@ -16,14 +16,24 @@ class AsynchronousLockTestCase(TestCase):
                tempdir = tempfile.mkdtemp()
                try:
                        path = os.path.join(tempdir, 'lock_me')
-                       for force_thread in (True, False):
+                       for force_async in (True, False):
                                for force_dummy in (True, False):
                                        async_lock = AsynchronousLock(path=path,
-                                               scheduler=scheduler, _force_dummy=force_dummy,
-                                               _force_thread=force_thread)
+                                               scheduler=scheduler, _force_async=force_async,
+                                               _force_thread=True,
+                                               _force_dummy=force_dummy)
                                        async_lock.start()
                                        async_lock.wait()
                                        async_lock.unlock()
                                        self.assertEqual(async_lock.returncode, os.EX_OK)
+
+                               async_lock = AsynchronousLock(path=path,
+                                       scheduler=scheduler, _force_async=force_async,
+                                       _force_process=True)
+                               async_lock.start()
+                               async_lock.wait()
+                               async_lock.unlock()
+                               self.assertEqual(async_lock.returncode, os.EX_OK)
+
                finally:
                        shutil.rmtree(tempdir)