| OLD | NEW |
| 1 # Copyright 2013 The Chromium Authors. All rights reserved. | 1 # Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 """Classes and functions related to threading.""" | 5 """Classes and functions related to threading.""" |
| 6 | 6 |
| 7 import functools | 7 import functools |
| 8 import inspect | 8 import inspect |
| 9 import logging | 9 import logging |
| 10 import os | 10 import os |
| (...skipping 354 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 365 raise | 365 raise |
| 366 channel.send_exception(e) | 366 channel.send_exception(e) |
| 367 except Exception as e: | 367 except Exception as e: |
| 368 if channel is None: | 368 if channel is None: |
| 369 raise | 369 raise |
| 370 channel.send_exception(e) | 370 channel.send_exception(e) |
| 371 | 371 |
| 372 | 372 |
| 373 class Progress(object): | 373 class Progress(object): |
| 374 """Prints progress and accepts updates thread-safely.""" | 374 """Prints progress and accepts updates thread-safely.""" |
| 375 def __init__(self, size): | 375 def __init__(self, initial_values): |
| 376 # To be used in the primary thread | 376 """Creates a Progress bar that will updates asynchronously from the worker |
| 377 self.last_printed_line = '' | 377 threads. |
| 378 self.index = 0 | 378 |
| 379 self.start = time.time() | 379 Arguments: |
| 380 self.size = size | 380 initial_values: list of int, defines both the number of columns and their |
| 381 # Setting it to True forces a print on the first print_update() call. | 381 initial values. |
| 382 self.value_changed = True | 382 """ |
| 383 assert all(isinstance(i, int) for i in initial_values) |
| 384 # Members to be used exclusively in the primary thread. |
| 383 self.use_cr_only = True | 385 self.use_cr_only = True |
| 384 self.unfinished_commands = set() | 386 self.unfinished_commands = set() |
| 387 self._start = time.time() |
| 388 self._last_printed_line = '' |
| 389 self._columns = initial_values[:] |
| 390 # Setting it to True forces a print on the first print_update() call. |
| 391 self._value_changed = True |
| 385 | 392 |
| 386 # To be used in all threads. | 393 # To be used in all threads. |
| 387 self.queued_lines = Queue.Queue() | 394 self._queued_updates = Queue.Queue() |
| 388 | 395 |
| 389 def update_item(self, name, index=0, size=0, raw=False): | 396 def update_item(self, name, raw=False, **kwargs): |
| 390 """Queue information to print out. | 397 """Queue information to print out. |
| 391 | 398 |
| 392 Arguments: | 399 Arguments: |
| 393 index: increment to add to index. usually 0 or 1. | |
| 394 size: increment to add to size. usually 0 or 1. | |
| 395 raw: if True, prints the data without the header. | 400 raw: if True, prints the data without the header. |
| 401 colN: increments column N to add to index, 0 based, value is usually 0 or |
| 402 1. |
| 396 """ | 403 """ |
| 397 assert isinstance(name, str) | 404 assert isinstance(name, str) |
| 398 assert isinstance(index, int) | |
| 399 assert isinstance(size, int) | |
| 400 assert isinstance(raw, bool) | 405 assert isinstance(raw, bool) |
| 401 self.queued_lines.put((name, index, size, raw)) | 406 assert all( |
| 407 k.startswith('col') and len(k) == 4 and k[3].isdigit() for k in kwargs) |
| 408 assert all(isinstance(v, int) for v in kwargs.itervalues()) |
| 409 args = [(int(k[3]), v) for k, v in kwargs.iteritems()] |
| 410 self._queued_updates.put((name, raw, args)) |
| 402 | 411 |
| 403 def print_update(self): | 412 def print_update(self): |
| 404 """Prints the current status.""" | 413 """Prints the current status.""" |
| 405 # Flush all the logging output so it doesn't appear within this output. | 414 # Flush all the logging output so it doesn't appear within this output. |
| 406 for handler in logging.root.handlers: | 415 for handler in logging.root.handlers: |
| 407 handler.flush() | 416 handler.flush() |
| 408 | 417 |
| 409 got_one = False | 418 got_one = False |
| 410 while True: | 419 while True: |
| 411 try: | 420 try: |
| 412 name, index, size, raw = self.queued_lines.get_nowait() | 421 name, raw, args = self._queued_updates.get_nowait() |
| 413 except Queue.Empty: | 422 except Queue.Empty: |
| 414 break | 423 break |
| 415 | 424 |
| 416 self.size += size | 425 for k, v in args: |
| 417 self.index += index | 426 self._columns[k] += v |
| 418 self.value_changed = bool(size or index) | 427 self._value_changed = bool(args) |
| 419 if not name: | 428 if not name: |
| 420 # Even if raw=True, there's nothing to print. | 429 # Even if raw=True, there's nothing to print. |
| 421 continue | 430 continue |
| 422 | 431 |
| 423 got_one = True | 432 got_one = True |
| 424 if raw: | 433 if raw: |
| 425 # Prints the data as-is. | 434 # Prints the data as-is. |
| 426 self.last_printed_line = '' | 435 self._last_printed_line = '' |
| 427 sys.stdout.write('\n%s\n' % name.strip('\n')) | 436 sys.stdout.write('\n%s\n' % name.strip('\n')) |
| 428 else: | 437 else: |
| 429 line, self.last_printed_line = self.gen_line(name) | 438 line, self._last_printed_line = self._gen_line(name) |
| 430 sys.stdout.write(line) | 439 sys.stdout.write(line) |
| 431 | 440 |
| 432 if not got_one and self.value_changed: | 441 if not got_one and self._value_changed: |
| 433 # Make sure a line is printed in that case where statistics changes. | 442 # Make sure a line is printed in that case where statistics changes. |
| 434 line, self.last_printed_line = self.gen_line('') | 443 line, self._last_printed_line = self._gen_line('') |
| 435 sys.stdout.write(line) | 444 sys.stdout.write(line) |
| 436 got_one = True | 445 got_one = True |
| 437 self.value_changed = False | 446 self._value_changed = False |
| 438 if got_one: | 447 if got_one: |
| 439 # Ensure that all the output is flushed to prevent it from getting mixed | 448 # Ensure that all the output is flushed to prevent it from getting mixed |
| 440 # with other output streams (like the logging streams). | 449 # with other output streams (like the logging streams). |
| 441 sys.stdout.flush() | 450 sys.stdout.flush() |
| 442 | 451 |
| 443 if self.unfinished_commands: | 452 if self.unfinished_commands: |
| 444 logging.debug('Waiting for the following commands to finish:\n%s', | 453 logging.debug('Waiting for the following commands to finish:\n%s', |
| 445 '\n'.join(self.unfinished_commands)) | 454 '\n'.join(self.unfinished_commands)) |
| 446 | 455 |
| 447 def gen_line(self, name): | 456 def _gen_line(self, name): |
| 448 """Generates the line to be printed.""" | 457 """Generates the line to be printed.""" |
| 449 next_line = ('[%*d/%d] %6.2fs %s') % ( | 458 next_line = ('[%s] %6.2fs %s') % ( |
| 450 len(str(self.size)), self.index, | 459 self._render_columns(), time.time() - self._start, name) |
| 451 self.size, | |
| 452 time.time() - self.start, | |
| 453 name) | |
| 454 # Fill it with whitespace only if self.use_cr_only is set. | 460 # Fill it with whitespace only if self.use_cr_only is set. |
| 455 prefix = '' | 461 prefix = '' |
| 456 if self.use_cr_only and self.last_printed_line: | 462 if self.use_cr_only and self._last_printed_line: |
| 457 prefix = '\r' | 463 prefix = '\r' |
| 458 if self.use_cr_only: | 464 if self.use_cr_only: |
| 459 suffix = ' ' * max(0, len(self.last_printed_line) - len(next_line)) | 465 suffix = ' ' * max(0, len(self._last_printed_line) - len(next_line)) |
| 460 else: | 466 else: |
| 461 suffix = '\n' | 467 suffix = '\n' |
| 462 return '%s%s%s' % (prefix, next_line, suffix), next_line | 468 return '%s%s%s' % (prefix, next_line, suffix), next_line |
| 463 | 469 |
| 470 def _render_columns(self): |
| 471 """Renders the columns.""" |
| 472 columns_as_str = map(str, self._columns) |
| 473 max_len = max(map(len, columns_as_str)) |
| 474 return '/'.join(i.rjust(max_len) for i in columns_as_str) |
| 475 |
| 464 | 476 |
| 465 class QueueWithProgress(Queue.PriorityQueue): | 477 class QueueWithProgress(Queue.PriorityQueue): |
| 466 """Implements progress support in join().""" | 478 """Implements progress support in join().""" |
| 467 def __init__(self, maxsize, *args, **kwargs): | 479 def __init__(self, progress, *args, **kwargs): |
| 468 Queue.PriorityQueue.__init__(self, *args, **kwargs) | 480 Queue.PriorityQueue.__init__(self, *args, **kwargs) |
| 469 self.progress = Progress(maxsize) | |
| 470 | |
| 471 def set_progress(self, progress): | |
| 472 """Replace the current progress, mainly used when a progress should be | |
| 473 shared between queues.""" | |
| 474 self.progress = progress | 481 self.progress = progress |
| 475 | 482 |
| 476 def task_done(self): | 483 def task_done(self): |
| 477 """Contrary to Queue.task_done(), it wakes self.all_tasks_done at each task | 484 """Contrary to Queue.task_done(), it wakes self.all_tasks_done at each task |
| 478 done. | 485 done. |
| 479 """ | 486 """ |
| 480 with self.all_tasks_done: | 487 with self.all_tasks_done: |
| 481 try: | 488 try: |
| 482 unfinished = self.unfinished_tasks - 1 | 489 unfinished = self.unfinished_tasks - 1 |
| 483 if unfinished < 0: | 490 if unfinished < 0: |
| (...skipping 23 matching lines...) Expand all Loading... |
| 507 self.progress.print_update() | 514 self.progress.print_update() |
| 508 # Use a short wait timeout so updates are printed in a timely manner. | 515 # Use a short wait timeout so updates are printed in a timely manner. |
| 509 self.all_tasks_done.wait(0.1) | 516 self.all_tasks_done.wait(0.1) |
| 510 self.progress.print_update() | 517 self.progress.print_update() |
| 511 | 518 |
| 512 | 519 |
| 513 class ThreadPoolWithProgress(ThreadPool): | 520 class ThreadPoolWithProgress(ThreadPool): |
| 514 QUEUE_CLASS = QueueWithProgress | 521 QUEUE_CLASS = QueueWithProgress |
| 515 | 522 |
| 516 def __init__(self, progress, *args, **kwargs): | 523 def __init__(self, progress, *args, **kwargs): |
| 524 self.QUEUE_CLASS = functools.partial(self.QUEUE_CLASS, progress) |
| 517 super(ThreadPoolWithProgress, self).__init__(*args, **kwargs) | 525 super(ThreadPoolWithProgress, self).__init__(*args, **kwargs) |
| 518 self.tasks.set_progress(progress) | |
| 519 | 526 |
| 520 def _output_append(self, out): | 527 def _output_append(self, out): |
| 521 """Also wakes up the listener on new completed test_case.""" | 528 """Also wakes up the listener on new completed test_case.""" |
| 522 super(ThreadPoolWithProgress, self)._output_append(out) | 529 super(ThreadPoolWithProgress, self)._output_append(out) |
| 523 self.tasks.wake_up() | 530 self.tasks.wake_up() |
| 524 | 531 |
| 525 | 532 |
| 526 class DeadlockDetector(object): | 533 class DeadlockDetector(object): |
| 527 """Context manager that can detect deadlocks. | 534 """Context manager that can detect deadlocks. |
| 528 | 535 |
| (...skipping 174 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 703 # Multiprocessing | 710 # Multiprocessing |
| 704 import multiprocessing | 711 import multiprocessing |
| 705 return multiprocessing.cpu_count() | 712 return multiprocessing.cpu_count() |
| 706 except: # pylint: disable=W0702 | 713 except: # pylint: disable=W0702 |
| 707 try: | 714 try: |
| 708 # Mac OS 10.6 | 715 # Mac OS 10.6 |
| 709 return int(os.sysconf('SC_NPROCESSORS_ONLN')) # pylint: disable=E1101 | 716 return int(os.sysconf('SC_NPROCESSORS_ONLN')) # pylint: disable=E1101 |
| 710 except: | 717 except: |
| 711 # Some of the windows builders seem to get here. | 718 # Some of the windows builders seem to get here. |
| 712 return 4 | 719 return 4 |
| OLD | NEW |