OLD | NEW |
1 #!/usr/bin/python -u | 1 #!/usr/bin/python -u |
2 | 2 |
3 """ | 3 """ |
4 Autotest scheduler | 4 Autotest scheduler |
5 """ | 5 """ |
6 | 6 |
7 | 7 |
8 import common | 8 import common |
9 import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal | 9 import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal |
10 import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback, urllib | 10 import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback, urllib |
11 import itertools, logging, weakref, gc | 11 import itertools, logging, weakref, gc |
12 | 12 |
13 import MySQLdb | 13 import MySQLdb |
14 | 14 |
15 from autotest_lib.scheduler import scheduler_logging_config | 15 from autotest_lib.scheduler import scheduler_logging_config |
16 from autotest_lib.frontend import setup_django_environment | 16 from autotest_lib.frontend import setup_django_environment |
17 | 17 |
18 import django.db | 18 import django.db |
19 | 19 |
20 from autotest_lib.client.common_lib import global_config, logging_manager | 20 from autotest_lib.client.common_lib import global_config, logging_manager |
21 from autotest_lib.client.common_lib import host_protections, utils | 21 from autotest_lib.client.common_lib import host_protections, utils |
22 from autotest_lib.database import database_connection | 22 from autotest_lib.database import database_connection |
23 from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection | 23 from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection |
24 from autotest_lib.frontend.afe import model_attributes | 24 from autotest_lib.frontend.afe import model_attributes |
25 from autotest_lib.scheduler import drone_manager, drones, email_manager | 25 from autotest_lib.scheduler import drone_manager, drones, email_manager |
26 from autotest_lib.scheduler import monitor_db_cleanup | 26 from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup |
27 from autotest_lib.scheduler import status_server, scheduler_config | 27 from autotest_lib.scheduler import status_server, scheduler_config |
28 from autotest_lib.scheduler import gc_stats, metahost_scheduler | |
29 from autotest_lib.scheduler import scheduler_models | 28 from autotest_lib.scheduler import scheduler_models |
30 BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter' | 29 BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter' |
31 PID_FILE_PREFIX = 'monitor_db' | 30 PID_FILE_PREFIX = 'monitor_db' |
32 | 31 |
33 RESULTS_DIR = '.' | 32 RESULTS_DIR = '.' |
34 AUTOSERV_NICE_LEVEL = 10 | 33 AUTOSERV_NICE_LEVEL = 10 |
35 DB_CONFIG_SECTION = 'AUTOTEST_WEB' | 34 DB_CONFIG_SECTION = 'AUTOTEST_WEB' |
36 AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..') | 35 AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..') |
37 | 36 |
38 if os.environ.has_key('AUTOTEST_DIR'): | 37 if os.environ.has_key('AUTOTEST_DIR'): |
(...skipping 29 matching lines...) Expand all Loading... |
68 """@returns How long to wait for autoserv to write pidfile.""" | 67 """@returns How long to wait for autoserv to write pidfile.""" |
69 pidfile_timeout_mins = global_config.global_config.get_config_value( | 68 pidfile_timeout_mins = global_config.global_config.get_config_value( |
70 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int) | 69 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int) |
71 return pidfile_timeout_mins * 60 | 70 return pidfile_timeout_mins * 60 |
72 | 71 |
73 | 72 |
74 def _site_init_monitor_db_dummy(): | 73 def _site_init_monitor_db_dummy(): |
75 return {} | 74 return {} |
76 | 75 |
77 | 76 |
78 get_site_metahost_schedulers = utils.import_site_function( | |
79 __file__, 'autotest_lib.scheduler.site_metahost_scheduler', | |
80 'get_metahost_schedulers', lambda : ()) | |
81 | |
82 | |
83 def _verify_default_drone_set_exists(): | 77 def _verify_default_drone_set_exists(): |
84 if (models.DroneSet.drone_sets_enabled() and | 78 if (models.DroneSet.drone_sets_enabled() and |
85 not models.DroneSet.default_drone_set_name()): | 79 not models.DroneSet.default_drone_set_name()): |
86 raise SchedulerError('Drone sets are enabled, but no default is set') | 80 raise host_scheduler.SchedulerError( |
| 81 'Drone sets are enabled, but no default is set') |
87 | 82 |
88 | 83 |
89 def _sanity_check(): | 84 def _sanity_check(): |
90 """Make sure the configs are consistent before starting the scheduler""" | 85 """Make sure the configs are consistent before starting the scheduler""" |
91 _verify_default_drone_set_exists() | 86 _verify_default_drone_set_exists() |
92 | 87 |
93 | 88 |
94 def main(): | 89 def main(): |
95 try: | 90 try: |
96 try: | 91 try: |
(...skipping 154 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
251 if not job: | 246 if not job: |
252 job = queue_entry.job | 247 job = queue_entry.job |
253 autoserv_argv += ['-u', job.owner, '-l', job.name] | 248 autoserv_argv += ['-u', job.owner, '-l', job.name] |
254 if job.is_image_update_job(): | 249 if job.is_image_update_job(): |
255 autoserv_argv += ['--image', job.update_image_path] | 250 autoserv_argv += ['--image', job.update_image_path] |
256 if verbose: | 251 if verbose: |
257 autoserv_argv.append('--verbose') | 252 autoserv_argv.append('--verbose') |
258 return autoserv_argv + extra_args | 253 return autoserv_argv + extra_args |
259 | 254 |
260 | 255 |
261 class SchedulerError(Exception): | |
262 """Raised by HostScheduler when an inconsistent state occurs.""" | |
263 | |
264 | |
265 class BaseHostScheduler(metahost_scheduler.HostSchedulingUtility): | |
266 """Handles the logic for choosing when to run jobs and on which hosts. | |
267 | |
268 This class makes several queries to the database on each tick, building up | |
269 some auxiliary data structures and using them to determine which hosts are | |
270 eligible to run which jobs, taking into account all the various factors that | |
271 affect that. | |
272 | |
273 In the past this was done with one or two very large, complex database | |
274 queries. It has proven much simpler and faster to build these auxiliary | |
275 data structures and perform the logic in Python. | |
276 """ | |
277 def __init__(self): | |
278 self._metahost_schedulers = metahost_scheduler.get_metahost_schedulers() | |
279 | |
280 # load site-specific scheduler selected in global_config | |
281 site_schedulers_str = global_config.global_config.get_config_value( | |
282 scheduler_config.CONFIG_SECTION, 'site_metahost_schedulers', | |
283 default='') | |
284 site_schedulers = set(site_schedulers_str.split(',')) | |
285 for scheduler in get_site_metahost_schedulers(): | |
286 if type(scheduler).__name__ in site_schedulers: | |
287 # always prepend, so site schedulers take precedence | |
288 self._metahost_schedulers = ( | |
289 [scheduler] + self._metahost_schedulers) | |
290 logging.info('Metahost schedulers: %s', | |
291 ', '.join(type(scheduler).__name__ for scheduler | |
292 in self._metahost_schedulers)) | |
293 | |
294 | |
295 def _get_ready_hosts(self): | |
296 # avoid any host with a currently active queue entry against it | |
297 hosts = scheduler_models.Host.fetch( | |
298 joins='LEFT JOIN afe_host_queue_entries AS active_hqe ' | |
299 'ON (afe_hosts.id = active_hqe.host_id AND ' | |
300 'active_hqe.active)', | |
301 where="active_hqe.host_id IS NULL " | |
302 "AND NOT afe_hosts.locked " | |
303 "AND (afe_hosts.status IS NULL " | |
304 "OR afe_hosts.status = 'Ready')") | |
305 return dict((host.id, host) for host in hosts) | |
306 | |
307 | |
308 @staticmethod | |
309 def _get_sql_id_list(id_list): | |
310 return ','.join(str(item_id) for item_id in id_list) | |
311 | |
312 | |
313 @classmethod | |
314 def _get_many2many_dict(cls, query, id_list, flip=False): | |
315 if not id_list: | |
316 return {} | |
317 query %= cls._get_sql_id_list(id_list) | |
318 rows = _db.execute(query) | |
319 return cls._process_many2many_dict(rows, flip) | |
320 | |
321 | |
322 @staticmethod | |
323 def _process_many2many_dict(rows, flip=False): | |
324 result = {} | |
325 for row in rows: | |
326 left_id, right_id = int(row[0]), int(row[1]) | |
327 if flip: | |
328 left_id, right_id = right_id, left_id | |
329 result.setdefault(left_id, set()).add(right_id) | |
330 return result | |
331 | |
332 | |
333 @classmethod | |
334 def _get_job_acl_groups(cls, job_ids): | |
335 query = """ | |
336 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id | |
337 FROM afe_jobs | |
338 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner | |
339 INNER JOIN afe_acl_groups_users ON | |
340 afe_acl_groups_users.user_id = afe_users.id | |
341 WHERE afe_jobs.id IN (%s) | |
342 """ | |
343 return cls._get_many2many_dict(query, job_ids) | |
344 | |
345 | |
346 @classmethod | |
347 def _get_job_ineligible_hosts(cls, job_ids): | |
348 query = """ | |
349 SELECT job_id, host_id | |
350 FROM afe_ineligible_host_queues | |
351 WHERE job_id IN (%s) | |
352 """ | |
353 return cls._get_many2many_dict(query, job_ids) | |
354 | |
355 | |
356 @classmethod | |
357 def _get_job_dependencies(cls, job_ids): | |
358 query = """ | |
359 SELECT job_id, label_id | |
360 FROM afe_jobs_dependency_labels | |
361 WHERE job_id IN (%s) | |
362 """ | |
363 return cls._get_many2many_dict(query, job_ids) | |
364 | |
365 | |
366 @classmethod | |
367 def _get_host_acls(cls, host_ids): | |
368 query = """ | |
369 SELECT host_id, aclgroup_id | |
370 FROM afe_acl_groups_hosts | |
371 WHERE host_id IN (%s) | |
372 """ | |
373 return cls._get_many2many_dict(query, host_ids) | |
374 | |
375 | |
376 @classmethod | |
377 def _get_label_hosts(cls, host_ids): | |
378 if not host_ids: | |
379 return {}, {} | |
380 query = """ | |
381 SELECT label_id, host_id | |
382 FROM afe_hosts_labels | |
383 WHERE host_id IN (%s) | |
384 """ % cls._get_sql_id_list(host_ids) | |
385 rows = _db.execute(query) | |
386 labels_to_hosts = cls._process_many2many_dict(rows) | |
387 hosts_to_labels = cls._process_many2many_dict(rows, flip=True) | |
388 return labels_to_hosts, hosts_to_labels | |
389 | |
390 | |
391 @classmethod | |
392 def _get_labels(cls): | |
393 return dict((label.id, label) for label | |
394 in scheduler_models.Label.fetch()) | |
395 | |
396 | |
397 def recovery_on_startup(self): | |
398 for metahost_scheduler in self._metahost_schedulers: | |
399 metahost_scheduler.recovery_on_startup() | |
400 | |
401 | |
402 def refresh(self, pending_queue_entries): | |
403 self._hosts_available = self._get_ready_hosts() | |
404 | |
405 relevant_jobs = [queue_entry.job_id | |
406 for queue_entry in pending_queue_entries] | |
407 self._job_acls = self._get_job_acl_groups(relevant_jobs) | |
408 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs) | |
409 self._job_dependencies = self._get_job_dependencies(relevant_jobs) | |
410 | |
411 host_ids = self._hosts_available.keys() | |
412 self._host_acls = self._get_host_acls(host_ids) | |
413 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids) | |
414 | |
415 self._labels = self._get_labels() | |
416 | |
417 | |
418 def tick(self): | |
419 for metahost_scheduler in self._metahost_schedulers: | |
420 metahost_scheduler.tick() | |
421 | |
422 | |
423 def hosts_in_label(self, label_id): | |
424 return set(self._label_hosts.get(label_id, ())) | |
425 | |
426 | |
427 def remove_host_from_label(self, host_id, label_id): | |
428 self._label_hosts[label_id].remove(host_id) | |
429 | |
430 | |
431 def pop_host(self, host_id): | |
432 return self._hosts_available.pop(host_id) | |
433 | |
434 | |
435 def ineligible_hosts_for_entry(self, queue_entry): | |
436 return set(self._ineligible_hosts.get(queue_entry.job_id, ())) | |
437 | |
438 | |
439 def _is_acl_accessible(self, host_id, queue_entry): | |
440 job_acls = self._job_acls.get(queue_entry.job_id, set()) | |
441 host_acls = self._host_acls.get(host_id, set()) | |
442 return len(host_acls.intersection(job_acls)) > 0 | |
443 | |
444 | |
445 def _check_job_dependencies(self, job_dependencies, host_labels): | |
446 missing = job_dependencies - host_labels | |
447 return len(missing) == 0 | |
448 | |
449 | |
450 def _check_only_if_needed_labels(self, job_dependencies, host_labels, | |
451 queue_entry): | |
452 if not queue_entry.meta_host: | |
453 # bypass only_if_needed labels when a specific host is selected | |
454 return True | |
455 | |
456 for label_id in host_labels: | |
457 label = self._labels[label_id] | |
458 if not label.only_if_needed: | |
459 # we don't care about non-only_if_needed labels | |
460 continue | |
461 if queue_entry.meta_host == label_id: | |
462 # if the label was requested in a metahost it's OK | |
463 continue | |
464 if label_id not in job_dependencies: | |
465 return False | |
466 return True | |
467 | |
468 | |
469 def _check_atomic_group_labels(self, host_labels, queue_entry): | |
470 """ | |
471 Determine if the given HostQueueEntry's atomic group settings are okay | |
472 to schedule on a host with the given labels. | |
473 | |
474 @param host_labels: A list of label ids that the host has. | |
475 @param queue_entry: The HostQueueEntry being considered for the host. | |
476 | |
477 @returns True if atomic group settings are okay, False otherwise. | |
478 """ | |
479 return (self._get_host_atomic_group_id(host_labels, queue_entry) == | |
480 queue_entry.atomic_group_id) | |
481 | |
482 | |
483 def _get_host_atomic_group_id(self, host_labels, queue_entry=None): | |
484 """ | |
485 Return the atomic group label id for a host with the given set of | |
486 labels if any, or None otherwise. Raises an exception if more than | |
487 one atomic group are found in the set of labels. | |
488 | |
489 @param host_labels: A list of label ids that the host has. | |
490 @param queue_entry: The HostQueueEntry we're testing. Only used for | |
491 extra info in a potential logged error message. | |
492 | |
493 @returns The id of the atomic group found on a label in host_labels | |
494 or None if no atomic group label is found. | |
495 """ | |
496 atomic_labels = [self._labels[label_id] for label_id in host_labels | |
497 if self._labels[label_id].atomic_group_id is not None] | |
498 atomic_ids = set(label.atomic_group_id for label in atomic_labels) | |
499 if not atomic_ids: | |
500 return None | |
501 if len(atomic_ids) > 1: | |
502 logging.error('More than one Atomic Group on HQE "%s" via: %r', | |
503 queue_entry, atomic_labels) | |
504 return atomic_ids.pop() | |
505 | |
506 | |
507 def _get_atomic_group_labels(self, atomic_group_id): | |
508 """ | |
509 Lookup the label ids that an atomic_group is associated with. | |
510 | |
511 @param atomic_group_id - The id of the AtomicGroup to look up. | |
512 | |
513 @returns A generator yeilding Label ids for this atomic group. | |
514 """ | |
515 return (id for id, label in self._labels.iteritems() | |
516 if label.atomic_group_id == atomic_group_id | |
517 and not label.invalid) | |
518 | |
519 | |
520 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry): | |
521 """ | |
522 @param group_hosts - A sequence of Host ids to test for usability | |
523 and eligibility against the Job associated with queue_entry. | |
524 @param queue_entry - The HostQueueEntry that these hosts are being | |
525 tested for eligibility against. | |
526 | |
527 @returns A subset of group_hosts Host ids that are eligible for the | |
528 supplied queue_entry. | |
529 """ | |
530 return set(host_id for host_id in group_hosts | |
531 if self.is_host_usable(host_id) | |
532 and self.is_host_eligible_for_job(host_id, queue_entry)) | |
533 | |
534 | |
535 def is_host_eligible_for_job(self, host_id, queue_entry): | |
536 if self._is_host_invalid(host_id): | |
537 # if an invalid host is scheduled for a job, it's a one-time host | |
538 # and it therefore bypasses eligibility checks. note this can only | |
539 # happen for non-metahosts, because invalid hosts have their label | |
540 # relationships cleared. | |
541 return True | |
542 | |
543 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set()) | |
544 host_labels = self._host_labels.get(host_id, set()) | |
545 | |
546 return (self._is_acl_accessible(host_id, queue_entry) and | |
547 self._check_job_dependencies(job_dependencies, host_labels) and | |
548 self._check_only_if_needed_labels( | |
549 job_dependencies, host_labels, queue_entry) and | |
550 self._check_atomic_group_labels(host_labels, queue_entry)) | |
551 | |
552 | |
553 def _is_host_invalid(self, host_id): | |
554 host_object = self._hosts_available.get(host_id, None) | |
555 return host_object and host_object.invalid | |
556 | |
557 | |
558 def _schedule_non_metahost(self, queue_entry): | |
559 if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry): | |
560 return None | |
561 return self._hosts_available.pop(queue_entry.host_id, None) | |
562 | |
563 | |
564 def is_host_usable(self, host_id): | |
565 if host_id not in self._hosts_available: | |
566 # host was already used during this scheduling cycle | |
567 return False | |
568 if self._hosts_available[host_id].invalid: | |
569 # Invalid hosts cannot be used for metahosts. They're included in | |
570 # the original query because they can be used by non-metahosts. | |
571 return False | |
572 return True | |
573 | |
574 | |
575 def schedule_entry(self, queue_entry): | |
576 if queue_entry.host_id is not None: | |
577 return self._schedule_non_metahost(queue_entry) | |
578 | |
579 for scheduler in self._metahost_schedulers: | |
580 if scheduler.can_schedule_metahost(queue_entry): | |
581 scheduler.schedule_metahost(queue_entry, self) | |
582 return None | |
583 | |
584 raise SchedulerError('No metahost scheduler to handle %s' % queue_entry) | |
585 | |
586 | |
587 def find_eligible_atomic_group(self, queue_entry): | |
588 """ | |
589 Given an atomic group host queue entry, locate an appropriate group | |
590 of hosts for the associated job to run on. | |
591 | |
592 The caller is responsible for creating new HQEs for the additional | |
593 hosts returned in order to run the actual job on them. | |
594 | |
595 @returns A list of Host instances in a ready state to satisfy this | |
596 atomic group scheduling. Hosts will all belong to the same | |
597 atomic group label as specified by the queue_entry. | |
598 An empty list will be returned if no suitable atomic | |
599 group could be found. | |
600 | |
601 TODO(gps): what is responsible for kicking off any attempted repairs on | |
602 a group of hosts? not this function, but something needs to. We do | |
603 not communicate that reason for returning [] outside of here... | |
604 For now, we'll just be unschedulable if enough hosts within one group | |
605 enter Repair Failed state. | |
606 """ | |
607 assert queue_entry.atomic_group_id is not None | |
608 job = queue_entry.job | |
609 assert job.synch_count and job.synch_count > 0 | |
610 atomic_group = queue_entry.atomic_group | |
611 if job.synch_count > atomic_group.max_number_of_machines: | |
612 # Such a Job and HostQueueEntry should never be possible to | |
613 # create using the frontend. Regardless, we can't process it. | |
614 # Abort it immediately and log an error on the scheduler. | |
615 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED) | |
616 logging.error( | |
617 'Error: job %d synch_count=%d > requested atomic_group %d ' | |
618 'max_number_of_machines=%d. Aborted host_queue_entry %d.', | |
619 job.id, job.synch_count, atomic_group.id, | |
620 atomic_group.max_number_of_machines, queue_entry.id) | |
621 return [] | |
622 hosts_in_label = self.hosts_in_label(queue_entry.meta_host) | |
623 ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry) | |
624 | |
625 # Look in each label associated with atomic_group until we find one with | |
626 # enough hosts to satisfy the job. | |
627 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id): | |
628 group_hosts = set(self.hosts_in_label(atomic_label_id)) | |
629 if queue_entry.meta_host is not None: | |
630 # If we have a metahost label, only allow its hosts. | |
631 group_hosts.intersection_update(hosts_in_label) | |
632 group_hosts -= ineligible_host_ids | |
633 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group( | |
634 group_hosts, queue_entry) | |
635 | |
636 # Job.synch_count is treated as "minimum synch count" when | |
637 # scheduling for an atomic group of hosts. The atomic group | |
638 # number of machines is the maximum to pick out of a single | |
639 # atomic group label for scheduling at one time. | |
640 min_hosts = job.synch_count | |
641 max_hosts = atomic_group.max_number_of_machines | |
642 | |
643 if len(eligible_host_ids_in_group) < min_hosts: | |
644 # Not enough eligible hosts in this atomic group label. | |
645 continue | |
646 | |
647 eligible_hosts_in_group = [self._hosts_available[id] | |
648 for id in eligible_host_ids_in_group] | |
649 # So that they show up in a sane order when viewing the job. | |
650 eligible_hosts_in_group.sort(cmp=scheduler_models.Host.cmp_for_sort) | |
651 | |
652 # Limit ourselves to scheduling the atomic group size. | |
653 if len(eligible_hosts_in_group) > max_hosts: | |
654 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts] | |
655 | |
656 # Remove the selected hosts from our cached internal state | |
657 # of available hosts in order to return the Host objects. | |
658 host_list = [] | |
659 for host in eligible_hosts_in_group: | |
660 hosts_in_label.discard(host.id) | |
661 self._hosts_available.pop(host.id) | |
662 host_list.append(host) | |
663 return host_list | |
664 | |
665 return [] | |
666 | |
667 | |
668 site_host_scheduler = utils.import_site_class(__file__, | |
669 "autotest_lib.scheduler.site_host_scheduler", | |
670 "site_host_scheduler", BaseHostScheduler) | |
671 | |
672 | |
673 class HostScheduler(site_host_scheduler): | |
674 pass | |
675 | |
676 | |
677 class Dispatcher(object): | 256 class Dispatcher(object): |
678 def __init__(self): | 257 def __init__(self): |
679 self._agents = [] | 258 self._agents = [] |
680 self._last_clean_time = time.time() | 259 self._last_clean_time = time.time() |
681 self._host_scheduler = HostScheduler() | 260 self._host_scheduler = host_scheduler.HostScheduler(_db) |
682 user_cleanup_time = scheduler_config.config.clean_interval | 261 user_cleanup_time = scheduler_config.config.clean_interval |
683 self._periodic_cleanup = monitor_db_cleanup.UserCleanup( | 262 self._periodic_cleanup = monitor_db_cleanup.UserCleanup( |
684 _db, user_cleanup_time) | 263 _db, user_cleanup_time) |
685 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db) | 264 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db) |
686 self._host_agents = {} | 265 self._host_agents = {} |
687 self._queue_entry_agents = {} | 266 self._queue_entry_agents = {} |
688 self._tick_count = 0 | 267 self._tick_count = 0 |
689 self._last_garbage_stats_time = time.time() | 268 self._last_garbage_stats_time = time.time() |
690 self._seconds_between_garbage_stats = 60 * ( | 269 self._seconds_between_garbage_stats = 60 * ( |
691 global_config.global_config.get_config_value( | 270 global_config.global_config.get_config_value( |
(...skipping 168 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
860 if queue_entry.is_hostless(): | 439 if queue_entry.is_hostless(): |
861 return HostlessQueueTask(queue_entry=queue_entry) | 440 return HostlessQueueTask(queue_entry=queue_entry) |
862 return QueueTask(queue_entries=task_entries) | 441 return QueueTask(queue_entries=task_entries) |
863 if queue_entry.status == models.HostQueueEntry.Status.GATHERING: | 442 if queue_entry.status == models.HostQueueEntry.Status.GATHERING: |
864 return GatherLogsTask(queue_entries=task_entries) | 443 return GatherLogsTask(queue_entries=task_entries) |
865 if queue_entry.status == models.HostQueueEntry.Status.PARSING: | 444 if queue_entry.status == models.HostQueueEntry.Status.PARSING: |
866 return FinalReparseTask(queue_entries=task_entries) | 445 return FinalReparseTask(queue_entries=task_entries) |
867 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING: | 446 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING: |
868 return ArchiveResultsTask(queue_entries=task_entries) | 447 return ArchiveResultsTask(queue_entries=task_entries) |
869 | 448 |
870 raise SchedulerError('_get_agent_task_for_queue_entry got entry with ' | 449 raise host_scheduler.SchedulerError( |
871 'invalid status %s: %s' | 450 '_get_agent_task_for_queue_entry got entry with ' |
872 % (queue_entry.status, queue_entry)) | 451 'invalid status %s: %s' % (queue_entry.status, queue_entry)) |
873 | 452 |
874 | 453 |
875 def _check_for_duplicate_host_entries(self, task_entries): | 454 def _check_for_duplicate_host_entries(self, task_entries): |
876 non_host_statuses = (models.HostQueueEntry.Status.PARSING, | 455 non_host_statuses = (models.HostQueueEntry.Status.PARSING, |
877 models.HostQueueEntry.Status.ARCHIVING) | 456 models.HostQueueEntry.Status.ARCHIVING) |
878 for task_entry in task_entries: | 457 for task_entry in task_entries: |
879 using_host = (task_entry.host is not None | 458 using_host = (task_entry.host is not None |
880 and task_entry.status not in non_host_statuses) | 459 and task_entry.status not in non_host_statuses) |
881 if using_host: | 460 if using_host: |
882 self._assert_host_has_no_agent(task_entry) | 461 self._assert_host_has_no_agent(task_entry) |
883 | 462 |
884 | 463 |
885 def _assert_host_has_no_agent(self, entry): | 464 def _assert_host_has_no_agent(self, entry): |
886 """ | 465 """ |
887 @param entry: a HostQueueEntry or a SpecialTask | 466 @param entry: a HostQueueEntry or a SpecialTask |
888 """ | 467 """ |
889 if self.host_has_agent(entry.host): | 468 if self.host_has_agent(entry.host): |
890 agent = tuple(self._host_agents.get(entry.host.id))[0] | 469 agent = tuple(self._host_agents.get(entry.host.id))[0] |
891 raise SchedulerError( | 470 raise host_scheduler.SchedulerError( |
892 'While scheduling %s, host %s already has a host agent %s' | 471 'While scheduling %s, host %s already has a host agent %s' |
893 % (entry, entry.host, agent.task)) | 472 % (entry, entry.host, agent.task)) |
894 | 473 |
895 | 474 |
896 def _get_agent_task_for_special_task(self, special_task): | 475 def _get_agent_task_for_special_task(self, special_task): |
897 """ | 476 """ |
898 Construct an AgentTask class to run the given SpecialTask and add it | 477 Construct an AgentTask class to run the given SpecialTask and add it |
899 to this dispatcher. | 478 to this dispatcher. |
900 @param special_task: a models.SpecialTask instance | 479 @param special_task: a models.SpecialTask instance |
901 @returns an AgentTask to run this SpecialTask | 480 @returns an AgentTask to run this SpecialTask |
902 """ | 481 """ |
903 self._assert_host_has_no_agent(special_task) | 482 self._assert_host_has_no_agent(special_task) |
904 | 483 |
905 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask) | 484 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask) |
906 for agent_task_class in special_agent_task_classes: | 485 for agent_task_class in special_agent_task_classes: |
907 if agent_task_class.TASK_TYPE == special_task.task: | 486 if agent_task_class.TASK_TYPE == special_task.task: |
908 return agent_task_class(task=special_task) | 487 return agent_task_class(task=special_task) |
909 | 488 |
910 raise SchedulerError('No AgentTask class for task', str(special_task)) | 489 raise host_scheduler.SchedulerError( |
| 490 'No AgentTask class for task', str(special_task)) |
911 | 491 |
912 | 492 |
913 def _register_pidfiles(self, agent_tasks): | 493 def _register_pidfiles(self, agent_tasks): |
914 for agent_task in agent_tasks: | 494 for agent_task in agent_tasks: |
915 agent_task.register_necessary_pidfiles() | 495 agent_task.register_necessary_pidfiles() |
916 | 496 |
917 | 497 |
918 def _recover_tasks(self, agent_tasks): | 498 def _recover_tasks(self, agent_tasks): |
919 orphans = _drone_manager.get_orphaned_autoserv_processes() | 499 orphans = _drone_manager.get_orphaned_autoserv_processes() |
920 | 500 |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
965 special_tasks = models.SpecialTask.objects.filter( | 545 special_tasks = models.SpecialTask.objects.filter( |
966 task__in=(models.SpecialTask.Task.CLEANUP, | 546 task__in=(models.SpecialTask.Task.CLEANUP, |
967 models.SpecialTask.Task.VERIFY), | 547 models.SpecialTask.Task.VERIFY), |
968 queue_entry__id=queue_entry.id, | 548 queue_entry__id=queue_entry.id, |
969 is_complete=False) | 549 is_complete=False) |
970 if special_tasks.count() == 0: | 550 if special_tasks.count() == 0: |
971 unrecovered_hqes.append(queue_entry) | 551 unrecovered_hqes.append(queue_entry) |
972 | 552 |
973 if unrecovered_hqes: | 553 if unrecovered_hqes: |
974 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes) | 554 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes) |
975 raise SchedulerError( | 555 raise host_scheduler.SchedulerError( |
976 '%d unrecovered verifying host queue entries:\n%s' % | 556 '%d unrecovered verifying host queue entries:\n%s' % |
977 (len(unrecovered_hqes), message)) | 557 (len(unrecovered_hqes), message)) |
978 | 558 |
979 | 559 |
980 def _get_prioritized_special_tasks(self): | 560 def _get_prioritized_special_tasks(self): |
981 """ | 561 """ |
982 Returns all queued SpecialTasks prioritized for repair first, then | 562 Returns all queued SpecialTasks prioritized for repair first, then |
983 cleanup, then verify. | 563 cleanup, then verify. |
984 """ | 564 """ |
985 queued_tasks = models.SpecialTask.objects.filter(is_active=False, | 565 queued_tasks = models.SpecialTask.objects.filter(is_active=False, |
(...skipping 788 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1774 logging.info('Recovering process %s for %s at %s' | 1354 logging.info('Recovering process %s for %s at %s' |
1775 % (self.monitor.get_process(), type(self).__name__, | 1355 % (self.monitor.get_process(), type(self).__name__, |
1776 self._working_directory())) | 1356 self._working_directory())) |
1777 | 1357 |
1778 | 1358 |
1779 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses, | 1359 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses, |
1780 allowed_host_statuses=None): | 1360 allowed_host_statuses=None): |
1781 class_name = self.__class__.__name__ | 1361 class_name = self.__class__.__name__ |
1782 for entry in queue_entries: | 1362 for entry in queue_entries: |
1783 if entry.status not in allowed_hqe_statuses: | 1363 if entry.status not in allowed_hqe_statuses: |
1784 raise SchedulerError('%s attempting to start ' | 1364 raise host_scheduler.SchedulerError( |
1785 'entry with invalid status %s: %s' | 1365 '%s attempting to start entry with invalid status %s: ' |
1786 % (class_name, entry.status, entry)) | 1366 '%s' % (class_name, entry.status, entry)) |
1787 invalid_host_status = ( | 1367 invalid_host_status = ( |
1788 allowed_host_statuses is not None | 1368 allowed_host_statuses is not None |
1789 and entry.host.status not in allowed_host_statuses) | 1369 and entry.host.status not in allowed_host_statuses) |
1790 if invalid_host_status: | 1370 if invalid_host_status: |
1791 raise SchedulerError('%s attempting to start on queue ' | 1371 raise host_scheduler.SchedulerError( |
1792 'entry with invalid host status %s: %s' | 1372 '%s attempting to start on queue entry with invalid ' |
1793 % (class_name, entry.host.status, entry)) | 1373 'host status %s: %s' |
| 1374 % (class_name, entry.host.status, entry)) |
1794 | 1375 |
1795 | 1376 |
1796 class TaskWithJobKeyvals(object): | 1377 class TaskWithJobKeyvals(object): |
1797 """AgentTask mixin providing functionality to help with job keyval files.""" | 1378 """AgentTask mixin providing functionality to help with job keyval files.""" |
1798 _KEYVAL_FILE = 'keyval' | 1379 _KEYVAL_FILE = 'keyval' |
1799 def _format_keyval(self, key, value): | 1380 def _format_keyval(self, key, value): |
1800 return '%s=%s' % (key, value) | 1381 return '%s=%s' % (key, value) |
1801 | 1382 |
1802 | 1383 |
1803 def _keyval_path(self): | 1384 def _keyval_path(self): |
(...skipping 805 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
2609 paired_process = self._paired_with_monitor().get_process() | 2190 paired_process = self._paired_with_monitor().get_process() |
2610 _drone_manager.write_lines_to_file( | 2191 _drone_manager.write_lines_to_file( |
2611 failed_file, ['Archiving failed with exit code %s' | 2192 failed_file, ['Archiving failed with exit code %s' |
2612 % self.monitor.exit_code()], | 2193 % self.monitor.exit_code()], |
2613 paired_with_process=paired_process) | 2194 paired_with_process=paired_process) |
2614 self._set_all_statuses(self._final_status()) | 2195 self._set_all_statuses(self._final_status()) |
2615 | 2196 |
2616 | 2197 |
2617 if __name__ == '__main__': | 2198 if __name__ == '__main__': |
2618 main() | 2199 main() |
OLD | NEW |