1 # Copyright 2010-2011 Gentoo Foundation
2 # Distributed under the terms of the GNU General Public License v2
12 import dummy_threading as threading
15 from portage import os
16 from portage.exception import TryAgain
17 from portage.localization import _
18 from portage.locks import lockfile, unlockfile
19 from portage.util import writemsg_level
20 from _emerge.AbstractPollTask import AbstractPollTask
21 from _emerge.AsynchronousTask import AsynchronousTask
22 from _emerge.PollConstants import PollConstants
23 from _emerge.SpawnProcess import SpawnProcess
25 class AsynchronousLock(AsynchronousTask):
27 This uses the portage.locks module to acquire a lock asynchronously,
28 using either a thread (if available) or a subprocess.
30 The default behavior is to use a process instead of a thread, since
31 there is currently no way to interrupt a thread that is waiting for
32 a lock (notably, SIGINT doesn't work because python delivers all
33 signals to the main thread).
36 __slots__ = ('path', 'scheduler',) + \
37 ('_imp', '_force_async', '_force_dummy', '_force_process', \
38 '_force_thread', '_waiting')
40 _use_process_by_default = True
44 if not self._force_async:
46 self._imp = lockfile(self.path,
47 wantnewlockfile=True, flags=os.O_NONBLOCK)
51 self.returncode = os.EX_OK
55 if self._force_process or \
56 (not self._force_thread and \
57 (self._use_process_by_default or threading is dummy_threading)):
58 self._imp = _LockProcess(path=self.path, scheduler=self.scheduler)
60 self._imp = _LockThread(path=self.path,
61 scheduler=self.scheduler,
62 _force_dummy=self._force_dummy)
64 self._imp.addExitListener(self._imp_exit)
67 def _imp_exit(self, imp):
73 if self._imp is not None:
77 if self.returncode is not None:
78 return self.returncode
80 self.returncode = self._imp.wait()
82 return self.returncode
86 raise AssertionError('not locked')
87 if isinstance(self._imp, (_LockProcess, _LockThread)):
93 class _LockThread(AbstractPollTask):
95 This uses the portage.locks module to acquire a lock asynchronously,
96 using a background thread. After the lock is acquired, the thread
97 writes to a pipe in order to notify a poll loop running in the main
100 If the threading module is unavailable then the dummy_threading
101 module will be used, and the lock will be acquired synchronously
102 (before the start() method returns).
105 __slots__ = ('path',) + \
106 ('_files', '_force_dummy', '_lock_obj',
107 '_thread', '_reg_id',)
112 self._files['pipe_read'] = os.fdopen(pr, 'rb', 0)
113 self._files['pipe_write'] = os.fdopen(pw, 'wb', 0)
114 for k, f in self._files.items():
115 fcntl.fcntl(f.fileno(), fcntl.F_SETFL,
116 fcntl.fcntl(f.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK)
117 self._reg_id = self.scheduler.register(self._files['pipe_read'].fileno(),
118 PollConstants.POLLIN, self._output_handler)
119 self._registered = True
120 threading_mod = threading
121 if self._force_dummy:
122 threading_mod = dummy_threading
123 self._thread = threading_mod.Thread(target=self._run_lock)
127 self._lock_obj = lockfile(self.path, wantnewlockfile=True)
128 self._files['pipe_write'].write(b'\0')
130 def _output_handler(self, f, event):
131 buf = self._read_buf(self._files['pipe_read'], event)
134 self.returncode = os.EX_OK
138 # There's currently no way to force thread termination.
142 if self.returncode is not None:
143 return self.returncode
145 self.scheduler.schedule(self._reg_id)
146 return self.returncode
149 if self._lock_obj is None:
150 raise AssertionError('not locked')
151 if self.returncode is None:
152 raise AssertionError('lock not acquired yet')
153 unlockfile(self._lock_obj)
154 self._lock_obj = None
156 def _unregister(self):
157 self._registered = False
159 if self._thread is not None:
163 if self._reg_id is not None:
164 self.scheduler.unregister(self._reg_id)
167 if self._files is not None:
168 for f in self._files.values():
172 class _LockProcess(AbstractPollTask):
174 This uses the portage.locks module to acquire a lock asynchronously,
175 using a subprocess. After the lock is acquired, the process
176 writes to a pipe in order to notify a poll loop running in the main
177 process. The unlock() method notifies the subprocess to release the
181 __slots__ = ('path', 'scheduler',) + \
182 ('_acquired', '_proc', '_files', '_reg_id', '_unlocked')
185 in_pr, in_pw = os.pipe()
186 out_pr, out_pw = os.pipe()
188 self._files['pipe_in'] = os.fdopen(in_pr, 'rb', 0)
189 self._files['pipe_out'] = os.fdopen(out_pw, 'wb', 0)
190 fcntl.fcntl(in_pr, fcntl.F_SETFL,
191 fcntl.fcntl(in_pr, fcntl.F_GETFL) | os.O_NONBLOCK)
192 self._reg_id = self.scheduler.register(in_pr,
193 PollConstants.POLLIN, self._output_handler)
194 self._registered = True
195 self._proc = SpawnProcess(
196 args=[portage._python_interpreter,
197 os.path.join(portage._bin_path, 'lock-helper.py'), self.path],
198 env=dict(os.environ, PORTAGE_PYM_PATH=portage._pym_path),
199 fd_pipes={0:out_pr, 1:in_pw, 2:sys.stderr.fileno()},
200 scheduler=self.scheduler)
201 self._proc.addExitListener(self._proc_exit)
206 def _proc_exit(self, proc):
207 if proc.returncode != os.EX_OK:
208 # Typically, this will happen due to the
209 # process being killed by a signal.
210 if not self._acquired:
211 # If the lock hasn't been aquired yet, the
212 # caller can check the returncode and handle
213 # this failure appropriately.
214 if not self.cancelled:
215 writemsg_level("_LockProcess: %s\n" % \
216 _("failed to acquire lock on '%s'") % (self.path,),
217 level=logging.ERROR, noiselevel=-1)
219 self.returncode = proc.returncode
223 if not self.cancelled and \
225 # We don't want lost locks going unnoticed, so it's
226 # only safe to ignore if either the cancel() or
227 # unlock() methods have been previously called.
228 raise AssertionError("lock process failed with returncode %s" \
229 % (proc.returncode,))
232 if self._proc is not None:
236 if self.returncode is not None:
237 return self.returncode
239 self.scheduler.schedule(self._reg_id)
240 return self.returncode
242 def _output_handler(self, f, event):
243 buf = self._read_buf(self._files['pipe_in'], event)
245 self._acquired = True
247 self.returncode = os.EX_OK
250 def _unregister(self):
251 self._registered = False
253 if self._reg_id is not None:
254 self.scheduler.unregister(self._reg_id)
257 if self._files is not None:
259 pipe_in = self._files.pop('pipe_in')
266 if self._proc is None:
267 raise AssertionError('not locked')
268 if self.returncode is None:
269 raise AssertionError('lock not acquired yet')
270 self._unlocked = True
271 self._files['pipe_out'].write(b'\0')
272 self._files['pipe_out'].close()