OLD | NEW |
| (Empty) |
1 # Copyright 2013 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 """A utility to run functions with timeouts and retries.""" | |
6 # pylint: disable=W0702 | |
7 | |
8 import logging | |
9 import threading | |
10 import time | |
11 import traceback | |
12 | |
13 from devil.utils import reraiser_thread | |
14 from devil.utils import watchdog_timer | |
15 | |
16 | |
17 | |
18 class TimeoutRetryThreadGroup(reraiser_thread.ReraiserThreadGroup): | |
19 def __init__(self, timeout, threads=None): | |
20 super(TimeoutRetryThreadGroup, self).__init__(threads) | |
21 self._watcher = watchdog_timer.WatchdogTimer(timeout) | |
22 | |
23 def GetWatcher(self): | |
24 """Returns the watchdog keeping track of this thread's time.""" | |
25 return self._watcher | |
26 | |
27 def GetElapsedTime(self): | |
28 return self._watcher.GetElapsed() | |
29 | |
30 def GetRemainingTime(self, required=0, msg=None): | |
31 """Get the remaining time before the thread times out. | |
32 | |
33 Useful to send as the |timeout| parameter of async IO operations. | |
34 | |
35 Args: | |
36 required: minimum amount of time that will be required to complete, e.g., | |
37 some sleep or IO operation. | |
38 msg: error message to show if timing out. | |
39 | |
40 Returns: | |
41 The number of seconds remaining before the thread times out, or None | |
42 if the thread never times out. | |
43 | |
44 Raises: | |
45 reraiser_thread.TimeoutError if the remaining time is less than the | |
46 required time. | |
47 """ | |
48 remaining = self._watcher.GetRemaining() | |
49 if remaining is not None and remaining < required: | |
50 if msg is None: | |
51 msg = 'Timeout expired' | |
52 if remaining > 0: | |
53 msg += (', wait of %.1f secs required but only %.1f secs left' | |
54 % (required, remaining)) | |
55 raise reraiser_thread.TimeoutError(msg) | |
56 return remaining | |
57 | |
58 | |
59 def CurrentTimeoutThreadGroup(): | |
60 """Returns the thread group that owns or is blocked on the active thread. | |
61 | |
62 Returns: | |
63 Returns None if no TimeoutRetryThreadGroup is tracking the current thread. | |
64 """ | |
65 thread_group = reraiser_thread.CurrentThreadGroup() | |
66 while thread_group: | |
67 if isinstance(thread_group, TimeoutRetryThreadGroup): | |
68 return thread_group | |
69 thread_group = thread_group.blocked_parent_thread_group | |
70 return None | |
71 | |
72 | |
73 def WaitFor(condition, wait_period=5, max_tries=None): | |
74 """Wait for a condition to become true. | |
75 | |
76 Repeatedly call the function condition(), with no arguments, until it returns | |
77 a true value. | |
78 | |
79 If called within a TimeoutRetryThreadGroup, it cooperates nicely with it. | |
80 | |
81 Args: | |
82 condition: function with the condition to check | |
83 wait_period: number of seconds to wait before retrying to check the | |
84 condition | |
85 max_tries: maximum number of checks to make, the default tries forever | |
86 or until the TimeoutRetryThreadGroup expires. | |
87 | |
88 Returns: | |
89 The true value returned by the condition, or None if the condition was | |
90 not met after max_tries. | |
91 | |
92 Raises: | |
93 reraiser_thread.TimeoutError: if the current thread is a | |
94 TimeoutRetryThreadGroup and the timeout expires. | |
95 """ | |
96 condition_name = condition.__name__ | |
97 timeout_thread_group = CurrentTimeoutThreadGroup() | |
98 while max_tries is None or max_tries > 0: | |
99 result = condition() | |
100 if max_tries is not None: | |
101 max_tries -= 1 | |
102 msg = ['condition', repr(condition_name), 'met' if result else 'not met'] | |
103 if timeout_thread_group: | |
104 # pylint: disable=no-member | |
105 msg.append('(%.1fs)' % timeout_thread_group.GetElapsedTime()) | |
106 logging.info(' '.join(msg)) | |
107 if result: | |
108 return result | |
109 if timeout_thread_group: | |
110 # pylint: disable=no-member | |
111 timeout_thread_group.GetRemainingTime(wait_period, | |
112 msg='Timed out waiting for %r' % condition_name) | |
113 time.sleep(wait_period) | |
114 return None | |
115 | |
116 | |
117 def _LogLastException(thread_name, attempt, max_attempts, log_func): | |
118 log_func('*' * 80) | |
119 log_func('Exception on thread %s (attempt %d of %d)', thread_name, | |
120 attempt, max_attempts) | |
121 log_func('*' * 80) | |
122 fmt_exc = ''.join(traceback.format_exc()) | |
123 for line in fmt_exc.splitlines(): | |
124 log_func(line.rstrip()) | |
125 log_func('*' * 80) | |
126 | |
127 | |
128 def AlwaysRetry(_exception): | |
129 return True | |
130 | |
131 | |
132 def Run(func, timeout, retries, args=None, kwargs=None, desc=None, | |
133 error_log_func=logging.critical, retry_if_func=AlwaysRetry): | |
134 """Runs the passed function in a separate thread with timeouts and retries. | |
135 | |
136 Args: | |
137 func: the function to be wrapped. | |
138 timeout: the timeout in seconds for each try. | |
139 retries: the number of retries. | |
140 args: list of positional args to pass to |func|. | |
141 kwargs: dictionary of keyword args to pass to |func|. | |
142 desc: An optional description of |func| used in logging. If omitted, | |
143 |func.__name__| will be used. | |
144 error_log_func: Logging function when logging errors. | |
145 retry_if_func: Unary callable that takes an exception and returns | |
146 whether |func| should be retried. Defaults to always retrying. | |
147 | |
148 Returns: | |
149 The return value of func(*args, **kwargs). | |
150 """ | |
151 if not args: | |
152 args = [] | |
153 if not kwargs: | |
154 kwargs = {} | |
155 | |
156 num_try = 1 | |
157 while True: | |
158 thread_name = 'TimeoutThread-%d-for-%s' % (num_try, | |
159 threading.current_thread().name) | |
160 child_thread = reraiser_thread.ReraiserThread(lambda: func(*args, **kwargs), | |
161 name=thread_name) | |
162 try: | |
163 thread_group = TimeoutRetryThreadGroup(timeout, threads=[child_thread]) | |
164 thread_group.StartAll(will_block=True) | |
165 while True: | |
166 thread_group.JoinAll(watcher=thread_group.GetWatcher(), timeout=60, | |
167 error_log_func=error_log_func) | |
168 if thread_group.IsAlive(): | |
169 logging.info('Still working on %s', desc if desc else func.__name__) | |
170 else: | |
171 return thread_group.GetAllReturnValues()[0] | |
172 except reraiser_thread.TimeoutError as e: | |
173 # Timeouts already get their stacks logged. | |
174 if num_try > retries or not retry_if_func(e): | |
175 raise | |
176 # Do not catch KeyboardInterrupt. | |
177 except Exception as e: # pylint: disable=broad-except | |
178 if num_try > retries or not retry_if_func(e): | |
179 raise | |
180 _LogLastException(thread_name, num_try, retries + 1, error_log_func) | |
181 num_try += 1 | |
OLD | NEW |