OLD | NEW |
1 # Copyright 2013 The Chromium Authors. All rights reserved. | 1 # Copyright 2013 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 """Thread and ThreadGroup that reraise exceptions on the main thread.""" | 5 """Thread and ThreadGroup that reraise exceptions on the main thread.""" |
6 | 6 |
| 7 import logging |
7 import sys | 8 import sys |
8 import threading | 9 import threading |
| 10 import time |
| 11 import traceback |
| 12 |
| 13 import watchdog_timer |
| 14 |
| 15 |
| 16 class TimeoutError(Exception): |
| 17 """Module-specific timeout exception.""" |
| 18 pass |
9 | 19 |
10 | 20 |
11 class ReraiserThread(threading.Thread): | 21 class ReraiserThread(threading.Thread): |
12 """Thread class that can reraise exceptions.""" | 22 """Thread class that can reraise exceptions.""" |
| 23 |
13 def __init__(self, func, args=[], kwargs={}): | 24 def __init__(self, func, args=[], kwargs={}): |
14 super(ReraiserThread, self).__init__() | 25 super(ReraiserThread, self).__init__() |
15 self.daemon = True | 26 self.daemon = True |
16 self._func = func | 27 self._func = func |
17 self._args = args | 28 self._args = args |
18 self._kwargs = kwargs | 29 self._kwargs = kwargs |
19 self._exc_info = None | 30 self._exc_info = None |
20 | 31 |
21 def ReraiseIfException(self): | 32 def ReraiseIfException(self): |
22 """Reraise exception if an exception was raised in the thread.""" | 33 """Reraise exception if an exception was raised in the thread.""" |
23 if self._exc_info: | 34 if self._exc_info: |
24 raise self._exc_info[0], self._exc_info[1], self._exc_info[2] | 35 raise self._exc_info[0], self._exc_info[1], self._exc_info[2] |
25 | 36 |
26 #override | 37 #override |
27 def run(self): | 38 def run(self): |
28 """Overrides Thread.run() to add support for reraising exceptions.""" | 39 """Overrides Thread.run() to add support for reraising exceptions.""" |
29 try: | 40 try: |
30 self._func(*self._args, **self._kwargs) | 41 self._func(*self._args, **self._kwargs) |
31 except: | 42 except: |
32 self._exc_info = sys.exc_info() | 43 self._exc_info = sys.exc_info() |
33 raise | 44 raise |
34 | 45 |
35 | 46 |
36 class ReraiserThreadGroup(object): | 47 class ReraiserThreadGroup(object): |
37 """A group of ReraiserThread objects.""" | 48 """A group of ReraiserThread objects.""" |
| 49 |
38 def __init__(self, threads=[]): | 50 def __init__(self, threads=[]): |
39 """Initialize thread group. | 51 """Initialize thread group. |
40 | 52 |
41 Args: | 53 Args: |
42 threads: a list of ReraiserThread objects; defaults to empty. | 54 threads: a list of ReraiserThread objects; defaults to empty. |
43 """ | 55 """ |
44 self._threads = threads | 56 self._threads = threads |
45 | 57 |
46 def Add(self, thread): | 58 def Add(self, thread): |
47 """Add a thread to the group. | 59 """Add a thread to the group. |
48 | 60 |
49 Args: | 61 Args: |
50 thread: a ReraiserThread object. | 62 thread: a ReraiserThread object. |
51 """ | 63 """ |
52 self._threads.append(thread) | 64 self._threads.append(thread) |
53 | 65 |
54 def StartAll(self): | 66 def StartAll(self): |
55 """Start all threads.""" | 67 """Start all threads.""" |
56 for thread in self._threads: | 68 for thread in self._threads: |
57 thread.start() | 69 thread.start() |
58 | 70 |
59 def JoinAll(self): | 71 def _JoinAll(self, watcher=watchdog_timer.WatchdogTimer(None)): |
60 """Join all threads. | 72 """Join all threads without stack dumps. |
61 | 73 |
62 Reraises exceptions raised by the child threads and supports | 74 Reraises exceptions raised by the child threads and supports breaking |
63 breaking immediately on exceptions raised on the main thread. | 75 immediately on exceptions raised on the main thread. |
| 76 |
| 77 Args: |
| 78 watcher: Watchdog object providing timeout, by default waits forever. |
64 """ | 79 """ |
65 alive_threads = self._threads[:] | 80 alive_threads = self._threads[:] |
66 while alive_threads: | 81 while alive_threads: |
67 for thread in alive_threads[:]: | 82 for thread in alive_threads[:]: |
| 83 if watcher.IsTimedOut(): |
| 84 raise TimeoutError('Timed out waiting for %d of %d threads.' % |
| 85 (len(alive_threads), len(self._threads))) |
68 # Allow the main thread to periodically check for interrupts. | 86 # Allow the main thread to periodically check for interrupts. |
69 thread.join(0.1) | 87 thread.join(0.1) |
70 if not thread.isAlive(): | 88 if not thread.isAlive(): |
71 alive_threads.remove(thread) | 89 alive_threads.remove(thread) |
72 # All threads are allowed to complete before reraising exceptions. | 90 # All threads are allowed to complete before reraising exceptions. |
73 for thread in self._threads: | 91 for thread in self._threads: |
74 thread.ReraiseIfException() | 92 thread.ReraiseIfException() |
| 93 |
| 94 def JoinAll(self, watcher=watchdog_timer.WatchdogTimer(None)): |
| 95 """Join all threads. |
| 96 |
| 97 Reraises exceptions raised by the child threads and supports breaking |
| 98 immediately on exceptions raised on the main thread. Unfinished threads' |
| 99 stacks will be logged on watchdog timeout. |
| 100 |
| 101 Args: |
| 102 watcher: Watchdog object providing timeout, by default waits forever. |
| 103 """ |
| 104 try: |
| 105 self._JoinAll(watcher) |
| 106 except TimeoutError: |
| 107 alive_thread_ids = (t.ident for t in self._threads if t.isAlive()) |
| 108 for thread_id in alive_thread_ids: |
| 109 stack = sys._current_frames()[thread_id] |
| 110 logging.critical('*' * 80) |
| 111 logging.critical('Stack dump for timed out ThreadId = %s', thread_id) |
| 112 logging.critical('*' * 80) |
| 113 for filename, lineno, name, line in traceback.extract_stack(stack): |
| 114 logging.critical('File: "%s", line %d, in %s', filename, lineno, name) |
| 115 if line: |
| 116 logging.critical(' %s', line.strip()) |
| 117 logging.critical('*' * 80) |
| 118 raise |
OLD | NEW |