OLD | NEW |
---|---|
(Empty) | |
1 """ | |
2 Autotest scheduling utility. | |
3 """ | |
4 | |
5 | |
6 import logging | |
7 | |
8 from autotest_lib.client.common_lib import global_config, utils | |
9 from autotest_lib.frontend.afe import models | |
10 from autotest_lib.scheduler import metahost_scheduler, scheduler_config | |
11 from autotest_lib.scheduler import scheduler_models | |
12 | |
13 | |
14 get_site_metahost_schedulers = utils.import_site_function( | |
15 __file__, 'autotest_lib.scheduler.site_metahost_scheduler', | |
16 'get_metahost_schedulers', lambda : ()) | |
17 | |
18 | |
19 class SchedulerError(Exception): | |
20 """Raised by HostScheduler when an inconsistent state occurs.""" | |
21 | |
22 | |
23 class base_host_scheduler(metahost_scheduler.HostSchedulingUtility): | |
ericli
2011/02/28 22:41:16
ClassName uppercase.
DaleCurtis
2011/03/01 00:25:36
Done.
| |
24 """Handles the logic for choosing when to run jobs and on which hosts. | |
25 | |
26 This class makes several queries to the database on each tick, building up | |
27 some auxiliary data structures and using them to determine which hosts are | |
28 eligible to run which jobs, taking into account all the various factors that | |
29 affect that. | |
30 | |
31 In the past this was done with one or two very large, complex database | |
32 queries. It has proven much simpler and faster to build these auxiliary | |
33 data structures and perform the logic in Python. | |
34 """ | |
35 def __init__(self, db): | |
36 self._db = db | |
37 self._metahost_schedulers = metahost_scheduler.get_metahost_schedulers() | |
38 | |
39 # load site-specific scheduler selected in global_config | |
40 site_schedulers_str = global_config.global_config.get_config_value( | |
41 scheduler_config.CONFIG_SECTION, 'site_metahost_schedulers', | |
42 default='') | |
43 site_schedulers = set(site_schedulers_str.split(',')) | |
44 for scheduler in get_site_metahost_schedulers(): | |
45 if type(scheduler).__name__ in site_schedulers: | |
46 # always prepend, so site schedulers take precedence | |
47 self._metahost_schedulers = ( | |
48 [scheduler] + self._metahost_schedulers) | |
49 logging.info('Metahost schedulers: %s', | |
50 ', '.join(type(scheduler).__name__ for scheduler | |
51 in self._metahost_schedulers)) | |
52 | |
53 | |
54 def _get_ready_hosts(self): | |
55 # avoid any host with a currently active queue entry against it | |
56 hosts = scheduler_models.Host.fetch( | |
57 joins='LEFT JOIN afe_host_queue_entries AS active_hqe ' | |
58 'ON (afe_hosts.id = active_hqe.host_id AND ' | |
59 'active_hqe.active)', | |
60 where="active_hqe.host_id IS NULL " | |
61 "AND NOT afe_hosts.locked " | |
62 "AND (afe_hosts.status IS NULL " | |
63 "OR afe_hosts.status = 'Ready')") | |
64 return dict((host.id, host) for host in hosts) | |
65 | |
66 | |
67 def _get_sql_id_list(self, id_list): | |
68 return ','.join(str(item_id) for item_id in id_list) | |
69 | |
70 | |
71 def _get_many2many_dict(self, query, id_list, flip=False): | |
72 if not id_list: | |
73 return {} | |
74 query %= self._get_sql_id_list(id_list) | |
75 rows = self._db.execute(query) | |
76 return self._process_many2many_dict(rows, flip) | |
77 | |
78 | |
79 def _process_many2many_dict(self, rows, flip=False): | |
80 result = {} | |
81 for row in rows: | |
82 left_id, right_id = int(row[0]), int(row[1]) | |
83 if flip: | |
84 left_id, right_id = right_id, left_id | |
85 result.setdefault(left_id, set()).add(right_id) | |
86 return result | |
87 | |
88 | |
89 def _get_job_acl_groups(self, job_ids): | |
90 query = """ | |
91 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id | |
92 FROM afe_jobs | |
93 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner | |
94 INNER JOIN afe_acl_groups_users ON | |
95 afe_acl_groups_users.user_id = afe_users.id | |
96 WHERE afe_jobs.id IN (%s) | |
97 """ | |
98 return self._get_many2many_dict(query, job_ids) | |
99 | |
100 | |
101 def _get_job_ineligible_hosts(self, job_ids): | |
102 query = """ | |
103 SELECT job_id, host_id | |
104 FROM afe_ineligible_host_queues | |
105 WHERE job_id IN (%s) | |
106 """ | |
107 return self._get_many2many_dict(query, job_ids) | |
108 | |
109 | |
110 def _get_job_dependencies(self, job_ids): | |
111 query = """ | |
112 SELECT job_id, label_id | |
113 FROM afe_jobs_dependency_labels | |
114 WHERE job_id IN (%s) | |
115 """ | |
116 return self._get_many2many_dict(query, job_ids) | |
117 | |
118 | |
119 def _get_host_acls(self, host_ids): | |
120 query = """ | |
121 SELECT host_id, aclgroup_id | |
122 FROM afe_acl_groups_hosts | |
123 WHERE host_id IN (%s) | |
124 """ | |
125 return self._get_many2many_dict(query, host_ids) | |
126 | |
127 | |
128 def _get_label_hosts(self, host_ids): | |
129 if not host_ids: | |
130 return {}, {} | |
131 query = """ | |
132 SELECT label_id, host_id | |
133 FROM afe_hosts_labels | |
134 WHERE host_id IN (%s) | |
135 """ % self._get_sql_id_list(host_ids) | |
136 rows = self._db.execute(query) | |
137 labels_to_hosts = self._process_many2many_dict(rows) | |
138 hosts_to_labels = self._process_many2many_dict(rows, flip=True) | |
139 return labels_to_hosts, hosts_to_labels | |
140 | |
141 | |
142 def _get_labels(self): | |
143 return dict((label.id, label) for label | |
144 in scheduler_models.Label.fetch()) | |
145 | |
146 | |
147 def recovery_on_startup(self): | |
148 for metahost_scheduler in self._metahost_schedulers: | |
149 metahost_scheduler.recovery_on_startup() | |
150 | |
151 | |
152 def refresh(self, pending_queue_entries): | |
153 self._hosts_available = self._get_ready_hosts() | |
154 | |
155 relevant_jobs = [queue_entry.job_id | |
156 for queue_entry in pending_queue_entries] | |
157 self._job_acls = self._get_job_acl_groups(relevant_jobs) | |
158 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs) | |
159 self._job_dependencies = self._get_job_dependencies(relevant_jobs) | |
160 | |
161 host_ids = self._hosts_available.keys() | |
162 self._host_acls = self._get_host_acls(host_ids) | |
163 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids) | |
164 | |
165 self._labels = self._get_labels() | |
166 | |
167 | |
168 def tick(self): | |
169 for metahost_scheduler in self._metahost_schedulers: | |
170 metahost_scheduler.tick() | |
171 | |
172 | |
173 def hosts_in_label(self, label_id): | |
174 return set(self._label_hosts.get(label_id, ())) | |
175 | |
176 | |
177 def remove_host_from_label(self, host_id, label_id): | |
178 self._label_hosts[label_id].remove(host_id) | |
179 | |
180 | |
181 def pop_host(self, host_id): | |
182 return self._hosts_available.pop(host_id) | |
183 | |
184 | |
185 def ineligible_hosts_for_entry(self, queue_entry): | |
186 return set(self._ineligible_hosts.get(queue_entry.job_id, ())) | |
187 | |
188 | |
189 def _is_acl_accessible(self, host_id, queue_entry): | |
190 job_acls = self._job_acls.get(queue_entry.job_id, set()) | |
191 host_acls = self._host_acls.get(host_id, set()) | |
192 return len(host_acls.intersection(job_acls)) > 0 | |
193 | |
194 | |
195 def _check_job_dependencies(self, job_dependencies, host_labels): | |
196 missing = job_dependencies - host_labels | |
197 return len(missing) == 0 | |
198 | |
199 | |
200 def _check_only_if_needed_labels(self, job_dependencies, host_labels, | |
201 queue_entry): | |
202 if not queue_entry.meta_host: | |
203 # bypass only_if_needed labels when a specific host is selected | |
204 return True | |
205 | |
206 for label_id in host_labels: | |
207 label = self._labels[label_id] | |
208 if not label.only_if_needed: | |
209 # we don't care about non-only_if_needed labels | |
210 continue | |
211 if queue_entry.meta_host == label_id: | |
212 # if the label was requested in a metahost it's OK | |
213 continue | |
214 if label_id not in job_dependencies: | |
215 return False | |
216 return True | |
217 | |
218 | |
219 def _check_atomic_group_labels(self, host_labels, queue_entry): | |
220 """ | |
221 Determine if the given HostQueueEntry's atomic group settings are okay | |
222 to schedule on a host with the given labels. | |
223 | |
224 @param host_labels: A list of label ids that the host has. | |
225 @param queue_entry: The HostQueueEntry being considered for the host. | |
226 | |
227 @returns True if atomic group settings are okay, False otherwise. | |
228 """ | |
229 return (self._get_host_atomic_group_id(host_labels, queue_entry) == | |
230 queue_entry.atomic_group_id) | |
231 | |
232 | |
233 def _get_host_atomic_group_id(self, host_labels, queue_entry=None): | |
234 """ | |
235 Return the atomic group label id for a host with the given set of | |
236 labels if any, or None otherwise. Raises an exception if more than | |
237 one atomic group are found in the set of labels. | |
238 | |
239 @param host_labels: A list of label ids that the host has. | |
240 @param queue_entry: The HostQueueEntry we're testing. Only used for | |
241 extra info in a potential logged error message. | |
242 | |
243 @returns The id of the atomic group found on a label in host_labels | |
244 or None if no atomic group label is found. | |
245 """ | |
246 atomic_labels = [self._labels[label_id] for label_id in host_labels | |
247 if self._labels[label_id].atomic_group_id is not None] | |
248 atomic_ids = set(label.atomic_group_id for label in atomic_labels) | |
249 if not atomic_ids: | |
250 return None | |
251 if len(atomic_ids) > 1: | |
252 logging.error('More than one Atomic Group on HQE "%s" via: %r', | |
253 queue_entry, atomic_labels) | |
254 return atomic_ids.pop() | |
255 | |
256 | |
257 def _get_atomic_group_labels(self, atomic_group_id): | |
258 """ | |
259 Lookup the label ids that an atomic_group is associated with. | |
260 | |
261 @param atomic_group_id - The id of the AtomicGroup to look up. | |
262 | |
263 @returns A generator yeilding Label ids for this atomic group. | |
264 """ | |
265 return (id for id, label in self._labels.iteritems() | |
266 if label.atomic_group_id == atomic_group_id | |
267 and not label.invalid) | |
268 | |
269 | |
270 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry): | |
271 """ | |
272 @param group_hosts - A sequence of Host ids to test for usability | |
273 and eligibility against the Job associated with queue_entry. | |
274 @param queue_entry - The HostQueueEntry that these hosts are being | |
275 tested for eligibility against. | |
276 | |
277 @returns A subset of group_hosts Host ids that are eligible for the | |
278 supplied queue_entry. | |
279 """ | |
280 return set(host_id for host_id in group_hosts | |
281 if self.is_host_usable(host_id) | |
282 and self.is_host_eligible_for_job(host_id, queue_entry)) | |
283 | |
284 | |
285 def is_host_eligible_for_job(self, host_id, queue_entry): | |
286 if self._is_host_invalid(host_id): | |
287 # if an invalid host is scheduled for a job, it's a one-time host | |
288 # and it therefore bypasses eligibility checks. note this can only | |
289 # happen for non-metahosts, because invalid hosts have their label | |
290 # relationships cleared. | |
291 return True | |
292 | |
293 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set()) | |
294 host_labels = self._host_labels.get(host_id, set()) | |
295 | |
296 return (self._is_acl_accessible(host_id, queue_entry) and | |
297 self._check_job_dependencies(job_dependencies, host_labels) and | |
298 self._check_only_if_needed_labels( | |
299 job_dependencies, host_labels, queue_entry) and | |
300 self._check_atomic_group_labels(host_labels, queue_entry)) | |
301 | |
302 | |
303 def _is_host_invalid(self, host_id): | |
304 host_object = self._hosts_available.get(host_id, None) | |
305 return host_object and host_object.invalid | |
306 | |
307 | |
308 def _schedule_non_metahost(self, queue_entry): | |
309 if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry): | |
310 return None | |
311 return self._hosts_available.pop(queue_entry.host_id, None) | |
312 | |
313 | |
314 def is_host_usable(self, host_id): | |
315 if host_id not in self._hosts_available: | |
316 # host was already used during this scheduling cycle | |
317 return False | |
318 if self._hosts_available[host_id].invalid: | |
319 # Invalid hosts cannot be used for metahosts. They're included in | |
320 # the original query because they can be used by non-metahosts. | |
321 return False | |
322 return True | |
323 | |
324 | |
325 def schedule_entry(self, queue_entry): | |
326 if queue_entry.host_id is not None: | |
327 return self._schedule_non_metahost(queue_entry) | |
328 | |
329 for scheduler in self._metahost_schedulers: | |
330 if scheduler.can_schedule_metahost(queue_entry): | |
331 scheduler.schedule_metahost(queue_entry, self) | |
332 return None | |
333 | |
334 raise SchedulerError('No metahost scheduler to handle %s' % queue_entry) | |
335 | |
336 | |
337 def find_eligible_atomic_group(self, queue_entry): | |
338 """ | |
339 Given an atomic group host queue entry, locate an appropriate group | |
340 of hosts for the associated job to run on. | |
341 | |
342 The caller is responsible for creating new HQEs for the additional | |
343 hosts returned in order to run the actual job on them. | |
344 | |
345 @returns A list of Host instances in a ready state to satisfy this | |
346 atomic group scheduling. Hosts will all belong to the same | |
347 atomic group label as specified by the queue_entry. | |
348 An empty list will be returned if no suitable atomic | |
349 group could be found. | |
350 | |
351 TODO(gps): what is responsible for kicking off any attempted repairs on | |
352 a group of hosts? not this function, but something needs to. We do | |
353 not communicate that reason for returning [] outside of here... | |
354 For now, we'll just be unschedulable if enough hosts within one group | |
355 enter Repair Failed state. | |
356 """ | |
357 assert queue_entry.atomic_group_id is not None | |
358 job = queue_entry.job | |
359 assert job.synch_count and job.synch_count > 0 | |
360 atomic_group = queue_entry.atomic_group | |
361 if job.synch_count > atomic_group.max_number_of_machines: | |
362 # Such a Job and HostQueueEntry should never be possible to | |
363 # create using the frontend. Regardless, we can't process it. | |
364 # Abort it immediately and log an error on the scheduler. | |
365 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED) | |
366 logging.error( | |
367 'Error: job %d synch_count=%d > requested atomic_group %d ' | |
368 'max_number_of_machines=%d. Aborted host_queue_entry %d.', | |
369 job.id, job.synch_count, atomic_group.id, | |
370 atomic_group.max_number_of_machines, queue_entry.id) | |
371 return [] | |
372 hosts_in_label = self.hosts_in_label(queue_entry.meta_host) | |
373 ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry) | |
374 | |
375 # Look in each label associated with atomic_group until we find one with | |
376 # enough hosts to satisfy the job. | |
377 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id): | |
378 group_hosts = set(self.hosts_in_label(atomic_label_id)) | |
379 if queue_entry.meta_host is not None: | |
380 # If we have a metahost label, only allow its hosts. | |
381 group_hosts.intersection_update(hosts_in_label) | |
382 group_hosts -= ineligible_host_ids | |
383 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group( | |
384 group_hosts, queue_entry) | |
385 | |
386 # Job.synch_count is treated as "minimum synch count" when | |
387 # scheduling for an atomic group of hosts. The atomic group | |
388 # number of machines is the maximum to pick out of a single | |
389 # atomic group label for scheduling at one time. | |
390 min_hosts = job.synch_count | |
391 max_hosts = atomic_group.max_number_of_machines | |
392 | |
393 if len(eligible_host_ids_in_group) < min_hosts: | |
394 # Not enough eligible hosts in this atomic group label. | |
395 continue | |
396 | |
397 eligible_hosts_in_group = [self._hosts_available[id] | |
398 for id in eligible_host_ids_in_group] | |
399 # So that they show up in a sane order when viewing the job. | |
400 eligible_hosts_in_group.sort(cmp=scheduler_models.Host.cmp_for_sort) | |
401 | |
402 # Limit ourselves to scheduling the atomic group size. | |
403 if len(eligible_hosts_in_group) > max_hosts: | |
404 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts] | |
405 | |
406 # Remove the selected hosts from our cached internal state | |
407 # of available hosts in order to return the Host objects. | |
408 host_list = [] | |
409 for host in eligible_hosts_in_group: | |
410 hosts_in_label.discard(host.id) | |
411 self._hosts_available.pop(host.id) | |
412 host_list.append(host) | |
413 return host_list | |
414 | |
415 return [] | |
416 | |
417 | |
418 site_host_scheduler = utils.import_site_class( | |
419 __file__, 'autotest_lib.scheduler.site_host_scheduler', | |
420 'site_host_scheduler', base_host_scheduler) | |
421 | |
422 | |
423 class HostScheduler(site_host_scheduler): | |
424 pass | |
OLD | NEW |