Index: utils/threading_utils.py |
diff --git a/utils/threading_utils.py b/utils/threading_utils.py |
index f288525a8471fb1d1f0707a62e57c16dba978582..cf4db117931eba15caf13bdead3b9e6037080548 100644 |
--- a/utils/threading_utils.py |
+++ b/utils/threading_utils.py |
@@ -372,33 +372,42 @@ class AutoRetryThreadPool(ThreadPool): |
class Progress(object): |
"""Prints progress and accepts updates thread-safely.""" |
- def __init__(self, size): |
- # To be used in the primary thread |
- self.last_printed_line = '' |
- self.index = 0 |
- self.start = time.time() |
- self.size = size |
- # Setting it to True forces a print on the first print_update() call. |
- self.value_changed = True |
+ def __init__(self, initial_values): |
+ """Creates a Progress bar that will updates asynchronously from the worker |
+ threads. |
+ |
+ Arguments: |
+ initial_values: list of int, defines both the number of columns and their |
+ initial values. |
+ """ |
+ assert all(isinstance(i, int) for i in initial_values) |
+ # Members to be used exclusively in the primary thread. |
self.use_cr_only = True |
self.unfinished_commands = set() |
+ self._start = time.time() |
+ self._last_printed_line = '' |
+ self._columns = initial_values[:] |
+ # Setting it to True forces a print on the first print_update() call. |
+ self._value_changed = True |
# To be used in all threads. |
- self.queued_lines = Queue.Queue() |
+ self._queued_updates = Queue.Queue() |
- def update_item(self, name, index=0, size=0, raw=False): |
+ def update_item(self, name, raw=False, **kwargs): |
"""Queue information to print out. |
Arguments: |
- index: increment to add to index. usually 0 or 1. |
- size: increment to add to size. usually 0 or 1. |
raw: if True, prints the data without the header. |
+ colN: increments column N to add to index, 0 based, value is usually 0 or |
+ 1. |
""" |
assert isinstance(name, str) |
- assert isinstance(index, int) |
- assert isinstance(size, int) |
assert isinstance(raw, bool) |
- self.queued_lines.put((name, index, size, raw)) |
+ assert all( |
+ k.startswith('col') and len(k) == 4 and k[3].isdigit() for k in kwargs) |
+ assert all(isinstance(v, int) for v in kwargs.itervalues()) |
+ args = [(int(k[3]), v) for k, v in kwargs.iteritems()] |
+ self._queued_updates.put((name, raw, args)) |
def print_update(self): |
"""Prints the current status.""" |
@@ -409,13 +418,13 @@ class Progress(object): |
got_one = False |
while True: |
try: |
- name, index, size, raw = self.queued_lines.get_nowait() |
+ name, raw, args = self._queued_updates.get_nowait() |
except Queue.Empty: |
break |
- self.size += size |
- self.index += index |
- self.value_changed = bool(size or index) |
+ for k, v in args: |
+ self._columns[k] += v |
+ self._value_changed = bool(args) |
if not name: |
# Even if raw=True, there's nothing to print. |
continue |
@@ -423,18 +432,18 @@ class Progress(object): |
got_one = True |
if raw: |
# Prints the data as-is. |
- self.last_printed_line = '' |
+ self._last_printed_line = '' |
sys.stdout.write('\n%s\n' % name.strip('\n')) |
else: |
- line, self.last_printed_line = self.gen_line(name) |
+ line, self._last_printed_line = self._gen_line(name) |
sys.stdout.write(line) |
- if not got_one and self.value_changed: |
+ if not got_one and self._value_changed: |
# Make sure a line is printed in that case where statistics changes. |
- line, self.last_printed_line = self.gen_line('') |
+ line, self._last_printed_line = self._gen_line('') |
sys.stdout.write(line) |
got_one = True |
- self.value_changed = False |
+ self._value_changed = False |
if got_one: |
# Ensure that all the output is flushed to prevent it from getting mixed |
# with other output streams (like the logging streams). |
@@ -444,33 +453,31 @@ class Progress(object): |
logging.debug('Waiting for the following commands to finish:\n%s', |
'\n'.join(self.unfinished_commands)) |
- def gen_line(self, name): |
+ def _gen_line(self, name): |
"""Generates the line to be printed.""" |
- next_line = ('[%*d/%d] %6.2fs %s') % ( |
- len(str(self.size)), self.index, |
- self.size, |
- time.time() - self.start, |
- name) |
+ next_line = ('[%s] %6.2fs %s') % ( |
+ self._render_columns(), time.time() - self._start, name) |
# Fill it with whitespace only if self.use_cr_only is set. |
prefix = '' |
- if self.use_cr_only and self.last_printed_line: |
+ if self.use_cr_only and self._last_printed_line: |
prefix = '\r' |
if self.use_cr_only: |
- suffix = ' ' * max(0, len(self.last_printed_line) - len(next_line)) |
+ suffix = ' ' * max(0, len(self._last_printed_line) - len(next_line)) |
else: |
suffix = '\n' |
return '%s%s%s' % (prefix, next_line, suffix), next_line |
+ def _render_columns(self): |
+ """Renders the columns.""" |
+ columns_as_str = map(str, self._columns) |
+ max_len = max(map(len, columns_as_str)) |
+ return '/'.join(i.rjust(max_len) for i in columns_as_str) |
+ |
class QueueWithProgress(Queue.PriorityQueue): |
"""Implements progress support in join().""" |
- def __init__(self, maxsize, *args, **kwargs): |
+ def __init__(self, progress, *args, **kwargs): |
Queue.PriorityQueue.__init__(self, *args, **kwargs) |
- self.progress = Progress(maxsize) |
- |
- def set_progress(self, progress): |
- """Replace the current progress, mainly used when a progress should be |
- shared between queues.""" |
self.progress = progress |
def task_done(self): |
@@ -514,8 +521,8 @@ class ThreadPoolWithProgress(ThreadPool): |
QUEUE_CLASS = QueueWithProgress |
def __init__(self, progress, *args, **kwargs): |
+ self.QUEUE_CLASS = functools.partial(self.QUEUE_CLASS, progress) |
super(ThreadPoolWithProgress, self).__init__(*args, **kwargs) |
- self.tasks.set_progress(progress) |
def _output_append(self, out): |
"""Also wakes up the listener on new completed test_case.""" |