Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(527)

Side by Side Diff: build/android/devil/utils/reraiser_thread.py

Issue 1376473006: Associate threads in reraiser_thread.RunAsync with the calling ThreadGroup (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: rebase due to patch failure on win bot Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2013 The Chromium Authors. All rights reserved. 1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """Thread and ThreadGroup that reraise exceptions on the main thread.""" 5 """Thread and ThreadGroup that reraise exceptions on the main thread."""
6 # pylint: disable=W0212 6 # pylint: disable=W0212
7 7
8 import logging 8 import logging
9 import sys 9 import sys
10 import threading 10 import threading
11 import time 11 import time
12 import traceback 12 import traceback
13 13
14 from devil.utils import watchdog_timer 14 from devil.utils import watchdog_timer
15 15
16 16
17 class TimeoutError(Exception): 17 class TimeoutError(Exception):
18 """Module-specific timeout exception.""" 18 """Module-specific timeout exception."""
19 pass 19 pass
20 20
21 21
22 def LogThreadStack(thread): 22 def LogThreadStack(thread, error_log_func=logging.critical):
23 """Log the stack for the given thread. 23 """Log the stack for the given thread.
24 24
25 Args: 25 Args:
26 thread: a threading.Thread instance. 26 thread: a threading.Thread instance.
27 error_log_func: Logging function when logging errors.
27 """ 28 """
28 stack = sys._current_frames()[thread.ident] 29 stack = sys._current_frames()[thread.ident]
29 logging.critical('*' * 80) 30 error_log_func('*' * 80)
30 logging.critical('Stack dump for thread %r', thread.name) 31 error_log_func('Stack dump for thread %r', thread.name)
31 logging.critical('*' * 80) 32 error_log_func('*' * 80)
32 for filename, lineno, name, line in traceback.extract_stack(stack): 33 for filename, lineno, name, line in traceback.extract_stack(stack):
33 logging.critical('File: "%s", line %d, in %s', filename, lineno, name) 34 error_log_func('File: "%s", line %d, in %s', filename, lineno, name)
34 if line: 35 if line:
35 logging.critical(' %s', line.strip()) 36 error_log_func(' %s', line.strip())
36 logging.critical('*' * 80) 37 error_log_func('*' * 80)
37 38
38 39
39 class ReraiserThread(threading.Thread): 40 class ReraiserThread(threading.Thread):
40 """Thread class that can reraise exceptions.""" 41 """Thread class that can reraise exceptions."""
41 42
42 def __init__(self, func, args=None, kwargs=None, name=None): 43 def __init__(self, func, args=None, kwargs=None, name=None):
43 """Initialize thread. 44 """Initialize thread.
44 45
45 Args: 46 Args:
46 func: callable to call on a new thread. 47 func: callable to call on a new thread.
47 args: list of positional arguments for callable, defaults to empty. 48 args: list of positional arguments for callable, defaults to empty.
48 kwargs: dictionary of keyword arguments for callable, defaults to empty. 49 kwargs: dictionary of keyword arguments for callable, defaults to empty.
49 name: thread name, defaults to Thread-N. 50 name: thread name, defaults to Thread-N.
50 """ 51 """
51 super(ReraiserThread, self).__init__(name=name) 52 super(ReraiserThread, self).__init__(name=name)
52 if not args: 53 if not args:
53 args = [] 54 args = []
54 if not kwargs: 55 if not kwargs:
55 kwargs = {} 56 kwargs = {}
56 self.daemon = True 57 self.daemon = True
57 self._func = func 58 self._func = func
58 self._args = args 59 self._args = args
59 self._kwargs = kwargs 60 self._kwargs = kwargs
60 self._ret = None 61 self._ret = None
61 self._exc_info = None 62 self._exc_info = None
63 self._thread_group = None
62 64
63 def ReraiseIfException(self): 65 def ReraiseIfException(self):
64 """Reraise exception if an exception was raised in the thread.""" 66 """Reraise exception if an exception was raised in the thread."""
65 if self._exc_info: 67 if self._exc_info:
66 raise self._exc_info[0], self._exc_info[1], self._exc_info[2] 68 raise self._exc_info[0], self._exc_info[1], self._exc_info[2]
67 69
68 def GetReturnValue(self): 70 def GetReturnValue(self):
69 """Reraise exception if present, otherwise get the return value.""" 71 """Reraise exception if present, otherwise get the return value."""
70 self.ReraiseIfException() 72 self.ReraiseIfException()
71 return self._ret 73 return self._ret
72 74
73 #override 75 #override
74 def run(self): 76 def run(self):
75 """Overrides Thread.run() to add support for reraising exceptions.""" 77 """Overrides Thread.run() to add support for reraising exceptions."""
76 try: 78 try:
77 self._ret = self._func(*self._args, **self._kwargs) 79 self._ret = self._func(*self._args, **self._kwargs)
78 except: # pylint: disable=W0702 80 except: # pylint: disable=W0702
79 self._exc_info = sys.exc_info() 81 self._exc_info = sys.exc_info()
80 82
81 83
82 class ReraiserThreadGroup(object): 84 class ReraiserThreadGroup(object):
83 """A group of ReraiserThread objects.""" 85 """A group of ReraiserThread objects."""
84 86
85 def __init__(self, threads=None): 87 def __init__(self, threads=None):
86 """Initialize thread group. 88 """Initialize thread group.
87 89
88 Args: 90 Args:
89 threads: a list of ReraiserThread objects; defaults to empty. 91 threads: a list of ReraiserThread objects; defaults to empty.
90 """ 92 """
91 if not threads: 93 self._threads = []
92 threads = [] 94 # Set when a thread from one group has called JoinAll on another. It is used
93 self._threads = list(threads) 95 # to detect when a there is a TimeoutRetryThread active that links to the
96 # current thread.
97 self.blocked_parent_thread_group = None
98 if threads:
99 for thread in threads:
100 self.Add(thread)
94 101
95 def Add(self, thread): 102 def Add(self, thread):
96 """Add a thread to the group. 103 """Add a thread to the group.
97 104
98 Args: 105 Args:
99 thread: a ReraiserThread object. 106 thread: a ReraiserThread object.
100 """ 107 """
108 assert thread._thread_group is None
109 thread._thread_group = self
101 self._threads.append(thread) 110 self._threads.append(thread)
102 111
103 def StartAll(self): 112 def StartAll(self, will_block=False):
104 """Start all threads.""" 113 """Start all threads.
114
115 Args:
116 will_block: Whether the calling thread will subsequently block on this
117 thread group. Causes the active ReraiserThreadGroup (if there is one)
118 to be marked as blocking on this thread group.
119 """
120 if will_block:
121 # Multiple threads blocking on the same outer thread should not happen in
122 # practice.
123 assert not self.blocked_parent_thread_group
124 self.blocked_parent_thread_group = CurrentThreadGroup()
105 for thread in self._threads: 125 for thread in self._threads:
106 thread.start() 126 thread.start()
107 127
108 def _JoinAll(self, watcher=None, timeout=None): 128 def _JoinAll(self, watcher=None, timeout=None):
109 """Join all threads without stack dumps. 129 """Join all threads without stack dumps.
110 130
111 Reraises exceptions raised by the child threads and supports breaking 131 Reraises exceptions raised by the child threads and supports breaking
112 immediately on exceptions raised on the main thread. 132 immediately on exceptions raised on the main thread.
113 133
114 Args: 134 Args:
115 watcher: Watchdog object providing the thread timeout. If none is 135 watcher: Watchdog object providing the thread timeout. If none is
116 provided, the thread will never be timed out. 136 provided, the thread will never be timed out.
117 timeout: An optional number of seconds to wait before timing out the join 137 timeout: An optional number of seconds to wait before timing out the join
118 operation. This will not time out the threads. 138 operation. This will not time out the threads.
119 """ 139 """
120 if watcher is None: 140 if watcher is None:
121 watcher = watchdog_timer.WatchdogTimer(None) 141 watcher = watchdog_timer.WatchdogTimer(None)
122 alive_threads = self._threads[:] 142 alive_threads = self._threads[:]
123 end_time = (time.time() + timeout) if timeout else None 143 end_time = (time.time() + timeout) if timeout else None
124 while alive_threads and (end_time is None or end_time > time.time()): 144 try:
125 for thread in alive_threads[:]: 145 while alive_threads and (end_time is None or end_time > time.time()):
126 if watcher.IsTimedOut(): 146 for thread in alive_threads[:]:
127 raise TimeoutError('Timed out waiting for %d of %d threads.' % 147 if watcher.IsTimedOut():
128 (len(alive_threads), len(self._threads))) 148 raise TimeoutError('Timed out waiting for %d of %d threads.' %
129 # Allow the main thread to periodically check for interrupts. 149 (len(alive_threads), len(self._threads)))
130 thread.join(0.1) 150 # Allow the main thread to periodically check for interrupts.
131 if not thread.isAlive(): 151 thread.join(0.1)
132 alive_threads.remove(thread) 152 if not thread.isAlive():
133 # All threads are allowed to complete before reraising exceptions. 153 alive_threads.remove(thread)
134 for thread in self._threads: 154 # All threads are allowed to complete before reraising exceptions.
135 thread.ReraiseIfException() 155 for thread in self._threads:
156 thread.ReraiseIfException()
157 finally:
158 self.blocked_parent_thread_group = None
136 159
137 def IsAlive(self): 160 def IsAlive(self):
138 """Check whether any of the threads are still alive. 161 """Check whether any of the threads are still alive.
139 162
140 Returns: 163 Returns:
141 Whether any of the threads are still alive. 164 Whether any of the threads are still alive.
142 """ 165 """
143 return any(t.isAlive() for t in self._threads) 166 return any(t.isAlive() for t in self._threads)
144 167
145 def JoinAll(self, watcher=None, timeout=None): 168 def JoinAll(self, watcher=None, timeout=None,
169 error_log_func=logging.critical):
146 """Join all threads. 170 """Join all threads.
147 171
148 Reraises exceptions raised by the child threads and supports breaking 172 Reraises exceptions raised by the child threads and supports breaking
149 immediately on exceptions raised on the main thread. Unfinished threads' 173 immediately on exceptions raised on the main thread. Unfinished threads'
150 stacks will be logged on watchdog timeout. 174 stacks will be logged on watchdog timeout.
151 175
152 Args: 176 Args:
153 watcher: Watchdog object providing the thread timeout. If none is 177 watcher: Watchdog object providing the thread timeout. If none is
154 provided, the thread will never be timed out. 178 provided, the thread will never be timed out.
155 timeout: An optional number of seconds to wait before timing out the join 179 timeout: An optional number of seconds to wait before timing out the join
156 operation. This will not time out the threads. 180 operation. This will not time out the threads.
181 error_log_func: Logging function when logging errors.
157 """ 182 """
158 try: 183 try:
159 self._JoinAll(watcher, timeout) 184 self._JoinAll(watcher, timeout)
160 except TimeoutError: 185 except TimeoutError:
161 logging.critical('Timed out. Dumping threads.') 186 error_log_func('Timed out. Dumping threads.')
162 for thread in (t for t in self._threads if t.isAlive()): 187 for thread in (t for t in self._threads if t.isAlive()):
163 LogThreadStack(thread) 188 LogThreadStack(thread, error_log_func=error_log_func)
164 raise 189 raise
165 190
166 def GetAllReturnValues(self, watcher=None): 191 def GetAllReturnValues(self, watcher=None):
167 """Get all return values, joining all threads if necessary. 192 """Get all return values, joining all threads if necessary.
168 193
169 Args: 194 Args:
170 watcher: same as in |JoinAll|. Only used if threads are alive. 195 watcher: same as in |JoinAll|. Only used if threads are alive.
171 """ 196 """
172 if any([t.isAlive() for t in self._threads]): 197 if any([t.isAlive() for t in self._threads]):
173 self.JoinAll(watcher) 198 self.JoinAll(watcher)
174 return [t.GetReturnValue() for t in self._threads] 199 return [t.GetReturnValue() for t in self._threads]
175 200
176 201
202 def CurrentThreadGroup():
203 """Returns the ReraiserThreadGroup that owns the running thread.
204
205 Returns:
206 The current thread group, otherwise None.
207 """
208 current_thread = threading.current_thread()
209 if isinstance(current_thread, ReraiserThread):
210 return current_thread._thread_group # pylint: disable=no-member
211 return None
212
213
177 def RunAsync(funcs, watcher=None): 214 def RunAsync(funcs, watcher=None):
178 """Executes the given functions in parallel and returns their results. 215 """Executes the given functions in parallel and returns their results.
179 216
180 Args: 217 Args:
181 funcs: List of functions to perform on their own threads. 218 funcs: List of functions to perform on their own threads.
182 watcher: Watchdog object providing timeout, by default waits forever. 219 watcher: Watchdog object providing timeout, by default waits forever.
183 220
184 Returns: 221 Returns:
185 A list of return values in the order of the given functions. 222 A list of return values in the order of the given functions.
186 """ 223 """
187 thread_group = ReraiserThreadGroup(ReraiserThread(f) for f in funcs) 224 thread_group = ReraiserThreadGroup(ReraiserThread(f) for f in funcs)
188 thread_group.StartAll() 225 thread_group.StartAll(will_block=True)
189 return thread_group.GetAllReturnValues(watcher=watcher) 226 return thread_group.GetAllReturnValues(watcher=watcher)
OLDNEW
« no previous file with comments | « build/android/devil/android/sdk/adb_wrapper.py ('k') | build/android/devil/utils/timeout_retry.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698