| OLD | NEW |
| 1 """ | 1 """ |
| 2 The main job wrapper for the server side. | 2 The main job wrapper for the server side. |
| 3 | 3 |
| 4 This is the core infrastructure. Derived from the client side job.py | 4 This is the core infrastructure. Derived from the client side job.py |
| 5 | 5 |
| 6 Copyright Martin J. Bligh, Andy Whitcroft 2007 | 6 Copyright Martin J. Bligh, Andy Whitcroft 2007 |
| 7 """ | 7 """ |
| 8 | 8 |
| 9 import getpass, os, sys, re, stat, tempfile, time, select, subprocess, platform | 9 import getpass, os, sys, re, stat, tempfile, time, select, subprocess, platform |
| 10 import Queue, threading | 10 import multiprocessing |
| 11 import traceback, shutil, warnings, fcntl, pickle, logging, itertools, errno | 11 import traceback, shutil, warnings, fcntl, pickle, logging, itertools, errno |
| 12 from autotest_lib.client.bin import sysinfo | 12 from autotest_lib.client.bin import sysinfo |
| 13 from autotest_lib.client.common_lib import base_job | 13 from autotest_lib.client.common_lib import base_job |
| 14 from autotest_lib.client.common_lib import error, log, utils, packages | 14 from autotest_lib.client.common_lib import error, log, utils, packages |
| 15 from autotest_lib.client.common_lib import logging_manager | 15 from autotest_lib.client.common_lib import logging_manager |
| 16 from autotest_lib.server import test, subcommand, profilers, server_job_utils | 16 from autotest_lib.server import test, subcommand, profilers, server_job_utils |
| 17 from autotest_lib.server.hosts import abstract_ssh | 17 from autotest_lib.server.hosts import abstract_ssh |
| 18 from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils | 18 from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils |
| 19 | 19 |
| 20 | 20 |
| (...skipping 434 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 455 """ | 455 """ |
| 456 results = self.parallel_simple(function, machines, timeout=timeout, | 456 results = self.parallel_simple(function, machines, timeout=timeout, |
| 457 return_results=True) | 457 return_results=True) |
| 458 success_machines = [] | 458 success_machines = [] |
| 459 for result, machine in itertools.izip(results, machines): | 459 for result, machine in itertools.izip(results, machines): |
| 460 if not isinstance(result, Exception): | 460 if not isinstance(result, Exception): |
| 461 success_machines.append(machine) | 461 success_machines.append(machine) |
| 462 return success_machines | 462 return success_machines |
| 463 | 463 |
| 464 | 464 |
| 465 def distribute_across_machines(self, tests, machines): | 465 def distribute_across_machines(self, tests, machines, |
| 466 continuous_parsing=False): |
| 466 """Run each test in tests once using machines. | 467 """Run each test in tests once using machines. |
| 467 | 468 |
| 468 Instead of running each test on each machine like parallel_on_machines, | 469 Instead of running each test on each machine like parallel_on_machines, |
| 469 run each test once across all machines. Put another way, the total | 470 run each test once across all machines. Put another way, the total |
| 470 number of tests run by parallel_on_machines is len(tests) * | 471 number of tests run by parallel_on_machines is len(tests) * |
| 471 len(machines). The number of tests run by distribute_across_machines is | 472 len(machines). The number of tests run by distribute_across_machines is |
| 472 len(tests). | 473 len(tests). |
| 473 | 474 |
| 474 Args: | 475 Args: |
| 475 tests: List of tests to run. | 476 tests: List of tests to run. |
| 476 machines: list of machines to use. | 477 machines: List of machines to use. |
| 478 continuous_parsing: Bool, if true parse job while running. |
| 477 """ | 479 """ |
| 478 # The Queue is thread safe, but since a machine may have to search | 480 # The Queue is thread safe, but since a machine may have to search |
| 479 # through the queue to find a valid test the lock provides exclusive | 481 # through the queue to find a valid test the lock provides exclusive |
| 480 # queue access for more than just the get call. | 482 # queue access for more than just the get call. |
| 481 test_queue = Queue.Queue() | 483 test_queue = multiprocessing.JoinableQueue() |
| 482 test_queue_lock = threading.Lock() | 484 test_queue_lock = multiprocessing.Lock() |
| 483 | 485 |
| 484 machine_workers = [server_job_utils.machine_worker(self, | 486 machine_workers = [server_job_utils.machine_worker(self, |
| 485 machine, | 487 machine, |
| 486 self.resultdir, | 488 self.resultdir, |
| 487 test_queue, | 489 test_queue, |
| 488 test_queue_lock) | 490 test_queue_lock, |
| 491 continuous_parsing) |
| 489 for machine in machines] | 492 for machine in machines] |
| 490 | 493 |
| 491 # To (potentially) speed up searching for valid tests create a list of | 494 # To (potentially) speed up searching for valid tests create a list of |
| 492 # unique attribute sets present in the machines for this job. If sets | 495 # unique attribute sets present in the machines for this job. If sets |
| 493 # were hashable we could just use a dictionary for fast verification. | 496 # were hashable we could just use a dictionary for fast verification. |
| 494 # This at least reduces the search space from the number of machines to | 497 # This at least reduces the search space from the number of machines to |
| 495 # the number of unique machines. | 498 # the number of unique machines. |
| 496 unique_machine_attributes = [] | 499 unique_machine_attributes = [] |
| 497 for mw in machine_workers: | 500 for mw in machine_workers: |
| 498 if not mw.attribute_set in unique_machine_attributes: | 501 if not mw.attribute_set in unique_machine_attributes: |
| (...skipping 708 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1207 intervals = self.disabled_warnings.setdefault(warning_type, []) | 1210 intervals = self.disabled_warnings.setdefault(warning_type, []) |
| 1208 if not intervals or intervals[-1][1] is not None: | 1211 if not intervals or intervals[-1][1] is not None: |
| 1209 intervals.append((int(current_time_func()), None)) | 1212 intervals.append((int(current_time_func()), None)) |
| 1210 | 1213 |
| 1211 | 1214 |
| 1212 def enable_warnings(self, warning_type, current_time_func=time.time): | 1215 def enable_warnings(self, warning_type, current_time_func=time.time): |
| 1213 """As of now, enables all further warnings of this type.""" | 1216 """As of now, enables all further warnings of this type.""" |
| 1214 intervals = self.disabled_warnings.get(warning_type, []) | 1217 intervals = self.disabled_warnings.get(warning_type, []) |
| 1215 if intervals and intervals[-1][1] is None: | 1218 if intervals and intervals[-1][1] is None: |
| 1216 intervals[-1] = (intervals[-1][0], int(current_time_func())) | 1219 intervals[-1] = (intervals[-1][0], int(current_time_func())) |
| OLD | NEW |