OLD | NEW |
1 # Copyright 2013 The Chromium Authors. All rights reserved. | 1 # Copyright 2013 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 """A utility to run functions with timeouts and retries.""" | 5 """A utility to run functions with timeouts and retries.""" |
6 # pylint: disable=W0702 | 6 # pylint: disable=W0702 |
7 | 7 |
| 8 import logging |
8 import threading | 9 import threading |
| 10 import time |
| 11 import traceback |
9 | 12 |
10 from pylib.utils import reraiser_thread | 13 from pylib.utils import reraiser_thread |
11 from pylib.utils import watchdog_timer | 14 from pylib.utils import watchdog_timer |
12 | 15 |
13 | 16 |
14 class TimeoutRetryThread(reraiser_thread.ReraiserThread): | 17 class TimeoutRetryThread(reraiser_thread.ReraiserThread): |
15 pass | 18 def __init__(self, func, timeout, name): |
| 19 super(TimeoutRetryThread, self).__init__(func, name=name) |
| 20 self._watcher = watchdog_timer.WatchdogTimer(timeout) |
| 21 self._expired = False |
| 22 |
| 23 def GetWatcher(self): |
| 24 """Returns the watchdog keeping track of this thread's time.""" |
| 25 return self._watcher |
| 26 |
| 27 def GetElapsedTime(self): |
| 28 return self._watcher.GetElapsed() |
| 29 |
| 30 def GetRemainingTime(self, required=0, msg=None): |
| 31 """Get the remaining time before the thread times out. |
| 32 |
| 33 Useful to send as the |timeout| parameter of async IO operations. |
| 34 |
| 35 Args: |
| 36 required: minimum amount of time that will be required to complete, e.g., |
| 37 some sleep or IO operation. |
| 38 msg: error message to show if timing out. |
| 39 |
| 40 Returns: |
| 41 The number of seconds remaining before the thread times out, or None |
| 42 if the thread never times out. |
| 43 |
| 44 Raises: |
| 45 reraiser_thread.TimeoutError if the remaining time is less than the |
| 46 required time. |
| 47 """ |
| 48 remaining = self._watcher.GetRemaining() |
| 49 if remaining is not None and remaining < required: |
| 50 if msg is None: |
| 51 msg = 'Timeout expired' |
| 52 if remaining > 0: |
| 53 msg += (', wait of %.1f secs required but only %.1f secs left' |
| 54 % (required, remaining)) |
| 55 self._expired = True |
| 56 raise reraiser_thread.TimeoutError(msg) |
| 57 return remaining |
| 58 |
| 59 def LogTimeoutException(self): |
| 60 """Log the exception that terminated this thread.""" |
| 61 if not self._expired: |
| 62 return |
| 63 logging.critical('*' * 80) |
| 64 logging.critical('%s on thread %r', self._exc_info[0].__name__, self.name) |
| 65 logging.critical('*' * 80) |
| 66 fmt_exc = ''.join(traceback.format_exception(*self._exc_info)) |
| 67 for line in fmt_exc.splitlines(): |
| 68 logging.critical(line.rstrip()) |
| 69 logging.critical('*' * 80) |
| 70 |
| 71 |
| 72 def CurrentTimeoutThread(): |
| 73 """Get the current thread if it is a TimeoutRetryThread. |
| 74 |
| 75 Returns: |
| 76 The current thread if it is a TimeoutRetryThread, otherwise None. |
| 77 """ |
| 78 current_thread = threading.current_thread() |
| 79 if isinstance(current_thread, TimeoutRetryThread): |
| 80 return current_thread |
| 81 else: |
| 82 return None |
| 83 |
| 84 |
| 85 def WaitFor(condition, wait_period=5, max_tries=None): |
| 86 """Wait for a condition to become true. |
| 87 |
| 88 Repeadly call the function condition(), with no arguments, until it returns |
| 89 a true value. |
| 90 |
| 91 If called within a TimeoutRetryThread, it cooperates nicely with it. |
| 92 |
| 93 Args: |
| 94 condition: function with the condition to check |
| 95 wait_period: number of seconds to wait before retrying to check the |
| 96 condition |
| 97 max_tries: maximum number of checks to make, the default tries forever |
| 98 or until the TimeoutRetryThread expires. |
| 99 |
| 100 Returns: |
| 101 The true value returned by the condition, or None if the condition was |
| 102 not met after max_tries. |
| 103 |
| 104 Raises: |
| 105 reraiser_thread.TimeoutError if the current thread is a TimeoutRetryThread |
| 106 and the timeout expires. |
| 107 """ |
| 108 condition_name = condition.__name__ |
| 109 timeout_thread = CurrentTimeoutThread() |
| 110 while max_tries is None or max_tries > 0: |
| 111 result = condition() |
| 112 if max_tries is not None: |
| 113 max_tries -= 1 |
| 114 msg = ['condition', repr(condition_name), 'met' if result else 'not met'] |
| 115 if timeout_thread: |
| 116 msg.append('(%.1fs)' % timeout_thread.GetElapsedTime()) |
| 117 logging.info(' '.join(msg)) |
| 118 if result: |
| 119 return result |
| 120 if timeout_thread: |
| 121 timeout_thread.GetRemainingTime(wait_period, |
| 122 msg='Timed out waiting for %r' % condition_name) |
| 123 time.sleep(wait_period) |
| 124 return None |
16 | 125 |
17 | 126 |
18 def Run(func, timeout, retries, args=None, kwargs=None): | 127 def Run(func, timeout, retries, args=None, kwargs=None): |
19 """Runs the passed function in a separate thread with timeouts and retries. | 128 """Runs the passed function in a separate thread with timeouts and retries. |
20 | 129 |
21 Args: | 130 Args: |
22 func: the function to be wrapped. | 131 func: the function to be wrapped. |
23 timeout: the timeout in seconds for each try. | 132 timeout: the timeout in seconds for each try. |
24 retries: the number of retries. | 133 retries: the number of retries. |
25 args: list of positional args to pass to |func|. | 134 args: list of positional args to pass to |func|. |
26 kwargs: dictionary of keyword args to pass to |func|. | 135 kwargs: dictionary of keyword args to pass to |func|. |
27 | 136 |
28 Returns: | 137 Returns: |
29 The return value of func(*args, **kwargs). | 138 The return value of func(*args, **kwargs). |
30 """ | 139 """ |
31 if not args: | 140 if not args: |
32 args = [] | 141 args = [] |
33 if not kwargs: | 142 if not kwargs: |
34 kwargs = {} | 143 kwargs = {} |
35 | 144 |
36 # The return value uses a list because Python variables are references, not | 145 # The return value uses a list because Python variables are references, not |
37 # values. Closures make a copy of the reference, so updating the closure's | 146 # values. Closures make a copy of the reference, so updating the closure's |
38 # reference wouldn't update where the original reference pointed. | 147 # reference wouldn't update where the original reference pointed. |
39 ret = [None] | 148 ret = [None] |
40 def RunOnTimeoutThread(): | 149 def RunOnTimeoutThread(): |
41 ret[0] = func(*args, **kwargs) | 150 ret[0] = func(*args, **kwargs) |
42 | 151 |
| 152 num_try = 1 |
43 while True: | 153 while True: |
| 154 child_thread = TimeoutRetryThread( |
| 155 RunOnTimeoutThread, timeout, |
| 156 name='TimeoutThread-%d-for-%s' % (num_try, |
| 157 threading.current_thread().name)) |
44 try: | 158 try: |
45 name = 'TimeoutThread-for-%s' % threading.current_thread().name | 159 thread_group = reraiser_thread.ReraiserThreadGroup([child_thread]) |
46 thread_group = reraiser_thread.ReraiserThreadGroup( | |
47 [TimeoutRetryThread(RunOnTimeoutThread, name=name)]) | |
48 thread_group.StartAll() | 160 thread_group.StartAll() |
49 thread_group.JoinAll(watchdog_timer.WatchdogTimer(timeout)) | 161 thread_group.JoinAll(child_thread.GetWatcher()) |
50 return ret[0] | 162 return ret[0] |
51 except: | 163 except: |
52 if retries <= 0: | 164 child_thread.LogTimeoutException() |
| 165 if num_try > retries: |
53 raise | 166 raise |
54 retries -= 1 | 167 num_try += 1 |
OLD | NEW |