Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 # Copyright 2013 The Chromium Authors. All rights reserved. | 1 # Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 """Classes and functions related to threading.""" | 5 """Classes and functions related to threading.""" |
| 6 | 6 |
| 7 import functools | 7 import functools |
| 8 import inspect | 8 import inspect |
| 9 import logging | 9 import logging |
| 10 import os | 10 import os |
| (...skipping 354 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 365 raise | 365 raise |
| 366 channel.send_exception(e) | 366 channel.send_exception(e) |
| 367 except Exception as e: | 367 except Exception as e: |
| 368 if channel is None: | 368 if channel is None: |
| 369 raise | 369 raise |
| 370 channel.send_exception(e) | 370 channel.send_exception(e) |
| 371 | 371 |
| 372 | 372 |
| 373 class Progress(object): | 373 class Progress(object): |
| 374 """Prints progress and accepts updates thread-safely.""" | 374 """Prints progress and accepts updates thread-safely.""" |
| 375 def __init__(self, size): | 375 def __init__(self, columns): |
| 376 # To be used in the primary thread | 376 """Creates a Progress bar that will updates asynchronously from the worker |
| 377 self.last_printed_line = '' | 377 threads. |
| 378 self.index = 0 | 378 |
| 379 self.start = time.time() | 379 Arguments: |
| 380 self.size = size | 380 columns: list of tuple(name, initialvalue), defines both the number of |
| 381 # Setting it to True forces a print on the first print_update() call. | 381 columns and their initial values. |
| 382 self.value_changed = True | 382 """ |
| 383 assert all( | |
| 384 len(c) == 2 and isinstance(c[0], str) and isinstance(c[1], int) | |
| 385 for c in columns), columns | |
| 386 # Members to be used exclusively in the primary thread. | |
| 383 self.use_cr_only = True | 387 self.use_cr_only = True |
| 384 self.unfinished_commands = set() | 388 self.unfinished_commands = set() |
| 389 self.start = time.time() | |
| 390 self._last_printed_line = '' | |
| 391 self._columns = [c[1] for c in columns] | |
| 392 self._columns_lookup = dict((c[0], i) for i, c in enumerate(columns)) | |
| 393 # Setting it to True forces a print on the first print_update() call. | |
| 394 self._value_changed = True | |
| 385 | 395 |
| 386 # To be used in all threads. | 396 # To be used in all threads. |
| 387 self.queued_lines = Queue.Queue() | 397 self._queued_updates = Queue.Queue() |
| 388 | 398 |
| 389 def update_item(self, name, index=0, size=0, raw=False): | 399 def update_item(self, name, raw=False, **kwargs): |
| 390 """Queue information to print out. | 400 """Queue information to print out. |
|
csharp
2013/10/03 16:08:04
Isn't this updating as well as queuing?
Marc-Antoine Ruel (Google)
2013/10/03 16:47:11
No, the function is illnamed. Note that this code
| |
| 391 | 401 |
| 392 Arguments: | 402 Arguments: |
| 393 index: increment to add to index. usually 0 or 1. | |
| 394 size: increment to add to size. usually 0 or 1. | |
| 395 raw: if True, prints the data without the header. | 403 raw: if True, prints the data without the header. |
| 404 <name>: argument name is a name of a column. it's value is the increment | |
|
csharp
2013/10/03 16:08:04
I'm a bit confused by this comment, it seems like
Marc-Antoine Ruel (Google)
2013/10/03 16:47:11
No, only kwargs. I had forgot to document 'name',
| |
| 405 to the column, value is usually 0 or 1. | |
| 396 """ | 406 """ |
| 397 assert isinstance(name, str) | 407 assert isinstance(name, str) |
| 398 assert isinstance(index, int) | |
| 399 assert isinstance(size, int) | |
| 400 assert isinstance(raw, bool) | 408 assert isinstance(raw, bool) |
| 401 self.queued_lines.put((name, index, size, raw)) | 409 assert all(isinstance(v, int) for v in kwargs.itervalues()) |
| 410 args = [(self._columns_lookup[k], v) for k, v in kwargs.iteritems() if v] | |
| 411 self._queued_updates.put((name, raw, args)) | |
| 402 | 412 |
| 403 def print_update(self): | 413 def print_update(self): |
| 404 """Prints the current status.""" | 414 """Prints the current status.""" |
| 405 # Flush all the logging output so it doesn't appear within this output. | 415 # Flush all the logging output so it doesn't appear within this output. |
| 406 for handler in logging.root.handlers: | 416 for handler in logging.root.handlers: |
| 407 handler.flush() | 417 handler.flush() |
| 408 | 418 |
| 409 got_one = False | 419 got_one = False |
| 410 while True: | 420 while True: |
| 411 try: | 421 try: |
| 412 name, index, size, raw = self.queued_lines.get_nowait() | 422 name, raw, args = self._queued_updates.get_nowait() |
| 413 except Queue.Empty: | 423 except Queue.Empty: |
| 414 break | 424 break |
| 415 | 425 |
| 416 self.size += size | 426 for k, v in args: |
| 417 self.index += index | 427 self._columns[k] += v |
| 418 self.value_changed = bool(size or index) | 428 self._value_changed = bool(args) |
| 419 if not name: | 429 if not name: |
| 420 # Even if raw=True, there's nothing to print. | 430 # Even if raw=True, there's nothing to print. |
| 421 continue | 431 continue |
| 422 | 432 |
| 423 got_one = True | 433 got_one = True |
| 424 if raw: | 434 if raw: |
| 425 # Prints the data as-is. | 435 # Prints the data as-is. |
| 426 self.last_printed_line = '' | 436 self._last_printed_line = '' |
| 427 sys.stdout.write('\n%s\n' % name.strip('\n')) | 437 sys.stdout.write('\n%s\n' % name.strip('\n')) |
| 428 else: | 438 else: |
| 429 line, self.last_printed_line = self.gen_line(name) | 439 line, self._last_printed_line = self._gen_line(name) |
| 430 sys.stdout.write(line) | 440 sys.stdout.write(line) |
| 431 | 441 |
| 432 if not got_one and self.value_changed: | 442 if not got_one and self._value_changed: |
| 433 # Make sure a line is printed in that case where statistics changes. | 443 # Make sure a line is printed in that case where statistics changes. |
| 434 line, self.last_printed_line = self.gen_line('') | 444 line, self._last_printed_line = self._gen_line('') |
| 435 sys.stdout.write(line) | 445 sys.stdout.write(line) |
| 436 got_one = True | 446 got_one = True |
| 437 self.value_changed = False | 447 self._value_changed = False |
| 438 if got_one: | 448 if got_one: |
| 439 # Ensure that all the output is flushed to prevent it from getting mixed | 449 # Ensure that all the output is flushed to prevent it from getting mixed |
| 440 # with other output streams (like the logging streams). | 450 # with other output streams (like the logging streams). |
| 441 sys.stdout.flush() | 451 sys.stdout.flush() |
| 442 | 452 |
| 443 if self.unfinished_commands: | 453 if self.unfinished_commands: |
| 444 logging.debug('Waiting for the following commands to finish:\n%s', | 454 logging.debug('Waiting for the following commands to finish:\n%s', |
| 445 '\n'.join(self.unfinished_commands)) | 455 '\n'.join(self.unfinished_commands)) |
| 446 | 456 |
| 447 def gen_line(self, name): | 457 def _gen_line(self, name): |
| 448 """Generates the line to be printed.""" | 458 """Generates the line to be printed.""" |
| 449 next_line = ('[%*d/%d] %6.2fs %s') % ( | 459 next_line = ('[%s] %6.2fs %s') % ( |
| 450 len(str(self.size)), self.index, | 460 self._render_columns(), time.time() - self.start, name) |
| 451 self.size, | |
| 452 time.time() - self.start, | |
| 453 name) | |
| 454 # Fill it with whitespace only if self.use_cr_only is set. | 461 # Fill it with whitespace only if self.use_cr_only is set. |
| 455 prefix = '' | 462 prefix = '' |
| 456 if self.use_cr_only and self.last_printed_line: | 463 if self.use_cr_only and self._last_printed_line: |
| 457 prefix = '\r' | 464 prefix = '\r' |
| 458 if self.use_cr_only: | 465 if self.use_cr_only: |
| 459 suffix = ' ' * max(0, len(self.last_printed_line) - len(next_line)) | 466 suffix = ' ' * max(0, len(self._last_printed_line) - len(next_line)) |
| 460 else: | 467 else: |
| 461 suffix = '\n' | 468 suffix = '\n' |
| 462 return '%s%s%s' % (prefix, next_line, suffix), next_line | 469 return '%s%s%s' % (prefix, next_line, suffix), next_line |
| 463 | 470 |
| 471 def _render_columns(self): | |
| 472 """Renders the columns.""" | |
| 473 columns_as_str = map(str, self._columns) | |
| 474 max_len = max(map(len, columns_as_str)) | |
| 475 return '/'.join(i.rjust(max_len) for i in columns_as_str) | |
| 476 | |
| 464 | 477 |
| 465 class QueueWithProgress(Queue.PriorityQueue): | 478 class QueueWithProgress(Queue.PriorityQueue): |
| 466 """Implements progress support in join().""" | 479 """Implements progress support in join().""" |
| 467 def __init__(self, maxsize, *args, **kwargs): | 480 def __init__(self, progress, *args, **kwargs): |
| 468 Queue.PriorityQueue.__init__(self, *args, **kwargs) | 481 Queue.PriorityQueue.__init__(self, *args, **kwargs) |
| 469 self.progress = Progress(maxsize) | |
| 470 | |
| 471 def set_progress(self, progress): | |
| 472 """Replace the current progress, mainly used when a progress should be | |
| 473 shared between queues.""" | |
| 474 self.progress = progress | 482 self.progress = progress |
| 475 | 483 |
| 476 def task_done(self): | 484 def task_done(self): |
| 477 """Contrary to Queue.task_done(), it wakes self.all_tasks_done at each task | 485 """Contrary to Queue.task_done(), it wakes self.all_tasks_done at each task |
| 478 done. | 486 done. |
| 479 """ | 487 """ |
| 480 with self.all_tasks_done: | 488 with self.all_tasks_done: |
| 481 try: | 489 try: |
| 482 unfinished = self.unfinished_tasks - 1 | 490 unfinished = self.unfinished_tasks - 1 |
| 483 if unfinished < 0: | 491 if unfinished < 0: |
| (...skipping 25 matching lines...) Expand all Loading... | |
| 509 # TODO(maruel): Find a way so Progress.queue and self.all_tasks_done | 517 # TODO(maruel): Find a way so Progress.queue and self.all_tasks_done |
| 510 # share the same underlying event so no polling is necessary. | 518 # share the same underlying event so no polling is necessary. |
| 511 self.all_tasks_done.wait(0.1) | 519 self.all_tasks_done.wait(0.1) |
| 512 self.progress.print_update() | 520 self.progress.print_update() |
| 513 | 521 |
| 514 | 522 |
| 515 class ThreadPoolWithProgress(ThreadPool): | 523 class ThreadPoolWithProgress(ThreadPool): |
| 516 QUEUE_CLASS = QueueWithProgress | 524 QUEUE_CLASS = QueueWithProgress |
| 517 | 525 |
| 518 def __init__(self, progress, *args, **kwargs): | 526 def __init__(self, progress, *args, **kwargs): |
| 527 self.QUEUE_CLASS = functools.partial(self.QUEUE_CLASS, progress) | |
| 519 super(ThreadPoolWithProgress, self).__init__(*args, **kwargs) | 528 super(ThreadPoolWithProgress, self).__init__(*args, **kwargs) |
| 520 self.tasks.set_progress(progress) | |
| 521 | 529 |
| 522 def _output_append(self, out): | 530 def _output_append(self, out): |
| 523 """Also wakes up the listener on new completed test_case.""" | 531 """Also wakes up the listener on new completed test_case.""" |
| 524 super(ThreadPoolWithProgress, self)._output_append(out) | 532 super(ThreadPoolWithProgress, self)._output_append(out) |
| 525 self.tasks.wake_up() | 533 self.tasks.wake_up() |
| 526 | 534 |
| 527 | 535 |
| 528 class DeadlockDetector(object): | 536 class DeadlockDetector(object): |
| 529 """Context manager that can detect deadlocks. | 537 """Context manager that can detect deadlocks. |
| 530 | 538 |
| (...skipping 174 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 705 # Multiprocessing | 713 # Multiprocessing |
| 706 import multiprocessing | 714 import multiprocessing |
| 707 return multiprocessing.cpu_count() | 715 return multiprocessing.cpu_count() |
| 708 except: # pylint: disable=W0702 | 716 except: # pylint: disable=W0702 |
| 709 try: | 717 try: |
| 710 # Mac OS 10.6 | 718 # Mac OS 10.6 |
| 711 return int(os.sysconf('SC_NPROCESSORS_ONLN')) # pylint: disable=E1101 | 719 return int(os.sysconf('SC_NPROCESSORS_ONLN')) # pylint: disable=E1101 |
| 712 except: | 720 except: |
| 713 # Some of the windows builders seem to get here. | 721 # Some of the windows builders seem to get here. |
| 714 return 4 | 722 return 4 |
| OLD | NEW |