OLD | NEW |
1 """ | 1 """ |
2 The main job wrapper for the server side. | 2 The main job wrapper for the server side. |
3 | 3 |
4 This is the core infrastructure. Derived from the client side job.py | 4 This is the core infrastructure. Derived from the client side job.py |
5 | 5 |
6 Copyright Martin J. Bligh, Andy Whitcroft 2007 | 6 Copyright Martin J. Bligh, Andy Whitcroft 2007 |
7 """ | 7 """ |
8 | 8 |
9 import getpass, os, sys, re, stat, tempfile, time, select, subprocess, platform | 9 import getpass, os, sys, re, stat, tempfile, time, select, subprocess, platform |
10 import Queue, threading | 10 import multiprocessing |
11 import traceback, shutil, warnings, fcntl, pickle, logging, itertools, errno | 11 import traceback, shutil, warnings, fcntl, pickle, logging, itertools, errno |
12 from autotest_lib.client.bin import sysinfo | 12 from autotest_lib.client.bin import sysinfo |
13 from autotest_lib.client.common_lib import base_job | 13 from autotest_lib.client.common_lib import base_job |
14 from autotest_lib.client.common_lib import error, log, utils, packages | 14 from autotest_lib.client.common_lib import error, log, utils, packages |
15 from autotest_lib.client.common_lib import logging_manager | 15 from autotest_lib.client.common_lib import logging_manager |
16 from autotest_lib.server import test, subcommand, profilers, server_job_utils | 16 from autotest_lib.server import test, subcommand, profilers, server_job_utils |
17 from autotest_lib.server.hosts import abstract_ssh | 17 from autotest_lib.server.hosts import abstract_ssh |
18 from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils | 18 from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils |
19 | 19 |
20 | 20 |
(...skipping 434 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
455 """ | 455 """ |
456 results = self.parallel_simple(function, machines, timeout=timeout, | 456 results = self.parallel_simple(function, machines, timeout=timeout, |
457 return_results=True) | 457 return_results=True) |
458 success_machines = [] | 458 success_machines = [] |
459 for result, machine in itertools.izip(results, machines): | 459 for result, machine in itertools.izip(results, machines): |
460 if not isinstance(result, Exception): | 460 if not isinstance(result, Exception): |
461 success_machines.append(machine) | 461 success_machines.append(machine) |
462 return success_machines | 462 return success_machines |
463 | 463 |
464 | 464 |
465 def distribute_across_machines(self, tests, machines): | 465 def distribute_across_machines(self, tests, machines, |
| 466 continuous_parsing=False): |
466 """Run each test in tests once using machines. | 467 """Run each test in tests once using machines. |
467 | 468 |
468 Instead of running each test on each machine like parallel_on_machines, | 469 Instead of running each test on each machine like parallel_on_machines, |
469 run each test once across all machines. Put another way, the total | 470 run each test once across all machines. Put another way, the total |
470 number of tests run by parallel_on_machines is len(tests) * | 471 number of tests run by parallel_on_machines is len(tests) * |
471 len(machines). The number of tests run by distribute_across_machines is | 472 len(machines). The number of tests run by distribute_across_machines is |
472 len(tests). | 473 len(tests). |
473 | 474 |
474 Args: | 475 Args: |
475 tests: List of tests to run. | 476 tests: List of tests to run. |
476 machines: list of machines to use. | 477 machines: List of machines to use. |
| 478 continuous_parsing: Bool, if true parse job while running. |
477 """ | 479 """ |
478 # The Queue is thread safe, but since a machine may have to search | 480 # The Queue is thread safe, but since a machine may have to search |
479 # through the queue to find a valid test the lock provides exclusive | 481 # through the queue to find a valid test the lock provides exclusive |
480 # queue access for more than just the get call. | 482 # queue access for more than just the get call. |
481 test_queue = Queue.Queue() | 483 test_queue = multiprocessing.JoinableQueue() |
482 test_queue_lock = threading.Lock() | 484 test_queue_lock = multiprocessing.Lock() |
483 | 485 |
484 machine_workers = [server_job_utils.machine_worker(self, | 486 machine_workers = [server_job_utils.machine_worker(self, |
485 machine, | 487 machine, |
486 self.resultdir, | 488 self.resultdir, |
487 test_queue, | 489 test_queue, |
488 test_queue_lock) | 490 test_queue_lock, |
| 491 continuous_parsing) |
489 for machine in machines] | 492 for machine in machines] |
490 | 493 |
491 # To (potentially) speed up searching for valid tests create a list of | 494 # To (potentially) speed up searching for valid tests create a list of |
492 # unique attribute sets present in the machines for this job. If sets | 495 # unique attribute sets present in the machines for this job. If sets |
493 # were hashable we could just use a dictionary for fast verification. | 496 # were hashable we could just use a dictionary for fast verification. |
494 # This at least reduces the search space from the number of machines to | 497 # This at least reduces the search space from the number of machines to |
495 # the number of unique machines. | 498 # the number of unique machines. |
496 unique_machine_attributes = [] | 499 unique_machine_attributes = [] |
497 for mw in machine_workers: | 500 for mw in machine_workers: |
498 if not mw.attribute_set in unique_machine_attributes: | 501 if not mw.attribute_set in unique_machine_attributes: |
(...skipping 708 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1207 intervals = self.disabled_warnings.setdefault(warning_type, []) | 1210 intervals = self.disabled_warnings.setdefault(warning_type, []) |
1208 if not intervals or intervals[-1][1] is not None: | 1211 if not intervals or intervals[-1][1] is not None: |
1209 intervals.append((int(current_time_func()), None)) | 1212 intervals.append((int(current_time_func()), None)) |
1210 | 1213 |
1211 | 1214 |
1212 def enable_warnings(self, warning_type, current_time_func=time.time): | 1215 def enable_warnings(self, warning_type, current_time_func=time.time): |
1213 """As of now, enables all further warnings of this type.""" | 1216 """As of now, enables all further warnings of this type.""" |
1214 intervals = self.disabled_warnings.get(warning_type, []) | 1217 intervals = self.disabled_warnings.get(warning_type, []) |
1215 if intervals and intervals[-1][1] is None: | 1218 if intervals and intervals[-1][1] is None: |
1216 intervals[-1] = (intervals[-1][0], int(current_time_func())) | 1219 intervals[-1] = (intervals[-1][0], int(current_time_func())) |
OLD | NEW |