Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(392)

Side by Side Diff: scheduler/monitor_db.py

Issue 6597047: Host scheduler refactoring. Move HostScheduler out of monitor_db. (Closed) Base URL: ssh://git@gitrw.chromium.org:9222/autotest.git@master
Patch Set: Revert name change. Created 9 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « scheduler/host_scheduler.py ('k') | scheduler/monitor_db_functional_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/python -u 1 #!/usr/bin/python -u
2 2
3 """ 3 """
4 Autotest scheduler 4 Autotest scheduler
5 """ 5 """
6 6
7 7
8 import common 8 import common
9 import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal 9 import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
10 import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback, urllib 10 import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback, urllib
11 import itertools, logging, weakref, gc 11 import itertools, logging, weakref, gc
12 12
13 import MySQLdb 13 import MySQLdb
14 14
15 from autotest_lib.scheduler import scheduler_logging_config 15 from autotest_lib.scheduler import scheduler_logging_config
16 from autotest_lib.frontend import setup_django_environment 16 from autotest_lib.frontend import setup_django_environment
17 17
18 import django.db 18 import django.db
19 19
20 from autotest_lib.client.common_lib import global_config, logging_manager 20 from autotest_lib.client.common_lib import global_config, logging_manager
21 from autotest_lib.client.common_lib import host_protections, utils 21 from autotest_lib.client.common_lib import host_protections, utils
22 from autotest_lib.database import database_connection 22 from autotest_lib.database import database_connection
23 from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection 23 from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
24 from autotest_lib.frontend.afe import model_attributes 24 from autotest_lib.frontend.afe import model_attributes
25 from autotest_lib.scheduler import drone_manager, drones, email_manager 25 from autotest_lib.scheduler import drone_manager, drones, email_manager
26 from autotest_lib.scheduler import monitor_db_cleanup 26 from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
27 from autotest_lib.scheduler import status_server, scheduler_config 27 from autotest_lib.scheduler import status_server, scheduler_config
28 from autotest_lib.scheduler import gc_stats, metahost_scheduler
29 from autotest_lib.scheduler import scheduler_models 28 from autotest_lib.scheduler import scheduler_models
30 BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter' 29 BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
31 PID_FILE_PREFIX = 'monitor_db' 30 PID_FILE_PREFIX = 'monitor_db'
32 31
33 RESULTS_DIR = '.' 32 RESULTS_DIR = '.'
34 AUTOSERV_NICE_LEVEL = 10 33 AUTOSERV_NICE_LEVEL = 10
35 DB_CONFIG_SECTION = 'AUTOTEST_WEB' 34 DB_CONFIG_SECTION = 'AUTOTEST_WEB'
36 AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..') 35 AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
37 36
38 if os.environ.has_key('AUTOTEST_DIR'): 37 if os.environ.has_key('AUTOTEST_DIR'):
(...skipping 29 matching lines...) Expand all
68 """@returns How long to wait for autoserv to write pidfile.""" 67 """@returns How long to wait for autoserv to write pidfile."""
69 pidfile_timeout_mins = global_config.global_config.get_config_value( 68 pidfile_timeout_mins = global_config.global_config.get_config_value(
70 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int) 69 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
71 return pidfile_timeout_mins * 60 70 return pidfile_timeout_mins * 60
72 71
73 72
74 def _site_init_monitor_db_dummy(): 73 def _site_init_monitor_db_dummy():
75 return {} 74 return {}
76 75
77 76
78 get_site_metahost_schedulers = utils.import_site_function(
79 __file__, 'autotest_lib.scheduler.site_metahost_scheduler',
80 'get_metahost_schedulers', lambda : ())
81
82
83 def _verify_default_drone_set_exists(): 77 def _verify_default_drone_set_exists():
84 if (models.DroneSet.drone_sets_enabled() and 78 if (models.DroneSet.drone_sets_enabled() and
85 not models.DroneSet.default_drone_set_name()): 79 not models.DroneSet.default_drone_set_name()):
86 raise SchedulerError('Drone sets are enabled, but no default is set') 80 raise host_scheduler.SchedulerError(
81 'Drone sets are enabled, but no default is set')
87 82
88 83
89 def _sanity_check(): 84 def _sanity_check():
90 """Make sure the configs are consistent before starting the scheduler""" 85 """Make sure the configs are consistent before starting the scheduler"""
91 _verify_default_drone_set_exists() 86 _verify_default_drone_set_exists()
92 87
93 88
94 def main(): 89 def main():
95 try: 90 try:
96 try: 91 try:
(...skipping 154 matching lines...) Expand 10 before | Expand all | Expand 10 after
251 if not job: 246 if not job:
252 job = queue_entry.job 247 job = queue_entry.job
253 autoserv_argv += ['-u', job.owner, '-l', job.name] 248 autoserv_argv += ['-u', job.owner, '-l', job.name]
254 if job.is_image_update_job(): 249 if job.is_image_update_job():
255 autoserv_argv += ['--image', job.update_image_path] 250 autoserv_argv += ['--image', job.update_image_path]
256 if verbose: 251 if verbose:
257 autoserv_argv.append('--verbose') 252 autoserv_argv.append('--verbose')
258 return autoserv_argv + extra_args 253 return autoserv_argv + extra_args
259 254
260 255
261 class SchedulerError(Exception):
262 """Raised by HostScheduler when an inconsistent state occurs."""
263
264
265 class BaseHostScheduler(metahost_scheduler.HostSchedulingUtility):
266 """Handles the logic for choosing when to run jobs and on which hosts.
267
268 This class makes several queries to the database on each tick, building up
269 some auxiliary data structures and using them to determine which hosts are
270 eligible to run which jobs, taking into account all the various factors that
271 affect that.
272
273 In the past this was done with one or two very large, complex database
274 queries. It has proven much simpler and faster to build these auxiliary
275 data structures and perform the logic in Python.
276 """
277 def __init__(self):
278 self._metahost_schedulers = metahost_scheduler.get_metahost_schedulers()
279
280 # load site-specific scheduler selected in global_config
281 site_schedulers_str = global_config.global_config.get_config_value(
282 scheduler_config.CONFIG_SECTION, 'site_metahost_schedulers',
283 default='')
284 site_schedulers = set(site_schedulers_str.split(','))
285 for scheduler in get_site_metahost_schedulers():
286 if type(scheduler).__name__ in site_schedulers:
287 # always prepend, so site schedulers take precedence
288 self._metahost_schedulers = (
289 [scheduler] + self._metahost_schedulers)
290 logging.info('Metahost schedulers: %s',
291 ', '.join(type(scheduler).__name__ for scheduler
292 in self._metahost_schedulers))
293
294
295 def _get_ready_hosts(self):
296 # avoid any host with a currently active queue entry against it
297 hosts = scheduler_models.Host.fetch(
298 joins='LEFT JOIN afe_host_queue_entries AS active_hqe '
299 'ON (afe_hosts.id = active_hqe.host_id AND '
300 'active_hqe.active)',
301 where="active_hqe.host_id IS NULL "
302 "AND NOT afe_hosts.locked "
303 "AND (afe_hosts.status IS NULL "
304 "OR afe_hosts.status = 'Ready')")
305 return dict((host.id, host) for host in hosts)
306
307
308 @staticmethod
309 def _get_sql_id_list(id_list):
310 return ','.join(str(item_id) for item_id in id_list)
311
312
313 @classmethod
314 def _get_many2many_dict(cls, query, id_list, flip=False):
315 if not id_list:
316 return {}
317 query %= cls._get_sql_id_list(id_list)
318 rows = _db.execute(query)
319 return cls._process_many2many_dict(rows, flip)
320
321
322 @staticmethod
323 def _process_many2many_dict(rows, flip=False):
324 result = {}
325 for row in rows:
326 left_id, right_id = int(row[0]), int(row[1])
327 if flip:
328 left_id, right_id = right_id, left_id
329 result.setdefault(left_id, set()).add(right_id)
330 return result
331
332
333 @classmethod
334 def _get_job_acl_groups(cls, job_ids):
335 query = """
336 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
337 FROM afe_jobs
338 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
339 INNER JOIN afe_acl_groups_users ON
340 afe_acl_groups_users.user_id = afe_users.id
341 WHERE afe_jobs.id IN (%s)
342 """
343 return cls._get_many2many_dict(query, job_ids)
344
345
346 @classmethod
347 def _get_job_ineligible_hosts(cls, job_ids):
348 query = """
349 SELECT job_id, host_id
350 FROM afe_ineligible_host_queues
351 WHERE job_id IN (%s)
352 """
353 return cls._get_many2many_dict(query, job_ids)
354
355
356 @classmethod
357 def _get_job_dependencies(cls, job_ids):
358 query = """
359 SELECT job_id, label_id
360 FROM afe_jobs_dependency_labels
361 WHERE job_id IN (%s)
362 """
363 return cls._get_many2many_dict(query, job_ids)
364
365
366 @classmethod
367 def _get_host_acls(cls, host_ids):
368 query = """
369 SELECT host_id, aclgroup_id
370 FROM afe_acl_groups_hosts
371 WHERE host_id IN (%s)
372 """
373 return cls._get_many2many_dict(query, host_ids)
374
375
376 @classmethod
377 def _get_label_hosts(cls, host_ids):
378 if not host_ids:
379 return {}, {}
380 query = """
381 SELECT label_id, host_id
382 FROM afe_hosts_labels
383 WHERE host_id IN (%s)
384 """ % cls._get_sql_id_list(host_ids)
385 rows = _db.execute(query)
386 labels_to_hosts = cls._process_many2many_dict(rows)
387 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
388 return labels_to_hosts, hosts_to_labels
389
390
391 @classmethod
392 def _get_labels(cls):
393 return dict((label.id, label) for label
394 in scheduler_models.Label.fetch())
395
396
397 def recovery_on_startup(self):
398 for metahost_scheduler in self._metahost_schedulers:
399 metahost_scheduler.recovery_on_startup()
400
401
402 def refresh(self, pending_queue_entries):
403 self._hosts_available = self._get_ready_hosts()
404
405 relevant_jobs = [queue_entry.job_id
406 for queue_entry in pending_queue_entries]
407 self._job_acls = self._get_job_acl_groups(relevant_jobs)
408 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
409 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
410
411 host_ids = self._hosts_available.keys()
412 self._host_acls = self._get_host_acls(host_ids)
413 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
414
415 self._labels = self._get_labels()
416
417
418 def tick(self):
419 for metahost_scheduler in self._metahost_schedulers:
420 metahost_scheduler.tick()
421
422
423 def hosts_in_label(self, label_id):
424 return set(self._label_hosts.get(label_id, ()))
425
426
427 def remove_host_from_label(self, host_id, label_id):
428 self._label_hosts[label_id].remove(host_id)
429
430
431 def pop_host(self, host_id):
432 return self._hosts_available.pop(host_id)
433
434
435 def ineligible_hosts_for_entry(self, queue_entry):
436 return set(self._ineligible_hosts.get(queue_entry.job_id, ()))
437
438
439 def _is_acl_accessible(self, host_id, queue_entry):
440 job_acls = self._job_acls.get(queue_entry.job_id, set())
441 host_acls = self._host_acls.get(host_id, set())
442 return len(host_acls.intersection(job_acls)) > 0
443
444
445 def _check_job_dependencies(self, job_dependencies, host_labels):
446 missing = job_dependencies - host_labels
447 return len(missing) == 0
448
449
450 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
451 queue_entry):
452 if not queue_entry.meta_host:
453 # bypass only_if_needed labels when a specific host is selected
454 return True
455
456 for label_id in host_labels:
457 label = self._labels[label_id]
458 if not label.only_if_needed:
459 # we don't care about non-only_if_needed labels
460 continue
461 if queue_entry.meta_host == label_id:
462 # if the label was requested in a metahost it's OK
463 continue
464 if label_id not in job_dependencies:
465 return False
466 return True
467
468
469 def _check_atomic_group_labels(self, host_labels, queue_entry):
470 """
471 Determine if the given HostQueueEntry's atomic group settings are okay
472 to schedule on a host with the given labels.
473
474 @param host_labels: A list of label ids that the host has.
475 @param queue_entry: The HostQueueEntry being considered for the host.
476
477 @returns True if atomic group settings are okay, False otherwise.
478 """
479 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
480 queue_entry.atomic_group_id)
481
482
483 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
484 """
485 Return the atomic group label id for a host with the given set of
486 labels if any, or None otherwise. Raises an exception if more than
487 one atomic group are found in the set of labels.
488
489 @param host_labels: A list of label ids that the host has.
490 @param queue_entry: The HostQueueEntry we're testing. Only used for
491 extra info in a potential logged error message.
492
493 @returns The id of the atomic group found on a label in host_labels
494 or None if no atomic group label is found.
495 """
496 atomic_labels = [self._labels[label_id] for label_id in host_labels
497 if self._labels[label_id].atomic_group_id is not None]
498 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
499 if not atomic_ids:
500 return None
501 if len(atomic_ids) > 1:
502 logging.error('More than one Atomic Group on HQE "%s" via: %r',
503 queue_entry, atomic_labels)
504 return atomic_ids.pop()
505
506
507 def _get_atomic_group_labels(self, atomic_group_id):
508 """
509 Lookup the label ids that an atomic_group is associated with.
510
511 @param atomic_group_id - The id of the AtomicGroup to look up.
512
513 @returns A generator yeilding Label ids for this atomic group.
514 """
515 return (id for id, label in self._labels.iteritems()
516 if label.atomic_group_id == atomic_group_id
517 and not label.invalid)
518
519
520 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
521 """
522 @param group_hosts - A sequence of Host ids to test for usability
523 and eligibility against the Job associated with queue_entry.
524 @param queue_entry - The HostQueueEntry that these hosts are being
525 tested for eligibility against.
526
527 @returns A subset of group_hosts Host ids that are eligible for the
528 supplied queue_entry.
529 """
530 return set(host_id for host_id in group_hosts
531 if self.is_host_usable(host_id)
532 and self.is_host_eligible_for_job(host_id, queue_entry))
533
534
535 def is_host_eligible_for_job(self, host_id, queue_entry):
536 if self._is_host_invalid(host_id):
537 # if an invalid host is scheduled for a job, it's a one-time host
538 # and it therefore bypasses eligibility checks. note this can only
539 # happen for non-metahosts, because invalid hosts have their label
540 # relationships cleared.
541 return True
542
543 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
544 host_labels = self._host_labels.get(host_id, set())
545
546 return (self._is_acl_accessible(host_id, queue_entry) and
547 self._check_job_dependencies(job_dependencies, host_labels) and
548 self._check_only_if_needed_labels(
549 job_dependencies, host_labels, queue_entry) and
550 self._check_atomic_group_labels(host_labels, queue_entry))
551
552
553 def _is_host_invalid(self, host_id):
554 host_object = self._hosts_available.get(host_id, None)
555 return host_object and host_object.invalid
556
557
558 def _schedule_non_metahost(self, queue_entry):
559 if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry):
560 return None
561 return self._hosts_available.pop(queue_entry.host_id, None)
562
563
564 def is_host_usable(self, host_id):
565 if host_id not in self._hosts_available:
566 # host was already used during this scheduling cycle
567 return False
568 if self._hosts_available[host_id].invalid:
569 # Invalid hosts cannot be used for metahosts. They're included in
570 # the original query because they can be used by non-metahosts.
571 return False
572 return True
573
574
575 def schedule_entry(self, queue_entry):
576 if queue_entry.host_id is not None:
577 return self._schedule_non_metahost(queue_entry)
578
579 for scheduler in self._metahost_schedulers:
580 if scheduler.can_schedule_metahost(queue_entry):
581 scheduler.schedule_metahost(queue_entry, self)
582 return None
583
584 raise SchedulerError('No metahost scheduler to handle %s' % queue_entry)
585
586
587 def find_eligible_atomic_group(self, queue_entry):
588 """
589 Given an atomic group host queue entry, locate an appropriate group
590 of hosts for the associated job to run on.
591
592 The caller is responsible for creating new HQEs for the additional
593 hosts returned in order to run the actual job on them.
594
595 @returns A list of Host instances in a ready state to satisfy this
596 atomic group scheduling. Hosts will all belong to the same
597 atomic group label as specified by the queue_entry.
598 An empty list will be returned if no suitable atomic
599 group could be found.
600
601 TODO(gps): what is responsible for kicking off any attempted repairs on
602 a group of hosts? not this function, but something needs to. We do
603 not communicate that reason for returning [] outside of here...
604 For now, we'll just be unschedulable if enough hosts within one group
605 enter Repair Failed state.
606 """
607 assert queue_entry.atomic_group_id is not None
608 job = queue_entry.job
609 assert job.synch_count and job.synch_count > 0
610 atomic_group = queue_entry.atomic_group
611 if job.synch_count > atomic_group.max_number_of_machines:
612 # Such a Job and HostQueueEntry should never be possible to
613 # create using the frontend. Regardless, we can't process it.
614 # Abort it immediately and log an error on the scheduler.
615 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
616 logging.error(
617 'Error: job %d synch_count=%d > requested atomic_group %d '
618 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
619 job.id, job.synch_count, atomic_group.id,
620 atomic_group.max_number_of_machines, queue_entry.id)
621 return []
622 hosts_in_label = self.hosts_in_label(queue_entry.meta_host)
623 ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry)
624
625 # Look in each label associated with atomic_group until we find one with
626 # enough hosts to satisfy the job.
627 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
628 group_hosts = set(self.hosts_in_label(atomic_label_id))
629 if queue_entry.meta_host is not None:
630 # If we have a metahost label, only allow its hosts.
631 group_hosts.intersection_update(hosts_in_label)
632 group_hosts -= ineligible_host_ids
633 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
634 group_hosts, queue_entry)
635
636 # Job.synch_count is treated as "minimum synch count" when
637 # scheduling for an atomic group of hosts. The atomic group
638 # number of machines is the maximum to pick out of a single
639 # atomic group label for scheduling at one time.
640 min_hosts = job.synch_count
641 max_hosts = atomic_group.max_number_of_machines
642
643 if len(eligible_host_ids_in_group) < min_hosts:
644 # Not enough eligible hosts in this atomic group label.
645 continue
646
647 eligible_hosts_in_group = [self._hosts_available[id]
648 for id in eligible_host_ids_in_group]
649 # So that they show up in a sane order when viewing the job.
650 eligible_hosts_in_group.sort(cmp=scheduler_models.Host.cmp_for_sort)
651
652 # Limit ourselves to scheduling the atomic group size.
653 if len(eligible_hosts_in_group) > max_hosts:
654 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
655
656 # Remove the selected hosts from our cached internal state
657 # of available hosts in order to return the Host objects.
658 host_list = []
659 for host in eligible_hosts_in_group:
660 hosts_in_label.discard(host.id)
661 self._hosts_available.pop(host.id)
662 host_list.append(host)
663 return host_list
664
665 return []
666
667
668 site_host_scheduler = utils.import_site_class(__file__,
669 "autotest_lib.scheduler.site_host_scheduler",
670 "site_host_scheduler", BaseHostScheduler)
671
672
673 class HostScheduler(site_host_scheduler):
674 pass
675
676
677 class Dispatcher(object): 256 class Dispatcher(object):
678 def __init__(self): 257 def __init__(self):
679 self._agents = [] 258 self._agents = []
680 self._last_clean_time = time.time() 259 self._last_clean_time = time.time()
681 self._host_scheduler = HostScheduler() 260 self._host_scheduler = host_scheduler.HostScheduler(_db)
682 user_cleanup_time = scheduler_config.config.clean_interval 261 user_cleanup_time = scheduler_config.config.clean_interval
683 self._periodic_cleanup = monitor_db_cleanup.UserCleanup( 262 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
684 _db, user_cleanup_time) 263 _db, user_cleanup_time)
685 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db) 264 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
686 self._host_agents = {} 265 self._host_agents = {}
687 self._queue_entry_agents = {} 266 self._queue_entry_agents = {}
688 self._tick_count = 0 267 self._tick_count = 0
689 self._last_garbage_stats_time = time.time() 268 self._last_garbage_stats_time = time.time()
690 self._seconds_between_garbage_stats = 60 * ( 269 self._seconds_between_garbage_stats = 60 * (
691 global_config.global_config.get_config_value( 270 global_config.global_config.get_config_value(
(...skipping 168 matching lines...) Expand 10 before | Expand all | Expand 10 after
860 if queue_entry.is_hostless(): 439 if queue_entry.is_hostless():
861 return HostlessQueueTask(queue_entry=queue_entry) 440 return HostlessQueueTask(queue_entry=queue_entry)
862 return QueueTask(queue_entries=task_entries) 441 return QueueTask(queue_entries=task_entries)
863 if queue_entry.status == models.HostQueueEntry.Status.GATHERING: 442 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
864 return GatherLogsTask(queue_entries=task_entries) 443 return GatherLogsTask(queue_entries=task_entries)
865 if queue_entry.status == models.HostQueueEntry.Status.PARSING: 444 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
866 return FinalReparseTask(queue_entries=task_entries) 445 return FinalReparseTask(queue_entries=task_entries)
867 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING: 446 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
868 return ArchiveResultsTask(queue_entries=task_entries) 447 return ArchiveResultsTask(queue_entries=task_entries)
869 448
870 raise SchedulerError('_get_agent_task_for_queue_entry got entry with ' 449 raise host_scheduler.SchedulerError(
871 'invalid status %s: %s' 450 '_get_agent_task_for_queue_entry got entry with '
872 % (queue_entry.status, queue_entry)) 451 'invalid status %s: %s' % (queue_entry.status, queue_entry))
873 452
874 453
875 def _check_for_duplicate_host_entries(self, task_entries): 454 def _check_for_duplicate_host_entries(self, task_entries):
876 non_host_statuses = (models.HostQueueEntry.Status.PARSING, 455 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
877 models.HostQueueEntry.Status.ARCHIVING) 456 models.HostQueueEntry.Status.ARCHIVING)
878 for task_entry in task_entries: 457 for task_entry in task_entries:
879 using_host = (task_entry.host is not None 458 using_host = (task_entry.host is not None
880 and task_entry.status not in non_host_statuses) 459 and task_entry.status not in non_host_statuses)
881 if using_host: 460 if using_host:
882 self._assert_host_has_no_agent(task_entry) 461 self._assert_host_has_no_agent(task_entry)
883 462
884 463
885 def _assert_host_has_no_agent(self, entry): 464 def _assert_host_has_no_agent(self, entry):
886 """ 465 """
887 @param entry: a HostQueueEntry or a SpecialTask 466 @param entry: a HostQueueEntry or a SpecialTask
888 """ 467 """
889 if self.host_has_agent(entry.host): 468 if self.host_has_agent(entry.host):
890 agent = tuple(self._host_agents.get(entry.host.id))[0] 469 agent = tuple(self._host_agents.get(entry.host.id))[0]
891 raise SchedulerError( 470 raise host_scheduler.SchedulerError(
892 'While scheduling %s, host %s already has a host agent %s' 471 'While scheduling %s, host %s already has a host agent %s'
893 % (entry, entry.host, agent.task)) 472 % (entry, entry.host, agent.task))
894 473
895 474
896 def _get_agent_task_for_special_task(self, special_task): 475 def _get_agent_task_for_special_task(self, special_task):
897 """ 476 """
898 Construct an AgentTask class to run the given SpecialTask and add it 477 Construct an AgentTask class to run the given SpecialTask and add it
899 to this dispatcher. 478 to this dispatcher.
900 @param special_task: a models.SpecialTask instance 479 @param special_task: a models.SpecialTask instance
901 @returns an AgentTask to run this SpecialTask 480 @returns an AgentTask to run this SpecialTask
902 """ 481 """
903 self._assert_host_has_no_agent(special_task) 482 self._assert_host_has_no_agent(special_task)
904 483
905 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask) 484 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
906 for agent_task_class in special_agent_task_classes: 485 for agent_task_class in special_agent_task_classes:
907 if agent_task_class.TASK_TYPE == special_task.task: 486 if agent_task_class.TASK_TYPE == special_task.task:
908 return agent_task_class(task=special_task) 487 return agent_task_class(task=special_task)
909 488
910 raise SchedulerError('No AgentTask class for task', str(special_task)) 489 raise host_scheduler.SchedulerError(
490 'No AgentTask class for task', str(special_task))
911 491
912 492
913 def _register_pidfiles(self, agent_tasks): 493 def _register_pidfiles(self, agent_tasks):
914 for agent_task in agent_tasks: 494 for agent_task in agent_tasks:
915 agent_task.register_necessary_pidfiles() 495 agent_task.register_necessary_pidfiles()
916 496
917 497
918 def _recover_tasks(self, agent_tasks): 498 def _recover_tasks(self, agent_tasks):
919 orphans = _drone_manager.get_orphaned_autoserv_processes() 499 orphans = _drone_manager.get_orphaned_autoserv_processes()
920 500
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
965 special_tasks = models.SpecialTask.objects.filter( 545 special_tasks = models.SpecialTask.objects.filter(
966 task__in=(models.SpecialTask.Task.CLEANUP, 546 task__in=(models.SpecialTask.Task.CLEANUP,
967 models.SpecialTask.Task.VERIFY), 547 models.SpecialTask.Task.VERIFY),
968 queue_entry__id=queue_entry.id, 548 queue_entry__id=queue_entry.id,
969 is_complete=False) 549 is_complete=False)
970 if special_tasks.count() == 0: 550 if special_tasks.count() == 0:
971 unrecovered_hqes.append(queue_entry) 551 unrecovered_hqes.append(queue_entry)
972 552
973 if unrecovered_hqes: 553 if unrecovered_hqes:
974 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes) 554 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
975 raise SchedulerError( 555 raise host_scheduler.SchedulerError(
976 '%d unrecovered verifying host queue entries:\n%s' % 556 '%d unrecovered verifying host queue entries:\n%s' %
977 (len(unrecovered_hqes), message)) 557 (len(unrecovered_hqes), message))
978 558
979 559
980 def _get_prioritized_special_tasks(self): 560 def _get_prioritized_special_tasks(self):
981 """ 561 """
982 Returns all queued SpecialTasks prioritized for repair first, then 562 Returns all queued SpecialTasks prioritized for repair first, then
983 cleanup, then verify. 563 cleanup, then verify.
984 """ 564 """
985 queued_tasks = models.SpecialTask.objects.filter(is_active=False, 565 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
(...skipping 788 matching lines...) Expand 10 before | Expand all | Expand 10 after
1774 logging.info('Recovering process %s for %s at %s' 1354 logging.info('Recovering process %s for %s at %s'
1775 % (self.monitor.get_process(), type(self).__name__, 1355 % (self.monitor.get_process(), type(self).__name__,
1776 self._working_directory())) 1356 self._working_directory()))
1777 1357
1778 1358
1779 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses, 1359 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1780 allowed_host_statuses=None): 1360 allowed_host_statuses=None):
1781 class_name = self.__class__.__name__ 1361 class_name = self.__class__.__name__
1782 for entry in queue_entries: 1362 for entry in queue_entries:
1783 if entry.status not in allowed_hqe_statuses: 1363 if entry.status not in allowed_hqe_statuses:
1784 raise SchedulerError('%s attempting to start ' 1364 raise host_scheduler.SchedulerError(
1785 'entry with invalid status %s: %s' 1365 '%s attempting to start entry with invalid status %s: '
1786 % (class_name, entry.status, entry)) 1366 '%s' % (class_name, entry.status, entry))
1787 invalid_host_status = ( 1367 invalid_host_status = (
1788 allowed_host_statuses is not None 1368 allowed_host_statuses is not None
1789 and entry.host.status not in allowed_host_statuses) 1369 and entry.host.status not in allowed_host_statuses)
1790 if invalid_host_status: 1370 if invalid_host_status:
1791 raise SchedulerError('%s attempting to start on queue ' 1371 raise host_scheduler.SchedulerError(
1792 'entry with invalid host status %s: %s' 1372 '%s attempting to start on queue entry with invalid '
1793 % (class_name, entry.host.status, entry)) 1373 'host status %s: %s'
1374 % (class_name, entry.host.status, entry))
1794 1375
1795 1376
1796 class TaskWithJobKeyvals(object): 1377 class TaskWithJobKeyvals(object):
1797 """AgentTask mixin providing functionality to help with job keyval files.""" 1378 """AgentTask mixin providing functionality to help with job keyval files."""
1798 _KEYVAL_FILE = 'keyval' 1379 _KEYVAL_FILE = 'keyval'
1799 def _format_keyval(self, key, value): 1380 def _format_keyval(self, key, value):
1800 return '%s=%s' % (key, value) 1381 return '%s=%s' % (key, value)
1801 1382
1802 1383
1803 def _keyval_path(self): 1384 def _keyval_path(self):
(...skipping 805 matching lines...) Expand 10 before | Expand all | Expand 10 after
2609 paired_process = self._paired_with_monitor().get_process() 2190 paired_process = self._paired_with_monitor().get_process()
2610 _drone_manager.write_lines_to_file( 2191 _drone_manager.write_lines_to_file(
2611 failed_file, ['Archiving failed with exit code %s' 2192 failed_file, ['Archiving failed with exit code %s'
2612 % self.monitor.exit_code()], 2193 % self.monitor.exit_code()],
2613 paired_with_process=paired_process) 2194 paired_with_process=paired_process)
2614 self._set_all_statuses(self._final_status()) 2195 self._set_all_statuses(self._final_status())
2615 2196
2616 2197
2617 if __name__ == '__main__': 2198 if __name__ == '__main__':
2618 main() 2199 main()
OLDNEW
« no previous file with comments | « scheduler/host_scheduler.py ('k') | scheduler/monitor_db_functional_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698