Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(330)

Unified Diff: utils/threading_utils.py

Issue 25478010: Add ThreadPool.abort() to stop processing early. (Closed) Base URL: https://chromium.googlesource.com/a/chromium/tools/swarm_client@master
Patch Set: Add TODO Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « tools/run_swarm_tests_on_swarm.py ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: utils/threading_utils.py
diff --git a/utils/threading_utils.py b/utils/threading_utils.py
index e0ef2160af86bcc3a2c2a74225325f21c196688e..12de3041b27b93e44630a8dec9552e1e8e68b710 100644
--- a/utils/threading_utils.py
+++ b/utils/threading_utils.py
@@ -265,6 +265,23 @@ class ThreadPool(object):
'Thread pool \'%s\' closed: spawned %d threads total',
self._prefix, len(self._workers))
+ def abort(self):
+ """Empties the queue.
+
+ To be used when the pool should stop early, like when Ctrl-C was detected.
+
+ Returns:
+ Number of tasks cancelled.
+ """
+ index = 0
+ while True:
+ try:
+ self.tasks.get_nowait()
+ self.tasks.task_done()
+ index += 1
+ except Queue.Empty:
+ return index
+
def __enter__(self):
"""Enables 'with' statement."""
return self
@@ -369,14 +386,18 @@ class Progress(object):
# To be used in all threads.
self.queued_lines = Queue.Queue()
- def update_item(self, name, index=False, size=False, raw=False):
+ def update_item(self, name, index=0, size=0, raw=False):
"""Queue information to print out.
Arguments:
- index: index should be incremented.
- size: total size should be incremented.
+ index: increment to add to index. usually 0 or 1.
+ size: increment to add to size. usually 0 or 1.
raw: if True, prints the data without the header.
"""
+ assert isinstance(name, str)
+ assert isinstance(index, int)
+ assert isinstance(size, int)
+ assert isinstance(raw, bool)
self.queued_lines.put((name, index, size, raw))
def print_update(self):
@@ -392,12 +413,9 @@ class Progress(object):
except Queue.Empty:
break
- if size:
- self.size += 1
- self.value_changed = True
- if index:
- self.index += 1
- self.value_changed = True
+ self.size += size
+ self.index += index
+ self.value_changed = bool(size or index)
if not name:
# Even if raw=True, there's nothing to print.
continue
@@ -487,7 +505,10 @@ class QueueWithProgress(Queue.PriorityQueue):
with self.all_tasks_done:
while self.unfinished_tasks:
self.progress.print_update()
- self.all_tasks_done.wait(60.)
+ # Use a short wait timeout so updates are printed in a timely manner.
+ # TODO(maruel): Find a way so Progress.queue and self.all_tasks_done
+ # share the same underlying event so no polling is necessary.
+ self.all_tasks_done.wait(0.1)
self.progress.print_update()
« no previous file with comments | « tools/run_swarm_tests_on_swarm.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698