Index: utils/threading_utils.py |
diff --git a/utils/threading_utils.py b/utils/threading_utils.py |
index 12de3041b27b93e44630a8dec9552e1e8e68b710..cb3ecf63b8562e12a38eeb56a4d4131f0756a490 100644 |
--- a/utils/threading_utils.py |
+++ b/utils/threading_utils.py |
@@ -372,33 +372,43 @@ class AutoRetryThreadPool(ThreadPool): |
class Progress(object): |
"""Prints progress and accepts updates thread-safely.""" |
- def __init__(self, size): |
- # To be used in the primary thread |
- self.last_printed_line = '' |
- self.index = 0 |
- self.start = time.time() |
- self.size = size |
- # Setting it to True forces a print on the first print_update() call. |
- self.value_changed = True |
+ def __init__(self, columns): |
+ """Creates a Progress bar that will updates asynchronously from the worker |
+ threads. |
+ |
+ Arguments: |
+ columns: list of tuple(name, initialvalue), defines both the number of |
+ columns and their initial values. |
+ """ |
+ assert all( |
+ len(c) == 2 and isinstance(c[0], str) and isinstance(c[1], int) |
+ for c in columns), columns |
+ # Members to be used exclusively in the primary thread. |
self.use_cr_only = True |
self.unfinished_commands = set() |
+ self.start = time.time() |
+ self._last_printed_line = '' |
+ self._columns = [c[1] for c in columns] |
+ self._columns_lookup = dict((c[0], i) for i, c in enumerate(columns)) |
+ # Setting it to True forces a print on the first print_update() call. |
+ self._value_changed = True |
# To be used in all threads. |
- self.queued_lines = Queue.Queue() |
+ self._queued_updates = Queue.Queue() |
- def update_item(self, name, index=0, size=0, raw=False): |
+ def update_item(self, name, raw=False, **kwargs): |
"""Queue information to print out. |
Arguments: |
- index: increment to add to index. usually 0 or 1. |
- size: increment to add to size. usually 0 or 1. |
raw: if True, prints the data without the header. |
+ <name>: argument name is a name of a column. it's value is the increment |
+ to the column, value is usually 0 or 1. |
""" |
assert isinstance(name, str) |
- assert isinstance(index, int) |
- assert isinstance(size, int) |
assert isinstance(raw, bool) |
- self.queued_lines.put((name, index, size, raw)) |
+ assert all(isinstance(v, int) for v in kwargs.itervalues()) |
+ args = [(self._columns_lookup[k], v) for k, v in kwargs.iteritems() if v] |
+ self._queued_updates.put((name, raw, args)) |
def print_update(self): |
"""Prints the current status.""" |
@@ -409,13 +419,13 @@ class Progress(object): |
got_one = False |
while True: |
try: |
- name, index, size, raw = self.queued_lines.get_nowait() |
+ name, raw, args = self._queued_updates.get_nowait() |
except Queue.Empty: |
break |
- self.size += size |
- self.index += index |
- self.value_changed = bool(size or index) |
+ for k, v in args: |
+ self._columns[k] += v |
+ self._value_changed = bool(args) |
if not name: |
# Even if raw=True, there's nothing to print. |
continue |
@@ -423,18 +433,18 @@ class Progress(object): |
got_one = True |
if raw: |
# Prints the data as-is. |
- self.last_printed_line = '' |
+ self._last_printed_line = '' |
sys.stdout.write('\n%s\n' % name.strip('\n')) |
else: |
- line, self.last_printed_line = self.gen_line(name) |
+ line, self._last_printed_line = self._gen_line(name) |
sys.stdout.write(line) |
- if not got_one and self.value_changed: |
+ if not got_one and self._value_changed: |
# Make sure a line is printed in that case where statistics changes. |
- line, self.last_printed_line = self.gen_line('') |
+ line, self._last_printed_line = self._gen_line('') |
sys.stdout.write(line) |
got_one = True |
- self.value_changed = False |
+ self._value_changed = False |
if got_one: |
# Ensure that all the output is flushed to prevent it from getting mixed |
# with other output streams (like the logging streams). |
@@ -444,33 +454,31 @@ class Progress(object): |
logging.debug('Waiting for the following commands to finish:\n%s', |
'\n'.join(self.unfinished_commands)) |
- def gen_line(self, name): |
+ def _gen_line(self, name): |
"""Generates the line to be printed.""" |
- next_line = ('[%*d/%d] %6.2fs %s') % ( |
- len(str(self.size)), self.index, |
- self.size, |
- time.time() - self.start, |
- name) |
+ next_line = ('[%s] %6.2fs %s') % ( |
+ self._render_columns(), time.time() - self.start, name) |
# Fill it with whitespace only if self.use_cr_only is set. |
prefix = '' |
- if self.use_cr_only and self.last_printed_line: |
+ if self.use_cr_only and self._last_printed_line: |
prefix = '\r' |
if self.use_cr_only: |
- suffix = ' ' * max(0, len(self.last_printed_line) - len(next_line)) |
+ suffix = ' ' * max(0, len(self._last_printed_line) - len(next_line)) |
else: |
suffix = '\n' |
return '%s%s%s' % (prefix, next_line, suffix), next_line |
+ def _render_columns(self): |
+ """Renders the columns.""" |
+ columns_as_str = map(str, self._columns) |
+ max_len = max(map(len, columns_as_str)) |
+ return '/'.join(i.rjust(max_len) for i in columns_as_str) |
+ |
class QueueWithProgress(Queue.PriorityQueue): |
"""Implements progress support in join().""" |
- def __init__(self, maxsize, *args, **kwargs): |
+ def __init__(self, progress, *args, **kwargs): |
Queue.PriorityQueue.__init__(self, *args, **kwargs) |
- self.progress = Progress(maxsize) |
- |
- def set_progress(self, progress): |
- """Replace the current progress, mainly used when a progress should be |
- shared between queues.""" |
self.progress = progress |
def task_done(self): |
@@ -516,8 +524,8 @@ class ThreadPoolWithProgress(ThreadPool): |
QUEUE_CLASS = QueueWithProgress |
def __init__(self, progress, *args, **kwargs): |
+ self.QUEUE_CLASS = functools.partial(self.QUEUE_CLASS, progress) |
super(ThreadPoolWithProgress, self).__init__(*args, **kwargs) |
- self.tasks.set_progress(progress) |
def _output_append(self, out): |
"""Also wakes up the listener on new completed test_case.""" |