Index: utils/threading_utils.py |
diff --git a/utils/threading_utils.py b/utils/threading_utils.py |
index e0ef2160af86bcc3a2c2a74225325f21c196688e..12de3041b27b93e44630a8dec9552e1e8e68b710 100644 |
--- a/utils/threading_utils.py |
+++ b/utils/threading_utils.py |
@@ -265,6 +265,23 @@ class ThreadPool(object): |
'Thread pool \'%s\' closed: spawned %d threads total', |
self._prefix, len(self._workers)) |
+ def abort(self): |
+ """Empties the queue. |
+ |
+ To be used when the pool should stop early, like when Ctrl-C was detected. |
+ |
+ Returns: |
+ Number of tasks cancelled. |
+ """ |
+ index = 0 |
+ while True: |
+ try: |
+ self.tasks.get_nowait() |
+ self.tasks.task_done() |
+ index += 1 |
+ except Queue.Empty: |
+ return index |
+ |
def __enter__(self): |
"""Enables 'with' statement.""" |
return self |
@@ -369,14 +386,18 @@ class Progress(object): |
# To be used in all threads. |
self.queued_lines = Queue.Queue() |
- def update_item(self, name, index=False, size=False, raw=False): |
+ def update_item(self, name, index=0, size=0, raw=False): |
"""Queue information to print out. |
Arguments: |
- index: index should be incremented. |
- size: total size should be incremented. |
+ index: increment to add to index. usually 0 or 1. |
+ size: increment to add to size. usually 0 or 1. |
raw: if True, prints the data without the header. |
""" |
+ assert isinstance(name, str) |
+ assert isinstance(index, int) |
+ assert isinstance(size, int) |
+ assert isinstance(raw, bool) |
self.queued_lines.put((name, index, size, raw)) |
def print_update(self): |
@@ -392,12 +413,9 @@ class Progress(object): |
except Queue.Empty: |
break |
- if size: |
- self.size += 1 |
- self.value_changed = True |
- if index: |
- self.index += 1 |
- self.value_changed = True |
+ self.size += size |
+ self.index += index |
+ self.value_changed = bool(size or index) |
if not name: |
# Even if raw=True, there's nothing to print. |
continue |
@@ -487,7 +505,10 @@ class QueueWithProgress(Queue.PriorityQueue): |
with self.all_tasks_done: |
while self.unfinished_tasks: |
self.progress.print_update() |
- self.all_tasks_done.wait(60.) |
+ # Use a short wait timeout so updates are printed in a timely manner. |
+ # TODO(maruel): Find a way so Progress.queue and self.all_tasks_done |
+ # share the same underlying event so no polling is necessary. |
+ self.all_tasks_done.wait(0.1) |
self.progress.print_update() |