Index: swarm_client/utils/threading_utils.py |
=================================================================== |
--- swarm_client/utils/threading_utils.py (revision 235167) |
+++ swarm_client/utils/threading_utils.py (working copy) |
@@ -1,747 +0,0 @@ |
-# Copyright 2013 The Chromium Authors. All rights reserved. |
-# Use of this source code is governed by a BSD-style license that can be |
-# found in the LICENSE file. |
- |
-"""Classes and functions related to threading.""" |
- |
-import functools |
-import inspect |
-import logging |
-import os |
-import Queue |
-import sys |
-import threading |
-import time |
-import traceback |
- |
- |
-class LockWithAssert(object): |
- """Wrapper around (non recursive) Lock that tracks its owner.""" |
- |
- def __init__(self): |
- self._lock = threading.Lock() |
- self._owner = None |
- |
- def __enter__(self): |
- self._lock.acquire() |
- assert self._owner is None |
- self._owner = threading.current_thread() |
- |
- def __exit__(self, _exc_type, _exec_value, _traceback): |
- self.assert_locked('Releasing unowned lock') |
- self._owner = None |
- self._lock.release() |
- return False |
- |
- def assert_locked(self, msg=None): |
- """Asserts the lock is owned by running thread.""" |
- assert self._owner == threading.current_thread(), msg |
- |
- |
-class ThreadPoolError(Exception): |
- """Base class for exceptions raised by ThreadPool.""" |
- pass |
- |
- |
-class ThreadPoolEmpty(ThreadPoolError): |
- """Trying to get task result from a thread pool with no pending tasks.""" |
- pass |
- |
- |
-class ThreadPoolClosed(ThreadPoolError): |
- """Trying to do something with a closed thread pool.""" |
- pass |
- |
- |
-class ThreadPool(object): |
- """Multithreaded worker pool with priority support. |
- |
- When the priority of tasks match, it works in strict FIFO mode. |
- """ |
- QUEUE_CLASS = Queue.PriorityQueue |
- |
- def __init__(self, initial_threads, max_threads, queue_size, prefix=None): |
- """Immediately starts |initial_threads| threads. |
- |
- Arguments: |
- initial_threads: Number of threads to start immediately. Can be 0 if it is |
- uncertain that threads will be needed. |
- max_threads: Maximum number of threads that will be started when all the |
- threads are busy working. Often the number of CPU cores. |
- queue_size: Maximum number of tasks to buffer in the queue. 0 for |
- unlimited queue. A non-zero value may make add_task() |
- blocking. |
- prefix: Prefix to use for thread names. Pool's threads will be |
- named '<prefix>-<thread index>'. |
- """ |
- prefix = prefix or 'tp-0x%0x' % id(self) |
- logging.debug( |
- 'New ThreadPool(%d, %d, %d): %s', initial_threads, max_threads, |
- queue_size, prefix) |
- assert initial_threads <= max_threads |
- # Update this check once 256 cores CPU are common. |
- assert max_threads <= 256 |
- |
- self.tasks = self.QUEUE_CLASS(queue_size) |
- self._max_threads = max_threads |
- self._prefix = prefix |
- |
- # Used to assign indexes to tasks. |
- self._num_of_added_tasks_lock = threading.Lock() |
- self._num_of_added_tasks = 0 |
- |
- # Lock that protected everything below (including conditional variable). |
- self._lock = threading.Lock() |
- |
- # Condition 'bool(_outputs) or bool(_exceptions) or _pending_count == 0'. |
- self._outputs_exceptions_cond = threading.Condition(self._lock) |
- self._outputs = [] |
- self._exceptions = [] |
- |
- # Number of pending tasks (queued or being processed now). |
- self._pending_count = 0 |
- |
- # List of threads. |
- self._workers = [] |
- # Number of threads that are waiting for new tasks. |
- self._ready = 0 |
- # Number of threads already added to _workers, but not yet running the loop. |
- self._starting = 0 |
- # True if close was called. Forbids adding new tasks. |
- self._is_closed = False |
- |
- for _ in range(initial_threads): |
- self._add_worker() |
- |
- def _add_worker(self): |
- """Adds one worker thread if there isn't too many. Thread-safe.""" |
- with self._lock: |
- if len(self._workers) >= self._max_threads or self._is_closed: |
- return False |
- worker = threading.Thread( |
- name='%s-%d' % (self._prefix, len(self._workers)), target=self._run) |
- self._workers.append(worker) |
- self._starting += 1 |
- logging.debug('Starting worker thread %s', worker.name) |
- worker.daemon = True |
- worker.start() |
- return True |
- |
- def add_task(self, priority, func, *args, **kwargs): |
- """Adds a task, a function to be executed by a worker. |
- |
- Arguments: |
- - priority: priority of the task versus others. Lower priority takes |
- precedence. |
- - func: function to run. Can either return a return value to be added to the |
- output list or be a generator which can emit multiple values. |
- - args and kwargs: arguments to |func|. Note that if func mutates |args| or |
- |kwargs| and that the task is retried, see |
- AutoRetryThreadPool, the retry will use the mutated |
- values. |
- |
- Returns: |
- Index of the item added, e.g. the total number of enqueued items up to |
- now. |
- """ |
- assert isinstance(priority, int) |
- assert callable(func) |
- with self._lock: |
- if self._is_closed: |
- raise ThreadPoolClosed('Can not add a task to a closed ThreadPool') |
- start_new_worker = ( |
- # Pending task count plus new task > number of available workers. |
- self.tasks.qsize() + 1 > self._ready + self._starting and |
- # Enough slots. |
- len(self._workers) < self._max_threads |
- ) |
- self._pending_count += 1 |
- with self._num_of_added_tasks_lock: |
- self._num_of_added_tasks += 1 |
- index = self._num_of_added_tasks |
- self.tasks.put((priority, index, func, args, kwargs)) |
- if start_new_worker: |
- self._add_worker() |
- return index |
- |
- def _run(self): |
- """Worker thread loop. Runs until a None task is queued.""" |
- # Thread has started, adjust counters. |
- with self._lock: |
- self._starting -= 1 |
- self._ready += 1 |
- while True: |
- try: |
- task = self.tasks.get() |
- finally: |
- with self._lock: |
- self._ready -= 1 |
- try: |
- if task is None: |
- # We're done. |
- return |
- _priority, _index, func, args, kwargs = task |
- if inspect.isgeneratorfunction(func): |
- for out in func(*args, **kwargs): |
- self._output_append(out) |
- else: |
- out = func(*args, **kwargs) |
- self._output_append(out) |
- except Exception as e: |
- logging.warning('Caught exception: %s', e) |
- exc_info = sys.exc_info() |
- logging.info(''.join(traceback.format_tb(exc_info[2]))) |
- with self._outputs_exceptions_cond: |
- self._exceptions.append(exc_info) |
- self._outputs_exceptions_cond.notifyAll() |
- finally: |
- try: |
- # Mark thread as ready again, mark task as processed. Do it before |
- # waking up threads waiting on self.tasks.join(). Otherwise they might |
- # find ThreadPool still 'busy' and perform unnecessary wait on CV. |
- with self._outputs_exceptions_cond: |
- self._ready += 1 |
- self._pending_count -= 1 |
- if self._pending_count == 0: |
- self._outputs_exceptions_cond.notifyAll() |
- self.tasks.task_done() |
- except Exception as e: |
- # We need to catch and log this error here because this is the root |
- # function for the thread, nothing higher will catch the error. |
- logging.exception('Caught exception while marking task as done: %s', |
- e) |
- |
- def _output_append(self, out): |
- if out is not None: |
- with self._outputs_exceptions_cond: |
- self._outputs.append(out) |
- self._outputs_exceptions_cond.notifyAll() |
- |
- def join(self): |
- """Extracts all the results from each threads unordered. |
- |
- Call repeatedly to extract all the exceptions if desired. |
- |
- Note: will wait for all work items to be done before returning an exception. |
- To get an exception early, use get_one_result(). |
- """ |
- # TODO(maruel): Stop waiting as soon as an exception is caught. |
- self.tasks.join() |
- with self._outputs_exceptions_cond: |
- if self._exceptions: |
- e = self._exceptions.pop(0) |
- raise e[0], e[1], e[2] |
- out = self._outputs |
- self._outputs = [] |
- return out |
- |
- def get_one_result(self): |
- """Returns the next item that was generated or raises an exception if one |
- occurred. |
- |
- Raises: |
- ThreadPoolEmpty - no results available. |
- """ |
- # Get first available result. |
- for result in self.iter_results(): |
- return result |
- # No results -> tasks queue is empty. |
- raise ThreadPoolEmpty('Task queue is empty') |
- |
- def iter_results(self): |
- """Yields results as they appear until all tasks are processed.""" |
- while True: |
- # Check for pending results. |
- result = None |
- with self._outputs_exceptions_cond: |
- if self._exceptions: |
- e = self._exceptions.pop(0) |
- raise e[0], e[1], e[2] |
- if self._outputs: |
- # Remember the result to yield it outside of the lock. |
- result = self._outputs.pop(0) |
- else: |
- # No pending tasks -> all tasks are done. |
- if not self._pending_count: |
- return |
- # Some task is queued, wait for its result to appear. |
- # Use non-None timeout so that process reacts to Ctrl+C and other |
- # signals, see http://bugs.python.org/issue8844. |
- self._outputs_exceptions_cond.wait(timeout=5) |
- continue |
- yield result |
- |
- def close(self): |
- """Closes all the threads.""" |
- # Ensure no new threads can be started, self._workers is effectively |
- # a constant after that and can be accessed outside the lock. |
- with self._lock: |
- if self._is_closed: |
- raise ThreadPoolClosed('Can not close already closed ThreadPool') |
- self._is_closed = True |
- for _ in range(len(self._workers)): |
- # Enqueueing None causes the worker to stop. |
- self.tasks.put(None) |
- for t in self._workers: |
- t.join() |
- logging.debug( |
- 'Thread pool \'%s\' closed: spawned %d threads total', |
- self._prefix, len(self._workers)) |
- |
- def abort(self): |
- """Empties the queue. |
- |
- To be used when the pool should stop early, like when Ctrl-C was detected. |
- |
- Returns: |
- Number of tasks cancelled. |
- """ |
- index = 0 |
- while True: |
- try: |
- self.tasks.get_nowait() |
- self.tasks.task_done() |
- index += 1 |
- except Queue.Empty: |
- return index |
- |
- def __enter__(self): |
- """Enables 'with' statement.""" |
- return self |
- |
- def __exit__(self, _exc_type, _exc_value, _traceback): |
- """Enables 'with' statement.""" |
- self.close() |
- |
- |
-class AutoRetryThreadPool(ThreadPool): |
- """Automatically retries enqueued operations on exception.""" |
- INTERNAL_PRIORITY_BITS = (1<<8) - 1 |
- HIGH, MED, LOW = (1<<8, 2<<8, 3<<8) |
- |
- def __init__(self, exceptions, retries, *args, **kwargs): |
- """ |
- Arguments: |
- exceptions: list of exception classes that can be retried on. |
- retries: maximum number of retries to do. |
- """ |
- assert exceptions and all(issubclass(e, Exception) for e in exceptions), ( |
- exceptions) |
- assert 1 <= retries <= self.INTERNAL_PRIORITY_BITS |
- super(AutoRetryThreadPool, self).__init__(*args, **kwargs) |
- self._swallowed_exceptions = tuple(exceptions) |
- self._retries = retries |
- |
- def add_task(self, priority, func, *args, **kwargs): |
- """Tasks added must not use the lower priority bits since they are reserved |
- for retries. |
- """ |
- assert (priority & self.INTERNAL_PRIORITY_BITS) == 0 |
- return super(AutoRetryThreadPool, self).add_task( |
- priority, |
- self._task_executer, |
- priority, |
- None, |
- func, |
- *args, |
- **kwargs) |
- |
- def add_task_with_channel(self, channel, priority, func, *args, **kwargs): |
- """Tasks added must not use the lower priority bits since they are reserved |
- for retries. |
- """ |
- assert (priority & self.INTERNAL_PRIORITY_BITS) == 0 |
- return super(AutoRetryThreadPool, self).add_task( |
- priority, |
- self._task_executer, |
- priority, |
- channel, |
- func, |
- *args, |
- **kwargs) |
- |
- def _task_executer(self, priority, channel, func, *args, **kwargs): |
- """Wraps the function and automatically retry on exceptions.""" |
- try: |
- result = func(*args, **kwargs) |
- if channel is None: |
- return result |
- channel.send_result(result) |
- except self._swallowed_exceptions as e: |
- # Retry a few times, lowering the priority. |
- actual_retries = priority & self.INTERNAL_PRIORITY_BITS |
- if actual_retries < self._retries: |
- priority += 1 |
- logging.debug( |
- 'Swallowed exception \'%s\'. Retrying at lower priority %X', |
- e, priority) |
- super(AutoRetryThreadPool, self).add_task( |
- priority, |
- self._task_executer, |
- priority, |
- channel, |
- func, |
- *args, |
- **kwargs) |
- return |
- if channel is None: |
- raise |
- channel.send_exception(e) |
- except Exception as e: |
- if channel is None: |
- raise |
- channel.send_exception(e) |
- |
- |
-class Progress(object): |
- """Prints progress and accepts updates thread-safely.""" |
- def __init__(self, columns): |
- """Creates a Progress bar that will updates asynchronously from the worker |
- threads. |
- |
- Arguments: |
- columns: list of tuple(name, initialvalue), defines both the number of |
- columns and their initial values. |
- """ |
- assert all( |
- len(c) == 2 and isinstance(c[0], str) and isinstance(c[1], int) |
- for c in columns), columns |
- # Members to be used exclusively in the primary thread. |
- self.use_cr_only = True |
- self.unfinished_commands = set() |
- self.start = time.time() |
- self._last_printed_line = '' |
- self._columns = [c[1] for c in columns] |
- self._columns_lookup = dict((c[0], i) for i, c in enumerate(columns)) |
- # Setting it to True forces a print on the first print_update() call. |
- self._value_changed = True |
- |
- # To be used in all threads. |
- self._queued_updates = Queue.Queue() |
- |
- def update_item(self, name, raw=False, **kwargs): |
- """Queue information to print out. |
- |
- Arguments: |
- name: string to print out to describe something that was completed. |
- raw: if True, prints the data without the header. |
- raw: if True, prints the data without the header. |
- <kwargs>: argument name is a name of a column. it's value is the increment |
- to the column, value is usually 0 or 1. |
- """ |
- assert isinstance(name, str) |
- assert isinstance(raw, bool) |
- assert all(isinstance(v, int) for v in kwargs.itervalues()) |
- args = [(self._columns_lookup[k], v) for k, v in kwargs.iteritems() if v] |
- self._queued_updates.put((name, raw, args)) |
- |
- def print_update(self): |
- """Prints the current status.""" |
- # Flush all the logging output so it doesn't appear within this output. |
- for handler in logging.root.handlers: |
- handler.flush() |
- |
- got_one = False |
- while True: |
- try: |
- name, raw, args = self._queued_updates.get_nowait() |
- except Queue.Empty: |
- break |
- |
- for k, v in args: |
- self._columns[k] += v |
- self._value_changed = bool(args) |
- if not name: |
- # Even if raw=True, there's nothing to print. |
- continue |
- |
- got_one = True |
- if raw: |
- # Prints the data as-is. |
- self._last_printed_line = '' |
- sys.stdout.write('\n%s\n' % name.strip('\n')) |
- else: |
- line, self._last_printed_line = self._gen_line(name) |
- sys.stdout.write(line) |
- |
- if not got_one and self._value_changed: |
- # Make sure a line is printed in that case where statistics changes. |
- line, self._last_printed_line = self._gen_line('') |
- sys.stdout.write(line) |
- got_one = True |
- self._value_changed = False |
- if got_one: |
- # Ensure that all the output is flushed to prevent it from getting mixed |
- # with other output streams (like the logging streams). |
- sys.stdout.flush() |
- |
- if self.unfinished_commands: |
- logging.debug('Waiting for the following commands to finish:\n%s', |
- '\n'.join(self.unfinished_commands)) |
- |
- def _gen_line(self, name): |
- """Generates the line to be printed.""" |
- next_line = ('[%s] %6.2fs %s') % ( |
- self._render_columns(), time.time() - self.start, name) |
- # Fill it with whitespace only if self.use_cr_only is set. |
- prefix = '' |
- if self.use_cr_only and self._last_printed_line: |
- prefix = '\r' |
- if self.use_cr_only: |
- suffix = ' ' * max(0, len(self._last_printed_line) - len(next_line)) |
- else: |
- suffix = '\n' |
- return '%s%s%s' % (prefix, next_line, suffix), next_line |
- |
- def _render_columns(self): |
- """Renders the columns.""" |
- columns_as_str = map(str, self._columns) |
- max_len = max(map(len, columns_as_str)) |
- return '/'.join(i.rjust(max_len) for i in columns_as_str) |
- |
- |
-class QueueWithProgress(Queue.PriorityQueue): |
- """Implements progress support in join().""" |
- def __init__(self, progress, *args, **kwargs): |
- Queue.PriorityQueue.__init__(self, *args, **kwargs) |
- self.progress = progress |
- |
- def task_done(self): |
- """Contrary to Queue.task_done(), it wakes self.all_tasks_done at each task |
- done. |
- """ |
- with self.all_tasks_done: |
- try: |
- unfinished = self.unfinished_tasks - 1 |
- if unfinished < 0: |
- raise ValueError('task_done() called too many times') |
- self.unfinished_tasks = unfinished |
- # This is less efficient, because we want the Progress to be updated. |
- self.all_tasks_done.notify_all() |
- except Exception as e: |
- logging.exception('task_done threw an exception.\n%s', e) |
- |
- def wake_up(self): |
- """Wakes up all_tasks_done. |
- |
- Unlike task_done(), do not substract one from self.unfinished_tasks. |
- """ |
- # TODO(maruel): This is highly inefficient, since the listener is awaken |
- # twice; once per output, once per task. There should be no relationship |
- # between the number of output and the number of input task. |
- with self.all_tasks_done: |
- self.all_tasks_done.notify_all() |
- |
- def join(self): |
- """Calls print_update() whenever possible.""" |
- self.progress.print_update() |
- with self.all_tasks_done: |
- while self.unfinished_tasks: |
- self.progress.print_update() |
- # Use a short wait timeout so updates are printed in a timely manner. |
- # TODO(maruel): Find a way so Progress.queue and self.all_tasks_done |
- # share the same underlying event so no polling is necessary. |
- self.all_tasks_done.wait(0.1) |
- self.progress.print_update() |
- |
- |
-class ThreadPoolWithProgress(ThreadPool): |
- QUEUE_CLASS = QueueWithProgress |
- |
- def __init__(self, progress, *args, **kwargs): |
- self.QUEUE_CLASS = functools.partial(self.QUEUE_CLASS, progress) |
- super(ThreadPoolWithProgress, self).__init__(*args, **kwargs) |
- |
- def _output_append(self, out): |
- """Also wakes up the listener on new completed test_case.""" |
- super(ThreadPoolWithProgress, self)._output_append(out) |
- self.tasks.wake_up() |
- |
- |
-class DeadlockDetector(object): |
- """Context manager that can detect deadlocks. |
- |
- It will dump stack frames of all running threads if its 'ping' method isn't |
- called in time. |
- |
- Usage: |
- with DeadlockDetector(timeout=60) as detector: |
- for item in some_work(): |
- ... |
- detector.ping() |
- ... |
- |
- Arguments: |
- timeout - maximum allowed time between calls to 'ping'. |
- """ |
- |
- def __init__(self, timeout): |
- self.timeout = timeout |
- self._thread = None |
- # Thread stop condition. Also lock for shared variables below. |
- self._stop_cv = threading.Condition() |
- self._stop_flag = False |
- # Time when 'ping' was called last time. |
- self._last_ping = None |
- # True if pings are coming on time. |
- self._alive = True |
- |
- def __enter__(self): |
- """Starts internal watcher thread.""" |
- assert self._thread is None |
- self.ping() |
- self._thread = threading.Thread(name='deadlock-detector', target=self._run) |
- self._thread.daemon = True |
- self._thread.start() |
- return self |
- |
- def __exit__(self, *_args): |
- """Stops internal watcher thread.""" |
- assert self._thread is not None |
- with self._stop_cv: |
- self._stop_flag = True |
- self._stop_cv.notify() |
- self._thread.join() |
- self._thread = None |
- self._stop_flag = False |
- |
- def ping(self): |
- """Notify detector that main thread is still running. |
- |
- Should be called periodically to inform the detector that everything is |
- running as it should. |
- """ |
- with self._stop_cv: |
- self._last_ping = time.time() |
- self._alive = True |
- |
- def _run(self): |
- """Loop that watches for pings and dumps threads state if ping is late.""" |
- with self._stop_cv: |
- while not self._stop_flag: |
- # Skipped deadline? Dump threads and switch to 'not alive' state. |
- if self._alive and time.time() > self._last_ping + self.timeout: |
- self.dump_threads(time.time() - self._last_ping, True) |
- self._alive = False |
- |
- # Pings are on time? |
- if self._alive: |
- # Wait until the moment we need to dump stack traces. |
- # Most probably some other thread will call 'ping' to move deadline |
- # further in time. We don't bother to wake up after each 'ping', |
- # only right before initial expected deadline. |
- self._stop_cv.wait(self._last_ping + self.timeout - time.time()) |
- else: |
- # Skipped some pings previously. Just periodically silently check |
- # for new pings with some arbitrary frequency. |
- self._stop_cv.wait(self.timeout * 0.1) |
- |
- @staticmethod |
- def dump_threads(timeout=None, skip_current_thread=False): |
- """Dumps stack frames of all running threads.""" |
- all_threads = threading.enumerate() |
- current_thread_id = threading.current_thread().ident |
- |
- # Collect tracebacks: thread name -> traceback string. |
- tracebacks = {} |
- |
- # pylint: disable=W0212 |
- for thread_id, frame in sys._current_frames().iteritems(): |
- # Don't dump deadlock detector's own thread, it's boring. |
- if thread_id == current_thread_id and skip_current_thread: |
- continue |
- |
- # Try to get more informative symbolic thread name. |
- name = 'untitled' |
- for thread in all_threads: |
- if thread.ident == thread_id: |
- name = thread.name |
- break |
- name += ' #%d' % (thread_id,) |
- tracebacks[name] = ''.join(traceback.format_stack(frame)) |
- |
- # Function to print a message. Makes it easier to change output destination. |
- def output(msg): |
- logging.warning(msg.rstrip()) |
- |
- # Print tracebacks, sorting them by thread name. That way a thread pool's |
- # threads will be printed as one group. |
- output('=============== Potential deadlock detected ===============') |
- if timeout is not None: |
- output('No pings in last %d sec.' % (timeout,)) |
- output('Dumping stack frames for all threads:') |
- for name in sorted(tracebacks): |
- output('Traceback for \'%s\':\n%s' % (name, tracebacks[name])) |
- output('===========================================================') |
- |
- |
-class Bit(object): |
- """Thread safe setable bit.""" |
- |
- def __init__(self): |
- self._lock = threading.Lock() |
- self._value = False |
- |
- def get(self): |
- with self._lock: |
- return self._value |
- |
- def set(self): |
- with self._lock: |
- self._value = True |
- |
- |
-class TaskChannel(object): |
- """Queue of results of async task execution.""" |
- |
- _ITEM_RESULT = 0 |
- _ITEM_EXCEPTION = 1 |
- |
- def __init__(self): |
- self._queue = Queue.Queue() |
- |
- def send_result(self, result): |
- """Enqueues a result of task execution.""" |
- self._queue.put((self._ITEM_RESULT, result)) |
- |
- def send_exception(self, exc): |
- """Enqueue an exception raised by a task.""" |
- assert isinstance(exc, Exception) |
- self._queue.put((self._ITEM_EXCEPTION, exc)) |
- |
- def pull(self): |
- """Dequeues available result or exception.""" |
- item_type, value = self._queue.get() |
- if item_type == self._ITEM_RESULT: |
- return value |
- if item_type == self._ITEM_EXCEPTION: |
- raise value |
- assert False, 'Impossible queue item type: %r' % item_type |
- |
- def wrap_task(self, task): |
- """Decorator that makes a function push results into this channel.""" |
- @functools.wraps(task) |
- def wrapped(*args, **kwargs): |
- try: |
- self.send_result(task(*args, **kwargs)) |
- except Exception as exc: |
- self.send_exception(exc) |
- return wrapped |
- |
- |
-def num_processors(): |
- """Returns the number of processors. |
- |
- Python on OSX 10.6 raises a NotImplementedError exception. |
- """ |
- try: |
- # Multiprocessing |
- import multiprocessing |
- return multiprocessing.cpu_count() |
- except: # pylint: disable=W0702 |
- try: |
- # Mac OS 10.6 |
- return int(os.sysconf('SC_NPROCESSORS_ONLN')) # pylint: disable=E1101 |
- except: |
- # Some of the windows builders seem to get here. |
- return 4 |