Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(229)

Side by Side Diff: utils/threading_utils.py

Issue 25478012: Make Progress support an arbitrary number of columns. (Closed) Base URL: https://chromium.googlesource.com/a/chromium/tools/swarm_client@1_progress
Patch Set: Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2013 The Chromium Authors. All rights reserved. 1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """Classes and functions related to threading.""" 5 """Classes and functions related to threading."""
6 6
7 import functools 7 import functools
8 import inspect 8 import inspect
9 import logging 9 import logging
10 import os 10 import os
(...skipping 354 matching lines...) Expand 10 before | Expand all | Expand 10 after
365 raise 365 raise
366 channel.send_exception(e) 366 channel.send_exception(e)
367 except Exception as e: 367 except Exception as e:
368 if channel is None: 368 if channel is None:
369 raise 369 raise
370 channel.send_exception(e) 370 channel.send_exception(e)
371 371
372 372
373 class Progress(object): 373 class Progress(object):
374 """Prints progress and accepts updates thread-safely.""" 374 """Prints progress and accepts updates thread-safely."""
375 def __init__(self, size): 375 def __init__(self, initial_values):
376 # To be used in the primary thread 376 """Creates a Progress bar that will updates asynchronously from the worker
377 self.last_printed_line = '' 377 threads.
378 self.index = 0 378
379 self.start = time.time() 379 Arguments:
380 self.size = size 380 initial_values: list of int, defines both the number of columns and their
381 # Setting it to True forces a print on the first print_update() call. 381 initial values.
382 self.value_changed = True 382 """
383 assert all(isinstance(i, int) for i in initial_values)
384 # Members to be used exclusively in the primary thread.
383 self.use_cr_only = True 385 self.use_cr_only = True
384 self.unfinished_commands = set() 386 self.unfinished_commands = set()
387 self._start = time.time()
388 self._last_printed_line = ''
389 self._columns = initial_values[:]
390 # Setting it to True forces a print on the first print_update() call.
391 self._value_changed = True
385 392
386 # To be used in all threads. 393 # To be used in all threads.
387 self.queued_lines = Queue.Queue() 394 self._queued_updates = Queue.Queue()
388 395
389 def update_item(self, name, index=0, size=0, raw=False): 396 def update_item(self, name, raw=False, **kwargs):
390 """Queue information to print out. 397 """Queue information to print out.
391 398
392 Arguments: 399 Arguments:
393 index: increment to add to index. usually 0 or 1.
394 size: increment to add to size. usually 0 or 1.
395 raw: if True, prints the data without the header. 400 raw: if True, prints the data without the header.
401 colN: increments column N to add to index, 0 based, value is usually 0 or
402 1.
396 """ 403 """
397 assert isinstance(name, str) 404 assert isinstance(name, str)
398 assert isinstance(index, int)
399 assert isinstance(size, int)
400 assert isinstance(raw, bool) 405 assert isinstance(raw, bool)
401 self.queued_lines.put((name, index, size, raw)) 406 assert all(
407 k.startswith('col') and len(k) == 4 and k[3].isdigit() for k in kwargs)
408 assert all(isinstance(v, int) for v in kwargs.itervalues())
409 args = [(int(k[3]), v) for k, v in kwargs.iteritems()]
410 self._queued_updates.put((name, raw, args))
402 411
403 def print_update(self): 412 def print_update(self):
404 """Prints the current status.""" 413 """Prints the current status."""
405 # Flush all the logging output so it doesn't appear within this output. 414 # Flush all the logging output so it doesn't appear within this output.
406 for handler in logging.root.handlers: 415 for handler in logging.root.handlers:
407 handler.flush() 416 handler.flush()
408 417
409 got_one = False 418 got_one = False
410 while True: 419 while True:
411 try: 420 try:
412 name, index, size, raw = self.queued_lines.get_nowait() 421 name, raw, args = self._queued_updates.get_nowait()
413 except Queue.Empty: 422 except Queue.Empty:
414 break 423 break
415 424
416 self.size += size 425 for k, v in args:
417 self.index += index 426 self._columns[k] += v
418 self.value_changed = bool(size or index) 427 self._value_changed = bool(args)
419 if not name: 428 if not name:
420 # Even if raw=True, there's nothing to print. 429 # Even if raw=True, there's nothing to print.
421 continue 430 continue
422 431
423 got_one = True 432 got_one = True
424 if raw: 433 if raw:
425 # Prints the data as-is. 434 # Prints the data as-is.
426 self.last_printed_line = '' 435 self._last_printed_line = ''
427 sys.stdout.write('\n%s\n' % name.strip('\n')) 436 sys.stdout.write('\n%s\n' % name.strip('\n'))
428 else: 437 else:
429 line, self.last_printed_line = self.gen_line(name) 438 line, self._last_printed_line = self._gen_line(name)
430 sys.stdout.write(line) 439 sys.stdout.write(line)
431 440
432 if not got_one and self.value_changed: 441 if not got_one and self._value_changed:
433 # Make sure a line is printed in that case where statistics changes. 442 # Make sure a line is printed in that case where statistics changes.
434 line, self.last_printed_line = self.gen_line('') 443 line, self._last_printed_line = self._gen_line('')
435 sys.stdout.write(line) 444 sys.stdout.write(line)
436 got_one = True 445 got_one = True
437 self.value_changed = False 446 self._value_changed = False
438 if got_one: 447 if got_one:
439 # Ensure that all the output is flushed to prevent it from getting mixed 448 # Ensure that all the output is flushed to prevent it from getting mixed
440 # with other output streams (like the logging streams). 449 # with other output streams (like the logging streams).
441 sys.stdout.flush() 450 sys.stdout.flush()
442 451
443 if self.unfinished_commands: 452 if self.unfinished_commands:
444 logging.debug('Waiting for the following commands to finish:\n%s', 453 logging.debug('Waiting for the following commands to finish:\n%s',
445 '\n'.join(self.unfinished_commands)) 454 '\n'.join(self.unfinished_commands))
446 455
447 def gen_line(self, name): 456 def _gen_line(self, name):
448 """Generates the line to be printed.""" 457 """Generates the line to be printed."""
449 next_line = ('[%*d/%d] %6.2fs %s') % ( 458 next_line = ('[%s] %6.2fs %s') % (
450 len(str(self.size)), self.index, 459 self._render_columns(), time.time() - self._start, name)
451 self.size,
452 time.time() - self.start,
453 name)
454 # Fill it with whitespace only if self.use_cr_only is set. 460 # Fill it with whitespace only if self.use_cr_only is set.
455 prefix = '' 461 prefix = ''
456 if self.use_cr_only and self.last_printed_line: 462 if self.use_cr_only and self._last_printed_line:
457 prefix = '\r' 463 prefix = '\r'
458 if self.use_cr_only: 464 if self.use_cr_only:
459 suffix = ' ' * max(0, len(self.last_printed_line) - len(next_line)) 465 suffix = ' ' * max(0, len(self._last_printed_line) - len(next_line))
460 else: 466 else:
461 suffix = '\n' 467 suffix = '\n'
462 return '%s%s%s' % (prefix, next_line, suffix), next_line 468 return '%s%s%s' % (prefix, next_line, suffix), next_line
463 469
470 def _render_columns(self):
471 """Renders the columns."""
472 columns_as_str = map(str, self._columns)
473 max_len = max(map(len, columns_as_str))
474 return '/'.join(i.rjust(max_len) for i in columns_as_str)
475
464 476
465 class QueueWithProgress(Queue.PriorityQueue): 477 class QueueWithProgress(Queue.PriorityQueue):
466 """Implements progress support in join().""" 478 """Implements progress support in join()."""
467 def __init__(self, maxsize, *args, **kwargs): 479 def __init__(self, progress, *args, **kwargs):
468 Queue.PriorityQueue.__init__(self, *args, **kwargs) 480 Queue.PriorityQueue.__init__(self, *args, **kwargs)
469 self.progress = Progress(maxsize)
470
471 def set_progress(self, progress):
472 """Replace the current progress, mainly used when a progress should be
473 shared between queues."""
474 self.progress = progress 481 self.progress = progress
475 482
476 def task_done(self): 483 def task_done(self):
477 """Contrary to Queue.task_done(), it wakes self.all_tasks_done at each task 484 """Contrary to Queue.task_done(), it wakes self.all_tasks_done at each task
478 done. 485 done.
479 """ 486 """
480 with self.all_tasks_done: 487 with self.all_tasks_done:
481 try: 488 try:
482 unfinished = self.unfinished_tasks - 1 489 unfinished = self.unfinished_tasks - 1
483 if unfinished < 0: 490 if unfinished < 0:
(...skipping 23 matching lines...) Expand all
507 self.progress.print_update() 514 self.progress.print_update()
508 # Use a short wait timeout so updates are printed in a timely manner. 515 # Use a short wait timeout so updates are printed in a timely manner.
509 self.all_tasks_done.wait(0.1) 516 self.all_tasks_done.wait(0.1)
510 self.progress.print_update() 517 self.progress.print_update()
511 518
512 519
513 class ThreadPoolWithProgress(ThreadPool): 520 class ThreadPoolWithProgress(ThreadPool):
514 QUEUE_CLASS = QueueWithProgress 521 QUEUE_CLASS = QueueWithProgress
515 522
516 def __init__(self, progress, *args, **kwargs): 523 def __init__(self, progress, *args, **kwargs):
524 self.QUEUE_CLASS = functools.partial(self.QUEUE_CLASS, progress)
517 super(ThreadPoolWithProgress, self).__init__(*args, **kwargs) 525 super(ThreadPoolWithProgress, self).__init__(*args, **kwargs)
518 self.tasks.set_progress(progress)
519 526
520 def _output_append(self, out): 527 def _output_append(self, out):
521 """Also wakes up the listener on new completed test_case.""" 528 """Also wakes up the listener on new completed test_case."""
522 super(ThreadPoolWithProgress, self)._output_append(out) 529 super(ThreadPoolWithProgress, self)._output_append(out)
523 self.tasks.wake_up() 530 self.tasks.wake_up()
524 531
525 532
526 class DeadlockDetector(object): 533 class DeadlockDetector(object):
527 """Context manager that can detect deadlocks. 534 """Context manager that can detect deadlocks.
528 535
(...skipping 174 matching lines...) Expand 10 before | Expand all | Expand 10 after
703 # Multiprocessing 710 # Multiprocessing
704 import multiprocessing 711 import multiprocessing
705 return multiprocessing.cpu_count() 712 return multiprocessing.cpu_count()
706 except: # pylint: disable=W0702 713 except: # pylint: disable=W0702
707 try: 714 try:
708 # Mac OS 10.6 715 # Mac OS 10.6
709 return int(os.sysconf('SC_NPROCESSORS_ONLN')) # pylint: disable=E1101 716 return int(os.sysconf('SC_NPROCESSORS_ONLN')) # pylint: disable=E1101
710 except: 717 except:
711 # Some of the windows builders seem to get here. 718 # Some of the windows builders seem to get here.
712 return 4 719 return 4
OLDNEW
« tools/isolateserver_load_test.py ('K') | « tools/run_swarm_tests_on_swarm.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698