OLD | NEW |
---|---|
1 # Copyright 2013 The Chromium Authors. All rights reserved. | 1 # Copyright 2013 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 """Classes and functions related to threading.""" | 5 """Classes and functions related to threading.""" |
6 | 6 |
7 import functools | 7 import functools |
8 import inspect | 8 import inspect |
9 import logging | 9 import logging |
10 import os | 10 import os |
(...skipping 247 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
258 self._is_closed = True | 258 self._is_closed = True |
259 for _ in range(len(self._workers)): | 259 for _ in range(len(self._workers)): |
260 # Enqueueing None causes the worker to stop. | 260 # Enqueueing None causes the worker to stop. |
261 self.tasks.put(None) | 261 self.tasks.put(None) |
262 for t in self._workers: | 262 for t in self._workers: |
263 t.join() | 263 t.join() |
264 logging.debug( | 264 logging.debug( |
265 'Thread pool \'%s\' closed: spawned %d threads total', | 265 'Thread pool \'%s\' closed: spawned %d threads total', |
266 self._prefix, len(self._workers)) | 266 self._prefix, len(self._workers)) |
267 | 267 |
268 def abort(self): | |
269 """Empties the queue. | |
270 | |
271 To be used when the pool should stop early, like when Ctrl-C was detected. | |
272 | |
273 Returns: | |
274 Number of tasks cancelled. | |
275 """ | |
276 index = 0 | |
277 while True: | |
278 try: | |
279 self.tasks.get_nowait() | |
280 self.tasks.task_done() | |
281 index += 1 | |
282 except Queue.Empty: | |
283 return index | |
284 | |
268 def __enter__(self): | 285 def __enter__(self): |
269 """Enables 'with' statement.""" | 286 """Enables 'with' statement.""" |
270 return self | 287 return self |
271 | 288 |
272 def __exit__(self, _exc_type, _exc_value, _traceback): | 289 def __exit__(self, _exc_type, _exc_value, _traceback): |
273 """Enables 'with' statement.""" | 290 """Enables 'with' statement.""" |
274 self.close() | 291 self.close() |
275 | 292 |
276 | 293 |
277 class AutoRetryThreadPool(ThreadPool): | 294 class AutoRetryThreadPool(ThreadPool): |
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
362 self.start = time.time() | 379 self.start = time.time() |
363 self.size = size | 380 self.size = size |
364 # Setting it to True forces a print on the first print_update() call. | 381 # Setting it to True forces a print on the first print_update() call. |
365 self.value_changed = True | 382 self.value_changed = True |
366 self.use_cr_only = True | 383 self.use_cr_only = True |
367 self.unfinished_commands = set() | 384 self.unfinished_commands = set() |
368 | 385 |
369 # To be used in all threads. | 386 # To be used in all threads. |
370 self.queued_lines = Queue.Queue() | 387 self.queued_lines = Queue.Queue() |
371 | 388 |
372 def update_item(self, name, index=False, size=False, raw=False): | 389 def update_item(self, name, index=0, size=0, raw=False): |
Marc-Antoine Ruel (Google)
2013/10/02 14:08:31
Eventually I want it to support an arbitrary numbe
| |
373 """Queue information to print out. | 390 """Queue information to print out. |
374 | 391 |
375 Arguments: | 392 Arguments: |
376 index: index should be incremented. | 393 index: increment to add to index. usually 0 or 1. |
377 size: total size should be incremented. | 394 size: increment to add to size. usually 0 or 1. |
378 raw: if True, prints the data without the header. | 395 raw: if True, prints the data without the header. |
379 """ | 396 """ |
397 assert isinstance(name, str) | |
398 assert isinstance(index, int) | |
399 assert isinstance(size, int) | |
400 assert isinstance(raw, bool) | |
380 self.queued_lines.put((name, index, size, raw)) | 401 self.queued_lines.put((name, index, size, raw)) |
381 | 402 |
382 def print_update(self): | 403 def print_update(self): |
383 """Prints the current status.""" | 404 """Prints the current status.""" |
384 # Flush all the logging output so it doesn't appear within this output. | 405 # Flush all the logging output so it doesn't appear within this output. |
385 for handler in logging.root.handlers: | 406 for handler in logging.root.handlers: |
386 handler.flush() | 407 handler.flush() |
387 | 408 |
388 got_one = False | 409 got_one = False |
389 while True: | 410 while True: |
390 try: | 411 try: |
391 name, index, size, raw = self.queued_lines.get_nowait() | 412 name, index, size, raw = self.queued_lines.get_nowait() |
392 except Queue.Empty: | 413 except Queue.Empty: |
393 break | 414 break |
394 | 415 |
395 if size: | 416 self.size += size |
396 self.size += 1 | 417 self.index += index |
397 self.value_changed = True | 418 self.value_changed = bool(size or index) |
398 if index: | |
399 self.index += 1 | |
400 self.value_changed = True | |
401 if not name: | 419 if not name: |
402 # Even if raw=True, there's nothing to print. | 420 # Even if raw=True, there's nothing to print. |
403 continue | 421 continue |
404 | 422 |
405 got_one = True | 423 got_one = True |
406 if raw: | 424 if raw: |
407 # Prints the data as-is. | 425 # Prints the data as-is. |
408 self.last_printed_line = '' | 426 self.last_printed_line = '' |
409 sys.stdout.write('\n%s\n' % name.strip('\n')) | 427 sys.stdout.write('\n%s\n' % name.strip('\n')) |
410 else: | 428 else: |
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
480 # between the number of output and the number of input task. | 498 # between the number of output and the number of input task. |
481 with self.all_tasks_done: | 499 with self.all_tasks_done: |
482 self.all_tasks_done.notify_all() | 500 self.all_tasks_done.notify_all() |
483 | 501 |
484 def join(self): | 502 def join(self): |
485 """Calls print_update() whenever possible.""" | 503 """Calls print_update() whenever possible.""" |
486 self.progress.print_update() | 504 self.progress.print_update() |
487 with self.all_tasks_done: | 505 with self.all_tasks_done: |
488 while self.unfinished_tasks: | 506 while self.unfinished_tasks: |
489 self.progress.print_update() | 507 self.progress.print_update() |
490 self.all_tasks_done.wait(60.) | 508 # Use a short wait timeout so updates are printed in a timely manner. |
509 self.all_tasks_done.wait(0.1) | |
Vadim Sh.
2013/10/02 18:19:48
I'm unhappy about polling. I think print_update ca
Marc-Antoine Ruel (Google)
2013/10/02 19:41:26
I agree it's far from awesome but I prefer the Pro
| |
491 self.progress.print_update() | 510 self.progress.print_update() |
492 | 511 |
493 | 512 |
494 class ThreadPoolWithProgress(ThreadPool): | 513 class ThreadPoolWithProgress(ThreadPool): |
495 QUEUE_CLASS = QueueWithProgress | 514 QUEUE_CLASS = QueueWithProgress |
496 | 515 |
497 def __init__(self, progress, *args, **kwargs): | 516 def __init__(self, progress, *args, **kwargs): |
498 super(ThreadPoolWithProgress, self).__init__(*args, **kwargs) | 517 super(ThreadPoolWithProgress, self).__init__(*args, **kwargs) |
499 self.tasks.set_progress(progress) | 518 self.tasks.set_progress(progress) |
500 | 519 |
(...skipping 183 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
684 # Multiprocessing | 703 # Multiprocessing |
685 import multiprocessing | 704 import multiprocessing |
686 return multiprocessing.cpu_count() | 705 return multiprocessing.cpu_count() |
687 except: # pylint: disable=W0702 | 706 except: # pylint: disable=W0702 |
688 try: | 707 try: |
689 # Mac OS 10.6 | 708 # Mac OS 10.6 |
690 return int(os.sysconf('SC_NPROCESSORS_ONLN')) # pylint: disable=E1101 | 709 return int(os.sysconf('SC_NPROCESSORS_ONLN')) # pylint: disable=E1101 |
691 except: | 710 except: |
692 # Some of the windows builders seem to get here. | 711 # Some of the windows builders seem to get here. |
693 return 4 | 712 return 4 |
OLD | NEW |