| OLD | NEW |
| 1 #!/usr/bin/python -u | 1 #!/usr/bin/python -u |
| 2 | 2 |
| 3 """ | 3 """ |
| 4 Autotest scheduler | 4 Autotest scheduler |
| 5 """ | 5 """ |
| 6 | 6 |
| 7 | 7 |
| 8 import common | 8 import common |
| 9 import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal | 9 import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal |
| 10 import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback, urllib | 10 import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback, urllib |
| 11 import itertools, logging, weakref, gc | 11 import itertools, logging, weakref, gc |
| 12 | 12 |
| 13 import MySQLdb | 13 import MySQLdb |
| 14 | 14 |
| 15 from autotest_lib.scheduler import scheduler_logging_config | 15 from autotest_lib.scheduler import scheduler_logging_config |
| 16 from autotest_lib.frontend import setup_django_environment | 16 from autotest_lib.frontend import setup_django_environment |
| 17 | 17 |
| 18 import django.db | 18 import django.db |
| 19 | 19 |
| 20 from autotest_lib.client.common_lib import global_config, logging_manager | 20 from autotest_lib.client.common_lib import global_config, logging_manager |
| 21 from autotest_lib.client.common_lib import host_protections, utils | 21 from autotest_lib.client.common_lib import host_protections, utils |
| 22 from autotest_lib.database import database_connection | 22 from autotest_lib.database import database_connection |
| 23 from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection | 23 from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection |
| 24 from autotest_lib.frontend.afe import model_attributes | 24 from autotest_lib.frontend.afe import model_attributes |
| 25 from autotest_lib.scheduler import drone_manager, drones, email_manager | 25 from autotest_lib.scheduler import drone_manager, drones, email_manager |
| 26 from autotest_lib.scheduler import monitor_db_cleanup | 26 from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup |
| 27 from autotest_lib.scheduler import status_server, scheduler_config | 27 from autotest_lib.scheduler import status_server, scheduler_config |
| 28 from autotest_lib.scheduler import gc_stats, metahost_scheduler | |
| 29 from autotest_lib.scheduler import scheduler_models | 28 from autotest_lib.scheduler import scheduler_models |
| 30 BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter' | 29 BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter' |
| 31 PID_FILE_PREFIX = 'monitor_db' | 30 PID_FILE_PREFIX = 'monitor_db' |
| 32 | 31 |
| 33 RESULTS_DIR = '.' | 32 RESULTS_DIR = '.' |
| 34 AUTOSERV_NICE_LEVEL = 10 | 33 AUTOSERV_NICE_LEVEL = 10 |
| 35 DB_CONFIG_SECTION = 'AUTOTEST_WEB' | 34 DB_CONFIG_SECTION = 'AUTOTEST_WEB' |
| 36 AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..') | 35 AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..') |
| 37 | 36 |
| 38 if os.environ.has_key('AUTOTEST_DIR'): | 37 if os.environ.has_key('AUTOTEST_DIR'): |
| (...skipping 29 matching lines...) Expand all Loading... |
| 68 """@returns How long to wait for autoserv to write pidfile.""" | 67 """@returns How long to wait for autoserv to write pidfile.""" |
| 69 pidfile_timeout_mins = global_config.global_config.get_config_value( | 68 pidfile_timeout_mins = global_config.global_config.get_config_value( |
| 70 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int) | 69 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int) |
| 71 return pidfile_timeout_mins * 60 | 70 return pidfile_timeout_mins * 60 |
| 72 | 71 |
| 73 | 72 |
| 74 def _site_init_monitor_db_dummy(): | 73 def _site_init_monitor_db_dummy(): |
| 75 return {} | 74 return {} |
| 76 | 75 |
| 77 | 76 |
| 78 get_site_metahost_schedulers = utils.import_site_function( | |
| 79 __file__, 'autotest_lib.scheduler.site_metahost_scheduler', | |
| 80 'get_metahost_schedulers', lambda : ()) | |
| 81 | |
| 82 | |
| 83 def _verify_default_drone_set_exists(): | 77 def _verify_default_drone_set_exists(): |
| 84 if (models.DroneSet.drone_sets_enabled() and | 78 if (models.DroneSet.drone_sets_enabled() and |
| 85 not models.DroneSet.default_drone_set_name()): | 79 not models.DroneSet.default_drone_set_name()): |
| 86 raise SchedulerError('Drone sets are enabled, but no default is set') | 80 raise host_scheduler.SchedulerError( |
| 81 'Drone sets are enabled, but no default is set') |
| 87 | 82 |
| 88 | 83 |
| 89 def _sanity_check(): | 84 def _sanity_check(): |
| 90 """Make sure the configs are consistent before starting the scheduler""" | 85 """Make sure the configs are consistent before starting the scheduler""" |
| 91 _verify_default_drone_set_exists() | 86 _verify_default_drone_set_exists() |
| 92 | 87 |
| 93 | 88 |
| 94 def main(): | 89 def main(): |
| 95 try: | 90 try: |
| 96 try: | 91 try: |
| (...skipping 154 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 251 if not job: | 246 if not job: |
| 252 job = queue_entry.job | 247 job = queue_entry.job |
| 253 autoserv_argv += ['-u', job.owner, '-l', job.name] | 248 autoserv_argv += ['-u', job.owner, '-l', job.name] |
| 254 if job.is_image_update_job(): | 249 if job.is_image_update_job(): |
| 255 autoserv_argv += ['--image', job.update_image_path] | 250 autoserv_argv += ['--image', job.update_image_path] |
| 256 if verbose: | 251 if verbose: |
| 257 autoserv_argv.append('--verbose') | 252 autoserv_argv.append('--verbose') |
| 258 return autoserv_argv + extra_args | 253 return autoserv_argv + extra_args |
| 259 | 254 |
| 260 | 255 |
| 261 class SchedulerError(Exception): | |
| 262 """Raised by HostScheduler when an inconsistent state occurs.""" | |
| 263 | |
| 264 | |
| 265 class BaseHostScheduler(metahost_scheduler.HostSchedulingUtility): | |
| 266 """Handles the logic for choosing when to run jobs and on which hosts. | |
| 267 | |
| 268 This class makes several queries to the database on each tick, building up | |
| 269 some auxiliary data structures and using them to determine which hosts are | |
| 270 eligible to run which jobs, taking into account all the various factors that | |
| 271 affect that. | |
| 272 | |
| 273 In the past this was done with one or two very large, complex database | |
| 274 queries. It has proven much simpler and faster to build these auxiliary | |
| 275 data structures and perform the logic in Python. | |
| 276 """ | |
| 277 def __init__(self): | |
| 278 self._metahost_schedulers = metahost_scheduler.get_metahost_schedulers() | |
| 279 | |
| 280 # load site-specific scheduler selected in global_config | |
| 281 site_schedulers_str = global_config.global_config.get_config_value( | |
| 282 scheduler_config.CONFIG_SECTION, 'site_metahost_schedulers', | |
| 283 default='') | |
| 284 site_schedulers = set(site_schedulers_str.split(',')) | |
| 285 for scheduler in get_site_metahost_schedulers(): | |
| 286 if type(scheduler).__name__ in site_schedulers: | |
| 287 # always prepend, so site schedulers take precedence | |
| 288 self._metahost_schedulers = ( | |
| 289 [scheduler] + self._metahost_schedulers) | |
| 290 logging.info('Metahost schedulers: %s', | |
| 291 ', '.join(type(scheduler).__name__ for scheduler | |
| 292 in self._metahost_schedulers)) | |
| 293 | |
| 294 | |
| 295 def _get_ready_hosts(self): | |
| 296 # avoid any host with a currently active queue entry against it | |
| 297 hosts = scheduler_models.Host.fetch( | |
| 298 joins='LEFT JOIN afe_host_queue_entries AS active_hqe ' | |
| 299 'ON (afe_hosts.id = active_hqe.host_id AND ' | |
| 300 'active_hqe.active)', | |
| 301 where="active_hqe.host_id IS NULL " | |
| 302 "AND NOT afe_hosts.locked " | |
| 303 "AND (afe_hosts.status IS NULL " | |
| 304 "OR afe_hosts.status = 'Ready')") | |
| 305 return dict((host.id, host) for host in hosts) | |
| 306 | |
| 307 | |
| 308 @staticmethod | |
| 309 def _get_sql_id_list(id_list): | |
| 310 return ','.join(str(item_id) for item_id in id_list) | |
| 311 | |
| 312 | |
| 313 @classmethod | |
| 314 def _get_many2many_dict(cls, query, id_list, flip=False): | |
| 315 if not id_list: | |
| 316 return {} | |
| 317 query %= cls._get_sql_id_list(id_list) | |
| 318 rows = _db.execute(query) | |
| 319 return cls._process_many2many_dict(rows, flip) | |
| 320 | |
| 321 | |
| 322 @staticmethod | |
| 323 def _process_many2many_dict(rows, flip=False): | |
| 324 result = {} | |
| 325 for row in rows: | |
| 326 left_id, right_id = int(row[0]), int(row[1]) | |
| 327 if flip: | |
| 328 left_id, right_id = right_id, left_id | |
| 329 result.setdefault(left_id, set()).add(right_id) | |
| 330 return result | |
| 331 | |
| 332 | |
| 333 @classmethod | |
| 334 def _get_job_acl_groups(cls, job_ids): | |
| 335 query = """ | |
| 336 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id | |
| 337 FROM afe_jobs | |
| 338 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner | |
| 339 INNER JOIN afe_acl_groups_users ON | |
| 340 afe_acl_groups_users.user_id = afe_users.id | |
| 341 WHERE afe_jobs.id IN (%s) | |
| 342 """ | |
| 343 return cls._get_many2many_dict(query, job_ids) | |
| 344 | |
| 345 | |
| 346 @classmethod | |
| 347 def _get_job_ineligible_hosts(cls, job_ids): | |
| 348 query = """ | |
| 349 SELECT job_id, host_id | |
| 350 FROM afe_ineligible_host_queues | |
| 351 WHERE job_id IN (%s) | |
| 352 """ | |
| 353 return cls._get_many2many_dict(query, job_ids) | |
| 354 | |
| 355 | |
| 356 @classmethod | |
| 357 def _get_job_dependencies(cls, job_ids): | |
| 358 query = """ | |
| 359 SELECT job_id, label_id | |
| 360 FROM afe_jobs_dependency_labels | |
| 361 WHERE job_id IN (%s) | |
| 362 """ | |
| 363 return cls._get_many2many_dict(query, job_ids) | |
| 364 | |
| 365 | |
| 366 @classmethod | |
| 367 def _get_host_acls(cls, host_ids): | |
| 368 query = """ | |
| 369 SELECT host_id, aclgroup_id | |
| 370 FROM afe_acl_groups_hosts | |
| 371 WHERE host_id IN (%s) | |
| 372 """ | |
| 373 return cls._get_many2many_dict(query, host_ids) | |
| 374 | |
| 375 | |
| 376 @classmethod | |
| 377 def _get_label_hosts(cls, host_ids): | |
| 378 if not host_ids: | |
| 379 return {}, {} | |
| 380 query = """ | |
| 381 SELECT label_id, host_id | |
| 382 FROM afe_hosts_labels | |
| 383 WHERE host_id IN (%s) | |
| 384 """ % cls._get_sql_id_list(host_ids) | |
| 385 rows = _db.execute(query) | |
| 386 labels_to_hosts = cls._process_many2many_dict(rows) | |
| 387 hosts_to_labels = cls._process_many2many_dict(rows, flip=True) | |
| 388 return labels_to_hosts, hosts_to_labels | |
| 389 | |
| 390 | |
| 391 @classmethod | |
| 392 def _get_labels(cls): | |
| 393 return dict((label.id, label) for label | |
| 394 in scheduler_models.Label.fetch()) | |
| 395 | |
| 396 | |
| 397 def recovery_on_startup(self): | |
| 398 for metahost_scheduler in self._metahost_schedulers: | |
| 399 metahost_scheduler.recovery_on_startup() | |
| 400 | |
| 401 | |
| 402 def refresh(self, pending_queue_entries): | |
| 403 self._hosts_available = self._get_ready_hosts() | |
| 404 | |
| 405 relevant_jobs = [queue_entry.job_id | |
| 406 for queue_entry in pending_queue_entries] | |
| 407 self._job_acls = self._get_job_acl_groups(relevant_jobs) | |
| 408 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs) | |
| 409 self._job_dependencies = self._get_job_dependencies(relevant_jobs) | |
| 410 | |
| 411 host_ids = self._hosts_available.keys() | |
| 412 self._host_acls = self._get_host_acls(host_ids) | |
| 413 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids) | |
| 414 | |
| 415 self._labels = self._get_labels() | |
| 416 | |
| 417 | |
| 418 def tick(self): | |
| 419 for metahost_scheduler in self._metahost_schedulers: | |
| 420 metahost_scheduler.tick() | |
| 421 | |
| 422 | |
| 423 def hosts_in_label(self, label_id): | |
| 424 return set(self._label_hosts.get(label_id, ())) | |
| 425 | |
| 426 | |
| 427 def remove_host_from_label(self, host_id, label_id): | |
| 428 self._label_hosts[label_id].remove(host_id) | |
| 429 | |
| 430 | |
| 431 def pop_host(self, host_id): | |
| 432 return self._hosts_available.pop(host_id) | |
| 433 | |
| 434 | |
| 435 def ineligible_hosts_for_entry(self, queue_entry): | |
| 436 return set(self._ineligible_hosts.get(queue_entry.job_id, ())) | |
| 437 | |
| 438 | |
| 439 def _is_acl_accessible(self, host_id, queue_entry): | |
| 440 job_acls = self._job_acls.get(queue_entry.job_id, set()) | |
| 441 host_acls = self._host_acls.get(host_id, set()) | |
| 442 return len(host_acls.intersection(job_acls)) > 0 | |
| 443 | |
| 444 | |
| 445 def _check_job_dependencies(self, job_dependencies, host_labels): | |
| 446 missing = job_dependencies - host_labels | |
| 447 return len(missing) == 0 | |
| 448 | |
| 449 | |
| 450 def _check_only_if_needed_labels(self, job_dependencies, host_labels, | |
| 451 queue_entry): | |
| 452 if not queue_entry.meta_host: | |
| 453 # bypass only_if_needed labels when a specific host is selected | |
| 454 return True | |
| 455 | |
| 456 for label_id in host_labels: | |
| 457 label = self._labels[label_id] | |
| 458 if not label.only_if_needed: | |
| 459 # we don't care about non-only_if_needed labels | |
| 460 continue | |
| 461 if queue_entry.meta_host == label_id: | |
| 462 # if the label was requested in a metahost it's OK | |
| 463 continue | |
| 464 if label_id not in job_dependencies: | |
| 465 return False | |
| 466 return True | |
| 467 | |
| 468 | |
| 469 def _check_atomic_group_labels(self, host_labels, queue_entry): | |
| 470 """ | |
| 471 Determine if the given HostQueueEntry's atomic group settings are okay | |
| 472 to schedule on a host with the given labels. | |
| 473 | |
| 474 @param host_labels: A list of label ids that the host has. | |
| 475 @param queue_entry: The HostQueueEntry being considered for the host. | |
| 476 | |
| 477 @returns True if atomic group settings are okay, False otherwise. | |
| 478 """ | |
| 479 return (self._get_host_atomic_group_id(host_labels, queue_entry) == | |
| 480 queue_entry.atomic_group_id) | |
| 481 | |
| 482 | |
| 483 def _get_host_atomic_group_id(self, host_labels, queue_entry=None): | |
| 484 """ | |
| 485 Return the atomic group label id for a host with the given set of | |
| 486 labels if any, or None otherwise. Raises an exception if more than | |
| 487 one atomic group are found in the set of labels. | |
| 488 | |
| 489 @param host_labels: A list of label ids that the host has. | |
| 490 @param queue_entry: The HostQueueEntry we're testing. Only used for | |
| 491 extra info in a potential logged error message. | |
| 492 | |
| 493 @returns The id of the atomic group found on a label in host_labels | |
| 494 or None if no atomic group label is found. | |
| 495 """ | |
| 496 atomic_labels = [self._labels[label_id] for label_id in host_labels | |
| 497 if self._labels[label_id].atomic_group_id is not None] | |
| 498 atomic_ids = set(label.atomic_group_id for label in atomic_labels) | |
| 499 if not atomic_ids: | |
| 500 return None | |
| 501 if len(atomic_ids) > 1: | |
| 502 logging.error('More than one Atomic Group on HQE "%s" via: %r', | |
| 503 queue_entry, atomic_labels) | |
| 504 return atomic_ids.pop() | |
| 505 | |
| 506 | |
| 507 def _get_atomic_group_labels(self, atomic_group_id): | |
| 508 """ | |
| 509 Lookup the label ids that an atomic_group is associated with. | |
| 510 | |
| 511 @param atomic_group_id - The id of the AtomicGroup to look up. | |
| 512 | |
| 513 @returns A generator yeilding Label ids for this atomic group. | |
| 514 """ | |
| 515 return (id for id, label in self._labels.iteritems() | |
| 516 if label.atomic_group_id == atomic_group_id | |
| 517 and not label.invalid) | |
| 518 | |
| 519 | |
| 520 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry): | |
| 521 """ | |
| 522 @param group_hosts - A sequence of Host ids to test for usability | |
| 523 and eligibility against the Job associated with queue_entry. | |
| 524 @param queue_entry - The HostQueueEntry that these hosts are being | |
| 525 tested for eligibility against. | |
| 526 | |
| 527 @returns A subset of group_hosts Host ids that are eligible for the | |
| 528 supplied queue_entry. | |
| 529 """ | |
| 530 return set(host_id for host_id in group_hosts | |
| 531 if self.is_host_usable(host_id) | |
| 532 and self.is_host_eligible_for_job(host_id, queue_entry)) | |
| 533 | |
| 534 | |
| 535 def is_host_eligible_for_job(self, host_id, queue_entry): | |
| 536 if self._is_host_invalid(host_id): | |
| 537 # if an invalid host is scheduled for a job, it's a one-time host | |
| 538 # and it therefore bypasses eligibility checks. note this can only | |
| 539 # happen for non-metahosts, because invalid hosts have their label | |
| 540 # relationships cleared. | |
| 541 return True | |
| 542 | |
| 543 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set()) | |
| 544 host_labels = self._host_labels.get(host_id, set()) | |
| 545 | |
| 546 return (self._is_acl_accessible(host_id, queue_entry) and | |
| 547 self._check_job_dependencies(job_dependencies, host_labels) and | |
| 548 self._check_only_if_needed_labels( | |
| 549 job_dependencies, host_labels, queue_entry) and | |
| 550 self._check_atomic_group_labels(host_labels, queue_entry)) | |
| 551 | |
| 552 | |
| 553 def _is_host_invalid(self, host_id): | |
| 554 host_object = self._hosts_available.get(host_id, None) | |
| 555 return host_object and host_object.invalid | |
| 556 | |
| 557 | |
| 558 def _schedule_non_metahost(self, queue_entry): | |
| 559 if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry): | |
| 560 return None | |
| 561 return self._hosts_available.pop(queue_entry.host_id, None) | |
| 562 | |
| 563 | |
| 564 def is_host_usable(self, host_id): | |
| 565 if host_id not in self._hosts_available: | |
| 566 # host was already used during this scheduling cycle | |
| 567 return False | |
| 568 if self._hosts_available[host_id].invalid: | |
| 569 # Invalid hosts cannot be used for metahosts. They're included in | |
| 570 # the original query because they can be used by non-metahosts. | |
| 571 return False | |
| 572 return True | |
| 573 | |
| 574 | |
| 575 def schedule_entry(self, queue_entry): | |
| 576 if queue_entry.host_id is not None: | |
| 577 return self._schedule_non_metahost(queue_entry) | |
| 578 | |
| 579 for scheduler in self._metahost_schedulers: | |
| 580 if scheduler.can_schedule_metahost(queue_entry): | |
| 581 scheduler.schedule_metahost(queue_entry, self) | |
| 582 return None | |
| 583 | |
| 584 raise SchedulerError('No metahost scheduler to handle %s' % queue_entry) | |
| 585 | |
| 586 | |
| 587 def find_eligible_atomic_group(self, queue_entry): | |
| 588 """ | |
| 589 Given an atomic group host queue entry, locate an appropriate group | |
| 590 of hosts for the associated job to run on. | |
| 591 | |
| 592 The caller is responsible for creating new HQEs for the additional | |
| 593 hosts returned in order to run the actual job on them. | |
| 594 | |
| 595 @returns A list of Host instances in a ready state to satisfy this | |
| 596 atomic group scheduling. Hosts will all belong to the same | |
| 597 atomic group label as specified by the queue_entry. | |
| 598 An empty list will be returned if no suitable atomic | |
| 599 group could be found. | |
| 600 | |
| 601 TODO(gps): what is responsible for kicking off any attempted repairs on | |
| 602 a group of hosts? not this function, but something needs to. We do | |
| 603 not communicate that reason for returning [] outside of here... | |
| 604 For now, we'll just be unschedulable if enough hosts within one group | |
| 605 enter Repair Failed state. | |
| 606 """ | |
| 607 assert queue_entry.atomic_group_id is not None | |
| 608 job = queue_entry.job | |
| 609 assert job.synch_count and job.synch_count > 0 | |
| 610 atomic_group = queue_entry.atomic_group | |
| 611 if job.synch_count > atomic_group.max_number_of_machines: | |
| 612 # Such a Job and HostQueueEntry should never be possible to | |
| 613 # create using the frontend. Regardless, we can't process it. | |
| 614 # Abort it immediately and log an error on the scheduler. | |
| 615 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED) | |
| 616 logging.error( | |
| 617 'Error: job %d synch_count=%d > requested atomic_group %d ' | |
| 618 'max_number_of_machines=%d. Aborted host_queue_entry %d.', | |
| 619 job.id, job.synch_count, atomic_group.id, | |
| 620 atomic_group.max_number_of_machines, queue_entry.id) | |
| 621 return [] | |
| 622 hosts_in_label = self.hosts_in_label(queue_entry.meta_host) | |
| 623 ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry) | |
| 624 | |
| 625 # Look in each label associated with atomic_group until we find one with | |
| 626 # enough hosts to satisfy the job. | |
| 627 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id): | |
| 628 group_hosts = set(self.hosts_in_label(atomic_label_id)) | |
| 629 if queue_entry.meta_host is not None: | |
| 630 # If we have a metahost label, only allow its hosts. | |
| 631 group_hosts.intersection_update(hosts_in_label) | |
| 632 group_hosts -= ineligible_host_ids | |
| 633 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group( | |
| 634 group_hosts, queue_entry) | |
| 635 | |
| 636 # Job.synch_count is treated as "minimum synch count" when | |
| 637 # scheduling for an atomic group of hosts. The atomic group | |
| 638 # number of machines is the maximum to pick out of a single | |
| 639 # atomic group label for scheduling at one time. | |
| 640 min_hosts = job.synch_count | |
| 641 max_hosts = atomic_group.max_number_of_machines | |
| 642 | |
| 643 if len(eligible_host_ids_in_group) < min_hosts: | |
| 644 # Not enough eligible hosts in this atomic group label. | |
| 645 continue | |
| 646 | |
| 647 eligible_hosts_in_group = [self._hosts_available[id] | |
| 648 for id in eligible_host_ids_in_group] | |
| 649 # So that they show up in a sane order when viewing the job. | |
| 650 eligible_hosts_in_group.sort(cmp=scheduler_models.Host.cmp_for_sort) | |
| 651 | |
| 652 # Limit ourselves to scheduling the atomic group size. | |
| 653 if len(eligible_hosts_in_group) > max_hosts: | |
| 654 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts] | |
| 655 | |
| 656 # Remove the selected hosts from our cached internal state | |
| 657 # of available hosts in order to return the Host objects. | |
| 658 host_list = [] | |
| 659 for host in eligible_hosts_in_group: | |
| 660 hosts_in_label.discard(host.id) | |
| 661 self._hosts_available.pop(host.id) | |
| 662 host_list.append(host) | |
| 663 return host_list | |
| 664 | |
| 665 return [] | |
| 666 | |
| 667 | |
| 668 site_host_scheduler = utils.import_site_class(__file__, | |
| 669 "autotest_lib.scheduler.site_host_scheduler", | |
| 670 "site_host_scheduler", BaseHostScheduler) | |
| 671 | |
| 672 | |
| 673 class HostScheduler(site_host_scheduler): | |
| 674 pass | |
| 675 | |
| 676 | |
| 677 class Dispatcher(object): | 256 class Dispatcher(object): |
| 678 def __init__(self): | 257 def __init__(self): |
| 679 self._agents = [] | 258 self._agents = [] |
| 680 self._last_clean_time = time.time() | 259 self._last_clean_time = time.time() |
| 681 self._host_scheduler = HostScheduler() | 260 self._host_scheduler = host_scheduler.HostScheduler(_db) |
| 682 user_cleanup_time = scheduler_config.config.clean_interval | 261 user_cleanup_time = scheduler_config.config.clean_interval |
| 683 self._periodic_cleanup = monitor_db_cleanup.UserCleanup( | 262 self._periodic_cleanup = monitor_db_cleanup.UserCleanup( |
| 684 _db, user_cleanup_time) | 263 _db, user_cleanup_time) |
| 685 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db) | 264 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db) |
| 686 self._host_agents = {} | 265 self._host_agents = {} |
| 687 self._queue_entry_agents = {} | 266 self._queue_entry_agents = {} |
| 688 self._tick_count = 0 | 267 self._tick_count = 0 |
| 689 self._last_garbage_stats_time = time.time() | 268 self._last_garbage_stats_time = time.time() |
| 690 self._seconds_between_garbage_stats = 60 * ( | 269 self._seconds_between_garbage_stats = 60 * ( |
| 691 global_config.global_config.get_config_value( | 270 global_config.global_config.get_config_value( |
| (...skipping 168 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 860 if queue_entry.is_hostless(): | 439 if queue_entry.is_hostless(): |
| 861 return HostlessQueueTask(queue_entry=queue_entry) | 440 return HostlessQueueTask(queue_entry=queue_entry) |
| 862 return QueueTask(queue_entries=task_entries) | 441 return QueueTask(queue_entries=task_entries) |
| 863 if queue_entry.status == models.HostQueueEntry.Status.GATHERING: | 442 if queue_entry.status == models.HostQueueEntry.Status.GATHERING: |
| 864 return GatherLogsTask(queue_entries=task_entries) | 443 return GatherLogsTask(queue_entries=task_entries) |
| 865 if queue_entry.status == models.HostQueueEntry.Status.PARSING: | 444 if queue_entry.status == models.HostQueueEntry.Status.PARSING: |
| 866 return FinalReparseTask(queue_entries=task_entries) | 445 return FinalReparseTask(queue_entries=task_entries) |
| 867 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING: | 446 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING: |
| 868 return ArchiveResultsTask(queue_entries=task_entries) | 447 return ArchiveResultsTask(queue_entries=task_entries) |
| 869 | 448 |
| 870 raise SchedulerError('_get_agent_task_for_queue_entry got entry with ' | 449 raise host_scheduler.SchedulerError( |
| 871 'invalid status %s: %s' | 450 '_get_agent_task_for_queue_entry got entry with ' |
| 872 % (queue_entry.status, queue_entry)) | 451 'invalid status %s: %s' % (queue_entry.status, queue_entry)) |
| 873 | 452 |
| 874 | 453 |
| 875 def _check_for_duplicate_host_entries(self, task_entries): | 454 def _check_for_duplicate_host_entries(self, task_entries): |
| 876 non_host_statuses = (models.HostQueueEntry.Status.PARSING, | 455 non_host_statuses = (models.HostQueueEntry.Status.PARSING, |
| 877 models.HostQueueEntry.Status.ARCHIVING) | 456 models.HostQueueEntry.Status.ARCHIVING) |
| 878 for task_entry in task_entries: | 457 for task_entry in task_entries: |
| 879 using_host = (task_entry.host is not None | 458 using_host = (task_entry.host is not None |
| 880 and task_entry.status not in non_host_statuses) | 459 and task_entry.status not in non_host_statuses) |
| 881 if using_host: | 460 if using_host: |
| 882 self._assert_host_has_no_agent(task_entry) | 461 self._assert_host_has_no_agent(task_entry) |
| 883 | 462 |
| 884 | 463 |
| 885 def _assert_host_has_no_agent(self, entry): | 464 def _assert_host_has_no_agent(self, entry): |
| 886 """ | 465 """ |
| 887 @param entry: a HostQueueEntry or a SpecialTask | 466 @param entry: a HostQueueEntry or a SpecialTask |
| 888 """ | 467 """ |
| 889 if self.host_has_agent(entry.host): | 468 if self.host_has_agent(entry.host): |
| 890 agent = tuple(self._host_agents.get(entry.host.id))[0] | 469 agent = tuple(self._host_agents.get(entry.host.id))[0] |
| 891 raise SchedulerError( | 470 raise host_scheduler.SchedulerError( |
| 892 'While scheduling %s, host %s already has a host agent %s' | 471 'While scheduling %s, host %s already has a host agent %s' |
| 893 % (entry, entry.host, agent.task)) | 472 % (entry, entry.host, agent.task)) |
| 894 | 473 |
| 895 | 474 |
| 896 def _get_agent_task_for_special_task(self, special_task): | 475 def _get_agent_task_for_special_task(self, special_task): |
| 897 """ | 476 """ |
| 898 Construct an AgentTask class to run the given SpecialTask and add it | 477 Construct an AgentTask class to run the given SpecialTask and add it |
| 899 to this dispatcher. | 478 to this dispatcher. |
| 900 @param special_task: a models.SpecialTask instance | 479 @param special_task: a models.SpecialTask instance |
| 901 @returns an AgentTask to run this SpecialTask | 480 @returns an AgentTask to run this SpecialTask |
| 902 """ | 481 """ |
| 903 self._assert_host_has_no_agent(special_task) | 482 self._assert_host_has_no_agent(special_task) |
| 904 | 483 |
| 905 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask) | 484 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask) |
| 906 for agent_task_class in special_agent_task_classes: | 485 for agent_task_class in special_agent_task_classes: |
| 907 if agent_task_class.TASK_TYPE == special_task.task: | 486 if agent_task_class.TASK_TYPE == special_task.task: |
| 908 return agent_task_class(task=special_task) | 487 return agent_task_class(task=special_task) |
| 909 | 488 |
| 910 raise SchedulerError('No AgentTask class for task', str(special_task)) | 489 raise host_scheduler.SchedulerError( |
| 490 'No AgentTask class for task', str(special_task)) |
| 911 | 491 |
| 912 | 492 |
| 913 def _register_pidfiles(self, agent_tasks): | 493 def _register_pidfiles(self, agent_tasks): |
| 914 for agent_task in agent_tasks: | 494 for agent_task in agent_tasks: |
| 915 agent_task.register_necessary_pidfiles() | 495 agent_task.register_necessary_pidfiles() |
| 916 | 496 |
| 917 | 497 |
| 918 def _recover_tasks(self, agent_tasks): | 498 def _recover_tasks(self, agent_tasks): |
| 919 orphans = _drone_manager.get_orphaned_autoserv_processes() | 499 orphans = _drone_manager.get_orphaned_autoserv_processes() |
| 920 | 500 |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 965 special_tasks = models.SpecialTask.objects.filter( | 545 special_tasks = models.SpecialTask.objects.filter( |
| 966 task__in=(models.SpecialTask.Task.CLEANUP, | 546 task__in=(models.SpecialTask.Task.CLEANUP, |
| 967 models.SpecialTask.Task.VERIFY), | 547 models.SpecialTask.Task.VERIFY), |
| 968 queue_entry__id=queue_entry.id, | 548 queue_entry__id=queue_entry.id, |
| 969 is_complete=False) | 549 is_complete=False) |
| 970 if special_tasks.count() == 0: | 550 if special_tasks.count() == 0: |
| 971 unrecovered_hqes.append(queue_entry) | 551 unrecovered_hqes.append(queue_entry) |
| 972 | 552 |
| 973 if unrecovered_hqes: | 553 if unrecovered_hqes: |
| 974 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes) | 554 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes) |
| 975 raise SchedulerError( | 555 raise host_scheduler.SchedulerError( |
| 976 '%d unrecovered verifying host queue entries:\n%s' % | 556 '%d unrecovered verifying host queue entries:\n%s' % |
| 977 (len(unrecovered_hqes), message)) | 557 (len(unrecovered_hqes), message)) |
| 978 | 558 |
| 979 | 559 |
| 980 def _get_prioritized_special_tasks(self): | 560 def _get_prioritized_special_tasks(self): |
| 981 """ | 561 """ |
| 982 Returns all queued SpecialTasks prioritized for repair first, then | 562 Returns all queued SpecialTasks prioritized for repair first, then |
| 983 cleanup, then verify. | 563 cleanup, then verify. |
| 984 """ | 564 """ |
| 985 queued_tasks = models.SpecialTask.objects.filter(is_active=False, | 565 queued_tasks = models.SpecialTask.objects.filter(is_active=False, |
| (...skipping 788 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1774 logging.info('Recovering process %s for %s at %s' | 1354 logging.info('Recovering process %s for %s at %s' |
| 1775 % (self.monitor.get_process(), type(self).__name__, | 1355 % (self.monitor.get_process(), type(self).__name__, |
| 1776 self._working_directory())) | 1356 self._working_directory())) |
| 1777 | 1357 |
| 1778 | 1358 |
| 1779 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses, | 1359 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses, |
| 1780 allowed_host_statuses=None): | 1360 allowed_host_statuses=None): |
| 1781 class_name = self.__class__.__name__ | 1361 class_name = self.__class__.__name__ |
| 1782 for entry in queue_entries: | 1362 for entry in queue_entries: |
| 1783 if entry.status not in allowed_hqe_statuses: | 1363 if entry.status not in allowed_hqe_statuses: |
| 1784 raise SchedulerError('%s attempting to start ' | 1364 raise host_scheduler.SchedulerError( |
| 1785 'entry with invalid status %s: %s' | 1365 '%s attempting to start entry with invalid status %s: ' |
| 1786 % (class_name, entry.status, entry)) | 1366 '%s' % (class_name, entry.status, entry)) |
| 1787 invalid_host_status = ( | 1367 invalid_host_status = ( |
| 1788 allowed_host_statuses is not None | 1368 allowed_host_statuses is not None |
| 1789 and entry.host.status not in allowed_host_statuses) | 1369 and entry.host.status not in allowed_host_statuses) |
| 1790 if invalid_host_status: | 1370 if invalid_host_status: |
| 1791 raise SchedulerError('%s attempting to start on queue ' | 1371 raise host_scheduler.SchedulerError( |
| 1792 'entry with invalid host status %s: %s' | 1372 '%s attempting to start on queue entry with invalid ' |
| 1793 % (class_name, entry.host.status, entry)) | 1373 'host status %s: %s' |
| 1374 % (class_name, entry.host.status, entry)) |
| 1794 | 1375 |
| 1795 | 1376 |
| 1796 class TaskWithJobKeyvals(object): | 1377 class TaskWithJobKeyvals(object): |
| 1797 """AgentTask mixin providing functionality to help with job keyval files.""" | 1378 """AgentTask mixin providing functionality to help with job keyval files.""" |
| 1798 _KEYVAL_FILE = 'keyval' | 1379 _KEYVAL_FILE = 'keyval' |
| 1799 def _format_keyval(self, key, value): | 1380 def _format_keyval(self, key, value): |
| 1800 return '%s=%s' % (key, value) | 1381 return '%s=%s' % (key, value) |
| 1801 | 1382 |
| 1802 | 1383 |
| 1803 def _keyval_path(self): | 1384 def _keyval_path(self): |
| (...skipping 805 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 2609 paired_process = self._paired_with_monitor().get_process() | 2190 paired_process = self._paired_with_monitor().get_process() |
| 2610 _drone_manager.write_lines_to_file( | 2191 _drone_manager.write_lines_to_file( |
| 2611 failed_file, ['Archiving failed with exit code %s' | 2192 failed_file, ['Archiving failed with exit code %s' |
| 2612 % self.monitor.exit_code()], | 2193 % self.monitor.exit_code()], |
| 2613 paired_with_process=paired_process) | 2194 paired_with_process=paired_process) |
| 2614 self._set_all_statuses(self._final_status()) | 2195 self._set_all_statuses(self._final_status()) |
| 2615 | 2196 |
| 2616 | 2197 |
| 2617 if __name__ == '__main__': | 2198 if __name__ == '__main__': |
| 2618 main() | 2199 main() |
| OLD | NEW |