| Index: utils/threading_utils.py
|
| diff --git a/utils/threading_utils.py b/utils/threading_utils.py
|
| index f288525a8471fb1d1f0707a62e57c16dba978582..cf4db117931eba15caf13bdead3b9e6037080548 100644
|
| --- a/utils/threading_utils.py
|
| +++ b/utils/threading_utils.py
|
| @@ -372,33 +372,42 @@ class AutoRetryThreadPool(ThreadPool):
|
|
|
| class Progress(object):
|
| """Prints progress and accepts updates thread-safely."""
|
| - def __init__(self, size):
|
| - # To be used in the primary thread
|
| - self.last_printed_line = ''
|
| - self.index = 0
|
| - self.start = time.time()
|
| - self.size = size
|
| - # Setting it to True forces a print on the first print_update() call.
|
| - self.value_changed = True
|
| + def __init__(self, initial_values):
|
| + """Creates a Progress bar that will updates asynchronously from the worker
|
| + threads.
|
| +
|
| + Arguments:
|
| + initial_values: list of int, defines both the number of columns and their
|
| + initial values.
|
| + """
|
| + assert all(isinstance(i, int) for i in initial_values)
|
| + # Members to be used exclusively in the primary thread.
|
| self.use_cr_only = True
|
| self.unfinished_commands = set()
|
| + self._start = time.time()
|
| + self._last_printed_line = ''
|
| + self._columns = initial_values[:]
|
| + # Setting it to True forces a print on the first print_update() call.
|
| + self._value_changed = True
|
|
|
| # To be used in all threads.
|
| - self.queued_lines = Queue.Queue()
|
| + self._queued_updates = Queue.Queue()
|
|
|
| - def update_item(self, name, index=0, size=0, raw=False):
|
| + def update_item(self, name, raw=False, **kwargs):
|
| """Queue information to print out.
|
|
|
| Arguments:
|
| - index: increment to add to index. usually 0 or 1.
|
| - size: increment to add to size. usually 0 or 1.
|
| raw: if True, prints the data without the header.
|
| + colN: increments column N to add to index, 0 based, value is usually 0 or
|
| + 1.
|
| """
|
| assert isinstance(name, str)
|
| - assert isinstance(index, int)
|
| - assert isinstance(size, int)
|
| assert isinstance(raw, bool)
|
| - self.queued_lines.put((name, index, size, raw))
|
| + assert all(
|
| + k.startswith('col') and len(k) == 4 and k[3].isdigit() for k in kwargs)
|
| + assert all(isinstance(v, int) for v in kwargs.itervalues())
|
| + args = [(int(k[3]), v) for k, v in kwargs.iteritems()]
|
| + self._queued_updates.put((name, raw, args))
|
|
|
| def print_update(self):
|
| """Prints the current status."""
|
| @@ -409,13 +418,13 @@ class Progress(object):
|
| got_one = False
|
| while True:
|
| try:
|
| - name, index, size, raw = self.queued_lines.get_nowait()
|
| + name, raw, args = self._queued_updates.get_nowait()
|
| except Queue.Empty:
|
| break
|
|
|
| - self.size += size
|
| - self.index += index
|
| - self.value_changed = bool(size or index)
|
| + for k, v in args:
|
| + self._columns[k] += v
|
| + self._value_changed = bool(args)
|
| if not name:
|
| # Even if raw=True, there's nothing to print.
|
| continue
|
| @@ -423,18 +432,18 @@ class Progress(object):
|
| got_one = True
|
| if raw:
|
| # Prints the data as-is.
|
| - self.last_printed_line = ''
|
| + self._last_printed_line = ''
|
| sys.stdout.write('\n%s\n' % name.strip('\n'))
|
| else:
|
| - line, self.last_printed_line = self.gen_line(name)
|
| + line, self._last_printed_line = self._gen_line(name)
|
| sys.stdout.write(line)
|
|
|
| - if not got_one and self.value_changed:
|
| + if not got_one and self._value_changed:
|
| # Make sure a line is printed in that case where statistics changes.
|
| - line, self.last_printed_line = self.gen_line('')
|
| + line, self._last_printed_line = self._gen_line('')
|
| sys.stdout.write(line)
|
| got_one = True
|
| - self.value_changed = False
|
| + self._value_changed = False
|
| if got_one:
|
| # Ensure that all the output is flushed to prevent it from getting mixed
|
| # with other output streams (like the logging streams).
|
| @@ -444,33 +453,31 @@ class Progress(object):
|
| logging.debug('Waiting for the following commands to finish:\n%s',
|
| '\n'.join(self.unfinished_commands))
|
|
|
| - def gen_line(self, name):
|
| + def _gen_line(self, name):
|
| """Generates the line to be printed."""
|
| - next_line = ('[%*d/%d] %6.2fs %s') % (
|
| - len(str(self.size)), self.index,
|
| - self.size,
|
| - time.time() - self.start,
|
| - name)
|
| + next_line = ('[%s] %6.2fs %s') % (
|
| + self._render_columns(), time.time() - self._start, name)
|
| # Fill it with whitespace only if self.use_cr_only is set.
|
| prefix = ''
|
| - if self.use_cr_only and self.last_printed_line:
|
| + if self.use_cr_only and self._last_printed_line:
|
| prefix = '\r'
|
| if self.use_cr_only:
|
| - suffix = ' ' * max(0, len(self.last_printed_line) - len(next_line))
|
| + suffix = ' ' * max(0, len(self._last_printed_line) - len(next_line))
|
| else:
|
| suffix = '\n'
|
| return '%s%s%s' % (prefix, next_line, suffix), next_line
|
|
|
| + def _render_columns(self):
|
| + """Renders the columns."""
|
| + columns_as_str = map(str, self._columns)
|
| + max_len = max(map(len, columns_as_str))
|
| + return '/'.join(i.rjust(max_len) for i in columns_as_str)
|
| +
|
|
|
| class QueueWithProgress(Queue.PriorityQueue):
|
| """Implements progress support in join()."""
|
| - def __init__(self, maxsize, *args, **kwargs):
|
| + def __init__(self, progress, *args, **kwargs):
|
| Queue.PriorityQueue.__init__(self, *args, **kwargs)
|
| - self.progress = Progress(maxsize)
|
| -
|
| - def set_progress(self, progress):
|
| - """Replace the current progress, mainly used when a progress should be
|
| - shared between queues."""
|
| self.progress = progress
|
|
|
| def task_done(self):
|
| @@ -514,8 +521,8 @@ class ThreadPoolWithProgress(ThreadPool):
|
| QUEUE_CLASS = QueueWithProgress
|
|
|
| def __init__(self, progress, *args, **kwargs):
|
| + self.QUEUE_CLASS = functools.partial(self.QUEUE_CLASS, progress)
|
| super(ThreadPoolWithProgress, self).__init__(*args, **kwargs)
|
| - self.tasks.set_progress(progress)
|
|
|
| def _output_append(self, out):
|
| """Also wakes up the listener on new completed test_case."""
|
|
|