Use PollScheduler iteration method.
[portage.git] / pym / _emerge / AsynchronousLock.py
1 # Copyright 2010-2012 Gentoo Foundation
2 # Distributed under the terms of the GNU General Public License v2
3
4 import dummy_threading
5 import fcntl
6 import errno
7 import logging
8 import sys
9
10 try:
11         import threading
12 except ImportError:
13         threading = dummy_threading
14
15 import portage
16 from portage import os
17 from portage.exception import TryAgain
18 from portage.localization import _
19 from portage.locks import lockfile, unlockfile
20 from portage.util import writemsg_level
21 from _emerge.AbstractPollTask import AbstractPollTask
22 from _emerge.AsynchronousTask import AsynchronousTask
23 from _emerge.PollConstants import PollConstants
24 from _emerge.SpawnProcess import SpawnProcess
25
26 class AsynchronousLock(AsynchronousTask):
27         """
28         This uses the portage.locks module to acquire a lock asynchronously,
29         using either a thread (if available) or a subprocess.
30
31         The default behavior is to use a process instead of a thread, since
32         there is currently no way to interrupt a thread that is waiting for
33         a lock (notably, SIGINT doesn't work because python delivers all
34         signals to the main thread).
35         """
36
37         __slots__ = ('path', 'scheduler',) + \
38                 ('_imp', '_force_async', '_force_dummy', '_force_process', \
39                 '_force_thread')
40
41         _use_process_by_default = True
42
43         def _start(self):
44
45                 if not self._force_async:
46                         try:
47                                 self._imp = lockfile(self.path,
48                                         wantnewlockfile=True, flags=os.O_NONBLOCK)
49                         except TryAgain:
50                                 pass
51                         else:
52                                 self.returncode = os.EX_OK
53                                 self.wait()
54                                 return
55
56                 if self._force_process or \
57                         (not self._force_thread and \
58                         (self._use_process_by_default or threading is dummy_threading)):
59                         self._imp = _LockProcess(path=self.path, scheduler=self.scheduler)
60                 else:
61                         self._imp = _LockThread(path=self.path,
62                                 scheduler=self.scheduler,
63                                 _force_dummy=self._force_dummy)
64
65                 self._imp.addExitListener(self._imp_exit)
66                 self._imp.start()
67
68         def _imp_exit(self, imp):
69                 # call exit listeners
70                 self.wait()
71
72         def _cancel(self):
73                 if isinstance(self._imp, AsynchronousTask):
74                         self._imp.cancel()
75
76         def _poll(self):
77                 if isinstance(self._imp, AsynchronousTask):
78                         self._imp.poll()
79                 return self.returncode
80
81         def _wait(self):
82                 if self.returncode is not None:
83                         return self.returncode
84                 self.returncode = self._imp.wait()
85                 return self.returncode
86
87         def unlock(self):
88                 if self._imp is None:
89                         raise AssertionError('not locked')
90                 if isinstance(self._imp, (_LockProcess, _LockThread)):
91                         self._imp.unlock()
92                 else:
93                         unlockfile(self._imp)
94                 self._imp = None
95
96 class _LockThread(AbstractPollTask):
97         """
98         This uses the portage.locks module to acquire a lock asynchronously,
99         using a background thread. After the lock is acquired, the thread
100         writes to a pipe in order to notify a poll loop running in the main
101         thread.
102
103         If the threading module is unavailable then the dummy_threading
104         module will be used, and the lock will be acquired synchronously
105         (before the start() method returns).
106         """
107
108         __slots__ = ('path',) + \
109                 ('_files', '_force_dummy', '_lock_obj',
110                 '_thread', '_reg_id',)
111
112         def _start(self):
113                 pr, pw = os.pipe()
114                 self._files = {}
115                 self._files['pipe_read'] = pr
116                 self._files['pipe_write'] = pw
117                 for f in self._files.values():
118                         fcntl.fcntl(f, fcntl.F_SETFL,
119                                 fcntl.fcntl(f, fcntl.F_GETFL) | os.O_NONBLOCK)
120                 self._reg_id = self.scheduler.register(self._files['pipe_read'],
121                         PollConstants.POLLIN, self._output_handler)
122                 self._registered = True
123                 threading_mod = threading
124                 if self._force_dummy:
125                         threading_mod = dummy_threading
126                 self._thread = threading_mod.Thread(target=self._run_lock)
127                 self._thread.start()
128
129         def _run_lock(self):
130                 self._lock_obj = lockfile(self.path, wantnewlockfile=True)
131                 os.write(self._files['pipe_write'], b'\0')
132
133         def _output_handler(self, f, event):
134                 buf = None
135                 if event & PollConstants.POLLIN:
136                         try:
137                                 buf = os.read(self._files['pipe_read'], self._bufsize)
138                         except OSError as e:
139                                 if e.errno not in (errno.EAGAIN,):
140                                         raise
141                 if buf:
142                         self._unregister()
143                         self.returncode = os.EX_OK
144                         self.wait()
145
146                 return True
147
148         def _cancel(self):
149                 # There's currently no way to force thread termination.
150                 pass
151
152         def _wait(self):
153                 if self.returncode is not None:
154                         return self.returncode
155                 while self._registered:
156                         self.scheduler.iteration()
157                 return self.returncode
158
159         def unlock(self):
160                 if self._lock_obj is None:
161                         raise AssertionError('not locked')
162                 if self.returncode is None:
163                         raise AssertionError('lock not acquired yet')
164                 unlockfile(self._lock_obj)
165                 self._lock_obj = None
166
167         def _unregister(self):
168                 self._registered = False
169
170                 if self._thread is not None:
171                         self._thread.join()
172                         self._thread = None
173
174                 if self._reg_id is not None:
175                         self.scheduler.unregister(self._reg_id)
176                         self._reg_id = None
177
178                 if self._files is not None:
179                         for f in self._files.values():
180                                 os.close(f)
181                         self._files = None
182
183 class _LockProcess(AbstractPollTask):
184         """
185         This uses the portage.locks module to acquire a lock asynchronously,
186         using a subprocess. After the lock is acquired, the process
187         writes to a pipe in order to notify a poll loop running in the main
188         process. The unlock() method notifies the subprocess to release the
189         lock and exit.
190         """
191
192         __slots__ = ('path',) + \
193                 ('_acquired', '_kill_test', '_proc', '_files', '_reg_id', '_unlocked')
194
195         def _start(self):
196                 in_pr, in_pw = os.pipe()
197                 out_pr, out_pw = os.pipe()
198                 self._files = {}
199                 self._files['pipe_in'] = in_pr
200                 self._files['pipe_out'] = out_pw
201                 fcntl.fcntl(in_pr, fcntl.F_SETFL,
202                         fcntl.fcntl(in_pr, fcntl.F_GETFL) | os.O_NONBLOCK)
203                 self._reg_id = self.scheduler.register(in_pr,
204                         PollConstants.POLLIN, self._output_handler)
205                 self._registered = True
206                 self._proc = SpawnProcess(
207                         args=[portage._python_interpreter,
208                                 os.path.join(portage._bin_path, 'lock-helper.py'), self.path],
209                                 env=dict(os.environ, PORTAGE_PYM_PATH=portage._pym_path),
210                                 fd_pipes={0:out_pr, 1:in_pw, 2:sys.stderr.fileno()},
211                                 scheduler=self.scheduler)
212                 self._proc.addExitListener(self._proc_exit)
213                 self._proc.start()
214                 os.close(out_pr)
215                 os.close(in_pw)
216
217         def _proc_exit(self, proc):
218
219                 if self._files is not None:
220                         # Close pipe_out if it's still open, since it's useless
221                         # after the process has exited. This helps to avoid
222                         # "ResourceWarning: unclosed file" since Python 3.2.
223                         try:
224                                 pipe_out = self._files.pop('pipe_out')
225                         except KeyError:
226                                 pass
227                         else:
228                                 os.close(pipe_out)
229
230                 if proc.returncode != os.EX_OK:
231                         # Typically, this will happen due to the
232                         # process being killed by a signal.
233
234                         if not self._acquired:
235                                 # If the lock hasn't been aquired yet, the
236                                 # caller can check the returncode and handle
237                                 # this failure appropriately.
238                                 if not (self.cancelled or self._kill_test):
239                                         writemsg_level("_LockProcess: %s\n" % \
240                                                 _("failed to acquire lock on '%s'") % (self.path,),
241                                                 level=logging.ERROR, noiselevel=-1)
242                                 self._unregister()
243                                 self.returncode = proc.returncode
244                                 self.wait()
245                                 return
246
247                         if not self.cancelled and \
248                                 not self._unlocked:
249                                 # We don't want lost locks going unnoticed, so it's
250                                 # only safe to ignore if either the cancel() or
251                                 # unlock() methods have been previously called.
252                                 raise AssertionError("lock process failed with returncode %s" \
253                                         % (proc.returncode,))
254
255         def _cancel(self):
256                 if self._proc is not None:
257                         self._proc.cancel()
258
259         def _poll(self):
260                 if self._proc is not None:
261                         self._proc.poll()
262                 return self.returncode
263
264         def _wait(self):
265                 if self.returncode is not None:
266                         return self.returncode
267                 while self._registered:
268                         self.scheduler.iteration()
269                 return self.returncode
270
271         def _output_handler(self, f, event):
272                 buf = None
273                 if event & PollConstants.POLLIN:
274                         try:
275                                 buf = os.read(self._files['pipe_in'], self._bufsize)
276                         except OSError as e:
277                                 if e.errno not in (errno.EAGAIN,):
278                                         raise
279                 if buf:
280                         self._acquired = True
281                         self._unregister()
282                         self.returncode = os.EX_OK
283                         self.wait()
284
285                 return True
286
287         def _unregister(self):
288                 self._registered = False
289
290                 if self._reg_id is not None:
291                         self.scheduler.unregister(self._reg_id)
292                         self._reg_id = None
293
294                 if self._files is not None:
295                         try:
296                                 pipe_in = self._files.pop('pipe_in')
297                         except KeyError:
298                                 pass
299                         else:
300                                 os.close(pipe_in)
301
302         def unlock(self):
303                 if self._proc is None:
304                         raise AssertionError('not locked')
305                 if self.returncode is None:
306                         raise AssertionError('lock not acquired yet')
307                 if self.returncode != os.EX_OK:
308                         raise AssertionError("lock process failed with returncode %s" \
309                                 % (self.returncode,))
310                 self._unlocked = True
311                 os.write(self._files['pipe_out'], b'\0')
312                 os.close(self._files['pipe_out'])
313                 self._files = None
314                 self._proc.wait()
315                 self._proc = None