7 import threading as _threading
9 from . import error as _error
12 class TimeLimitedFunction (_threading.Thread):
13 """Run `function` with a time limit of `timeout` seconds.
16 >>> def sleeping_return(sleep, x):
19 >>> TimeLimitedFunction(0.5, sleeping_return)(0.1, 'x')
21 >>> TimeLimitedFunction(0.5, sleeping_return)(10, 'y')
22 Traceback (most recent call last):
24 rss2email.error.TimeoutError: 0.5 second timeout exceeded
25 >>> TimeLimitedFunction(0.5, time.sleep)('x')
26 Traceback (most recent call last):
28 rss2email.error.TimeoutError: error while running time limited function: a float is required
30 def __init__(self, timeout, target, **kwargs):
31 super(TimeLimitedFunction, self).__init__(target=target, **kwargs)
32 self.setDaemon(True) # daemon kwarg only added in Python 3.3.
33 self.timeout = timeout
38 """Based on Thread.run().
40 We add handling for self.result and self.error.
44 self.result = self._target(*self._args, **self._kwargs)
46 self.error = _sys.exc_info()
48 # Avoid a refcycle if the thread is running a function with
49 # an argument that has a member that points to the thread.
50 del self._target, self._args, self._kwargs
52 def __call__(self, *args, **kwargs):
56 self.join(self.timeout)
58 raise _error.TimeoutError(
59 time_limited_function=self) from self.error[1]
61 raise _error.TimeoutError(time_limited_function=self)