| Index: swarm_client/utils/threading_utils.py
|
| ===================================================================
|
| --- swarm_client/utils/threading_utils.py (revision 235167)
|
| +++ swarm_client/utils/threading_utils.py (working copy)
|
| @@ -1,747 +0,0 @@
|
| -# Copyright 2013 The Chromium Authors. All rights reserved.
|
| -# Use of this source code is governed by a BSD-style license that can be
|
| -# found in the LICENSE file.
|
| -
|
| -"""Classes and functions related to threading."""
|
| -
|
| -import functools
|
| -import inspect
|
| -import logging
|
| -import os
|
| -import Queue
|
| -import sys
|
| -import threading
|
| -import time
|
| -import traceback
|
| -
|
| -
|
| -class LockWithAssert(object):
|
| - """Wrapper around (non recursive) Lock that tracks its owner."""
|
| -
|
| - def __init__(self):
|
| - self._lock = threading.Lock()
|
| - self._owner = None
|
| -
|
| - def __enter__(self):
|
| - self._lock.acquire()
|
| - assert self._owner is None
|
| - self._owner = threading.current_thread()
|
| -
|
| - def __exit__(self, _exc_type, _exec_value, _traceback):
|
| - self.assert_locked('Releasing unowned lock')
|
| - self._owner = None
|
| - self._lock.release()
|
| - return False
|
| -
|
| - def assert_locked(self, msg=None):
|
| - """Asserts the lock is owned by running thread."""
|
| - assert self._owner == threading.current_thread(), msg
|
| -
|
| -
|
| -class ThreadPoolError(Exception):
|
| - """Base class for exceptions raised by ThreadPool."""
|
| - pass
|
| -
|
| -
|
| -class ThreadPoolEmpty(ThreadPoolError):
|
| - """Trying to get task result from a thread pool with no pending tasks."""
|
| - pass
|
| -
|
| -
|
| -class ThreadPoolClosed(ThreadPoolError):
|
| - """Trying to do something with a closed thread pool."""
|
| - pass
|
| -
|
| -
|
| -class ThreadPool(object):
|
| - """Multithreaded worker pool with priority support.
|
| -
|
| - When the priority of tasks match, it works in strict FIFO mode.
|
| - """
|
| - QUEUE_CLASS = Queue.PriorityQueue
|
| -
|
| - def __init__(self, initial_threads, max_threads, queue_size, prefix=None):
|
| - """Immediately starts |initial_threads| threads.
|
| -
|
| - Arguments:
|
| - initial_threads: Number of threads to start immediately. Can be 0 if it is
|
| - uncertain that threads will be needed.
|
| - max_threads: Maximum number of threads that will be started when all the
|
| - threads are busy working. Often the number of CPU cores.
|
| - queue_size: Maximum number of tasks to buffer in the queue. 0 for
|
| - unlimited queue. A non-zero value may make add_task()
|
| - blocking.
|
| - prefix: Prefix to use for thread names. Pool's threads will be
|
| - named '<prefix>-<thread index>'.
|
| - """
|
| - prefix = prefix or 'tp-0x%0x' % id(self)
|
| - logging.debug(
|
| - 'New ThreadPool(%d, %d, %d): %s', initial_threads, max_threads,
|
| - queue_size, prefix)
|
| - assert initial_threads <= max_threads
|
| - # Update this check once 256 cores CPU are common.
|
| - assert max_threads <= 256
|
| -
|
| - self.tasks = self.QUEUE_CLASS(queue_size)
|
| - self._max_threads = max_threads
|
| - self._prefix = prefix
|
| -
|
| - # Used to assign indexes to tasks.
|
| - self._num_of_added_tasks_lock = threading.Lock()
|
| - self._num_of_added_tasks = 0
|
| -
|
| - # Lock that protected everything below (including conditional variable).
|
| - self._lock = threading.Lock()
|
| -
|
| - # Condition 'bool(_outputs) or bool(_exceptions) or _pending_count == 0'.
|
| - self._outputs_exceptions_cond = threading.Condition(self._lock)
|
| - self._outputs = []
|
| - self._exceptions = []
|
| -
|
| - # Number of pending tasks (queued or being processed now).
|
| - self._pending_count = 0
|
| -
|
| - # List of threads.
|
| - self._workers = []
|
| - # Number of threads that are waiting for new tasks.
|
| - self._ready = 0
|
| - # Number of threads already added to _workers, but not yet running the loop.
|
| - self._starting = 0
|
| - # True if close was called. Forbids adding new tasks.
|
| - self._is_closed = False
|
| -
|
| - for _ in range(initial_threads):
|
| - self._add_worker()
|
| -
|
| - def _add_worker(self):
|
| - """Adds one worker thread if there isn't too many. Thread-safe."""
|
| - with self._lock:
|
| - if len(self._workers) >= self._max_threads or self._is_closed:
|
| - return False
|
| - worker = threading.Thread(
|
| - name='%s-%d' % (self._prefix, len(self._workers)), target=self._run)
|
| - self._workers.append(worker)
|
| - self._starting += 1
|
| - logging.debug('Starting worker thread %s', worker.name)
|
| - worker.daemon = True
|
| - worker.start()
|
| - return True
|
| -
|
| - def add_task(self, priority, func, *args, **kwargs):
|
| - """Adds a task, a function to be executed by a worker.
|
| -
|
| - Arguments:
|
| - - priority: priority of the task versus others. Lower priority takes
|
| - precedence.
|
| - - func: function to run. Can either return a return value to be added to the
|
| - output list or be a generator which can emit multiple values.
|
| - - args and kwargs: arguments to |func|. Note that if func mutates |args| or
|
| - |kwargs| and that the task is retried, see
|
| - AutoRetryThreadPool, the retry will use the mutated
|
| - values.
|
| -
|
| - Returns:
|
| - Index of the item added, e.g. the total number of enqueued items up to
|
| - now.
|
| - """
|
| - assert isinstance(priority, int)
|
| - assert callable(func)
|
| - with self._lock:
|
| - if self._is_closed:
|
| - raise ThreadPoolClosed('Can not add a task to a closed ThreadPool')
|
| - start_new_worker = (
|
| - # Pending task count plus new task > number of available workers.
|
| - self.tasks.qsize() + 1 > self._ready + self._starting and
|
| - # Enough slots.
|
| - len(self._workers) < self._max_threads
|
| - )
|
| - self._pending_count += 1
|
| - with self._num_of_added_tasks_lock:
|
| - self._num_of_added_tasks += 1
|
| - index = self._num_of_added_tasks
|
| - self.tasks.put((priority, index, func, args, kwargs))
|
| - if start_new_worker:
|
| - self._add_worker()
|
| - return index
|
| -
|
| - def _run(self):
|
| - """Worker thread loop. Runs until a None task is queued."""
|
| - # Thread has started, adjust counters.
|
| - with self._lock:
|
| - self._starting -= 1
|
| - self._ready += 1
|
| - while True:
|
| - try:
|
| - task = self.tasks.get()
|
| - finally:
|
| - with self._lock:
|
| - self._ready -= 1
|
| - try:
|
| - if task is None:
|
| - # We're done.
|
| - return
|
| - _priority, _index, func, args, kwargs = task
|
| - if inspect.isgeneratorfunction(func):
|
| - for out in func(*args, **kwargs):
|
| - self._output_append(out)
|
| - else:
|
| - out = func(*args, **kwargs)
|
| - self._output_append(out)
|
| - except Exception as e:
|
| - logging.warning('Caught exception: %s', e)
|
| - exc_info = sys.exc_info()
|
| - logging.info(''.join(traceback.format_tb(exc_info[2])))
|
| - with self._outputs_exceptions_cond:
|
| - self._exceptions.append(exc_info)
|
| - self._outputs_exceptions_cond.notifyAll()
|
| - finally:
|
| - try:
|
| - # Mark thread as ready again, mark task as processed. Do it before
|
| - # waking up threads waiting on self.tasks.join(). Otherwise they might
|
| - # find ThreadPool still 'busy' and perform unnecessary wait on CV.
|
| - with self._outputs_exceptions_cond:
|
| - self._ready += 1
|
| - self._pending_count -= 1
|
| - if self._pending_count == 0:
|
| - self._outputs_exceptions_cond.notifyAll()
|
| - self.tasks.task_done()
|
| - except Exception as e:
|
| - # We need to catch and log this error here because this is the root
|
| - # function for the thread, nothing higher will catch the error.
|
| - logging.exception('Caught exception while marking task as done: %s',
|
| - e)
|
| -
|
| - def _output_append(self, out):
|
| - if out is not None:
|
| - with self._outputs_exceptions_cond:
|
| - self._outputs.append(out)
|
| - self._outputs_exceptions_cond.notifyAll()
|
| -
|
| - def join(self):
|
| - """Extracts all the results from each threads unordered.
|
| -
|
| - Call repeatedly to extract all the exceptions if desired.
|
| -
|
| - Note: will wait for all work items to be done before returning an exception.
|
| - To get an exception early, use get_one_result().
|
| - """
|
| - # TODO(maruel): Stop waiting as soon as an exception is caught.
|
| - self.tasks.join()
|
| - with self._outputs_exceptions_cond:
|
| - if self._exceptions:
|
| - e = self._exceptions.pop(0)
|
| - raise e[0], e[1], e[2]
|
| - out = self._outputs
|
| - self._outputs = []
|
| - return out
|
| -
|
| - def get_one_result(self):
|
| - """Returns the next item that was generated or raises an exception if one
|
| - occurred.
|
| -
|
| - Raises:
|
| - ThreadPoolEmpty - no results available.
|
| - """
|
| - # Get first available result.
|
| - for result in self.iter_results():
|
| - return result
|
| - # No results -> tasks queue is empty.
|
| - raise ThreadPoolEmpty('Task queue is empty')
|
| -
|
| - def iter_results(self):
|
| - """Yields results as they appear until all tasks are processed."""
|
| - while True:
|
| - # Check for pending results.
|
| - result = None
|
| - with self._outputs_exceptions_cond:
|
| - if self._exceptions:
|
| - e = self._exceptions.pop(0)
|
| - raise e[0], e[1], e[2]
|
| - if self._outputs:
|
| - # Remember the result to yield it outside of the lock.
|
| - result = self._outputs.pop(0)
|
| - else:
|
| - # No pending tasks -> all tasks are done.
|
| - if not self._pending_count:
|
| - return
|
| - # Some task is queued, wait for its result to appear.
|
| - # Use non-None timeout so that process reacts to Ctrl+C and other
|
| - # signals, see http://bugs.python.org/issue8844.
|
| - self._outputs_exceptions_cond.wait(timeout=5)
|
| - continue
|
| - yield result
|
| -
|
| - def close(self):
|
| - """Closes all the threads."""
|
| - # Ensure no new threads can be started, self._workers is effectively
|
| - # a constant after that and can be accessed outside the lock.
|
| - with self._lock:
|
| - if self._is_closed:
|
| - raise ThreadPoolClosed('Can not close already closed ThreadPool')
|
| - self._is_closed = True
|
| - for _ in range(len(self._workers)):
|
| - # Enqueueing None causes the worker to stop.
|
| - self.tasks.put(None)
|
| - for t in self._workers:
|
| - t.join()
|
| - logging.debug(
|
| - 'Thread pool \'%s\' closed: spawned %d threads total',
|
| - self._prefix, len(self._workers))
|
| -
|
| - def abort(self):
|
| - """Empties the queue.
|
| -
|
| - To be used when the pool should stop early, like when Ctrl-C was detected.
|
| -
|
| - Returns:
|
| - Number of tasks cancelled.
|
| - """
|
| - index = 0
|
| - while True:
|
| - try:
|
| - self.tasks.get_nowait()
|
| - self.tasks.task_done()
|
| - index += 1
|
| - except Queue.Empty:
|
| - return index
|
| -
|
| - def __enter__(self):
|
| - """Enables 'with' statement."""
|
| - return self
|
| -
|
| - def __exit__(self, _exc_type, _exc_value, _traceback):
|
| - """Enables 'with' statement."""
|
| - self.close()
|
| -
|
| -
|
| -class AutoRetryThreadPool(ThreadPool):
|
| - """Automatically retries enqueued operations on exception."""
|
| - INTERNAL_PRIORITY_BITS = (1<<8) - 1
|
| - HIGH, MED, LOW = (1<<8, 2<<8, 3<<8)
|
| -
|
| - def __init__(self, exceptions, retries, *args, **kwargs):
|
| - """
|
| - Arguments:
|
| - exceptions: list of exception classes that can be retried on.
|
| - retries: maximum number of retries to do.
|
| - """
|
| - assert exceptions and all(issubclass(e, Exception) for e in exceptions), (
|
| - exceptions)
|
| - assert 1 <= retries <= self.INTERNAL_PRIORITY_BITS
|
| - super(AutoRetryThreadPool, self).__init__(*args, **kwargs)
|
| - self._swallowed_exceptions = tuple(exceptions)
|
| - self._retries = retries
|
| -
|
| - def add_task(self, priority, func, *args, **kwargs):
|
| - """Tasks added must not use the lower priority bits since they are reserved
|
| - for retries.
|
| - """
|
| - assert (priority & self.INTERNAL_PRIORITY_BITS) == 0
|
| - return super(AutoRetryThreadPool, self).add_task(
|
| - priority,
|
| - self._task_executer,
|
| - priority,
|
| - None,
|
| - func,
|
| - *args,
|
| - **kwargs)
|
| -
|
| - def add_task_with_channel(self, channel, priority, func, *args, **kwargs):
|
| - """Tasks added must not use the lower priority bits since they are reserved
|
| - for retries.
|
| - """
|
| - assert (priority & self.INTERNAL_PRIORITY_BITS) == 0
|
| - return super(AutoRetryThreadPool, self).add_task(
|
| - priority,
|
| - self._task_executer,
|
| - priority,
|
| - channel,
|
| - func,
|
| - *args,
|
| - **kwargs)
|
| -
|
| - def _task_executer(self, priority, channel, func, *args, **kwargs):
|
| - """Wraps the function and automatically retry on exceptions."""
|
| - try:
|
| - result = func(*args, **kwargs)
|
| - if channel is None:
|
| - return result
|
| - channel.send_result(result)
|
| - except self._swallowed_exceptions as e:
|
| - # Retry a few times, lowering the priority.
|
| - actual_retries = priority & self.INTERNAL_PRIORITY_BITS
|
| - if actual_retries < self._retries:
|
| - priority += 1
|
| - logging.debug(
|
| - 'Swallowed exception \'%s\'. Retrying at lower priority %X',
|
| - e, priority)
|
| - super(AutoRetryThreadPool, self).add_task(
|
| - priority,
|
| - self._task_executer,
|
| - priority,
|
| - channel,
|
| - func,
|
| - *args,
|
| - **kwargs)
|
| - return
|
| - if channel is None:
|
| - raise
|
| - channel.send_exception(e)
|
| - except Exception as e:
|
| - if channel is None:
|
| - raise
|
| - channel.send_exception(e)
|
| -
|
| -
|
| -class Progress(object):
|
| - """Prints progress and accepts updates thread-safely."""
|
| - def __init__(self, columns):
|
| - """Creates a Progress bar that will updates asynchronously from the worker
|
| - threads.
|
| -
|
| - Arguments:
|
| - columns: list of tuple(name, initialvalue), defines both the number of
|
| - columns and their initial values.
|
| - """
|
| - assert all(
|
| - len(c) == 2 and isinstance(c[0], str) and isinstance(c[1], int)
|
| - for c in columns), columns
|
| - # Members to be used exclusively in the primary thread.
|
| - self.use_cr_only = True
|
| - self.unfinished_commands = set()
|
| - self.start = time.time()
|
| - self._last_printed_line = ''
|
| - self._columns = [c[1] for c in columns]
|
| - self._columns_lookup = dict((c[0], i) for i, c in enumerate(columns))
|
| - # Setting it to True forces a print on the first print_update() call.
|
| - self._value_changed = True
|
| -
|
| - # To be used in all threads.
|
| - self._queued_updates = Queue.Queue()
|
| -
|
| - def update_item(self, name, raw=False, **kwargs):
|
| - """Queue information to print out.
|
| -
|
| - Arguments:
|
| - name: string to print out to describe something that was completed.
|
| - raw: if True, prints the data without the header.
|
| - raw: if True, prints the data without the header.
|
| - <kwargs>: argument name is a name of a column. it's value is the increment
|
| - to the column, value is usually 0 or 1.
|
| - """
|
| - assert isinstance(name, str)
|
| - assert isinstance(raw, bool)
|
| - assert all(isinstance(v, int) for v in kwargs.itervalues())
|
| - args = [(self._columns_lookup[k], v) for k, v in kwargs.iteritems() if v]
|
| - self._queued_updates.put((name, raw, args))
|
| -
|
| - def print_update(self):
|
| - """Prints the current status."""
|
| - # Flush all the logging output so it doesn't appear within this output.
|
| - for handler in logging.root.handlers:
|
| - handler.flush()
|
| -
|
| - got_one = False
|
| - while True:
|
| - try:
|
| - name, raw, args = self._queued_updates.get_nowait()
|
| - except Queue.Empty:
|
| - break
|
| -
|
| - for k, v in args:
|
| - self._columns[k] += v
|
| - self._value_changed = bool(args)
|
| - if not name:
|
| - # Even if raw=True, there's nothing to print.
|
| - continue
|
| -
|
| - got_one = True
|
| - if raw:
|
| - # Prints the data as-is.
|
| - self._last_printed_line = ''
|
| - sys.stdout.write('\n%s\n' % name.strip('\n'))
|
| - else:
|
| - line, self._last_printed_line = self._gen_line(name)
|
| - sys.stdout.write(line)
|
| -
|
| - if not got_one and self._value_changed:
|
| - # Make sure a line is printed in that case where statistics changes.
|
| - line, self._last_printed_line = self._gen_line('')
|
| - sys.stdout.write(line)
|
| - got_one = True
|
| - self._value_changed = False
|
| - if got_one:
|
| - # Ensure that all the output is flushed to prevent it from getting mixed
|
| - # with other output streams (like the logging streams).
|
| - sys.stdout.flush()
|
| -
|
| - if self.unfinished_commands:
|
| - logging.debug('Waiting for the following commands to finish:\n%s',
|
| - '\n'.join(self.unfinished_commands))
|
| -
|
| - def _gen_line(self, name):
|
| - """Generates the line to be printed."""
|
| - next_line = ('[%s] %6.2fs %s') % (
|
| - self._render_columns(), time.time() - self.start, name)
|
| - # Fill it with whitespace only if self.use_cr_only is set.
|
| - prefix = ''
|
| - if self.use_cr_only and self._last_printed_line:
|
| - prefix = '\r'
|
| - if self.use_cr_only:
|
| - suffix = ' ' * max(0, len(self._last_printed_line) - len(next_line))
|
| - else:
|
| - suffix = '\n'
|
| - return '%s%s%s' % (prefix, next_line, suffix), next_line
|
| -
|
| - def _render_columns(self):
|
| - """Renders the columns."""
|
| - columns_as_str = map(str, self._columns)
|
| - max_len = max(map(len, columns_as_str))
|
| - return '/'.join(i.rjust(max_len) for i in columns_as_str)
|
| -
|
| -
|
| -class QueueWithProgress(Queue.PriorityQueue):
|
| - """Implements progress support in join()."""
|
| - def __init__(self, progress, *args, **kwargs):
|
| - Queue.PriorityQueue.__init__(self, *args, **kwargs)
|
| - self.progress = progress
|
| -
|
| - def task_done(self):
|
| - """Contrary to Queue.task_done(), it wakes self.all_tasks_done at each task
|
| - done.
|
| - """
|
| - with self.all_tasks_done:
|
| - try:
|
| - unfinished = self.unfinished_tasks - 1
|
| - if unfinished < 0:
|
| - raise ValueError('task_done() called too many times')
|
| - self.unfinished_tasks = unfinished
|
| - # This is less efficient, because we want the Progress to be updated.
|
| - self.all_tasks_done.notify_all()
|
| - except Exception as e:
|
| - logging.exception('task_done threw an exception.\n%s', e)
|
| -
|
| - def wake_up(self):
|
| - """Wakes up all_tasks_done.
|
| -
|
| - Unlike task_done(), do not substract one from self.unfinished_tasks.
|
| - """
|
| - # TODO(maruel): This is highly inefficient, since the listener is awaken
|
| - # twice; once per output, once per task. There should be no relationship
|
| - # between the number of output and the number of input task.
|
| - with self.all_tasks_done:
|
| - self.all_tasks_done.notify_all()
|
| -
|
| - def join(self):
|
| - """Calls print_update() whenever possible."""
|
| - self.progress.print_update()
|
| - with self.all_tasks_done:
|
| - while self.unfinished_tasks:
|
| - self.progress.print_update()
|
| - # Use a short wait timeout so updates are printed in a timely manner.
|
| - # TODO(maruel): Find a way so Progress.queue and self.all_tasks_done
|
| - # share the same underlying event so no polling is necessary.
|
| - self.all_tasks_done.wait(0.1)
|
| - self.progress.print_update()
|
| -
|
| -
|
| -class ThreadPoolWithProgress(ThreadPool):
|
| - QUEUE_CLASS = QueueWithProgress
|
| -
|
| - def __init__(self, progress, *args, **kwargs):
|
| - self.QUEUE_CLASS = functools.partial(self.QUEUE_CLASS, progress)
|
| - super(ThreadPoolWithProgress, self).__init__(*args, **kwargs)
|
| -
|
| - def _output_append(self, out):
|
| - """Also wakes up the listener on new completed test_case."""
|
| - super(ThreadPoolWithProgress, self)._output_append(out)
|
| - self.tasks.wake_up()
|
| -
|
| -
|
| -class DeadlockDetector(object):
|
| - """Context manager that can detect deadlocks.
|
| -
|
| - It will dump stack frames of all running threads if its 'ping' method isn't
|
| - called in time.
|
| -
|
| - Usage:
|
| - with DeadlockDetector(timeout=60) as detector:
|
| - for item in some_work():
|
| - ...
|
| - detector.ping()
|
| - ...
|
| -
|
| - Arguments:
|
| - timeout - maximum allowed time between calls to 'ping'.
|
| - """
|
| -
|
| - def __init__(self, timeout):
|
| - self.timeout = timeout
|
| - self._thread = None
|
| - # Thread stop condition. Also lock for shared variables below.
|
| - self._stop_cv = threading.Condition()
|
| - self._stop_flag = False
|
| - # Time when 'ping' was called last time.
|
| - self._last_ping = None
|
| - # True if pings are coming on time.
|
| - self._alive = True
|
| -
|
| - def __enter__(self):
|
| - """Starts internal watcher thread."""
|
| - assert self._thread is None
|
| - self.ping()
|
| - self._thread = threading.Thread(name='deadlock-detector', target=self._run)
|
| - self._thread.daemon = True
|
| - self._thread.start()
|
| - return self
|
| -
|
| - def __exit__(self, *_args):
|
| - """Stops internal watcher thread."""
|
| - assert self._thread is not None
|
| - with self._stop_cv:
|
| - self._stop_flag = True
|
| - self._stop_cv.notify()
|
| - self._thread.join()
|
| - self._thread = None
|
| - self._stop_flag = False
|
| -
|
| - def ping(self):
|
| - """Notify detector that main thread is still running.
|
| -
|
| - Should be called periodically to inform the detector that everything is
|
| - running as it should.
|
| - """
|
| - with self._stop_cv:
|
| - self._last_ping = time.time()
|
| - self._alive = True
|
| -
|
| - def _run(self):
|
| - """Loop that watches for pings and dumps threads state if ping is late."""
|
| - with self._stop_cv:
|
| - while not self._stop_flag:
|
| - # Skipped deadline? Dump threads and switch to 'not alive' state.
|
| - if self._alive and time.time() > self._last_ping + self.timeout:
|
| - self.dump_threads(time.time() - self._last_ping, True)
|
| - self._alive = False
|
| -
|
| - # Pings are on time?
|
| - if self._alive:
|
| - # Wait until the moment we need to dump stack traces.
|
| - # Most probably some other thread will call 'ping' to move deadline
|
| - # further in time. We don't bother to wake up after each 'ping',
|
| - # only right before initial expected deadline.
|
| - self._stop_cv.wait(self._last_ping + self.timeout - time.time())
|
| - else:
|
| - # Skipped some pings previously. Just periodically silently check
|
| - # for new pings with some arbitrary frequency.
|
| - self._stop_cv.wait(self.timeout * 0.1)
|
| -
|
| - @staticmethod
|
| - def dump_threads(timeout=None, skip_current_thread=False):
|
| - """Dumps stack frames of all running threads."""
|
| - all_threads = threading.enumerate()
|
| - current_thread_id = threading.current_thread().ident
|
| -
|
| - # Collect tracebacks: thread name -> traceback string.
|
| - tracebacks = {}
|
| -
|
| - # pylint: disable=W0212
|
| - for thread_id, frame in sys._current_frames().iteritems():
|
| - # Don't dump deadlock detector's own thread, it's boring.
|
| - if thread_id == current_thread_id and skip_current_thread:
|
| - continue
|
| -
|
| - # Try to get more informative symbolic thread name.
|
| - name = 'untitled'
|
| - for thread in all_threads:
|
| - if thread.ident == thread_id:
|
| - name = thread.name
|
| - break
|
| - name += ' #%d' % (thread_id,)
|
| - tracebacks[name] = ''.join(traceback.format_stack(frame))
|
| -
|
| - # Function to print a message. Makes it easier to change output destination.
|
| - def output(msg):
|
| - logging.warning(msg.rstrip())
|
| -
|
| - # Print tracebacks, sorting them by thread name. That way a thread pool's
|
| - # threads will be printed as one group.
|
| - output('=============== Potential deadlock detected ===============')
|
| - if timeout is not None:
|
| - output('No pings in last %d sec.' % (timeout,))
|
| - output('Dumping stack frames for all threads:')
|
| - for name in sorted(tracebacks):
|
| - output('Traceback for \'%s\':\n%s' % (name, tracebacks[name]))
|
| - output('===========================================================')
|
| -
|
| -
|
| -class Bit(object):
|
| - """Thread safe setable bit."""
|
| -
|
| - def __init__(self):
|
| - self._lock = threading.Lock()
|
| - self._value = False
|
| -
|
| - def get(self):
|
| - with self._lock:
|
| - return self._value
|
| -
|
| - def set(self):
|
| - with self._lock:
|
| - self._value = True
|
| -
|
| -
|
| -class TaskChannel(object):
|
| - """Queue of results of async task execution."""
|
| -
|
| - _ITEM_RESULT = 0
|
| - _ITEM_EXCEPTION = 1
|
| -
|
| - def __init__(self):
|
| - self._queue = Queue.Queue()
|
| -
|
| - def send_result(self, result):
|
| - """Enqueues a result of task execution."""
|
| - self._queue.put((self._ITEM_RESULT, result))
|
| -
|
| - def send_exception(self, exc):
|
| - """Enqueue an exception raised by a task."""
|
| - assert isinstance(exc, Exception)
|
| - self._queue.put((self._ITEM_EXCEPTION, exc))
|
| -
|
| - def pull(self):
|
| - """Dequeues available result or exception."""
|
| - item_type, value = self._queue.get()
|
| - if item_type == self._ITEM_RESULT:
|
| - return value
|
| - if item_type == self._ITEM_EXCEPTION:
|
| - raise value
|
| - assert False, 'Impossible queue item type: %r' % item_type
|
| -
|
| - def wrap_task(self, task):
|
| - """Decorator that makes a function push results into this channel."""
|
| - @functools.wraps(task)
|
| - def wrapped(*args, **kwargs):
|
| - try:
|
| - self.send_result(task(*args, **kwargs))
|
| - except Exception as exc:
|
| - self.send_exception(exc)
|
| - return wrapped
|
| -
|
| -
|
| -def num_processors():
|
| - """Returns the number of processors.
|
| -
|
| - Python on OSX 10.6 raises a NotImplementedError exception.
|
| - """
|
| - try:
|
| - # Multiprocessing
|
| - import multiprocessing
|
| - return multiprocessing.cpu_count()
|
| - except: # pylint: disable=W0702
|
| - try:
|
| - # Mac OS 10.6
|
| - return int(os.sysconf('SC_NPROCESSORS_ONLN')) # pylint: disable=E1101
|
| - except:
|
| - # Some of the windows builders seem to get here.
|
| - return 4
|
|
|