OLD | NEW |
1 # Copyright 2013 The Chromium Authors. All rights reserved. | 1 # Copyright 2013 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 """Classes and functions related to threading.""" | 5 """Classes and functions related to threading.""" |
6 | 6 |
7 import functools | 7 import functools |
8 import inspect | 8 import inspect |
9 import logging | 9 import logging |
10 import os | 10 import os |
(...skipping 354 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
365 raise | 365 raise |
366 channel.send_exception(e) | 366 channel.send_exception(e) |
367 except Exception as e: | 367 except Exception as e: |
368 if channel is None: | 368 if channel is None: |
369 raise | 369 raise |
370 channel.send_exception(e) | 370 channel.send_exception(e) |
371 | 371 |
372 | 372 |
373 class Progress(object): | 373 class Progress(object): |
374 """Prints progress and accepts updates thread-safely.""" | 374 """Prints progress and accepts updates thread-safely.""" |
375 def __init__(self, size): | 375 def __init__(self, initial_values): |
376 # To be used in the primary thread | 376 """Creates a Progress bar that will updates asynchronously from the worker |
377 self.last_printed_line = '' | 377 threads. |
378 self.index = 0 | 378 |
379 self.start = time.time() | 379 Arguments: |
380 self.size = size | 380 initial_values: list of int, defines both the number of columns and their |
381 # Setting it to True forces a print on the first print_update() call. | 381 initial values. |
382 self.value_changed = True | 382 """ |
| 383 assert all(isinstance(i, int) for i in initial_values) |
| 384 # Members to be used exclusively in the primary thread. |
383 self.use_cr_only = True | 385 self.use_cr_only = True |
384 self.unfinished_commands = set() | 386 self.unfinished_commands = set() |
| 387 self._start = time.time() |
| 388 self._last_printed_line = '' |
| 389 self._columns = initial_values[:] |
| 390 # Setting it to True forces a print on the first print_update() call. |
| 391 self._value_changed = True |
385 | 392 |
386 # To be used in all threads. | 393 # To be used in all threads. |
387 self.queued_lines = Queue.Queue() | 394 self._queued_updates = Queue.Queue() |
388 | 395 |
389 def update_item(self, name, index=0, size=0, raw=False): | 396 def update_item(self, name, raw=False, **kwargs): |
390 """Queue information to print out. | 397 """Queue information to print out. |
391 | 398 |
392 Arguments: | 399 Arguments: |
393 index: increment to add to index. usually 0 or 1. | |
394 size: increment to add to size. usually 0 or 1. | |
395 raw: if True, prints the data without the header. | 400 raw: if True, prints the data without the header. |
| 401 colN: increments column N to add to index, 0 based, value is usually 0 or |
| 402 1. |
396 """ | 403 """ |
397 assert isinstance(name, str) | 404 assert isinstance(name, str) |
398 assert isinstance(index, int) | |
399 assert isinstance(size, int) | |
400 assert isinstance(raw, bool) | 405 assert isinstance(raw, bool) |
401 self.queued_lines.put((name, index, size, raw)) | 406 assert all( |
| 407 k.startswith('col') and len(k) == 4 and k[3].isdigit() for k in kwargs) |
| 408 assert all(isinstance(v, int) for v in kwargs.itervalues()) |
| 409 args = [(int(k[3]), v) for k, v in kwargs.iteritems()] |
| 410 self._queued_updates.put((name, raw, args)) |
402 | 411 |
403 def print_update(self): | 412 def print_update(self): |
404 """Prints the current status.""" | 413 """Prints the current status.""" |
405 # Flush all the logging output so it doesn't appear within this output. | 414 # Flush all the logging output so it doesn't appear within this output. |
406 for handler in logging.root.handlers: | 415 for handler in logging.root.handlers: |
407 handler.flush() | 416 handler.flush() |
408 | 417 |
409 got_one = False | 418 got_one = False |
410 while True: | 419 while True: |
411 try: | 420 try: |
412 name, index, size, raw = self.queued_lines.get_nowait() | 421 name, raw, args = self._queued_updates.get_nowait() |
413 except Queue.Empty: | 422 except Queue.Empty: |
414 break | 423 break |
415 | 424 |
416 self.size += size | 425 for k, v in args: |
417 self.index += index | 426 self._columns[k] += v |
418 self.value_changed = bool(size or index) | 427 self._value_changed = bool(args) |
419 if not name: | 428 if not name: |
420 # Even if raw=True, there's nothing to print. | 429 # Even if raw=True, there's nothing to print. |
421 continue | 430 continue |
422 | 431 |
423 got_one = True | 432 got_one = True |
424 if raw: | 433 if raw: |
425 # Prints the data as-is. | 434 # Prints the data as-is. |
426 self.last_printed_line = '' | 435 self._last_printed_line = '' |
427 sys.stdout.write('\n%s\n' % name.strip('\n')) | 436 sys.stdout.write('\n%s\n' % name.strip('\n')) |
428 else: | 437 else: |
429 line, self.last_printed_line = self.gen_line(name) | 438 line, self._last_printed_line = self._gen_line(name) |
430 sys.stdout.write(line) | 439 sys.stdout.write(line) |
431 | 440 |
432 if not got_one and self.value_changed: | 441 if not got_one and self._value_changed: |
433 # Make sure a line is printed in that case where statistics changes. | 442 # Make sure a line is printed in that case where statistics changes. |
434 line, self.last_printed_line = self.gen_line('') | 443 line, self._last_printed_line = self._gen_line('') |
435 sys.stdout.write(line) | 444 sys.stdout.write(line) |
436 got_one = True | 445 got_one = True |
437 self.value_changed = False | 446 self._value_changed = False |
438 if got_one: | 447 if got_one: |
439 # Ensure that all the output is flushed to prevent it from getting mixed | 448 # Ensure that all the output is flushed to prevent it from getting mixed |
440 # with other output streams (like the logging streams). | 449 # with other output streams (like the logging streams). |
441 sys.stdout.flush() | 450 sys.stdout.flush() |
442 | 451 |
443 if self.unfinished_commands: | 452 if self.unfinished_commands: |
444 logging.debug('Waiting for the following commands to finish:\n%s', | 453 logging.debug('Waiting for the following commands to finish:\n%s', |
445 '\n'.join(self.unfinished_commands)) | 454 '\n'.join(self.unfinished_commands)) |
446 | 455 |
447 def gen_line(self, name): | 456 def _gen_line(self, name): |
448 """Generates the line to be printed.""" | 457 """Generates the line to be printed.""" |
449 next_line = ('[%*d/%d] %6.2fs %s') % ( | 458 next_line = ('[%s] %6.2fs %s') % ( |
450 len(str(self.size)), self.index, | 459 self._render_columns(), time.time() - self._start, name) |
451 self.size, | |
452 time.time() - self.start, | |
453 name) | |
454 # Fill it with whitespace only if self.use_cr_only is set. | 460 # Fill it with whitespace only if self.use_cr_only is set. |
455 prefix = '' | 461 prefix = '' |
456 if self.use_cr_only and self.last_printed_line: | 462 if self.use_cr_only and self._last_printed_line: |
457 prefix = '\r' | 463 prefix = '\r' |
458 if self.use_cr_only: | 464 if self.use_cr_only: |
459 suffix = ' ' * max(0, len(self.last_printed_line) - len(next_line)) | 465 suffix = ' ' * max(0, len(self._last_printed_line) - len(next_line)) |
460 else: | 466 else: |
461 suffix = '\n' | 467 suffix = '\n' |
462 return '%s%s%s' % (prefix, next_line, suffix), next_line | 468 return '%s%s%s' % (prefix, next_line, suffix), next_line |
463 | 469 |
| 470 def _render_columns(self): |
| 471 """Renders the columns.""" |
| 472 columns_as_str = map(str, self._columns) |
| 473 max_len = max(map(len, columns_as_str)) |
| 474 return '/'.join(i.rjust(max_len) for i in columns_as_str) |
| 475 |
464 | 476 |
465 class QueueWithProgress(Queue.PriorityQueue): | 477 class QueueWithProgress(Queue.PriorityQueue): |
466 """Implements progress support in join().""" | 478 """Implements progress support in join().""" |
467 def __init__(self, maxsize, *args, **kwargs): | 479 def __init__(self, progress, *args, **kwargs): |
468 Queue.PriorityQueue.__init__(self, *args, **kwargs) | 480 Queue.PriorityQueue.__init__(self, *args, **kwargs) |
469 self.progress = Progress(maxsize) | |
470 | |
471 def set_progress(self, progress): | |
472 """Replace the current progress, mainly used when a progress should be | |
473 shared between queues.""" | |
474 self.progress = progress | 481 self.progress = progress |
475 | 482 |
476 def task_done(self): | 483 def task_done(self): |
477 """Contrary to Queue.task_done(), it wakes self.all_tasks_done at each task | 484 """Contrary to Queue.task_done(), it wakes self.all_tasks_done at each task |
478 done. | 485 done. |
479 """ | 486 """ |
480 with self.all_tasks_done: | 487 with self.all_tasks_done: |
481 try: | 488 try: |
482 unfinished = self.unfinished_tasks - 1 | 489 unfinished = self.unfinished_tasks - 1 |
483 if unfinished < 0: | 490 if unfinished < 0: |
(...skipping 23 matching lines...) Expand all Loading... |
507 self.progress.print_update() | 514 self.progress.print_update() |
508 # Use a short wait timeout so updates are printed in a timely manner. | 515 # Use a short wait timeout so updates are printed in a timely manner. |
509 self.all_tasks_done.wait(0.1) | 516 self.all_tasks_done.wait(0.1) |
510 self.progress.print_update() | 517 self.progress.print_update() |
511 | 518 |
512 | 519 |
513 class ThreadPoolWithProgress(ThreadPool): | 520 class ThreadPoolWithProgress(ThreadPool): |
514 QUEUE_CLASS = QueueWithProgress | 521 QUEUE_CLASS = QueueWithProgress |
515 | 522 |
516 def __init__(self, progress, *args, **kwargs): | 523 def __init__(self, progress, *args, **kwargs): |
| 524 self.QUEUE_CLASS = functools.partial(self.QUEUE_CLASS, progress) |
517 super(ThreadPoolWithProgress, self).__init__(*args, **kwargs) | 525 super(ThreadPoolWithProgress, self).__init__(*args, **kwargs) |
518 self.tasks.set_progress(progress) | |
519 | 526 |
520 def _output_append(self, out): | 527 def _output_append(self, out): |
521 """Also wakes up the listener on new completed test_case.""" | 528 """Also wakes up the listener on new completed test_case.""" |
522 super(ThreadPoolWithProgress, self)._output_append(out) | 529 super(ThreadPoolWithProgress, self)._output_append(out) |
523 self.tasks.wake_up() | 530 self.tasks.wake_up() |
524 | 531 |
525 | 532 |
526 class DeadlockDetector(object): | 533 class DeadlockDetector(object): |
527 """Context manager that can detect deadlocks. | 534 """Context manager that can detect deadlocks. |
528 | 535 |
(...skipping 174 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
703 # Multiprocessing | 710 # Multiprocessing |
704 import multiprocessing | 711 import multiprocessing |
705 return multiprocessing.cpu_count() | 712 return multiprocessing.cpu_count() |
706 except: # pylint: disable=W0702 | 713 except: # pylint: disable=W0702 |
707 try: | 714 try: |
708 # Mac OS 10.6 | 715 # Mac OS 10.6 |
709 return int(os.sysconf('SC_NPROCESSORS_ONLN')) # pylint: disable=E1101 | 716 return int(os.sysconf('SC_NPROCESSORS_ONLN')) # pylint: disable=E1101 |
710 except: | 717 except: |
711 # Some of the windows builders seem to get here. | 718 # Some of the windows builders seem to get here. |
712 return 4 | 719 return 4 |
OLD | NEW |