OLD | NEW |
| (Empty) |
1 package task_scheduler | |
2 | |
3 import ( | |
4 "fmt" | |
5 "math" | |
6 "path" | |
7 "sort" | |
8 "sync" | |
9 "time" | |
10 | |
11 swarming_api "github.com/luci/luci-go/common/api/swarming/swarming/v1" | |
12 "github.com/skia-dev/glog" | |
13 "go.skia.org/infra/build_scheduler/go/db" | |
14 "go.skia.org/infra/go/buildbot" | |
15 "go.skia.org/infra/go/gitinfo" | |
16 "go.skia.org/infra/go/gitrepo" | |
17 "go.skia.org/infra/go/isolate" | |
18 "go.skia.org/infra/go/metrics2" | |
19 "go.skia.org/infra/go/swarming" | |
20 "go.skia.org/infra/go/timer" | |
21 "go.skia.org/infra/go/util" | |
22 ) | |
23 | |
24 // TaskScheduler is a struct used for scheduling tasks on bots. | |
25 type TaskScheduler struct { | |
26 cache db.TaskCache | |
27 db db.DB | |
28 isolate *isolate.Client | |
29 period time.Duration | |
30 queue []*taskCandidate | |
31 queueMtx sync.RWMutex | |
32 repoMap *gitinfo.RepoMap | |
33 repos map[string]*gitrepo.Repo | |
34 swarming swarming.ApiClient | |
35 taskCfgCache *taskCfgCache | |
36 timeDecayAmt24Hr float64 | |
37 workdir string | |
38 } | |
39 | |
40 func NewTaskScheduler(d db.DB, cache db.TaskCache, period time.Duration, workdir
string, repoNames []string, isolateClient *isolate.Client, swarmingClient swarm
ing.ApiClient) (*TaskScheduler, error) { | |
41 repos := make(map[string]*gitrepo.Repo, len(repoNames)) | |
42 rm := gitinfo.NewRepoMap(workdir) | |
43 for _, r := range repoNames { | |
44 repo, err := gitrepo.NewRepo(r, path.Join(workdir, path.Base(r))
) | |
45 if err != nil { | |
46 return nil, err | |
47 } | |
48 repos[r] = repo | |
49 if _, err := rm.Repo(r); err != nil { | |
50 return nil, err | |
51 } | |
52 } | |
53 s := &TaskScheduler{ | |
54 cache: cache, | |
55 db: d, | |
56 isolate: isolateClient, | |
57 period: period, | |
58 queue: []*taskCandidate{}, | |
59 queueMtx: sync.RWMutex{}, | |
60 repoMap: rm, | |
61 repos: repos, | |
62 swarming: swarmingClient, | |
63 taskCfgCache: newTaskCfgCache(rm), | |
64 timeDecayAmt24Hr: 1.0, | |
65 workdir: workdir, | |
66 } | |
67 return s, nil | |
68 } | |
69 | |
70 // Start initiates the TaskScheduler's goroutines for scheduling tasks. | |
71 func (s *TaskScheduler) Start() { | |
72 go func() { | |
73 lv := metrics2.NewLiveness("last-successful-task-scheduling") | |
74 for _ = range time.Tick(time.Minute) { | |
75 if err := s.MainLoop(); err != nil { | |
76 glog.Errorf("Failed to run the task scheduler: %
s", err) | |
77 } else { | |
78 lv.Reset() | |
79 } | |
80 } | |
81 }() | |
82 } | |
83 | |
84 // computeBlamelistRecursive traces through commit history, adding to | |
85 // the commits map until the blamelist for the task is complete. | |
86 // | |
87 // Args: | |
88 // - repoName: Name of the repository. | |
89 // - commit: Current commit as we're recursing through history. | |
90 // - taskName: Name of the task. | |
91 // - revision: Revision at which the task would run. | |
92 // - commits: Buffer in which to place the blamelist commits as they accumula
te. | |
93 // - cache: TaskCache, for finding previous tasks. | |
94 // - repo: gitrepo.Repo corresponding to the repository. | |
95 // - stealFrom: Existing Task from which this Task will steal commits, if one e
xists. | |
96 func computeBlamelistRecursive(repoName string, commit *gitrepo.Commit, taskName
string, revision *gitrepo.Commit, commits []*gitrepo.Commit, cache db.TaskCache
, repo *gitrepo.Repo, stealFrom *db.Task) ([]*gitrepo.Commit, *db.Task, error) { | |
97 // Shortcut in case we missed this case before; if this is the first | |
98 // task for this task spec which has a valid Revision, the blamelist wil
l | |
99 // be the entire Git history. If we find too many commits, assume we've | |
100 // hit this case and just return the Revision as the blamelist. | |
101 if len(commits) > buildbot.MAX_BLAMELIST_COMMITS && stealFrom == nil { | |
102 commits = append(commits[:0], commit) | |
103 return commits, nil, nil | |
104 } | |
105 | |
106 // Determine whether any task already includes this commit. | |
107 prev, err := cache.GetTaskForCommit(repoName, commit.Hash, taskName) | |
108 if err != nil { | |
109 return nil, nil, err | |
110 } | |
111 | |
112 // If we're stealing commits from a previous task but the current | |
113 // commit is not in any task's blamelist, we must have scrolled past | |
114 // the beginning of the tasks. Just return. | |
115 if prev == nil && stealFrom != nil { | |
116 return commits, stealFrom, nil | |
117 } | |
118 | |
119 // If a previous task already included this commit, we have to make a de
cision. | |
120 if prev != nil { | |
121 // If this Task's Revision is already included in a different | |
122 // Task, then we're either bisecting or retrying a task. We'll | |
123 // "steal" commits from the previous Task's blamelist. | |
124 if len(commits) == 0 { | |
125 stealFrom = prev | |
126 | |
127 // Another shortcut: If our Revision is the same as the | |
128 // Revision of the Task we're stealing commits from, | |
129 // ie. both tasks ran at the same commit, then this is a | |
130 // retry. Just steal all of the commits without doing | |
131 // any more work. | |
132 if stealFrom.Revision == revision.Hash { | |
133 commits = commits[:0] | |
134 for _, c := range stealFrom.Commits { | |
135 ptr := repo.Get(c) | |
136 if ptr == nil { | |
137 return nil, nil, fmt.Errorf("No
such commit: %q", c) | |
138 } | |
139 commits = append(commits, ptr) | |
140 } | |
141 return commits, stealFrom, nil | |
142 } | |
143 } | |
144 if stealFrom == nil || prev != stealFrom { | |
145 // If we've hit a commit belonging to a different task, | |
146 // we're done. | |
147 return commits, stealFrom, nil | |
148 } | |
149 } | |
150 | |
151 // Add the commit. | |
152 commits = append(commits, commit) | |
153 | |
154 // Recurse on the commit's parents. | |
155 for _, p := range commit.Parents { | |
156 var err error | |
157 commits, stealFrom, err = computeBlamelistRecursive(repoName, p,
taskName, revision, commits, cache, repo, stealFrom) | |
158 if err != nil { | |
159 return nil, nil, err | |
160 } | |
161 } | |
162 return commits, stealFrom, nil | |
163 } | |
164 | |
165 // ComputeBlamelist computes the blamelist for a new task, specified by name, | |
166 // repo, and revision. Returns the list of commits covered by the task, and any | |
167 // previous task which part or all of the blamelist was "stolen" from (see | |
168 // below). There are three cases: | |
169 // | |
170 // 1. The new task tests commits which have not yet been tested. Trace commit | |
171 // history, accumulating commits until we find commits which have been tested | |
172 // by previous tasks. | |
173 // | |
174 // 2. The new task runs at the same commit as a previous task. This is a retry, | |
175 // so the entire blamelist of the previous task is "stolen". | |
176 // | |
177 // 3. The new task runs at a commit which is in a previous task's blamelist, but | |
178 // no task has run at the same commit. This is a bisect. Trace commit | |
179 // history, "stealing" commits from the previous task until we find a commit | |
180 // which was covered by a *different* previous task. | |
181 // | |
182 // Args: | |
183 // - cache: TaskCache instance. | |
184 // - repo: gitrepo.Repo instance corresponding to the repository of the
task. | |
185 // - name: Name of the task. | |
186 // - repoName: Name of the repository for the task. | |
187 // - revision: Revision at which the task would run. | |
188 // - commitsBuf: Buffer for use as scratch space. | |
189 func ComputeBlamelist(cache db.TaskCache, repo *gitrepo.Repo, name, repoName, re
vision string, commitsBuf []*gitrepo.Commit) ([]string, *db.Task, error) { | |
190 // TODO(borenet): If this is a try job, don't compute the blamelist. | |
191 | |
192 // If this is the first invocation of a given task spec, save time by | |
193 // setting the blamelist to only be c.Revision. | |
194 if !cache.KnownTaskName(repoName, name) { | |
195 return []string{revision}, nil, nil | |
196 } | |
197 | |
198 commit := repo.Get(revision) | |
199 if commit == nil { | |
200 return nil, nil, fmt.Errorf("No such commit: %q", revision) | |
201 } | |
202 | |
203 commitsBuf = commitsBuf[:0] | |
204 | |
205 // Run the helper function to recurse on commit history. | |
206 commits, stealFrom, err := computeBlamelistRecursive(repoName, commit, n
ame, commit, commitsBuf, cache, repo, nil) | |
207 if err != nil { | |
208 return nil, nil, err | |
209 } | |
210 | |
211 // De-duplicate the commits list. Duplicates are rare but will occur | |
212 // in the case of a task which runs after a short-lived branch is merged | |
213 // so that the blamelist includes both the branch point and the merge | |
214 // commit. In this case, any commits just before the branch point will | |
215 // be duplicated. | |
216 // TODO(borenet): This has never happened in the Skia repo, but the | |
217 // below 8 lines of code account for ~10% of the execution time of this | |
218 // function, which is the critical path for the scheduler. Consider | |
219 // either ignoring this case or come up with an alternate solution which | |
220 // moves this logic out of the critical path. | |
221 rv := make([]string, 0, len(commits)) | |
222 visited := make(map[*gitrepo.Commit]bool, len(commits)) | |
223 for _, c := range commits { | |
224 if !visited[c] { | |
225 rv = append(rv, c.Hash) | |
226 visited[c] = true | |
227 } | |
228 } | |
229 return rv, stealFrom, nil | |
230 } | |
231 | |
232 // findTaskCandidates goes through the given commits-by-repos, loads task specs | |
233 // from each repo/commit pair and passes them onto the out channel, filtering | |
234 // candidates which we don't want to run. The out channel will be closed when | |
235 // all task specs have been considered, or when an error occurs. If an error | |
236 // occurs, it will be passed onto the errs channel, but that channel will not | |
237 // be closed. | |
238 func (s *TaskScheduler) findTaskCandidates(commitsByRepo map[string][]string) (m
ap[string][]*taskCandidate, error) { | |
239 defer timer.New("TaskScheduler.findTaskCandidates").Stop() | |
240 // Obtain all possible tasks. | |
241 specs, err := s.taskCfgCache.GetTaskSpecsForCommits(commitsByRepo) | |
242 if err != nil { | |
243 return nil, err | |
244 } | |
245 bySpec := map[string][]*taskCandidate{} | |
246 total := 0 | |
247 for repo, commits := range specs { | |
248 for commit, tasks := range commits { | |
249 for name, task := range tasks { | |
250 c := &taskCandidate{ | |
251 IsolatedHashes: nil, | |
252 Name: name, | |
253 Repo: repo, | |
254 Revision: commit, | |
255 Score: 0.0, | |
256 TaskSpec: task, | |
257 } | |
258 // We shouldn't duplicate pending, in-progress, | |
259 // or successfully completed tasks. | |
260 previous, err := s.cache.GetTaskForCommit(c.Repo
, c.Revision, c.Name) | |
261 if err != nil { | |
262 return nil, err | |
263 } | |
264 if previous != nil && previous.Revision == c.Rev
ision { | |
265 if previous.Status == db.TASK_STATUS_PEN
DING || previous.Status == db.TASK_STATUS_RUNNING { | |
266 continue | |
267 } | |
268 if previous.Success() { | |
269 continue | |
270 } | |
271 } | |
272 | |
273 // Don't consider candidates whose dependencies
are not met. | |
274 depsMet, hashes, err := c.allDepsMet(s.cache) | |
275 if err != nil { | |
276 return nil, err | |
277 } | |
278 if !depsMet { | |
279 continue | |
280 } | |
281 c.IsolatedHashes = hashes | |
282 | |
283 key := fmt.Sprintf("%s|%s", c.Repo, c.Name) | |
284 candidates, ok := bySpec[key] | |
285 if !ok { | |
286 candidates = make([]*taskCandidate, 0, l
en(commits)) | |
287 } | |
288 bySpec[key] = append(candidates, c) | |
289 total++ | |
290 } | |
291 } | |
292 } | |
293 glog.Infof("Found %d candidates in %d specs:", total, len(bySpec)) | |
294 return bySpec, nil | |
295 } | |
296 | |
297 // processTaskCandidate computes the remaining information about the task | |
298 // candidate, eg. blamelists and scoring. | |
299 func (s *TaskScheduler) processTaskCandidate(c *taskCandidate, now time.Time, ca
che *cacheWrapper, commitsBuf []*gitrepo.Commit) error { | |
300 // Compute blamelist. | |
301 repo, ok := s.repos[c.Repo] | |
302 if !ok { | |
303 return fmt.Errorf("No such repo: %s", c.Repo) | |
304 } | |
305 commits, stealingFrom, err := ComputeBlamelist(cache, repo, c.Name, c.Re
po, c.Revision, commitsBuf) | |
306 if err != nil { | |
307 return err | |
308 } | |
309 c.Commits = commits | |
310 if stealingFrom != nil { | |
311 c.StealingFromId = stealingFrom.Id | |
312 } | |
313 | |
314 // Score the candidates. | |
315 // The score for a candidate is based on the "testedness" increase | |
316 // provided by running the task. | |
317 stoleFromCommits := 0 | |
318 if stealingFrom != nil { | |
319 stoleFromCommits = len(stealingFrom.Commits) | |
320 } | |
321 score := testednessIncrease(len(c.Commits), stoleFromCommits) | |
322 | |
323 // Scale the score by other factors, eg. time decay. | |
324 decay, err := s.timeDecayForCommit(now, c.Repo, c.Revision) | |
325 if err != nil { | |
326 return err | |
327 } | |
328 score *= decay | |
329 | |
330 c.Score = score | |
331 return nil | |
332 } | |
333 | |
334 // regenerateTaskQueue obtains the set of all eligible task candidates, scores | |
335 // them, and prepares them to be triggered. | |
336 func (s *TaskScheduler) regenerateTaskQueue() error { | |
337 defer timer.New("TaskScheduler.regenerateTaskQueue").Stop() | |
338 | |
339 // Find the recent commits to use. | |
340 for _, repoName := range s.repoMap.Repos() { | |
341 r, err := s.repoMap.Repo(repoName) | |
342 if err != nil { | |
343 return err | |
344 } | |
345 if err := r.Reset("HEAD"); err != nil { | |
346 return err | |
347 } | |
348 if err := r.Checkout("master"); err != nil { | |
349 return err | |
350 } | |
351 } | |
352 if err := s.repoMap.Update(); err != nil { | |
353 return err | |
354 } | |
355 from := time.Now().Add(-s.period) | |
356 commits := map[string][]string{} | |
357 for _, repoName := range s.repoMap.Repos() { | |
358 repo, err := s.repoMap.Repo(repoName) | |
359 if err != nil { | |
360 return err | |
361 } | |
362 commits[repoName] = repo.From(from) | |
363 } | |
364 | |
365 // Find and process task candidates. | |
366 candidates, err := s.findTaskCandidates(commits) | |
367 if err != nil { | |
368 return err | |
369 } | |
370 defer timer.New("process task candidates").Stop() | |
371 now := time.Now() | |
372 processed := make(chan *taskCandidate) | |
373 errs := make(chan error) | |
374 wg := sync.WaitGroup{} | |
375 for _, c := range candidates { | |
376 wg.Add(1) | |
377 go func(candidates []*taskCandidate) { | |
378 defer wg.Done() | |
379 cache := newCacheWrapper(s.cache) | |
380 commitsBuf := make([]*gitrepo.Commit, 0, buildbot.MAX_BL
AMELIST_COMMITS) | |
381 for { | |
382 // Find the best candidate. | |
383 idx := -1 | |
384 var best *taskCandidate | |
385 for i, candidate := range candidates { | |
386 c := candidate.Copy() | |
387 if err := s.processTaskCandidate(c, now,
cache, commitsBuf); err != nil { | |
388 errs <- err | |
389 return | |
390 } | |
391 if best == nil || c.Score > best.Score { | |
392 best = c | |
393 idx = i | |
394 } | |
395 } | |
396 if best == nil { | |
397 return | |
398 } | |
399 processed <- best | |
400 t := best.MakeTask() | |
401 t.Id = best.MakeId() | |
402 cache.insert(t) | |
403 if best.StealingFromId != "" { | |
404 stoleFrom, err := cache.GetTask(best.Ste
alingFromId) | |
405 if err != nil { | |
406 errs <- err | |
407 return | |
408 } | |
409 stole := util.NewStringSet(best.Commits) | |
410 oldC := util.NewStringSet(stoleFrom.Comm
its) | |
411 newC := oldC.Complement(stole) | |
412 commits := make([]string, 0, len(newC)) | |
413 for c, _ := range newC { | |
414 commits = append(commits, c) | |
415 } | |
416 stoleFrom.Commits = commits | |
417 cache.insert(stoleFrom) | |
418 } | |
419 candidates = append(candidates[:idx], candidates
[idx+1:]...) | |
420 } | |
421 }(c) | |
422 } | |
423 go func() { | |
424 wg.Wait() | |
425 close(processed) | |
426 close(errs) | |
427 }() | |
428 rvCandidates := []*taskCandidate{} | |
429 rvErrs := []error{} | |
430 for { | |
431 select { | |
432 case c, ok := <-processed: | |
433 if ok { | |
434 rvCandidates = append(rvCandidates, c) | |
435 } else { | |
436 processed = nil | |
437 } | |
438 case err, ok := <-errs: | |
439 if ok { | |
440 rvErrs = append(rvErrs, err) | |
441 } else { | |
442 errs = nil | |
443 } | |
444 } | |
445 if processed == nil && errs == nil { | |
446 break | |
447 } | |
448 } | |
449 | |
450 if len(rvErrs) != 0 { | |
451 return rvErrs[0] | |
452 } | |
453 sort.Sort(taskCandidateSlice(rvCandidates)) | |
454 | |
455 s.queueMtx.Lock() | |
456 defer s.queueMtx.Unlock() | |
457 s.queue = rvCandidates | |
458 return nil | |
459 } | |
460 | |
461 // getCandidatesToSchedule matches the list of free Swarming bots to task | |
462 // candidates in the queue and returns the candidates which should be run. | |
463 // Assumes that the tasks are sorted in decreasing order by score. | |
464 func getCandidatesToSchedule(bots []*swarming_api.SwarmingRpcsBotInfo, tasks []*
taskCandidate) []*taskCandidate { | |
465 defer timer.New("task_scheduler.getCandidatesToSchedule").Stop() | |
466 // Create a bots-by-swarming-dimension mapping. | |
467 botsByDim := map[string]util.StringSet{} | |
468 for _, b := range bots { | |
469 for _, dim := range b.Dimensions { | |
470 for _, val := range dim.Value { | |
471 d := fmt.Sprintf("%s:%s", dim.Key, val) | |
472 if _, ok := botsByDim[d]; !ok { | |
473 botsByDim[d] = util.StringSet{} | |
474 } | |
475 botsByDim[d][b.BotId] = true | |
476 } | |
477 } | |
478 } | |
479 | |
480 // Match bots to tasks. | |
481 // TODO(borenet): Some tasks require a more specialized bot. We should | |
482 // match so that less-specialized tasks don't "steal" more-specialized | |
483 // bots which they don't actually need. | |
484 rv := make([]*taskCandidate, 0, len(bots)) | |
485 for _, c := range tasks { | |
486 // For each dimension of the task, find the set of bots which ma
tches. | |
487 matches := util.StringSet{} | |
488 for i, d := range c.TaskSpec.Dimensions { | |
489 if i == 0 { | |
490 matches = matches.Union(botsByDim[d]) | |
491 } else { | |
492 matches = matches.Intersect(botsByDim[d]) | |
493 } | |
494 } | |
495 if len(matches) > 0 { | |
496 // We're going to run this task. Choose a bot. Sort the | |
497 // bots by ID so that the choice is deterministic. | |
498 choices := make([]string, 0, len(matches)) | |
499 for botId, _ := range matches { | |
500 choices = append(choices, botId) | |
501 } | |
502 sort.Strings(choices) | |
503 bot := choices[0] | |
504 | |
505 // Remove the bot from consideration. | |
506 for dim, subset := range botsByDim { | |
507 delete(subset, bot) | |
508 if len(subset) == 0 { | |
509 delete(botsByDim, dim) | |
510 } | |
511 } | |
512 | |
513 // Force the candidate to run on this bot. | |
514 c.TaskSpec.Dimensions = append(c.TaskSpec.Dimensions, fm
t.Sprintf("id:%s", bot)) | |
515 | |
516 // Add the task to the scheduling list. | |
517 rv = append(rv, c) | |
518 | |
519 // If we've exhausted the bot list, stop here. | |
520 if len(botsByDim) == 0 { | |
521 break | |
522 } | |
523 } | |
524 } | |
525 sort.Sort(taskCandidateSlice(rv)) | |
526 return rv | |
527 } | |
528 | |
529 // scheduleTasks queries for free Swarming bots and triggers tasks according | |
530 // to relative priorities in the queue. | |
531 func (s *TaskScheduler) scheduleTasks() error { | |
532 defer timer.New("TaskScheduler.scheduleTasks").Stop() | |
533 // Find free bots, match them with tasks. | |
534 bots, err := getFreeSwarmingBots(s.swarming) | |
535 if err != nil { | |
536 return err | |
537 } | |
538 s.queueMtx.Lock() | |
539 defer s.queueMtx.Unlock() | |
540 schedule := getCandidatesToSchedule(bots, s.queue) | |
541 | |
542 // First, group by commit hash since we have to isolate the code at | |
543 // a particular revision for each task. | |
544 byRepoCommit := map[string]map[string][]*taskCandidate{} | |
545 for _, c := range schedule { | |
546 if mRepo, ok := byRepoCommit[c.Repo]; !ok { | |
547 byRepoCommit[c.Repo] = map[string][]*taskCandidate{c.Rev
ision: []*taskCandidate{c}} | |
548 } else { | |
549 mRepo[c.Revision] = append(mRepo[c.Revision], c) | |
550 } | |
551 } | |
552 | |
553 // Isolate the tasks by commit. | |
554 for repoName, commits := range byRepoCommit { | |
555 infraBotsDir := path.Join(s.workdir, repoName, "infra", "bots") | |
556 for commit, candidates := range commits { | |
557 repo, err := s.repoMap.Repo(repoName) | |
558 if err != nil { | |
559 return err | |
560 } | |
561 if err := repo.Checkout(commit); err != nil { | |
562 return err | |
563 } | |
564 tasks := make([]*isolate.Task, 0, len(candidates)) | |
565 for _, c := range candidates { | |
566 tasks = append(tasks, c.MakeIsolateTask(infraBot
sDir, s.workdir)) | |
567 } | |
568 hashes, err := s.isolate.IsolateTasks(tasks) | |
569 if err != nil { | |
570 return err | |
571 } | |
572 if len(hashes) != len(candidates) { | |
573 return fmt.Errorf("IsolateTasks returned incorre
ct number of hashes.") | |
574 } | |
575 for i, c := range candidates { | |
576 c.IsolatedInput = hashes[i] | |
577 } | |
578 } | |
579 } | |
580 | |
581 // Trigger tasks. | |
582 byCandidateId := make(map[string]*db.Task, len(schedule)) | |
583 tasksToInsert := make(map[string]*db.Task, len(schedule)*2) | |
584 for _, candidate := range schedule { | |
585 t := candidate.MakeTask() | |
586 if err := s.db.AssignId(t); err != nil { | |
587 return err | |
588 } | |
589 req := candidate.MakeTaskRequest(t.Id) | |
590 resp, err := s.swarming.TriggerTask(req) | |
591 if err != nil { | |
592 return err | |
593 } | |
594 if _, err := t.UpdateFromSwarming(resp.TaskResult); err != nil { | |
595 return err | |
596 } | |
597 byCandidateId[candidate.MakeId()] = t | |
598 tasksToInsert[t.Id] = t | |
599 // If we're stealing commits from another task, find it and adju
st | |
600 // its blamelist. | |
601 // TODO(borenet): We're retrieving a cached task which may have
been | |
602 // changed since the cache was last updated. We need to handle t
hat. | |
603 if candidate.StealingFromId != "" { | |
604 var stealingFrom *db.Task | |
605 if _, _, _, err := parseId(candidate.StealingFromId); er
r == nil { | |
606 stealingFrom = byCandidateId[candidate.StealingF
romId] | |
607 if stealingFrom == nil { | |
608 return fmt.Errorf("Attempting to backfil
l a just-triggered candidate but can't find it: %q", candidate.StealingFromId) | |
609 } | |
610 } else { | |
611 var ok bool | |
612 stealingFrom, ok = tasksToInsert[candidate.Steal
ingFromId] | |
613 if !ok { | |
614 stealingFrom, err = s.cache.GetTask(cand
idate.StealingFromId) | |
615 if err != nil { | |
616 return err | |
617 } | |
618 } | |
619 } | |
620 oldCommits := util.NewStringSet(stealingFrom.Commits) | |
621 stealing := util.NewStringSet(t.Commits) | |
622 stealingFrom.Commits = oldCommits.Complement(stealing).K
eys() | |
623 tasksToInsert[stealingFrom.Id] = stealingFrom | |
624 } | |
625 } | |
626 tasks := make([]*db.Task, 0, len(tasksToInsert)) | |
627 for _, t := range tasksToInsert { | |
628 tasks = append(tasks, t) | |
629 } | |
630 | |
631 // Insert the tasks into the database. | |
632 if err := s.db.PutTasks(tasks); err != nil { | |
633 return err | |
634 } | |
635 | |
636 // Remove the tasks from the queue. | |
637 newQueue := make([]*taskCandidate, 0, len(s.queue)-len(schedule)) | |
638 for i, j := 0, 0; i < len(s.queue); { | |
639 if j >= len(schedule) { | |
640 newQueue = append(newQueue, s.queue[i:]...) | |
641 break | |
642 } | |
643 if s.queue[i] == schedule[j] { | |
644 j++ | |
645 } else { | |
646 newQueue = append(newQueue, s.queue[i]) | |
647 } | |
648 i++ | |
649 } | |
650 s.queue = newQueue | |
651 | |
652 // Note; if regenerateQueue and scheduleTasks are ever decoupled so that | |
653 // the queue is reused by multiple runs of scheduleTasks, we'll need to | |
654 // address the fact that some candidates may still have their | |
655 // StoleFromId pointing to candidates which have been triggered and | |
656 // removed from the queue. In that case, we should just need to write a | |
657 // loop which updates those candidates to use the IDs of the newly- | |
658 // inserted Tasks in the database rather than the candidate ID. | |
659 | |
660 glog.Infof("Triggered %d tasks on %d bots.", len(schedule), len(bots)) | |
661 return nil | |
662 } | |
663 | |
664 // MainLoop runs a single end-to-end task scheduling loop. | |
665 func (s *TaskScheduler) MainLoop() error { | |
666 defer timer.New("TaskSchedulder.MainLoop").Stop() | |
667 | |
668 glog.Infof("Task Scheduler updating...") | |
669 var e1, e2 error | |
670 var wg sync.WaitGroup | |
671 | |
672 wg.Add(1) | |
673 go func() { | |
674 defer wg.Done() | |
675 if err := s.updateRepos(); err != nil { | |
676 e1 = err | |
677 } | |
678 }() | |
679 | |
680 // TODO(borenet): Do we have to fail out of scheduling if we fail to | |
681 // updateUnfinishedTasks? Maybe we can just add a liveness metric and | |
682 // alert if we go too long without updating successfully. | |
683 wg.Add(1) | |
684 go func() { | |
685 defer wg.Done() | |
686 if err := updateUnfinishedTasks(s.cache, s.db, s.swarming); err
!= nil { | |
687 e2 = err | |
688 } | |
689 }() | |
690 wg.Wait() | |
691 | |
692 if e1 != nil { | |
693 return e1 | |
694 } | |
695 if e2 != nil { | |
696 return e2 | |
697 } | |
698 | |
699 // Update the task cache. | |
700 if err := s.cache.Update(); err != nil { | |
701 return err | |
702 } | |
703 | |
704 // Regenerate the queue, schedule tasks. | |
705 // TODO(borenet): Query for free Swarming bots while we're regenerating | |
706 // the queue. | |
707 glog.Infof("Task Scheduler regenerating the queue...") | |
708 if err := s.regenerateTaskQueue(); err != nil { | |
709 return err | |
710 } | |
711 | |
712 glog.Infof("Task Scheduler scheduling tasks...") | |
713 if err := s.scheduleTasks(); err != nil { | |
714 return err | |
715 } | |
716 | |
717 // Update the cache again to include the newly-inserted tasks. | |
718 return s.cache.Update() | |
719 } | |
720 | |
721 // updateRepos syncs the scheduler's repos. | |
722 func (s *TaskScheduler) updateRepos() error { | |
723 for _, r := range s.repos { | |
724 if err := r.Update(); err != nil { | |
725 return err | |
726 } | |
727 } | |
728 return nil | |
729 } | |
730 | |
731 // QueueLen returns the length of the queue. | |
732 func (s *TaskScheduler) QueueLen() int { | |
733 s.queueMtx.RLock() | |
734 defer s.queueMtx.RUnlock() | |
735 return len(s.queue) | |
736 } | |
737 | |
738 // timeDecay24Hr computes a linear time decay amount for the given duration, | |
739 // given the requested decay amount at 24 hours. | |
740 func timeDecay24Hr(decayAmt24Hr float64, elapsed time.Duration) float64 { | |
741 return math.Max(1.0-(1.0-decayAmt24Hr)*(float64(elapsed)/float64(24*time
.Hour)), 0.0) | |
742 } | |
743 | |
744 // timeDecayForCommit computes a multiplier for a task candidate score based | |
745 // on how long ago the given commit landed. This allows us to prioritize more | |
746 // recent commits. | |
747 func (s *TaskScheduler) timeDecayForCommit(now time.Time, repoName, commit strin
g) (float64, error) { | |
748 if s.timeDecayAmt24Hr == 1.0 { | |
749 // Shortcut for special case. | |
750 return 1.0, nil | |
751 } | |
752 repo, err := s.repoMap.Repo(repoName) | |
753 if err != nil { | |
754 return 0.0, err | |
755 } | |
756 d, err := repo.Details(commit, false) | |
757 if err != nil { | |
758 return 0.0, err | |
759 } | |
760 return timeDecay24Hr(s.timeDecayAmt24Hr, now.Sub(d.Timestamp)), nil | |
761 } | |
762 | |
763 // testedness computes the total "testedness" of a set of commits covered by a | |
764 // task whose blamelist included N commits. The "testedness" of a task spec at a | |
765 // given commit is defined as follows: | |
766 // | |
767 // -1.0 if no task has ever included this commit for this task spec. | |
768 // 1.0 if a task was run for this task spec AT this commit. | |
769 // 1.0 / N if a task for this task spec has included this commit, where N is | |
770 // the number of commits included in the task. | |
771 // | |
772 // This function gives the sum of the testedness for a blamelist of N commits. | |
773 func testedness(n int) float64 { | |
774 if n < 0 { | |
775 // This should never happen. | |
776 glog.Errorf("Task score function got a blamelist with %d commits
", n) | |
777 return -1.0 | |
778 } else if n == 0 { | |
779 // Zero commits have zero testedness. | |
780 return 0.0 | |
781 } else if n == 1 { | |
782 return 1.0 | |
783 } else { | |
784 return 1.0 + float64(n-1)/float64(n) | |
785 } | |
786 } | |
787 | |
788 // testednessIncrease computes the increase in "testedness" obtained by running | |
789 // a task with the given blamelist length which may have "stolen" commits from | |
790 // a previous task with a different blamelist length. To do so, we compute the | |
791 // "testedness" for every commit affected by the task, before and after the | |
792 // task would run. We subtract the "before" score from the "after" score to | |
793 // obtain the "testedness" increase at each commit, then sum them to find the | |
794 // total increase in "testedness" obtained by running the task. | |
795 func testednessIncrease(blamelistLength, stoleFromBlamelistLength int) float64 { | |
796 // Invalid inputs. | |
797 if blamelistLength <= 0 || stoleFromBlamelistLength < 0 { | |
798 return -1.0 | |
799 } | |
800 | |
801 if stoleFromBlamelistLength == 0 { | |
802 // This task covers previously-untested commits. Previous tested
ness | |
803 // is -1.0 for each commit in the blamelist. | |
804 beforeTestedness := float64(-blamelistLength) | |
805 afterTestedness := testedness(blamelistLength) | |
806 return afterTestedness - beforeTestedness | |
807 } else if blamelistLength == stoleFromBlamelistLength { | |
808 // This is a retry. It provides no testedness increase, so short
cut here | |
809 // rather than perform the math to obtain the same answer. | |
810 return 0.0 | |
811 } else { | |
812 // This is a bisect/backfill. | |
813 beforeTestedness := testedness(stoleFromBlamelistLength) | |
814 afterTestedness := testedness(blamelistLength) + testedness(stol
eFromBlamelistLength-blamelistLength) | |
815 return afterTestedness - beforeTestedness | |
816 } | |
817 } | |
818 | |
819 // getFreeSwarmingBots returns a slice of free swarming bots. | |
820 func getFreeSwarmingBots(s swarming.ApiClient) ([]*swarming_api.SwarmingRpcsBotI
nfo, error) { | |
821 bots, err := s.ListSkiaBots() | |
822 if err != nil { | |
823 return nil, err | |
824 } | |
825 rv := make([]*swarming_api.SwarmingRpcsBotInfo, 0, len(bots)) | |
826 for _, bot := range bots { | |
827 if bot.IsDead { | |
828 continue | |
829 } | |
830 if bot.Quarantined { | |
831 continue | |
832 } | |
833 if bot.TaskId != "" { | |
834 continue | |
835 } | |
836 rv = append(rv, bot) | |
837 } | |
838 return rv, nil | |
839 } | |
840 | |
841 // updateUnfinishedTasks queries Swarming for all unfinished tasks and updates | |
842 // their status in the DB. | |
843 func updateUnfinishedTasks(cache db.TaskCache, d db.DB, s swarming.ApiClient) er
ror { | |
844 tasks, err := cache.UnfinishedTasks() | |
845 if err != nil { | |
846 return err | |
847 } | |
848 sort.Sort(db.TaskSlice(tasks)) | |
849 | |
850 // TODO(borenet): This would be faster if Swarming had a | |
851 // get-multiple-tasks-by-ID endpoint. | |
852 var wg sync.WaitGroup | |
853 errs := make([]error, len(tasks)) | |
854 for i, t := range tasks { | |
855 wg.Add(1) | |
856 go func(idx int, t *db.Task) { | |
857 defer wg.Done() | |
858 swarmTask, err := s.GetTask(t.SwarmingTaskId) | |
859 if err != nil { | |
860 errs[idx] = fmt.Errorf("Failed to update unfinis
hed task; failed to get updated task from swarming: %s", err) | |
861 return | |
862 } | |
863 if err := db.UpdateDBFromSwarmingTask(d, swarmTask); err
!= nil { | |
864 errs[idx] = fmt.Errorf("Failed to update unfinis
hed task: %s", err) | |
865 return | |
866 } | |
867 }(i, t) | |
868 } | |
869 wg.Wait() | |
870 for _, err := range errs { | |
871 if err != nil { | |
872 return err | |
873 } | |
874 } | |
875 return nil | |
876 } | |
OLD | NEW |