Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(152)

Side by Side Diff: server/server_job.py

Issue 6902033: Reduce load on autotest by cutting number of forks and parsers. (Closed) Base URL: ssh://gitrw.chromium.org:9222/autotest.git@master
Patch Set: Created 9 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | server/server_job_utils.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 """ 1 """
2 The main job wrapper for the server side. 2 The main job wrapper for the server side.
3 3
4 This is the core infrastructure. Derived from the client side job.py 4 This is the core infrastructure. Derived from the client side job.py
5 5
6 Copyright Martin J. Bligh, Andy Whitcroft 2007 6 Copyright Martin J. Bligh, Andy Whitcroft 2007
7 """ 7 """
8 8
9 import getpass, os, sys, re, stat, tempfile, time, select, subprocess, platform 9 import getpass, os, sys, re, stat, tempfile, time, select, subprocess, platform
10 import Queue, threading 10 import multiprocessing
11 import traceback, shutil, warnings, fcntl, pickle, logging, itertools, errno 11 import traceback, shutil, warnings, fcntl, pickle, logging, itertools, errno
12 from autotest_lib.client.bin import sysinfo 12 from autotest_lib.client.bin import sysinfo
13 from autotest_lib.client.common_lib import base_job 13 from autotest_lib.client.common_lib import base_job
14 from autotest_lib.client.common_lib import error, log, utils, packages 14 from autotest_lib.client.common_lib import error, log, utils, packages
15 from autotest_lib.client.common_lib import logging_manager 15 from autotest_lib.client.common_lib import logging_manager
16 from autotest_lib.server import test, subcommand, profilers, server_job_utils 16 from autotest_lib.server import test, subcommand, profilers, server_job_utils
17 from autotest_lib.server.hosts import abstract_ssh 17 from autotest_lib.server.hosts import abstract_ssh
18 from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils 18 from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
19 19
20 20
(...skipping 434 matching lines...) Expand 10 before | Expand all | Expand 10 after
455 """ 455 """
456 results = self.parallel_simple(function, machines, timeout=timeout, 456 results = self.parallel_simple(function, machines, timeout=timeout,
457 return_results=True) 457 return_results=True)
458 success_machines = [] 458 success_machines = []
459 for result, machine in itertools.izip(results, machines): 459 for result, machine in itertools.izip(results, machines):
460 if not isinstance(result, Exception): 460 if not isinstance(result, Exception):
461 success_machines.append(machine) 461 success_machines.append(machine)
462 return success_machines 462 return success_machines
463 463
464 464
465 def distribute_across_machines(self, tests, machines): 465 def distribute_across_machines(self, tests, machines,
466 continuous_parsing=False):
466 """Run each test in tests once using machines. 467 """Run each test in tests once using machines.
467 468
468 Instead of running each test on each machine like parallel_on_machines, 469 Instead of running each test on each machine like parallel_on_machines,
469 run each test once across all machines. Put another way, the total 470 run each test once across all machines. Put another way, the total
470 number of tests run by parallel_on_machines is len(tests) * 471 number of tests run by parallel_on_machines is len(tests) *
471 len(machines). The number of tests run by distribute_across_machines is 472 len(machines). The number of tests run by distribute_across_machines is
472 len(tests). 473 len(tests).
473 474
474 Args: 475 Args:
475 tests: List of tests to run. 476 tests: List of tests to run.
476 machines: list of machines to use. 477 machines: List of machines to use.
478 continuous_parsing: Bool, if true parse job while running.
477 """ 479 """
478 # The Queue is thread safe, but since a machine may have to search 480 # The Queue is thread safe, but since a machine may have to search
479 # through the queue to find a valid test the lock provides exclusive 481 # through the queue to find a valid test the lock provides exclusive
480 # queue access for more than just the get call. 482 # queue access for more than just the get call.
481 test_queue = Queue.Queue() 483 test_queue = multiprocessing.JoinableQueue()
482 test_queue_lock = threading.Lock() 484 test_queue_lock = multiprocessing.Lock()
483 485
484 machine_workers = [server_job_utils.machine_worker(self, 486 machine_workers = [server_job_utils.machine_worker(self,
485 machine, 487 machine,
486 self.resultdir, 488 self.resultdir,
487 test_queue, 489 test_queue,
488 test_queue_lock) 490 test_queue_lock,
491 continuous_parsing)
489 for machine in machines] 492 for machine in machines]
490 493
491 # To (potentially) speed up searching for valid tests create a list of 494 # To (potentially) speed up searching for valid tests create a list of
492 # unique attribute sets present in the machines for this job. If sets 495 # unique attribute sets present in the machines for this job. If sets
493 # were hashable we could just use a dictionary for fast verification. 496 # were hashable we could just use a dictionary for fast verification.
494 # This at least reduces the search space from the number of machines to 497 # This at least reduces the search space from the number of machines to
495 # the number of unique machines. 498 # the number of unique machines.
496 unique_machine_attributes = [] 499 unique_machine_attributes = []
497 for mw in machine_workers: 500 for mw in machine_workers:
498 if not mw.attribute_set in unique_machine_attributes: 501 if not mw.attribute_set in unique_machine_attributes:
(...skipping 708 matching lines...) Expand 10 before | Expand all | Expand 10 after
1207 intervals = self.disabled_warnings.setdefault(warning_type, []) 1210 intervals = self.disabled_warnings.setdefault(warning_type, [])
1208 if not intervals or intervals[-1][1] is not None: 1211 if not intervals or intervals[-1][1] is not None:
1209 intervals.append((int(current_time_func()), None)) 1212 intervals.append((int(current_time_func()), None))
1210 1213
1211 1214
1212 def enable_warnings(self, warning_type, current_time_func=time.time): 1215 def enable_warnings(self, warning_type, current_time_func=time.time):
1213 """As of now, enables all further warnings of this type.""" 1216 """As of now, enables all further warnings of this type."""
1214 intervals = self.disabled_warnings.get(warning_type, []) 1217 intervals = self.disabled_warnings.get(warning_type, [])
1215 if intervals and intervals[-1][1] is None: 1218 if intervals and intervals[-1][1] is None:
1216 intervals[-1] = (intervals[-1][0], int(current_time_func())) 1219 intervals[-1] = (intervals[-1][0], int(current_time_func()))
OLDNEW
« no previous file with comments | « no previous file | server/server_job_utils.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698