Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(222)

Side by Side Diff: utils/threading_utils.py

Issue 25530003: Rename load_test to isolateserver_load_test, create swarming_load_test. (Closed) Base URL: https://chromium.googlesource.com/a/chromium/tools/swarm_client@2_exception
Patch Set: Address review comments Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2013 The Chromium Authors. All rights reserved. 1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """Classes and functions related to threading.""" 5 """Classes and functions related to threading."""
6 6
7 import functools 7 import functools
8 import inspect 8 import inspect
9 import logging 9 import logging
10 import os 10 import os
(...skipping 354 matching lines...) Expand 10 before | Expand all | Expand 10 after
365 raise 365 raise
366 channel.send_exception(e) 366 channel.send_exception(e)
367 except Exception as e: 367 except Exception as e:
368 if channel is None: 368 if channel is None:
369 raise 369 raise
370 channel.send_exception(e) 370 channel.send_exception(e)
371 371
372 372
373 class Progress(object): 373 class Progress(object):
374 """Prints progress and accepts updates thread-safely.""" 374 """Prints progress and accepts updates thread-safely."""
375 def __init__(self, size): 375 def __init__(self, columns):
376 # To be used in the primary thread 376 """Creates a Progress bar that will updates asynchronously from the worker
377 self.last_printed_line = '' 377 threads.
378 self.index = 0 378
379 self.start = time.time() 379 Arguments:
380 self.size = size 380 columns: list of tuple(name, initialvalue), defines both the number of
381 # Setting it to True forces a print on the first print_update() call. 381 columns and their initial values.
382 self.value_changed = True 382 """
383 assert all(
384 len(c) == 2 and isinstance(c[0], str) and isinstance(c[1], int)
385 for c in columns), columns
386 # Members to be used exclusively in the primary thread.
383 self.use_cr_only = True 387 self.use_cr_only = True
384 self.unfinished_commands = set() 388 self.unfinished_commands = set()
389 self.start = time.time()
390 self._last_printed_line = ''
391 self._columns = [c[1] for c in columns]
392 self._columns_lookup = dict((c[0], i) for i, c in enumerate(columns))
393 # Setting it to True forces a print on the first print_update() call.
394 self._value_changed = True
385 395
386 # To be used in all threads. 396 # To be used in all threads.
387 self.queued_lines = Queue.Queue() 397 self._queued_updates = Queue.Queue()
388 398
389 def update_item(self, name, index=0, size=0, raw=False): 399 def update_item(self, name, raw=False, **kwargs):
390 """Queue information to print out. 400 """Queue information to print out.
csharp 2013/10/03 16:08:04 Isn't this updating as well as queuing?
Marc-Antoine Ruel (Google) 2013/10/03 16:47:11 No, the function is illnamed. Note that this code
391 401
392 Arguments: 402 Arguments:
393 index: increment to add to index. usually 0 or 1.
394 size: increment to add to size. usually 0 or 1.
395 raw: if True, prints the data without the header. 403 raw: if True, prints the data without the header.
404 <name>: argument name is a name of a column. it's value is the increment
csharp 2013/10/03 16:08:04 I'm a bit confused by this comment, it seems like
Marc-Antoine Ruel (Google) 2013/10/03 16:47:11 No, only kwargs. I had forgot to document 'name',
405 to the column, value is usually 0 or 1.
396 """ 406 """
397 assert isinstance(name, str) 407 assert isinstance(name, str)
398 assert isinstance(index, int)
399 assert isinstance(size, int)
400 assert isinstance(raw, bool) 408 assert isinstance(raw, bool)
401 self.queued_lines.put((name, index, size, raw)) 409 assert all(isinstance(v, int) for v in kwargs.itervalues())
410 args = [(self._columns_lookup[k], v) for k, v in kwargs.iteritems() if v]
411 self._queued_updates.put((name, raw, args))
402 412
403 def print_update(self): 413 def print_update(self):
404 """Prints the current status.""" 414 """Prints the current status."""
405 # Flush all the logging output so it doesn't appear within this output. 415 # Flush all the logging output so it doesn't appear within this output.
406 for handler in logging.root.handlers: 416 for handler in logging.root.handlers:
407 handler.flush() 417 handler.flush()
408 418
409 got_one = False 419 got_one = False
410 while True: 420 while True:
411 try: 421 try:
412 name, index, size, raw = self.queued_lines.get_nowait() 422 name, raw, args = self._queued_updates.get_nowait()
413 except Queue.Empty: 423 except Queue.Empty:
414 break 424 break
415 425
416 self.size += size 426 for k, v in args:
417 self.index += index 427 self._columns[k] += v
418 self.value_changed = bool(size or index) 428 self._value_changed = bool(args)
419 if not name: 429 if not name:
420 # Even if raw=True, there's nothing to print. 430 # Even if raw=True, there's nothing to print.
421 continue 431 continue
422 432
423 got_one = True 433 got_one = True
424 if raw: 434 if raw:
425 # Prints the data as-is. 435 # Prints the data as-is.
426 self.last_printed_line = '' 436 self._last_printed_line = ''
427 sys.stdout.write('\n%s\n' % name.strip('\n')) 437 sys.stdout.write('\n%s\n' % name.strip('\n'))
428 else: 438 else:
429 line, self.last_printed_line = self.gen_line(name) 439 line, self._last_printed_line = self._gen_line(name)
430 sys.stdout.write(line) 440 sys.stdout.write(line)
431 441
432 if not got_one and self.value_changed: 442 if not got_one and self._value_changed:
433 # Make sure a line is printed in that case where statistics changes. 443 # Make sure a line is printed in that case where statistics changes.
434 line, self.last_printed_line = self.gen_line('') 444 line, self._last_printed_line = self._gen_line('')
435 sys.stdout.write(line) 445 sys.stdout.write(line)
436 got_one = True 446 got_one = True
437 self.value_changed = False 447 self._value_changed = False
438 if got_one: 448 if got_one:
439 # Ensure that all the output is flushed to prevent it from getting mixed 449 # Ensure that all the output is flushed to prevent it from getting mixed
440 # with other output streams (like the logging streams). 450 # with other output streams (like the logging streams).
441 sys.stdout.flush() 451 sys.stdout.flush()
442 452
443 if self.unfinished_commands: 453 if self.unfinished_commands:
444 logging.debug('Waiting for the following commands to finish:\n%s', 454 logging.debug('Waiting for the following commands to finish:\n%s',
445 '\n'.join(self.unfinished_commands)) 455 '\n'.join(self.unfinished_commands))
446 456
447 def gen_line(self, name): 457 def _gen_line(self, name):
448 """Generates the line to be printed.""" 458 """Generates the line to be printed."""
449 next_line = ('[%*d/%d] %6.2fs %s') % ( 459 next_line = ('[%s] %6.2fs %s') % (
450 len(str(self.size)), self.index, 460 self._render_columns(), time.time() - self.start, name)
451 self.size,
452 time.time() - self.start,
453 name)
454 # Fill it with whitespace only if self.use_cr_only is set. 461 # Fill it with whitespace only if self.use_cr_only is set.
455 prefix = '' 462 prefix = ''
456 if self.use_cr_only and self.last_printed_line: 463 if self.use_cr_only and self._last_printed_line:
457 prefix = '\r' 464 prefix = '\r'
458 if self.use_cr_only: 465 if self.use_cr_only:
459 suffix = ' ' * max(0, len(self.last_printed_line) - len(next_line)) 466 suffix = ' ' * max(0, len(self._last_printed_line) - len(next_line))
460 else: 467 else:
461 suffix = '\n' 468 suffix = '\n'
462 return '%s%s%s' % (prefix, next_line, suffix), next_line 469 return '%s%s%s' % (prefix, next_line, suffix), next_line
463 470
471 def _render_columns(self):
472 """Renders the columns."""
473 columns_as_str = map(str, self._columns)
474 max_len = max(map(len, columns_as_str))
475 return '/'.join(i.rjust(max_len) for i in columns_as_str)
476
464 477
465 class QueueWithProgress(Queue.PriorityQueue): 478 class QueueWithProgress(Queue.PriorityQueue):
466 """Implements progress support in join().""" 479 """Implements progress support in join()."""
467 def __init__(self, maxsize, *args, **kwargs): 480 def __init__(self, progress, *args, **kwargs):
468 Queue.PriorityQueue.__init__(self, *args, **kwargs) 481 Queue.PriorityQueue.__init__(self, *args, **kwargs)
469 self.progress = Progress(maxsize)
470
471 def set_progress(self, progress):
472 """Replace the current progress, mainly used when a progress should be
473 shared between queues."""
474 self.progress = progress 482 self.progress = progress
475 483
476 def task_done(self): 484 def task_done(self):
477 """Contrary to Queue.task_done(), it wakes self.all_tasks_done at each task 485 """Contrary to Queue.task_done(), it wakes self.all_tasks_done at each task
478 done. 486 done.
479 """ 487 """
480 with self.all_tasks_done: 488 with self.all_tasks_done:
481 try: 489 try:
482 unfinished = self.unfinished_tasks - 1 490 unfinished = self.unfinished_tasks - 1
483 if unfinished < 0: 491 if unfinished < 0:
(...skipping 25 matching lines...) Expand all
509 # TODO(maruel): Find a way so Progress.queue and self.all_tasks_done 517 # TODO(maruel): Find a way so Progress.queue and self.all_tasks_done
510 # share the same underlying event so no polling is necessary. 518 # share the same underlying event so no polling is necessary.
511 self.all_tasks_done.wait(0.1) 519 self.all_tasks_done.wait(0.1)
512 self.progress.print_update() 520 self.progress.print_update()
513 521
514 522
515 class ThreadPoolWithProgress(ThreadPool): 523 class ThreadPoolWithProgress(ThreadPool):
516 QUEUE_CLASS = QueueWithProgress 524 QUEUE_CLASS = QueueWithProgress
517 525
518 def __init__(self, progress, *args, **kwargs): 526 def __init__(self, progress, *args, **kwargs):
527 self.QUEUE_CLASS = functools.partial(self.QUEUE_CLASS, progress)
519 super(ThreadPoolWithProgress, self).__init__(*args, **kwargs) 528 super(ThreadPoolWithProgress, self).__init__(*args, **kwargs)
520 self.tasks.set_progress(progress)
521 529
522 def _output_append(self, out): 530 def _output_append(self, out):
523 """Also wakes up the listener on new completed test_case.""" 531 """Also wakes up the listener on new completed test_case."""
524 super(ThreadPoolWithProgress, self)._output_append(out) 532 super(ThreadPoolWithProgress, self)._output_append(out)
525 self.tasks.wake_up() 533 self.tasks.wake_up()
526 534
527 535
528 class DeadlockDetector(object): 536 class DeadlockDetector(object):
529 """Context manager that can detect deadlocks. 537 """Context manager that can detect deadlocks.
530 538
(...skipping 174 matching lines...) Expand 10 before | Expand all | Expand 10 after
705 # Multiprocessing 713 # Multiprocessing
706 import multiprocessing 714 import multiprocessing
707 return multiprocessing.cpu_count() 715 return multiprocessing.cpu_count()
708 except: # pylint: disable=W0702 716 except: # pylint: disable=W0702
709 try: 717 try:
710 # Mac OS 10.6 718 # Mac OS 10.6
711 return int(os.sysconf('SC_NPROCESSORS_ONLN')) # pylint: disable=E1101 719 return int(os.sysconf('SC_NPROCESSORS_ONLN')) # pylint: disable=E1101
712 except: 720 except:
713 # Some of the windows builders seem to get here. 721 # Some of the windows builders seem to get here.
714 return 4 722 return 4
OLDNEW
« tools/swarming_load_test_client.py ('K') | « tools/swarming_load_test_client.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698