1 # Copyright 2010-2012 Gentoo Foundation
2 # Distributed under the terms of the GNU General Public License v2
13 threading = dummy_threading
16 from portage import os
17 from portage.exception import TryAgain
18 from portage.localization import _
19 from portage.locks import lockfile, unlockfile
20 from portage.util import writemsg_level
21 from _emerge.AbstractPollTask import AbstractPollTask
22 from _emerge.AsynchronousTask import AsynchronousTask
23 from _emerge.PollConstants import PollConstants
24 from _emerge.SpawnProcess import SpawnProcess
26 class AsynchronousLock(AsynchronousTask):
28 This uses the portage.locks module to acquire a lock asynchronously,
29 using either a thread (if available) or a subprocess.
31 The default behavior is to use a process instead of a thread, since
32 there is currently no way to interrupt a thread that is waiting for
33 a lock (notably, SIGINT doesn't work because python delivers all
34 signals to the main thread).
37 __slots__ = ('path', 'scheduler',) + \
38 ('_imp', '_force_async', '_force_dummy', '_force_process', \
41 _use_process_by_default = True
45 if not self._force_async:
47 self._imp = lockfile(self.path,
48 wantnewlockfile=True, flags=os.O_NONBLOCK)
52 self.returncode = os.EX_OK
56 if self._force_process or \
57 (not self._force_thread and \
58 (self._use_process_by_default or threading is dummy_threading)):
59 self._imp = _LockProcess(path=self.path, scheduler=self.scheduler)
61 self._imp = _LockThread(path=self.path,
62 scheduler=self.scheduler,
63 _force_dummy=self._force_dummy)
65 self._imp.addExitListener(self._imp_exit)
68 def _imp_exit(self, imp):
73 if isinstance(self._imp, AsynchronousTask):
77 if isinstance(self._imp, AsynchronousTask):
79 return self.returncode
82 if self.returncode is not None:
83 return self.returncode
84 self.returncode = self._imp.wait()
85 return self.returncode
89 raise AssertionError('not locked')
90 if isinstance(self._imp, (_LockProcess, _LockThread)):
96 class _LockThread(AbstractPollTask):
98 This uses the portage.locks module to acquire a lock asynchronously,
99 using a background thread. After the lock is acquired, the thread
100 writes to a pipe in order to notify a poll loop running in the main
103 If the threading module is unavailable then the dummy_threading
104 module will be used, and the lock will be acquired synchronously
105 (before the start() method returns).
108 __slots__ = ('path',) + \
109 ('_files', '_force_dummy', '_lock_obj',
110 '_thread', '_reg_id',)
115 self._files['pipe_read'] = pr
116 self._files['pipe_write'] = pw
117 for f in self._files.values():
118 fcntl.fcntl(f, fcntl.F_SETFL,
119 fcntl.fcntl(f, fcntl.F_GETFL) | os.O_NONBLOCK)
120 self._reg_id = self.scheduler.register(self._files['pipe_read'],
121 PollConstants.POLLIN, self._output_handler)
122 self._registered = True
123 threading_mod = threading
124 if self._force_dummy:
125 threading_mod = dummy_threading
126 self._thread = threading_mod.Thread(target=self._run_lock)
130 self._lock_obj = lockfile(self.path, wantnewlockfile=True)
131 os.write(self._files['pipe_write'], b'\0')
133 def _output_handler(self, f, event):
135 if event & PollConstants.POLLIN:
137 buf = os.read(self._files['pipe_read'], self._bufsize)
139 if e.errno not in (errno.EAGAIN,):
143 self.returncode = os.EX_OK
149 # There's currently no way to force thread termination.
153 if self.returncode is not None:
154 return self.returncode
155 while self._registered:
156 self.scheduler.iteration()
157 return self.returncode
160 if self._lock_obj is None:
161 raise AssertionError('not locked')
162 if self.returncode is None:
163 raise AssertionError('lock not acquired yet')
164 unlockfile(self._lock_obj)
165 self._lock_obj = None
167 def _unregister(self):
168 self._registered = False
170 if self._thread is not None:
174 if self._reg_id is not None:
175 self.scheduler.unregister(self._reg_id)
178 if self._files is not None:
179 for f in self._files.values():
183 class _LockProcess(AbstractPollTask):
185 This uses the portage.locks module to acquire a lock asynchronously,
186 using a subprocess. After the lock is acquired, the process
187 writes to a pipe in order to notify a poll loop running in the main
188 process. The unlock() method notifies the subprocess to release the
192 __slots__ = ('path',) + \
193 ('_acquired', '_kill_test', '_proc', '_files', '_reg_id', '_unlocked')
196 in_pr, in_pw = os.pipe()
197 out_pr, out_pw = os.pipe()
199 self._files['pipe_in'] = in_pr
200 self._files['pipe_out'] = out_pw
201 fcntl.fcntl(in_pr, fcntl.F_SETFL,
202 fcntl.fcntl(in_pr, fcntl.F_GETFL) | os.O_NONBLOCK)
203 self._reg_id = self.scheduler.register(in_pr,
204 PollConstants.POLLIN, self._output_handler)
205 self._registered = True
206 self._proc = SpawnProcess(
207 args=[portage._python_interpreter,
208 os.path.join(portage._bin_path, 'lock-helper.py'), self.path],
209 env=dict(os.environ, PORTAGE_PYM_PATH=portage._pym_path),
210 fd_pipes={0:out_pr, 1:in_pw, 2:sys.stderr.fileno()},
211 scheduler=self.scheduler)
212 self._proc.addExitListener(self._proc_exit)
217 def _proc_exit(self, proc):
219 if self._files is not None:
220 # Close pipe_out if it's still open, since it's useless
221 # after the process has exited. This helps to avoid
222 # "ResourceWarning: unclosed file" since Python 3.2.
224 pipe_out = self._files.pop('pipe_out')
230 if proc.returncode != os.EX_OK:
231 # Typically, this will happen due to the
232 # process being killed by a signal.
234 if not self._acquired:
235 # If the lock hasn't been aquired yet, the
236 # caller can check the returncode and handle
237 # this failure appropriately.
238 if not (self.cancelled or self._kill_test):
239 writemsg_level("_LockProcess: %s\n" % \
240 _("failed to acquire lock on '%s'") % (self.path,),
241 level=logging.ERROR, noiselevel=-1)
243 self.returncode = proc.returncode
247 if not self.cancelled and \
249 # We don't want lost locks going unnoticed, so it's
250 # only safe to ignore if either the cancel() or
251 # unlock() methods have been previously called.
252 raise AssertionError("lock process failed with returncode %s" \
253 % (proc.returncode,))
256 if self._proc is not None:
260 if self._proc is not None:
262 return self.returncode
265 if self.returncode is not None:
266 return self.returncode
267 while self._registered:
268 self.scheduler.iteration()
269 return self.returncode
271 def _output_handler(self, f, event):
273 if event & PollConstants.POLLIN:
275 buf = os.read(self._files['pipe_in'], self._bufsize)
277 if e.errno not in (errno.EAGAIN,):
280 self._acquired = True
282 self.returncode = os.EX_OK
287 def _unregister(self):
288 self._registered = False
290 if self._reg_id is not None:
291 self.scheduler.unregister(self._reg_id)
294 if self._files is not None:
296 pipe_in = self._files.pop('pipe_in')
303 if self._proc is None:
304 raise AssertionError('not locked')
305 if self.returncode is None:
306 raise AssertionError('lock not acquired yet')
307 if self.returncode != os.EX_OK:
308 raise AssertionError("lock process failed with returncode %s" \
309 % (self.returncode,))
310 self._unlocked = True
311 os.write(self._files['pipe_out'], b'\0')
312 os.close(self._files['pipe_out'])