OLD | NEW |
---|---|
1 # Copyright 2013 The Chromium Authors. All rights reserved. | 1 # Copyright 2013 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 """Thread and ThreadGroup that reraise exceptions on the main thread.""" | 5 """Thread and ThreadGroup that reraise exceptions on the main thread.""" |
6 | 6 |
7 import logging | |
7 import sys | 8 import sys |
8 import threading | 9 import threading |
10 import time | |
11 import traceback | |
12 | |
13 import watchdog_timer | |
14 | |
15 | |
16 class TimeoutError(Exception): | |
17 """Module-specific timeout exception.""" | |
18 pass | |
9 | 19 |
10 | 20 |
11 class ReraiserThread(threading.Thread): | 21 class ReraiserThread(threading.Thread): |
12 """Thread class that can reraise exceptions.""" | 22 """Thread class that can reraise exceptions.""" |
23 | |
13 def __init__(self, func, args=[], kwargs={}): | 24 def __init__(self, func, args=[], kwargs={}): |
14 super(ReraiserThread, self).__init__() | 25 super(ReraiserThread, self).__init__() |
15 self.daemon = True | 26 self.daemon = True |
16 self._func = func | 27 self._func = func |
17 self._args = args | 28 self._args = args |
18 self._kwargs = kwargs | 29 self._kwargs = kwargs |
19 self._exc_info = None | 30 self._exc_info = None |
20 | 31 |
21 def ReraiseIfException(self): | 32 def ReraiseIfException(self): |
22 """Reraise exception if an exception was raised in the thread.""" | 33 """Reraise exception if an exception was raised in the thread.""" |
23 if self._exc_info: | 34 if self._exc_info: |
24 raise self._exc_info[0], self._exc_info[1], self._exc_info[2] | 35 raise self._exc_info[0], self._exc_info[1], self._exc_info[2] |
25 | 36 |
26 #override | 37 #override |
27 def run(self): | 38 def run(self): |
28 """Overrides Thread.run() to add support for reraising exceptions.""" | 39 """Overrides Thread.run() to add support for reraising exceptions.""" |
29 try: | 40 try: |
30 self._func(*self._args, **self._kwargs) | 41 self._func(*self._args, **self._kwargs) |
31 except: | 42 except: |
32 self._exc_info = sys.exc_info() | 43 self._exc_info = sys.exc_info() |
33 raise | 44 raise |
34 | 45 |
35 | 46 |
36 class ReraiserThreadGroup(object): | 47 class ReraiserThreadGroup(object): |
37 """A group of ReraiserThread objects.""" | 48 """A group of ReraiserThread objects.""" |
49 | |
38 def __init__(self, threads=[]): | 50 def __init__(self, threads=[]): |
39 """Initialize thread group. | 51 """Initialize thread group. |
40 | 52 |
41 Args: | 53 Args: |
42 threads: a list of ReraiserThread objects; defaults to empty. | 54 threads: a list of ReraiserThread objects; defaults to empty. |
43 """ | 55 """ |
44 self._threads = threads | 56 self._threads = threads |
45 | 57 |
46 def Add(self, thread): | 58 def Add(self, thread): |
47 """Add a thread to the group. | 59 """Add a thread to the group. |
48 | 60 |
49 Args: | 61 Args: |
50 thread: a ReraiserThread object. | 62 thread: a ReraiserThread object. |
51 """ | 63 """ |
52 self._threads.append(thread) | 64 self._threads.append(thread) |
53 | 65 |
54 def StartAll(self): | 66 def StartAll(self): |
55 """Start all threads.""" | 67 """Start all threads.""" |
56 for thread in self._threads: | 68 for thread in self._threads: |
57 thread.start() | 69 thread.start() |
58 | 70 |
59 def JoinAll(self): | 71 def _JoinAll(self, watcher=watchdog_timer.WatchdogTimer(None)): |
60 """Join all threads. | 72 """Join all threads without stack dumps. |
61 | 73 |
62 Reraises exceptions raised by the child threads and supports | 74 Reraises exceptions raised by the child threads and supports breaking |
63 breaking immediately on exceptions raised on the main thread. | 75 immediately on exceptions raised on the main thread. |
76 | |
77 Args: | |
78 watcher: Watchdog object providing timeout, by default waits forever. | |
64 """ | 79 """ |
65 alive_threads = self._threads[:] | 80 alive_threads = self._threads[:] |
66 while alive_threads: | 81 while alive_threads: |
67 for thread in alive_threads[:]: | 82 for thread in alive_threads[:]: |
83 if watcher.IsTimedOut(): | |
84 raise TimeoutError('Timed out waiting for %d of %d threads.' % | |
85 (len(alive_threads), len(self._threads))) | |
68 # Allow the main thread to periodically check for interrupts. | 86 # Allow the main thread to periodically check for interrupts. |
69 thread.join(0.1) | 87 thread.join(0.1) |
70 if not thread.isAlive(): | 88 if not thread.isAlive(): |
71 alive_threads.remove(thread) | 89 alive_threads.remove(thread) |
72 # All threads are allowed to complete before reraising exceptions. | 90 # All threads are allowed to complete before reraising exceptions. |
73 for thread in self._threads: | 91 for thread in self._threads: |
74 thread.ReraiseIfException() | 92 thread.ReraiseIfException() |
93 | |
94 def JoinAll(self, watcher=watchdog_timer.WatchdogTimer(None)): | |
95 """Join all threads. | |
96 | |
97 Reraises exceptions raised by the child threads and supports breaking | |
98 immediately on exceptions raised on the main thread. Unfinished threads' | |
99 stacks will be logged on watchdog timeout. | |
100 | |
101 Args: | |
102 watcher: Watchdog object providing timeout, by default waits forever. | |
103 """ | |
104 try: | |
105 self._JoinAll(watcher) | |
106 except TimeoutError: | |
107 alive_thread_ids = (t.ident for t in self._threads if t.isAlive()) | |
108 for thread_id in alive_thread_ids: | |
109 stack = sys._current_frames()[thread_id] | |
110 logging.critical('STACK DUMP FOR TIMED OUT THREAD ID = %s', thread_id) | |
frankf
2013/04/03 23:17:48
'*' * 80
Blah
'*' * 80
| |
111 for filename, lineno, name, line in traceback.extract_stack(stack): | |
112 logging.critical('File: "%s", line %d, in %s', filename, lineno, name) | |
113 if line: | |
114 logging.critical(' %s', line.strip()) | |
115 logging.critical('') | |
116 raise | |
OLD | NEW |