Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(70)

Side by Side Diff: utils/threading_utils.py

Issue 25478010: Add ThreadPool.abort() to stop processing early. (Closed) Base URL: https://chromium.googlesource.com/a/chromium/tools/swarm_client@master
Patch Set: s/slow/short/ Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2013 The Chromium Authors. All rights reserved. 1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """Classes and functions related to threading.""" 5 """Classes and functions related to threading."""
6 6
7 import functools 7 import functools
8 import inspect 8 import inspect
9 import logging 9 import logging
10 import os 10 import os
(...skipping 247 matching lines...) Expand 10 before | Expand all | Expand 10 after
258 self._is_closed = True 258 self._is_closed = True
259 for _ in range(len(self._workers)): 259 for _ in range(len(self._workers)):
260 # Enqueueing None causes the worker to stop. 260 # Enqueueing None causes the worker to stop.
261 self.tasks.put(None) 261 self.tasks.put(None)
262 for t in self._workers: 262 for t in self._workers:
263 t.join() 263 t.join()
264 logging.debug( 264 logging.debug(
265 'Thread pool \'%s\' closed: spawned %d threads total', 265 'Thread pool \'%s\' closed: spawned %d threads total',
266 self._prefix, len(self._workers)) 266 self._prefix, len(self._workers))
267 267
268 def abort(self):
269 """Empties the queue.
270
271 To be used when the pool should stop early, like when Ctrl-C was detected.
272
273 Returns:
274 Number of tasks cancelled.
275 """
276 index = 0
277 while True:
278 try:
279 self.tasks.get_nowait()
280 self.tasks.task_done()
281 index += 1
282 except Queue.Empty:
283 return index
284
268 def __enter__(self): 285 def __enter__(self):
269 """Enables 'with' statement.""" 286 """Enables 'with' statement."""
270 return self 287 return self
271 288
272 def __exit__(self, _exc_type, _exc_value, _traceback): 289 def __exit__(self, _exc_type, _exc_value, _traceback):
273 """Enables 'with' statement.""" 290 """Enables 'with' statement."""
274 self.close() 291 self.close()
275 292
276 293
277 class AutoRetryThreadPool(ThreadPool): 294 class AutoRetryThreadPool(ThreadPool):
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after
362 self.start = time.time() 379 self.start = time.time()
363 self.size = size 380 self.size = size
364 # Setting it to True forces a print on the first print_update() call. 381 # Setting it to True forces a print on the first print_update() call.
365 self.value_changed = True 382 self.value_changed = True
366 self.use_cr_only = True 383 self.use_cr_only = True
367 self.unfinished_commands = set() 384 self.unfinished_commands = set()
368 385
369 # To be used in all threads. 386 # To be used in all threads.
370 self.queued_lines = Queue.Queue() 387 self.queued_lines = Queue.Queue()
371 388
372 def update_item(self, name, index=False, size=False, raw=False): 389 def update_item(self, name, index=0, size=0, raw=False):
Marc-Antoine Ruel (Google) 2013/10/02 14:08:31 Eventually I want it to support an arbitrary numbe
373 """Queue information to print out. 390 """Queue information to print out.
374 391
375 Arguments: 392 Arguments:
376 index: index should be incremented. 393 index: increment to add to index. usually 0 or 1.
377 size: total size should be incremented. 394 size: increment to add to size. usually 0 or 1.
378 raw: if True, prints the data without the header. 395 raw: if True, prints the data without the header.
379 """ 396 """
397 assert isinstance(name, str)
398 assert isinstance(index, int)
399 assert isinstance(size, int)
400 assert isinstance(raw, bool)
380 self.queued_lines.put((name, index, size, raw)) 401 self.queued_lines.put((name, index, size, raw))
381 402
382 def print_update(self): 403 def print_update(self):
383 """Prints the current status.""" 404 """Prints the current status."""
384 # Flush all the logging output so it doesn't appear within this output. 405 # Flush all the logging output so it doesn't appear within this output.
385 for handler in logging.root.handlers: 406 for handler in logging.root.handlers:
386 handler.flush() 407 handler.flush()
387 408
388 got_one = False 409 got_one = False
389 while True: 410 while True:
390 try: 411 try:
391 name, index, size, raw = self.queued_lines.get_nowait() 412 name, index, size, raw = self.queued_lines.get_nowait()
392 except Queue.Empty: 413 except Queue.Empty:
393 break 414 break
394 415
395 if size: 416 self.size += size
396 self.size += 1 417 self.index += index
397 self.value_changed = True 418 self.value_changed = bool(size or index)
398 if index:
399 self.index += 1
400 self.value_changed = True
401 if not name: 419 if not name:
402 # Even if raw=True, there's nothing to print. 420 # Even if raw=True, there's nothing to print.
403 continue 421 continue
404 422
405 got_one = True 423 got_one = True
406 if raw: 424 if raw:
407 # Prints the data as-is. 425 # Prints the data as-is.
408 self.last_printed_line = '' 426 self.last_printed_line = ''
409 sys.stdout.write('\n%s\n' % name.strip('\n')) 427 sys.stdout.write('\n%s\n' % name.strip('\n'))
410 else: 428 else:
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after
480 # between the number of output and the number of input task. 498 # between the number of output and the number of input task.
481 with self.all_tasks_done: 499 with self.all_tasks_done:
482 self.all_tasks_done.notify_all() 500 self.all_tasks_done.notify_all()
483 501
484 def join(self): 502 def join(self):
485 """Calls print_update() whenever possible.""" 503 """Calls print_update() whenever possible."""
486 self.progress.print_update() 504 self.progress.print_update()
487 with self.all_tasks_done: 505 with self.all_tasks_done:
488 while self.unfinished_tasks: 506 while self.unfinished_tasks:
489 self.progress.print_update() 507 self.progress.print_update()
490 self.all_tasks_done.wait(60.) 508 # Use a short wait timeout so updates are printed in a timely manner.
509 self.all_tasks_done.wait(0.1)
Vadim Sh. 2013/10/02 18:19:48 I'm unhappy about polling. I think print_update ca
Marc-Antoine Ruel (Google) 2013/10/02 19:41:26 I agree it's far from awesome but I prefer the Pro
491 self.progress.print_update() 510 self.progress.print_update()
492 511
493 512
494 class ThreadPoolWithProgress(ThreadPool): 513 class ThreadPoolWithProgress(ThreadPool):
495 QUEUE_CLASS = QueueWithProgress 514 QUEUE_CLASS = QueueWithProgress
496 515
497 def __init__(self, progress, *args, **kwargs): 516 def __init__(self, progress, *args, **kwargs):
498 super(ThreadPoolWithProgress, self).__init__(*args, **kwargs) 517 super(ThreadPoolWithProgress, self).__init__(*args, **kwargs)
499 self.tasks.set_progress(progress) 518 self.tasks.set_progress(progress)
500 519
(...skipping 183 matching lines...) Expand 10 before | Expand all | Expand 10 after
684 # Multiprocessing 703 # Multiprocessing
685 import multiprocessing 704 import multiprocessing
686 return multiprocessing.cpu_count() 705 return multiprocessing.cpu_count()
687 except: # pylint: disable=W0702 706 except: # pylint: disable=W0702
688 try: 707 try:
689 # Mac OS 10.6 708 # Mac OS 10.6
690 return int(os.sysconf('SC_NPROCESSORS_ONLN')) # pylint: disable=E1101 709 return int(os.sysconf('SC_NPROCESSORS_ONLN')) # pylint: disable=E1101
691 except: 710 except:
692 # Some of the windows builders seem to get here. 711 # Some of the windows builders seem to get here.
693 return 4 712 return 4
OLDNEW
« googletest/run_test_cases.py ('K') | « tools/run_swarm_tests_on_swarm.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698