OLD | NEW |
1 # Copyright 2013 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 """Thread and ThreadGroup that reraise exceptions on the main thread.""" | 5 # pylint: disable=unused-wildcard-import |
6 # pylint: disable=W0212 | 6 # pylint: disable=wildcard-import |
7 | 7 |
8 import logging | 8 from devil.utils.reraiser_thread import * |
9 import sys | |
10 import threading | |
11 import traceback | |
12 | |
13 from pylib.utils import watchdog_timer | |
14 | |
15 | |
16 class TimeoutError(Exception): | |
17 """Module-specific timeout exception.""" | |
18 pass | |
19 | |
20 | |
21 def LogThreadStack(thread): | |
22 """Log the stack for the given thread. | |
23 | |
24 Args: | |
25 thread: a threading.Thread instance. | |
26 """ | |
27 stack = sys._current_frames()[thread.ident] | |
28 logging.critical('*' * 80) | |
29 logging.critical('Stack dump for thread %r', thread.name) | |
30 logging.critical('*' * 80) | |
31 for filename, lineno, name, line in traceback.extract_stack(stack): | |
32 logging.critical('File: "%s", line %d, in %s', filename, lineno, name) | |
33 if line: | |
34 logging.critical(' %s', line.strip()) | |
35 logging.critical('*' * 80) | |
36 | |
37 | |
38 class ReraiserThread(threading.Thread): | |
39 """Thread class that can reraise exceptions.""" | |
40 | |
41 def __init__(self, func, args=None, kwargs=None, name=None): | |
42 """Initialize thread. | |
43 | |
44 Args: | |
45 func: callable to call on a new thread. | |
46 args: list of positional arguments for callable, defaults to empty. | |
47 kwargs: dictionary of keyword arguments for callable, defaults to empty. | |
48 name: thread name, defaults to Thread-N. | |
49 """ | |
50 super(ReraiserThread, self).__init__(name=name) | |
51 if not args: | |
52 args = [] | |
53 if not kwargs: | |
54 kwargs = {} | |
55 self.daemon = True | |
56 self._func = func | |
57 self._args = args | |
58 self._kwargs = kwargs | |
59 self._ret = None | |
60 self._exc_info = None | |
61 | |
62 def ReraiseIfException(self): | |
63 """Reraise exception if an exception was raised in the thread.""" | |
64 if self._exc_info: | |
65 raise self._exc_info[0], self._exc_info[1], self._exc_info[2] | |
66 | |
67 def GetReturnValue(self): | |
68 """Reraise exception if present, otherwise get the return value.""" | |
69 self.ReraiseIfException() | |
70 return self._ret | |
71 | |
72 #override | |
73 def run(self): | |
74 """Overrides Thread.run() to add support for reraising exceptions.""" | |
75 try: | |
76 self._ret = self._func(*self._args, **self._kwargs) | |
77 except: # pylint: disable=W0702 | |
78 self._exc_info = sys.exc_info() | |
79 | |
80 | |
81 class ReraiserThreadGroup(object): | |
82 """A group of ReraiserThread objects.""" | |
83 | |
84 def __init__(self, threads=None): | |
85 """Initialize thread group. | |
86 | |
87 Args: | |
88 threads: a list of ReraiserThread objects; defaults to empty. | |
89 """ | |
90 if not threads: | |
91 threads = [] | |
92 self._threads = threads | |
93 | |
94 def Add(self, thread): | |
95 """Add a thread to the group. | |
96 | |
97 Args: | |
98 thread: a ReraiserThread object. | |
99 """ | |
100 self._threads.append(thread) | |
101 | |
102 def StartAll(self): | |
103 """Start all threads.""" | |
104 for thread in self._threads: | |
105 thread.start() | |
106 | |
107 def _JoinAll(self, watcher=None): | |
108 """Join all threads without stack dumps. | |
109 | |
110 Reraises exceptions raised by the child threads and supports breaking | |
111 immediately on exceptions raised on the main thread. | |
112 | |
113 Args: | |
114 watcher: Watchdog object providing timeout, by default waits forever. | |
115 """ | |
116 if watcher is None: | |
117 watcher = watchdog_timer.WatchdogTimer(None) | |
118 alive_threads = self._threads[:] | |
119 while alive_threads: | |
120 for thread in alive_threads[:]: | |
121 if watcher.IsTimedOut(): | |
122 raise TimeoutError('Timed out waiting for %d of %d threads.' % | |
123 (len(alive_threads), len(self._threads))) | |
124 # Allow the main thread to periodically check for interrupts. | |
125 thread.join(0.1) | |
126 if not thread.isAlive(): | |
127 alive_threads.remove(thread) | |
128 # All threads are allowed to complete before reraising exceptions. | |
129 for thread in self._threads: | |
130 thread.ReraiseIfException() | |
131 | |
132 def JoinAll(self, watcher=None): | |
133 """Join all threads. | |
134 | |
135 Reraises exceptions raised by the child threads and supports breaking | |
136 immediately on exceptions raised on the main thread. Unfinished threads' | |
137 stacks will be logged on watchdog timeout. | |
138 | |
139 Args: | |
140 watcher: Watchdog object providing timeout, by default waits forever. | |
141 """ | |
142 try: | |
143 self._JoinAll(watcher) | |
144 except TimeoutError: | |
145 for thread in (t for t in self._threads if t.isAlive()): | |
146 LogThreadStack(thread) | |
147 raise | |
148 | |
149 def GetAllReturnValues(self, watcher=None): | |
150 """Get all return values, joining all threads if necessary. | |
151 | |
152 Args: | |
153 watcher: same as in |JoinAll|. Only used if threads are alive. | |
154 """ | |
155 if any([t.isAlive() for t in self._threads]): | |
156 self.JoinAll(watcher) | |
157 return [t.GetReturnValue() for t in self._threads] | |
158 | |
OLD | NEW |