1135df744a6c3d3b5f92914428a8a04319e7a1fd
[portage.git] / pym / _emerge / AsynchronousLock.py
1 # Copyright 2010-2011 Gentoo Foundation
2 # Distributed under the terms of the GNU General Public License v2
3
4 import dummy_threading
5 import fcntl
6 import logging
7 import sys
8
9 try:
10         import threading
11 except ImportError:
12         import dummy_threading as threading
13
14 import portage
15 from portage import os
16 from portage.exception import TryAgain
17 from portage.localization import _
18 from portage.locks import lockfile, unlockfile
19 from portage.util import writemsg_level
20 from _emerge.AbstractPollTask import AbstractPollTask
21 from _emerge.AsynchronousTask import AsynchronousTask
22 from _emerge.PollConstants import PollConstants
23 from _emerge.SpawnProcess import SpawnProcess
24
25 class AsynchronousLock(AsynchronousTask):
26         """
27         This uses the portage.locks module to acquire a lock asynchronously,
28         using either a thread (if available) or a subprocess.
29
30         The default behavior is to use a process instead of a thread, since
31         there is currently no way to interrupt a thread that is waiting for
32         a lock (notably, SIGINT doesn't work because python delivers all
33         signals to the main thread).
34         """
35
36         __slots__ = ('path', 'scheduler',) + \
37                 ('_imp', '_force_async', '_force_dummy', '_force_process', \
38                 '_force_thread', '_waiting')
39
40         _use_process_by_default = True
41
42         def _start(self):
43
44                 if not self._force_async:
45                         try:
46                                 self._imp = lockfile(self.path,
47                                         wantnewlockfile=True, flags=os.O_NONBLOCK)
48                         except TryAgain:
49                                 pass
50                         else:
51                                 self.returncode = os.EX_OK
52                                 self.wait()
53                                 return
54
55                 if self._force_process or \
56                         (not self._force_thread and \
57                         (self._use_process_by_default or threading is dummy_threading)):
58                         self._imp = _LockProcess(path=self.path, scheduler=self.scheduler)
59                 else:
60                         self._imp = _LockThread(path=self.path,
61                                 scheduler=self.scheduler,
62                                 _force_dummy=self._force_dummy)
63
64                 self._imp.addExitListener(self._imp_exit)
65                 self._imp.start()
66
67         def _imp_exit(self, imp):
68                 # call exit listeners
69                 if not self._waiting:
70                         self.wait()
71
72         def _cancel(self):
73                 if self._imp is not None:
74                         self._imp.cancel()
75
76         def _wait(self):
77                 if self.returncode is not None:
78                         return self.returncode
79                 self._waiting = True
80                 self.returncode = self._imp.wait()
81                 self._waiting = False
82                 return self.returncode
83
84         def unlock(self):
85                 if self._imp is None:
86                         raise AssertionError('not locked')
87                 if isinstance(self._imp, (_LockProcess, _LockThread)):
88                         self._imp.unlock()
89                 else:
90                         unlockfile(self._imp)
91                 self._imp = None
92
93 class _LockThread(AbstractPollTask):
94         """
95         This uses the portage.locks module to acquire a lock asynchronously,
96         using a background thread. After the lock is acquired, the thread
97         writes to a pipe in order to notify a poll loop running in the main
98         thread.
99
100         If the threading module is unavailable then the dummy_threading
101         module will be used, and the lock will be acquired synchronously
102         (before the start() method returns).
103         """
104
105         __slots__ = ('path',) + \
106                 ('_files', '_force_dummy', '_lock_obj',
107                 '_thread', '_reg_id',)
108
109         def _start(self):
110                 pr, pw = os.pipe()
111                 self._files = {}
112                 self._files['pipe_read'] = os.fdopen(pr, 'rb', 0)
113                 self._files['pipe_write'] = os.fdopen(pw, 'wb', 0)
114                 for k, f in self._files.items():
115                         fcntl.fcntl(f.fileno(), fcntl.F_SETFL,
116                                 fcntl.fcntl(f.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK)
117                 self._reg_id = self.scheduler.register(self._files['pipe_read'].fileno(),
118                         PollConstants.POLLIN, self._output_handler)
119                 self._registered = True
120                 threading_mod = threading
121                 if self._force_dummy:
122                         threading_mod = dummy_threading
123                 self._thread = threading_mod.Thread(target=self._run_lock)
124                 self._thread.start()
125
126         def _run_lock(self):
127                 self._lock_obj = lockfile(self.path, wantnewlockfile=True)
128                 self._files['pipe_write'].write(b'\0')
129
130         def _output_handler(self, f, event):
131                 buf = self._read_buf(self._files['pipe_read'], event)
132                 if buf:
133                         self._unregister()
134                         self.returncode = os.EX_OK
135                         self.wait()
136
137         def _cancel(self):
138                 # There's currently no way to force thread termination.
139                 pass
140
141         def _wait(self):
142                 if self.returncode is not None:
143                         return self.returncode
144                 if self._registered:
145                         self.scheduler.schedule(self._reg_id)
146                 return self.returncode
147
148         def unlock(self):
149                 if self._lock_obj is None:
150                         raise AssertionError('not locked')
151                 if self.returncode is None:
152                         raise AssertionError('lock not acquired yet')
153                 unlockfile(self._lock_obj)
154                 self._lock_obj = None
155
156         def _unregister(self):
157                 self._registered = False
158
159                 if self._thread is not None:
160                         self._thread.join()
161                         self._thread = None
162
163                 if self._reg_id is not None:
164                         self.scheduler.unregister(self._reg_id)
165                         self._reg_id = None
166
167                 if self._files is not None:
168                         for f in self._files.values():
169                                 f.close()
170                         self._files = None
171
172 class _LockProcess(AbstractPollTask):
173         """
174         This uses the portage.locks module to acquire a lock asynchronously,
175         using a subprocess. After the lock is acquired, the process
176         writes to a pipe in order to notify a poll loop running in the main
177         process. The unlock() method notifies the subprocess to release the
178         lock and exit.
179         """
180
181         __slots__ = ('path', 'scheduler',) + \
182                 ('_acquired', '_proc', '_files', '_reg_id', '_unlocked')
183
184         def _start(self):
185                 in_pr, in_pw = os.pipe()
186                 out_pr, out_pw = os.pipe()
187                 self._files = {}
188                 self._files['pipe_in'] = os.fdopen(in_pr, 'rb', 0)
189                 self._files['pipe_out'] = os.fdopen(out_pw, 'wb', 0)
190                 fcntl.fcntl(in_pr, fcntl.F_SETFL,
191                         fcntl.fcntl(in_pr, fcntl.F_GETFL) | os.O_NONBLOCK)
192                 self._reg_id = self.scheduler.register(in_pr,
193                         PollConstants.POLLIN, self._output_handler)
194                 self._registered = True
195                 self._proc = SpawnProcess(
196                         args=[portage._python_interpreter,
197                                 os.path.join(portage._bin_path, 'lock-helper.py'), self.path],
198                                 env=dict(os.environ, PORTAGE_PYM_PATH=portage._pym_path),
199                                 fd_pipes={0:out_pr, 1:in_pw, 2:sys.stderr.fileno()},
200                                 scheduler=self.scheduler)
201                 self._proc.addExitListener(self._proc_exit)
202                 self._proc.start()
203                 os.close(out_pr)
204                 os.close(in_pw)
205
206         def _proc_exit(self, proc):
207                 if proc.returncode != os.EX_OK:
208                         # Typically, this will happen due to the
209                         # process being killed by a signal.
210                         if not self._acquired:
211                                 # If the lock hasn't been aquired yet, the
212                                 # caller can check the returncode and handle
213                                 # this failure appropriately.
214                                 if not self.cancelled:
215                                         writemsg_level("_LockProcess: %s\n" % \
216                                                 _("failed to acquire lock on '%s'") % (self.path,),
217                                                 level=logging.ERROR, noiselevel=-1)
218                                 self._unregister()
219                                 self.returncode = proc.returncode
220                                 self.wait()
221                                 return
222
223                         if not self.cancelled and \
224                                 not self._unlocked:
225                                 # We don't want lost locks going unnoticed, so it's
226                                 # only safe to ignore if either the cancel() or
227                                 # unlock() methods have been previously called.
228                                 raise AssertionError("lock process failed with returncode %s" \
229                                         % (proc.returncode,))
230
231         def _cancel(self):
232                 if self._proc is not None:
233                         self._proc.cancel()
234
235         def _wait(self):
236                 if self.returncode is not None:
237                         return self.returncode
238                 if self._registered:
239                         self.scheduler.schedule(self._reg_id)
240                 return self.returncode
241
242         def _output_handler(self, f, event):
243                 buf = self._read_buf(self._files['pipe_in'], event)
244                 if buf:
245                         self._acquired = True
246                         self._unregister()
247                         self.returncode = os.EX_OK
248                         self.wait()
249
250         def _unregister(self):
251                 self._registered = False
252
253                 if self._reg_id is not None:
254                         self.scheduler.unregister(self._reg_id)
255                         self._reg_id = None
256
257                 if self._files is not None:
258                         try:
259                                 pipe_in = self._files.pop('pipe_in')
260                         except KeyError:
261                                 pass
262                         else:
263                                 pipe_in.close()
264
265         def unlock(self):
266                 if self._proc is None:
267                         raise AssertionError('not locked')
268                 if self.returncode is None:
269                         raise AssertionError('lock not acquired yet')
270                 self._unlocked = True
271                 self._files['pipe_out'].write(b'\0')
272                 self._files['pipe_out'].close()
273                 self._files = None
274                 self._proc.wait()
275                 self._proc = None