OLD | NEW |
| (Empty) |
1 # Copyright 2013 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 """Classes and functions related to threading.""" | |
6 | |
7 import functools | |
8 import inspect | |
9 import logging | |
10 import os | |
11 import Queue | |
12 import sys | |
13 import threading | |
14 import time | |
15 import traceback | |
16 | |
17 | |
18 class LockWithAssert(object): | |
19 """Wrapper around (non recursive) Lock that tracks its owner.""" | |
20 | |
21 def __init__(self): | |
22 self._lock = threading.Lock() | |
23 self._owner = None | |
24 | |
25 def __enter__(self): | |
26 self._lock.acquire() | |
27 assert self._owner is None | |
28 self._owner = threading.current_thread() | |
29 | |
30 def __exit__(self, _exc_type, _exec_value, _traceback): | |
31 self.assert_locked('Releasing unowned lock') | |
32 self._owner = None | |
33 self._lock.release() | |
34 return False | |
35 | |
36 def assert_locked(self, msg=None): | |
37 """Asserts the lock is owned by running thread.""" | |
38 assert self._owner == threading.current_thread(), msg | |
39 | |
40 | |
41 class ThreadPoolError(Exception): | |
42 """Base class for exceptions raised by ThreadPool.""" | |
43 pass | |
44 | |
45 | |
46 class ThreadPoolEmpty(ThreadPoolError): | |
47 """Trying to get task result from a thread pool with no pending tasks.""" | |
48 pass | |
49 | |
50 | |
51 class ThreadPoolClosed(ThreadPoolError): | |
52 """Trying to do something with a closed thread pool.""" | |
53 pass | |
54 | |
55 | |
56 class ThreadPool(object): | |
57 """Multithreaded worker pool with priority support. | |
58 | |
59 When the priority of tasks match, it works in strict FIFO mode. | |
60 """ | |
61 QUEUE_CLASS = Queue.PriorityQueue | |
62 | |
63 def __init__(self, initial_threads, max_threads, queue_size, prefix=None): | |
64 """Immediately starts |initial_threads| threads. | |
65 | |
66 Arguments: | |
67 initial_threads: Number of threads to start immediately. Can be 0 if it is | |
68 uncertain that threads will be needed. | |
69 max_threads: Maximum number of threads that will be started when all the | |
70 threads are busy working. Often the number of CPU cores. | |
71 queue_size: Maximum number of tasks to buffer in the queue. 0 for | |
72 unlimited queue. A non-zero value may make add_task() | |
73 blocking. | |
74 prefix: Prefix to use for thread names. Pool's threads will be | |
75 named '<prefix>-<thread index>'. | |
76 """ | |
77 prefix = prefix or 'tp-0x%0x' % id(self) | |
78 logging.debug( | |
79 'New ThreadPool(%d, %d, %d): %s', initial_threads, max_threads, | |
80 queue_size, prefix) | |
81 assert initial_threads <= max_threads | |
82 # Update this check once 256 cores CPU are common. | |
83 assert max_threads <= 256 | |
84 | |
85 self.tasks = self.QUEUE_CLASS(queue_size) | |
86 self._max_threads = max_threads | |
87 self._prefix = prefix | |
88 | |
89 # Used to assign indexes to tasks. | |
90 self._num_of_added_tasks_lock = threading.Lock() | |
91 self._num_of_added_tasks = 0 | |
92 | |
93 # Lock that protected everything below (including conditional variable). | |
94 self._lock = threading.Lock() | |
95 | |
96 # Condition 'bool(_outputs) or bool(_exceptions) or _pending_count == 0'. | |
97 self._outputs_exceptions_cond = threading.Condition(self._lock) | |
98 self._outputs = [] | |
99 self._exceptions = [] | |
100 | |
101 # Number of pending tasks (queued or being processed now). | |
102 self._pending_count = 0 | |
103 | |
104 # List of threads. | |
105 self._workers = [] | |
106 # Number of threads that are waiting for new tasks. | |
107 self._ready = 0 | |
108 # Number of threads already added to _workers, but not yet running the loop. | |
109 self._starting = 0 | |
110 # True if close was called. Forbids adding new tasks. | |
111 self._is_closed = False | |
112 | |
113 for _ in range(initial_threads): | |
114 self._add_worker() | |
115 | |
116 def _add_worker(self): | |
117 """Adds one worker thread if there isn't too many. Thread-safe.""" | |
118 with self._lock: | |
119 if len(self._workers) >= self._max_threads or self._is_closed: | |
120 return False | |
121 worker = threading.Thread( | |
122 name='%s-%d' % (self._prefix, len(self._workers)), target=self._run) | |
123 self._workers.append(worker) | |
124 self._starting += 1 | |
125 logging.debug('Starting worker thread %s', worker.name) | |
126 worker.daemon = True | |
127 worker.start() | |
128 return True | |
129 | |
130 def add_task(self, priority, func, *args, **kwargs): | |
131 """Adds a task, a function to be executed by a worker. | |
132 | |
133 Arguments: | |
134 - priority: priority of the task versus others. Lower priority takes | |
135 precedence. | |
136 - func: function to run. Can either return a return value to be added to the | |
137 output list or be a generator which can emit multiple values. | |
138 - args and kwargs: arguments to |func|. Note that if func mutates |args| or | |
139 |kwargs| and that the task is retried, see | |
140 AutoRetryThreadPool, the retry will use the mutated | |
141 values. | |
142 | |
143 Returns: | |
144 Index of the item added, e.g. the total number of enqueued items up to | |
145 now. | |
146 """ | |
147 assert isinstance(priority, int) | |
148 assert callable(func) | |
149 with self._lock: | |
150 if self._is_closed: | |
151 raise ThreadPoolClosed('Can not add a task to a closed ThreadPool') | |
152 start_new_worker = ( | |
153 # Pending task count plus new task > number of available workers. | |
154 self.tasks.qsize() + 1 > self._ready + self._starting and | |
155 # Enough slots. | |
156 len(self._workers) < self._max_threads | |
157 ) | |
158 self._pending_count += 1 | |
159 with self._num_of_added_tasks_lock: | |
160 self._num_of_added_tasks += 1 | |
161 index = self._num_of_added_tasks | |
162 self.tasks.put((priority, index, func, args, kwargs)) | |
163 if start_new_worker: | |
164 self._add_worker() | |
165 return index | |
166 | |
167 def _run(self): | |
168 """Worker thread loop. Runs until a None task is queued.""" | |
169 # Thread has started, adjust counters. | |
170 with self._lock: | |
171 self._starting -= 1 | |
172 self._ready += 1 | |
173 while True: | |
174 try: | |
175 task = self.tasks.get() | |
176 finally: | |
177 with self._lock: | |
178 self._ready -= 1 | |
179 try: | |
180 if task is None: | |
181 # We're done. | |
182 return | |
183 _priority, _index, func, args, kwargs = task | |
184 if inspect.isgeneratorfunction(func): | |
185 for out in func(*args, **kwargs): | |
186 self._output_append(out) | |
187 else: | |
188 out = func(*args, **kwargs) | |
189 self._output_append(out) | |
190 except Exception as e: | |
191 logging.warning('Caught exception: %s', e) | |
192 exc_info = sys.exc_info() | |
193 logging.info(''.join(traceback.format_tb(exc_info[2]))) | |
194 with self._outputs_exceptions_cond: | |
195 self._exceptions.append(exc_info) | |
196 self._outputs_exceptions_cond.notifyAll() | |
197 finally: | |
198 try: | |
199 # Mark thread as ready again, mark task as processed. Do it before | |
200 # waking up threads waiting on self.tasks.join(). Otherwise they might | |
201 # find ThreadPool still 'busy' and perform unnecessary wait on CV. | |
202 with self._outputs_exceptions_cond: | |
203 self._ready += 1 | |
204 self._pending_count -= 1 | |
205 if self._pending_count == 0: | |
206 self._outputs_exceptions_cond.notifyAll() | |
207 self.tasks.task_done() | |
208 except Exception as e: | |
209 # We need to catch and log this error here because this is the root | |
210 # function for the thread, nothing higher will catch the error. | |
211 logging.exception('Caught exception while marking task as done: %s', | |
212 e) | |
213 | |
214 def _output_append(self, out): | |
215 if out is not None: | |
216 with self._outputs_exceptions_cond: | |
217 self._outputs.append(out) | |
218 self._outputs_exceptions_cond.notifyAll() | |
219 | |
220 def join(self): | |
221 """Extracts all the results from each threads unordered. | |
222 | |
223 Call repeatedly to extract all the exceptions if desired. | |
224 | |
225 Note: will wait for all work items to be done before returning an exception. | |
226 To get an exception early, use get_one_result(). | |
227 """ | |
228 # TODO(maruel): Stop waiting as soon as an exception is caught. | |
229 self.tasks.join() | |
230 with self._outputs_exceptions_cond: | |
231 if self._exceptions: | |
232 e = self._exceptions.pop(0) | |
233 raise e[0], e[1], e[2] | |
234 out = self._outputs | |
235 self._outputs = [] | |
236 return out | |
237 | |
238 def get_one_result(self): | |
239 """Returns the next item that was generated or raises an exception if one | |
240 occurred. | |
241 | |
242 Raises: | |
243 ThreadPoolEmpty - no results available. | |
244 """ | |
245 # Get first available result. | |
246 for result in self.iter_results(): | |
247 return result | |
248 # No results -> tasks queue is empty. | |
249 raise ThreadPoolEmpty('Task queue is empty') | |
250 | |
251 def iter_results(self): | |
252 """Yields results as they appear until all tasks are processed.""" | |
253 while True: | |
254 # Check for pending results. | |
255 result = None | |
256 with self._outputs_exceptions_cond: | |
257 if self._exceptions: | |
258 e = self._exceptions.pop(0) | |
259 raise e[0], e[1], e[2] | |
260 if self._outputs: | |
261 # Remember the result to yield it outside of the lock. | |
262 result = self._outputs.pop(0) | |
263 else: | |
264 # No pending tasks -> all tasks are done. | |
265 if not self._pending_count: | |
266 return | |
267 # Some task is queued, wait for its result to appear. | |
268 # Use non-None timeout so that process reacts to Ctrl+C and other | |
269 # signals, see http://bugs.python.org/issue8844. | |
270 self._outputs_exceptions_cond.wait(timeout=5) | |
271 continue | |
272 yield result | |
273 | |
274 def close(self): | |
275 """Closes all the threads.""" | |
276 # Ensure no new threads can be started, self._workers is effectively | |
277 # a constant after that and can be accessed outside the lock. | |
278 with self._lock: | |
279 if self._is_closed: | |
280 raise ThreadPoolClosed('Can not close already closed ThreadPool') | |
281 self._is_closed = True | |
282 for _ in range(len(self._workers)): | |
283 # Enqueueing None causes the worker to stop. | |
284 self.tasks.put(None) | |
285 for t in self._workers: | |
286 t.join() | |
287 logging.debug( | |
288 'Thread pool \'%s\' closed: spawned %d threads total', | |
289 self._prefix, len(self._workers)) | |
290 | |
291 def abort(self): | |
292 """Empties the queue. | |
293 | |
294 To be used when the pool should stop early, like when Ctrl-C was detected. | |
295 | |
296 Returns: | |
297 Number of tasks cancelled. | |
298 """ | |
299 index = 0 | |
300 while True: | |
301 try: | |
302 self.tasks.get_nowait() | |
303 self.tasks.task_done() | |
304 index += 1 | |
305 except Queue.Empty: | |
306 return index | |
307 | |
308 def __enter__(self): | |
309 """Enables 'with' statement.""" | |
310 return self | |
311 | |
312 def __exit__(self, _exc_type, _exc_value, _traceback): | |
313 """Enables 'with' statement.""" | |
314 self.close() | |
315 | |
316 | |
317 class AutoRetryThreadPool(ThreadPool): | |
318 """Automatically retries enqueued operations on exception.""" | |
319 INTERNAL_PRIORITY_BITS = (1<<8) - 1 | |
320 HIGH, MED, LOW = (1<<8, 2<<8, 3<<8) | |
321 | |
322 def __init__(self, exceptions, retries, *args, **kwargs): | |
323 """ | |
324 Arguments: | |
325 exceptions: list of exception classes that can be retried on. | |
326 retries: maximum number of retries to do. | |
327 """ | |
328 assert exceptions and all(issubclass(e, Exception) for e in exceptions), ( | |
329 exceptions) | |
330 assert 1 <= retries <= self.INTERNAL_PRIORITY_BITS | |
331 super(AutoRetryThreadPool, self).__init__(*args, **kwargs) | |
332 self._swallowed_exceptions = tuple(exceptions) | |
333 self._retries = retries | |
334 | |
335 def add_task(self, priority, func, *args, **kwargs): | |
336 """Tasks added must not use the lower priority bits since they are reserved | |
337 for retries. | |
338 """ | |
339 assert (priority & self.INTERNAL_PRIORITY_BITS) == 0 | |
340 return super(AutoRetryThreadPool, self).add_task( | |
341 priority, | |
342 self._task_executer, | |
343 priority, | |
344 None, | |
345 func, | |
346 *args, | |
347 **kwargs) | |
348 | |
349 def add_task_with_channel(self, channel, priority, func, *args, **kwargs): | |
350 """Tasks added must not use the lower priority bits since they are reserved | |
351 for retries. | |
352 """ | |
353 assert (priority & self.INTERNAL_PRIORITY_BITS) == 0 | |
354 return super(AutoRetryThreadPool, self).add_task( | |
355 priority, | |
356 self._task_executer, | |
357 priority, | |
358 channel, | |
359 func, | |
360 *args, | |
361 **kwargs) | |
362 | |
363 def _task_executer(self, priority, channel, func, *args, **kwargs): | |
364 """Wraps the function and automatically retry on exceptions.""" | |
365 try: | |
366 result = func(*args, **kwargs) | |
367 if channel is None: | |
368 return result | |
369 channel.send_result(result) | |
370 except self._swallowed_exceptions as e: | |
371 # Retry a few times, lowering the priority. | |
372 actual_retries = priority & self.INTERNAL_PRIORITY_BITS | |
373 if actual_retries < self._retries: | |
374 priority += 1 | |
375 logging.debug( | |
376 'Swallowed exception \'%s\'. Retrying at lower priority %X', | |
377 e, priority) | |
378 super(AutoRetryThreadPool, self).add_task( | |
379 priority, | |
380 self._task_executer, | |
381 priority, | |
382 channel, | |
383 func, | |
384 *args, | |
385 **kwargs) | |
386 return | |
387 if channel is None: | |
388 raise | |
389 channel.send_exception(e) | |
390 except Exception as e: | |
391 if channel is None: | |
392 raise | |
393 channel.send_exception(e) | |
394 | |
395 | |
396 class Progress(object): | |
397 """Prints progress and accepts updates thread-safely.""" | |
398 def __init__(self, columns): | |
399 """Creates a Progress bar that will updates asynchronously from the worker | |
400 threads. | |
401 | |
402 Arguments: | |
403 columns: list of tuple(name, initialvalue), defines both the number of | |
404 columns and their initial values. | |
405 """ | |
406 assert all( | |
407 len(c) == 2 and isinstance(c[0], str) and isinstance(c[1], int) | |
408 for c in columns), columns | |
409 # Members to be used exclusively in the primary thread. | |
410 self.use_cr_only = True | |
411 self.unfinished_commands = set() | |
412 self.start = time.time() | |
413 self._last_printed_line = '' | |
414 self._columns = [c[1] for c in columns] | |
415 self._columns_lookup = dict((c[0], i) for i, c in enumerate(columns)) | |
416 # Setting it to True forces a print on the first print_update() call. | |
417 self._value_changed = True | |
418 | |
419 # To be used in all threads. | |
420 self._queued_updates = Queue.Queue() | |
421 | |
422 def update_item(self, name, raw=False, **kwargs): | |
423 """Queue information to print out. | |
424 | |
425 Arguments: | |
426 name: string to print out to describe something that was completed. | |
427 raw: if True, prints the data without the header. | |
428 raw: if True, prints the data without the header. | |
429 <kwargs>: argument name is a name of a column. it's value is the increment | |
430 to the column, value is usually 0 or 1. | |
431 """ | |
432 assert isinstance(name, str) | |
433 assert isinstance(raw, bool) | |
434 assert all(isinstance(v, int) for v in kwargs.itervalues()) | |
435 args = [(self._columns_lookup[k], v) for k, v in kwargs.iteritems() if v] | |
436 self._queued_updates.put((name, raw, args)) | |
437 | |
438 def print_update(self): | |
439 """Prints the current status.""" | |
440 # Flush all the logging output so it doesn't appear within this output. | |
441 for handler in logging.root.handlers: | |
442 handler.flush() | |
443 | |
444 got_one = False | |
445 while True: | |
446 try: | |
447 name, raw, args = self._queued_updates.get_nowait() | |
448 except Queue.Empty: | |
449 break | |
450 | |
451 for k, v in args: | |
452 self._columns[k] += v | |
453 self._value_changed = bool(args) | |
454 if not name: | |
455 # Even if raw=True, there's nothing to print. | |
456 continue | |
457 | |
458 got_one = True | |
459 if raw: | |
460 # Prints the data as-is. | |
461 self._last_printed_line = '' | |
462 sys.stdout.write('\n%s\n' % name.strip('\n')) | |
463 else: | |
464 line, self._last_printed_line = self._gen_line(name) | |
465 sys.stdout.write(line) | |
466 | |
467 if not got_one and self._value_changed: | |
468 # Make sure a line is printed in that case where statistics changes. | |
469 line, self._last_printed_line = self._gen_line('') | |
470 sys.stdout.write(line) | |
471 got_one = True | |
472 self._value_changed = False | |
473 if got_one: | |
474 # Ensure that all the output is flushed to prevent it from getting mixed | |
475 # with other output streams (like the logging streams). | |
476 sys.stdout.flush() | |
477 | |
478 if self.unfinished_commands: | |
479 logging.debug('Waiting for the following commands to finish:\n%s', | |
480 '\n'.join(self.unfinished_commands)) | |
481 | |
482 def _gen_line(self, name): | |
483 """Generates the line to be printed.""" | |
484 next_line = ('[%s] %6.2fs %s') % ( | |
485 self._render_columns(), time.time() - self.start, name) | |
486 # Fill it with whitespace only if self.use_cr_only is set. | |
487 prefix = '' | |
488 if self.use_cr_only and self._last_printed_line: | |
489 prefix = '\r' | |
490 if self.use_cr_only: | |
491 suffix = ' ' * max(0, len(self._last_printed_line) - len(next_line)) | |
492 else: | |
493 suffix = '\n' | |
494 return '%s%s%s' % (prefix, next_line, suffix), next_line | |
495 | |
496 def _render_columns(self): | |
497 """Renders the columns.""" | |
498 columns_as_str = map(str, self._columns) | |
499 max_len = max(map(len, columns_as_str)) | |
500 return '/'.join(i.rjust(max_len) for i in columns_as_str) | |
501 | |
502 | |
503 class QueueWithProgress(Queue.PriorityQueue): | |
504 """Implements progress support in join().""" | |
505 def __init__(self, progress, *args, **kwargs): | |
506 Queue.PriorityQueue.__init__(self, *args, **kwargs) | |
507 self.progress = progress | |
508 | |
509 def task_done(self): | |
510 """Contrary to Queue.task_done(), it wakes self.all_tasks_done at each task | |
511 done. | |
512 """ | |
513 with self.all_tasks_done: | |
514 try: | |
515 unfinished = self.unfinished_tasks - 1 | |
516 if unfinished < 0: | |
517 raise ValueError('task_done() called too many times') | |
518 self.unfinished_tasks = unfinished | |
519 # This is less efficient, because we want the Progress to be updated. | |
520 self.all_tasks_done.notify_all() | |
521 except Exception as e: | |
522 logging.exception('task_done threw an exception.\n%s', e) | |
523 | |
524 def wake_up(self): | |
525 """Wakes up all_tasks_done. | |
526 | |
527 Unlike task_done(), do not substract one from self.unfinished_tasks. | |
528 """ | |
529 # TODO(maruel): This is highly inefficient, since the listener is awaken | |
530 # twice; once per output, once per task. There should be no relationship | |
531 # between the number of output and the number of input task. | |
532 with self.all_tasks_done: | |
533 self.all_tasks_done.notify_all() | |
534 | |
535 def join(self): | |
536 """Calls print_update() whenever possible.""" | |
537 self.progress.print_update() | |
538 with self.all_tasks_done: | |
539 while self.unfinished_tasks: | |
540 self.progress.print_update() | |
541 # Use a short wait timeout so updates are printed in a timely manner. | |
542 # TODO(maruel): Find a way so Progress.queue and self.all_tasks_done | |
543 # share the same underlying event so no polling is necessary. | |
544 self.all_tasks_done.wait(0.1) | |
545 self.progress.print_update() | |
546 | |
547 | |
548 class ThreadPoolWithProgress(ThreadPool): | |
549 QUEUE_CLASS = QueueWithProgress | |
550 | |
551 def __init__(self, progress, *args, **kwargs): | |
552 self.QUEUE_CLASS = functools.partial(self.QUEUE_CLASS, progress) | |
553 super(ThreadPoolWithProgress, self).__init__(*args, **kwargs) | |
554 | |
555 def _output_append(self, out): | |
556 """Also wakes up the listener on new completed test_case.""" | |
557 super(ThreadPoolWithProgress, self)._output_append(out) | |
558 self.tasks.wake_up() | |
559 | |
560 | |
561 class DeadlockDetector(object): | |
562 """Context manager that can detect deadlocks. | |
563 | |
564 It will dump stack frames of all running threads if its 'ping' method isn't | |
565 called in time. | |
566 | |
567 Usage: | |
568 with DeadlockDetector(timeout=60) as detector: | |
569 for item in some_work(): | |
570 ... | |
571 detector.ping() | |
572 ... | |
573 | |
574 Arguments: | |
575 timeout - maximum allowed time between calls to 'ping'. | |
576 """ | |
577 | |
578 def __init__(self, timeout): | |
579 self.timeout = timeout | |
580 self._thread = None | |
581 # Thread stop condition. Also lock for shared variables below. | |
582 self._stop_cv = threading.Condition() | |
583 self._stop_flag = False | |
584 # Time when 'ping' was called last time. | |
585 self._last_ping = None | |
586 # True if pings are coming on time. | |
587 self._alive = True | |
588 | |
589 def __enter__(self): | |
590 """Starts internal watcher thread.""" | |
591 assert self._thread is None | |
592 self.ping() | |
593 self._thread = threading.Thread(name='deadlock-detector', target=self._run) | |
594 self._thread.daemon = True | |
595 self._thread.start() | |
596 return self | |
597 | |
598 def __exit__(self, *_args): | |
599 """Stops internal watcher thread.""" | |
600 assert self._thread is not None | |
601 with self._stop_cv: | |
602 self._stop_flag = True | |
603 self._stop_cv.notify() | |
604 self._thread.join() | |
605 self._thread = None | |
606 self._stop_flag = False | |
607 | |
608 def ping(self): | |
609 """Notify detector that main thread is still running. | |
610 | |
611 Should be called periodically to inform the detector that everything is | |
612 running as it should. | |
613 """ | |
614 with self._stop_cv: | |
615 self._last_ping = time.time() | |
616 self._alive = True | |
617 | |
618 def _run(self): | |
619 """Loop that watches for pings and dumps threads state if ping is late.""" | |
620 with self._stop_cv: | |
621 while not self._stop_flag: | |
622 # Skipped deadline? Dump threads and switch to 'not alive' state. | |
623 if self._alive and time.time() > self._last_ping + self.timeout: | |
624 self.dump_threads(time.time() - self._last_ping, True) | |
625 self._alive = False | |
626 | |
627 # Pings are on time? | |
628 if self._alive: | |
629 # Wait until the moment we need to dump stack traces. | |
630 # Most probably some other thread will call 'ping' to move deadline | |
631 # further in time. We don't bother to wake up after each 'ping', | |
632 # only right before initial expected deadline. | |
633 self._stop_cv.wait(self._last_ping + self.timeout - time.time()) | |
634 else: | |
635 # Skipped some pings previously. Just periodically silently check | |
636 # for new pings with some arbitrary frequency. | |
637 self._stop_cv.wait(self.timeout * 0.1) | |
638 | |
639 @staticmethod | |
640 def dump_threads(timeout=None, skip_current_thread=False): | |
641 """Dumps stack frames of all running threads.""" | |
642 all_threads = threading.enumerate() | |
643 current_thread_id = threading.current_thread().ident | |
644 | |
645 # Collect tracebacks: thread name -> traceback string. | |
646 tracebacks = {} | |
647 | |
648 # pylint: disable=W0212 | |
649 for thread_id, frame in sys._current_frames().iteritems(): | |
650 # Don't dump deadlock detector's own thread, it's boring. | |
651 if thread_id == current_thread_id and skip_current_thread: | |
652 continue | |
653 | |
654 # Try to get more informative symbolic thread name. | |
655 name = 'untitled' | |
656 for thread in all_threads: | |
657 if thread.ident == thread_id: | |
658 name = thread.name | |
659 break | |
660 name += ' #%d' % (thread_id,) | |
661 tracebacks[name] = ''.join(traceback.format_stack(frame)) | |
662 | |
663 # Function to print a message. Makes it easier to change output destination. | |
664 def output(msg): | |
665 logging.warning(msg.rstrip()) | |
666 | |
667 # Print tracebacks, sorting them by thread name. That way a thread pool's | |
668 # threads will be printed as one group. | |
669 output('=============== Potential deadlock detected ===============') | |
670 if timeout is not None: | |
671 output('No pings in last %d sec.' % (timeout,)) | |
672 output('Dumping stack frames for all threads:') | |
673 for name in sorted(tracebacks): | |
674 output('Traceback for \'%s\':\n%s' % (name, tracebacks[name])) | |
675 output('===========================================================') | |
676 | |
677 | |
678 class Bit(object): | |
679 """Thread safe setable bit.""" | |
680 | |
681 def __init__(self): | |
682 self._lock = threading.Lock() | |
683 self._value = False | |
684 | |
685 def get(self): | |
686 with self._lock: | |
687 return self._value | |
688 | |
689 def set(self): | |
690 with self._lock: | |
691 self._value = True | |
692 | |
693 | |
694 class TaskChannel(object): | |
695 """Queue of results of async task execution.""" | |
696 | |
697 _ITEM_RESULT = 0 | |
698 _ITEM_EXCEPTION = 1 | |
699 | |
700 def __init__(self): | |
701 self._queue = Queue.Queue() | |
702 | |
703 def send_result(self, result): | |
704 """Enqueues a result of task execution.""" | |
705 self._queue.put((self._ITEM_RESULT, result)) | |
706 | |
707 def send_exception(self, exc): | |
708 """Enqueue an exception raised by a task.""" | |
709 assert isinstance(exc, Exception) | |
710 self._queue.put((self._ITEM_EXCEPTION, exc)) | |
711 | |
712 def pull(self): | |
713 """Dequeues available result or exception.""" | |
714 item_type, value = self._queue.get() | |
715 if item_type == self._ITEM_RESULT: | |
716 return value | |
717 if item_type == self._ITEM_EXCEPTION: | |
718 raise value | |
719 assert False, 'Impossible queue item type: %r' % item_type | |
720 | |
721 def wrap_task(self, task): | |
722 """Decorator that makes a function push results into this channel.""" | |
723 @functools.wraps(task) | |
724 def wrapped(*args, **kwargs): | |
725 try: | |
726 self.send_result(task(*args, **kwargs)) | |
727 except Exception as exc: | |
728 self.send_exception(exc) | |
729 return wrapped | |
730 | |
731 | |
732 def num_processors(): | |
733 """Returns the number of processors. | |
734 | |
735 Python on OSX 10.6 raises a NotImplementedError exception. | |
736 """ | |
737 try: | |
738 # Multiprocessing | |
739 import multiprocessing | |
740 return multiprocessing.cpu_count() | |
741 except: # pylint: disable=W0702 | |
742 try: | |
743 # Mac OS 10.6 | |
744 return int(os.sysconf('SC_NPROCESSORS_ONLN')) # pylint: disable=E1101 | |
745 except: | |
746 # Some of the windows builders seem to get here. | |
747 return 4 | |
OLD | NEW |