OLD | NEW |
---|---|
1 # Copyright 2013 The Chromium Authors. All rights reserved. | 1 # Copyright 2013 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 """Classes and functions related to threading.""" | 5 """Classes and functions related to threading.""" |
6 | 6 |
7 import functools | 7 import functools |
8 import inspect | 8 import inspect |
9 import logging | 9 import logging |
10 import os | 10 import os |
(...skipping 354 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
365 raise | 365 raise |
366 channel.send_exception(e) | 366 channel.send_exception(e) |
367 except Exception as e: | 367 except Exception as e: |
368 if channel is None: | 368 if channel is None: |
369 raise | 369 raise |
370 channel.send_exception(e) | 370 channel.send_exception(e) |
371 | 371 |
372 | 372 |
373 class Progress(object): | 373 class Progress(object): |
374 """Prints progress and accepts updates thread-safely.""" | 374 """Prints progress and accepts updates thread-safely.""" |
375 def __init__(self, size): | 375 def __init__(self, columns): |
376 # To be used in the primary thread | 376 """Creates a Progress bar that will updates asynchronously from the worker |
377 self.last_printed_line = '' | 377 threads. |
378 self.index = 0 | 378 |
379 self.start = time.time() | 379 Arguments: |
380 self.size = size | 380 columns: list of tuple(name, initialvalue), defines both the number of |
381 # Setting it to True forces a print on the first print_update() call. | 381 columns and their initial values. |
382 self.value_changed = True | 382 """ |
383 assert all( | |
384 len(c) == 2 and isinstance(c[0], str) and isinstance(c[1], int) | |
385 for c in columns), columns | |
386 # Members to be used exclusively in the primary thread. | |
383 self.use_cr_only = True | 387 self.use_cr_only = True |
384 self.unfinished_commands = set() | 388 self.unfinished_commands = set() |
389 self.start = time.time() | |
390 self._last_printed_line = '' | |
391 self._columns = [c[1] for c in columns] | |
392 self._columns_lookup = dict((c[0], i) for i, c in enumerate(columns)) | |
393 # Setting it to True forces a print on the first print_update() call. | |
394 self._value_changed = True | |
385 | 395 |
386 # To be used in all threads. | 396 # To be used in all threads. |
387 self.queued_lines = Queue.Queue() | 397 self._queued_updates = Queue.Queue() |
388 | 398 |
389 def update_item(self, name, index=0, size=0, raw=False): | 399 def update_item(self, name, raw=False, **kwargs): |
390 """Queue information to print out. | 400 """Queue information to print out. |
csharp
2013/10/03 16:08:04
Isn't this updating as well as queuing?
Marc-Antoine Ruel (Google)
2013/10/03 16:47:11
No, the function is illnamed. Note that this code
| |
391 | 401 |
392 Arguments: | 402 Arguments: |
393 index: increment to add to index. usually 0 or 1. | |
394 size: increment to add to size. usually 0 or 1. | |
395 raw: if True, prints the data without the header. | 403 raw: if True, prints the data without the header. |
404 <name>: argument name is a name of a column. it's value is the increment | |
csharp
2013/10/03 16:08:04
I'm a bit confused by this comment, it seems like
Marc-Antoine Ruel (Google)
2013/10/03 16:47:11
No, only kwargs. I had forgot to document 'name',
| |
405 to the column, value is usually 0 or 1. | |
396 """ | 406 """ |
397 assert isinstance(name, str) | 407 assert isinstance(name, str) |
398 assert isinstance(index, int) | |
399 assert isinstance(size, int) | |
400 assert isinstance(raw, bool) | 408 assert isinstance(raw, bool) |
401 self.queued_lines.put((name, index, size, raw)) | 409 assert all(isinstance(v, int) for v in kwargs.itervalues()) |
410 args = [(self._columns_lookup[k], v) for k, v in kwargs.iteritems() if v] | |
411 self._queued_updates.put((name, raw, args)) | |
402 | 412 |
403 def print_update(self): | 413 def print_update(self): |
404 """Prints the current status.""" | 414 """Prints the current status.""" |
405 # Flush all the logging output so it doesn't appear within this output. | 415 # Flush all the logging output so it doesn't appear within this output. |
406 for handler in logging.root.handlers: | 416 for handler in logging.root.handlers: |
407 handler.flush() | 417 handler.flush() |
408 | 418 |
409 got_one = False | 419 got_one = False |
410 while True: | 420 while True: |
411 try: | 421 try: |
412 name, index, size, raw = self.queued_lines.get_nowait() | 422 name, raw, args = self._queued_updates.get_nowait() |
413 except Queue.Empty: | 423 except Queue.Empty: |
414 break | 424 break |
415 | 425 |
416 self.size += size | 426 for k, v in args: |
417 self.index += index | 427 self._columns[k] += v |
418 self.value_changed = bool(size or index) | 428 self._value_changed = bool(args) |
419 if not name: | 429 if not name: |
420 # Even if raw=True, there's nothing to print. | 430 # Even if raw=True, there's nothing to print. |
421 continue | 431 continue |
422 | 432 |
423 got_one = True | 433 got_one = True |
424 if raw: | 434 if raw: |
425 # Prints the data as-is. | 435 # Prints the data as-is. |
426 self.last_printed_line = '' | 436 self._last_printed_line = '' |
427 sys.stdout.write('\n%s\n' % name.strip('\n')) | 437 sys.stdout.write('\n%s\n' % name.strip('\n')) |
428 else: | 438 else: |
429 line, self.last_printed_line = self.gen_line(name) | 439 line, self._last_printed_line = self._gen_line(name) |
430 sys.stdout.write(line) | 440 sys.stdout.write(line) |
431 | 441 |
432 if not got_one and self.value_changed: | 442 if not got_one and self._value_changed: |
433 # Make sure a line is printed in that case where statistics changes. | 443 # Make sure a line is printed in that case where statistics changes. |
434 line, self.last_printed_line = self.gen_line('') | 444 line, self._last_printed_line = self._gen_line('') |
435 sys.stdout.write(line) | 445 sys.stdout.write(line) |
436 got_one = True | 446 got_one = True |
437 self.value_changed = False | 447 self._value_changed = False |
438 if got_one: | 448 if got_one: |
439 # Ensure that all the output is flushed to prevent it from getting mixed | 449 # Ensure that all the output is flushed to prevent it from getting mixed |
440 # with other output streams (like the logging streams). | 450 # with other output streams (like the logging streams). |
441 sys.stdout.flush() | 451 sys.stdout.flush() |
442 | 452 |
443 if self.unfinished_commands: | 453 if self.unfinished_commands: |
444 logging.debug('Waiting for the following commands to finish:\n%s', | 454 logging.debug('Waiting for the following commands to finish:\n%s', |
445 '\n'.join(self.unfinished_commands)) | 455 '\n'.join(self.unfinished_commands)) |
446 | 456 |
447 def gen_line(self, name): | 457 def _gen_line(self, name): |
448 """Generates the line to be printed.""" | 458 """Generates the line to be printed.""" |
449 next_line = ('[%*d/%d] %6.2fs %s') % ( | 459 next_line = ('[%s] %6.2fs %s') % ( |
450 len(str(self.size)), self.index, | 460 self._render_columns(), time.time() - self.start, name) |
451 self.size, | |
452 time.time() - self.start, | |
453 name) | |
454 # Fill it with whitespace only if self.use_cr_only is set. | 461 # Fill it with whitespace only if self.use_cr_only is set. |
455 prefix = '' | 462 prefix = '' |
456 if self.use_cr_only and self.last_printed_line: | 463 if self.use_cr_only and self._last_printed_line: |
457 prefix = '\r' | 464 prefix = '\r' |
458 if self.use_cr_only: | 465 if self.use_cr_only: |
459 suffix = ' ' * max(0, len(self.last_printed_line) - len(next_line)) | 466 suffix = ' ' * max(0, len(self._last_printed_line) - len(next_line)) |
460 else: | 467 else: |
461 suffix = '\n' | 468 suffix = '\n' |
462 return '%s%s%s' % (prefix, next_line, suffix), next_line | 469 return '%s%s%s' % (prefix, next_line, suffix), next_line |
463 | 470 |
471 def _render_columns(self): | |
472 """Renders the columns.""" | |
473 columns_as_str = map(str, self._columns) | |
474 max_len = max(map(len, columns_as_str)) | |
475 return '/'.join(i.rjust(max_len) for i in columns_as_str) | |
476 | |
464 | 477 |
465 class QueueWithProgress(Queue.PriorityQueue): | 478 class QueueWithProgress(Queue.PriorityQueue): |
466 """Implements progress support in join().""" | 479 """Implements progress support in join().""" |
467 def __init__(self, maxsize, *args, **kwargs): | 480 def __init__(self, progress, *args, **kwargs): |
468 Queue.PriorityQueue.__init__(self, *args, **kwargs) | 481 Queue.PriorityQueue.__init__(self, *args, **kwargs) |
469 self.progress = Progress(maxsize) | |
470 | |
471 def set_progress(self, progress): | |
472 """Replace the current progress, mainly used when a progress should be | |
473 shared between queues.""" | |
474 self.progress = progress | 482 self.progress = progress |
475 | 483 |
476 def task_done(self): | 484 def task_done(self): |
477 """Contrary to Queue.task_done(), it wakes self.all_tasks_done at each task | 485 """Contrary to Queue.task_done(), it wakes self.all_tasks_done at each task |
478 done. | 486 done. |
479 """ | 487 """ |
480 with self.all_tasks_done: | 488 with self.all_tasks_done: |
481 try: | 489 try: |
482 unfinished = self.unfinished_tasks - 1 | 490 unfinished = self.unfinished_tasks - 1 |
483 if unfinished < 0: | 491 if unfinished < 0: |
(...skipping 25 matching lines...) Expand all Loading... | |
509 # TODO(maruel): Find a way so Progress.queue and self.all_tasks_done | 517 # TODO(maruel): Find a way so Progress.queue and self.all_tasks_done |
510 # share the same underlying event so no polling is necessary. | 518 # share the same underlying event so no polling is necessary. |
511 self.all_tasks_done.wait(0.1) | 519 self.all_tasks_done.wait(0.1) |
512 self.progress.print_update() | 520 self.progress.print_update() |
513 | 521 |
514 | 522 |
515 class ThreadPoolWithProgress(ThreadPool): | 523 class ThreadPoolWithProgress(ThreadPool): |
516 QUEUE_CLASS = QueueWithProgress | 524 QUEUE_CLASS = QueueWithProgress |
517 | 525 |
518 def __init__(self, progress, *args, **kwargs): | 526 def __init__(self, progress, *args, **kwargs): |
527 self.QUEUE_CLASS = functools.partial(self.QUEUE_CLASS, progress) | |
519 super(ThreadPoolWithProgress, self).__init__(*args, **kwargs) | 528 super(ThreadPoolWithProgress, self).__init__(*args, **kwargs) |
520 self.tasks.set_progress(progress) | |
521 | 529 |
522 def _output_append(self, out): | 530 def _output_append(self, out): |
523 """Also wakes up the listener on new completed test_case.""" | 531 """Also wakes up the listener on new completed test_case.""" |
524 super(ThreadPoolWithProgress, self)._output_append(out) | 532 super(ThreadPoolWithProgress, self)._output_append(out) |
525 self.tasks.wake_up() | 533 self.tasks.wake_up() |
526 | 534 |
527 | 535 |
528 class DeadlockDetector(object): | 536 class DeadlockDetector(object): |
529 """Context manager that can detect deadlocks. | 537 """Context manager that can detect deadlocks. |
530 | 538 |
(...skipping 174 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
705 # Multiprocessing | 713 # Multiprocessing |
706 import multiprocessing | 714 import multiprocessing |
707 return multiprocessing.cpu_count() | 715 return multiprocessing.cpu_count() |
708 except: # pylint: disable=W0702 | 716 except: # pylint: disable=W0702 |
709 try: | 717 try: |
710 # Mac OS 10.6 | 718 # Mac OS 10.6 |
711 return int(os.sysconf('SC_NPROCESSORS_ONLN')) # pylint: disable=E1101 | 719 return int(os.sysconf('SC_NPROCESSORS_ONLN')) # pylint: disable=E1101 |
712 except: | 720 except: |
713 # Some of the windows builders seem to get here. | 721 # Some of the windows builders seem to get here. |
714 return 4 | 722 return 4 |
OLD | NEW |