Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(58)

Side by Side Diff: build_scheduler/go/task_scheduler/task_scheduler.go

Issue 2296763008: [task scheduler] Move files from build_scheduler/ to task_scheduler/ (Closed) Base URL: https://skia.googlesource.com/buildbot@master
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 package task_scheduler
2
3 import (
4 "fmt"
5 "math"
6 "path"
7 "sort"
8 "sync"
9 "time"
10
11 swarming_api "github.com/luci/luci-go/common/api/swarming/swarming/v1"
12 "github.com/skia-dev/glog"
13 "go.skia.org/infra/build_scheduler/go/db"
14 "go.skia.org/infra/go/buildbot"
15 "go.skia.org/infra/go/gitinfo"
16 "go.skia.org/infra/go/gitrepo"
17 "go.skia.org/infra/go/isolate"
18 "go.skia.org/infra/go/metrics2"
19 "go.skia.org/infra/go/swarming"
20 "go.skia.org/infra/go/timer"
21 "go.skia.org/infra/go/util"
22 )
23
24 // TaskScheduler is a struct used for scheduling tasks on bots.
25 type TaskScheduler struct {
26 cache db.TaskCache
27 db db.DB
28 isolate *isolate.Client
29 period time.Duration
30 queue []*taskCandidate
31 queueMtx sync.RWMutex
32 repoMap *gitinfo.RepoMap
33 repos map[string]*gitrepo.Repo
34 swarming swarming.ApiClient
35 taskCfgCache *taskCfgCache
36 timeDecayAmt24Hr float64
37 workdir string
38 }
39
40 func NewTaskScheduler(d db.DB, cache db.TaskCache, period time.Duration, workdir string, repoNames []string, isolateClient *isolate.Client, swarmingClient swarm ing.ApiClient) (*TaskScheduler, error) {
41 repos := make(map[string]*gitrepo.Repo, len(repoNames))
42 rm := gitinfo.NewRepoMap(workdir)
43 for _, r := range repoNames {
44 repo, err := gitrepo.NewRepo(r, path.Join(workdir, path.Base(r)) )
45 if err != nil {
46 return nil, err
47 }
48 repos[r] = repo
49 if _, err := rm.Repo(r); err != nil {
50 return nil, err
51 }
52 }
53 s := &TaskScheduler{
54 cache: cache,
55 db: d,
56 isolate: isolateClient,
57 period: period,
58 queue: []*taskCandidate{},
59 queueMtx: sync.RWMutex{},
60 repoMap: rm,
61 repos: repos,
62 swarming: swarmingClient,
63 taskCfgCache: newTaskCfgCache(rm),
64 timeDecayAmt24Hr: 1.0,
65 workdir: workdir,
66 }
67 return s, nil
68 }
69
70 // Start initiates the TaskScheduler's goroutines for scheduling tasks.
71 func (s *TaskScheduler) Start() {
72 go func() {
73 lv := metrics2.NewLiveness("last-successful-task-scheduling")
74 for _ = range time.Tick(time.Minute) {
75 if err := s.MainLoop(); err != nil {
76 glog.Errorf("Failed to run the task scheduler: % s", err)
77 } else {
78 lv.Reset()
79 }
80 }
81 }()
82 }
83
84 // computeBlamelistRecursive traces through commit history, adding to
85 // the commits map until the blamelist for the task is complete.
86 //
87 // Args:
88 // - repoName: Name of the repository.
89 // - commit: Current commit as we're recursing through history.
90 // - taskName: Name of the task.
91 // - revision: Revision at which the task would run.
92 // - commits: Buffer in which to place the blamelist commits as they accumula te.
93 // - cache: TaskCache, for finding previous tasks.
94 // - repo: gitrepo.Repo corresponding to the repository.
95 // - stealFrom: Existing Task from which this Task will steal commits, if one e xists.
96 func computeBlamelistRecursive(repoName string, commit *gitrepo.Commit, taskName string, revision *gitrepo.Commit, commits []*gitrepo.Commit, cache db.TaskCache , repo *gitrepo.Repo, stealFrom *db.Task) ([]*gitrepo.Commit, *db.Task, error) {
97 // Shortcut in case we missed this case before; if this is the first
98 // task for this task spec which has a valid Revision, the blamelist wil l
99 // be the entire Git history. If we find too many commits, assume we've
100 // hit this case and just return the Revision as the blamelist.
101 if len(commits) > buildbot.MAX_BLAMELIST_COMMITS && stealFrom == nil {
102 commits = append(commits[:0], commit)
103 return commits, nil, nil
104 }
105
106 // Determine whether any task already includes this commit.
107 prev, err := cache.GetTaskForCommit(repoName, commit.Hash, taskName)
108 if err != nil {
109 return nil, nil, err
110 }
111
112 // If we're stealing commits from a previous task but the current
113 // commit is not in any task's blamelist, we must have scrolled past
114 // the beginning of the tasks. Just return.
115 if prev == nil && stealFrom != nil {
116 return commits, stealFrom, nil
117 }
118
119 // If a previous task already included this commit, we have to make a de cision.
120 if prev != nil {
121 // If this Task's Revision is already included in a different
122 // Task, then we're either bisecting or retrying a task. We'll
123 // "steal" commits from the previous Task's blamelist.
124 if len(commits) == 0 {
125 stealFrom = prev
126
127 // Another shortcut: If our Revision is the same as the
128 // Revision of the Task we're stealing commits from,
129 // ie. both tasks ran at the same commit, then this is a
130 // retry. Just steal all of the commits without doing
131 // any more work.
132 if stealFrom.Revision == revision.Hash {
133 commits = commits[:0]
134 for _, c := range stealFrom.Commits {
135 ptr := repo.Get(c)
136 if ptr == nil {
137 return nil, nil, fmt.Errorf("No such commit: %q", c)
138 }
139 commits = append(commits, ptr)
140 }
141 return commits, stealFrom, nil
142 }
143 }
144 if stealFrom == nil || prev != stealFrom {
145 // If we've hit a commit belonging to a different task,
146 // we're done.
147 return commits, stealFrom, nil
148 }
149 }
150
151 // Add the commit.
152 commits = append(commits, commit)
153
154 // Recurse on the commit's parents.
155 for _, p := range commit.Parents {
156 var err error
157 commits, stealFrom, err = computeBlamelistRecursive(repoName, p, taskName, revision, commits, cache, repo, stealFrom)
158 if err != nil {
159 return nil, nil, err
160 }
161 }
162 return commits, stealFrom, nil
163 }
164
165 // ComputeBlamelist computes the blamelist for a new task, specified by name,
166 // repo, and revision. Returns the list of commits covered by the task, and any
167 // previous task which part or all of the blamelist was "stolen" from (see
168 // below). There are three cases:
169 //
170 // 1. The new task tests commits which have not yet been tested. Trace commit
171 // history, accumulating commits until we find commits which have been tested
172 // by previous tasks.
173 //
174 // 2. The new task runs at the same commit as a previous task. This is a retry,
175 // so the entire blamelist of the previous task is "stolen".
176 //
177 // 3. The new task runs at a commit which is in a previous task's blamelist, but
178 // no task has run at the same commit. This is a bisect. Trace commit
179 // history, "stealing" commits from the previous task until we find a commit
180 // which was covered by a *different* previous task.
181 //
182 // Args:
183 // - cache: TaskCache instance.
184 // - repo: gitrepo.Repo instance corresponding to the repository of the task.
185 // - name: Name of the task.
186 // - repoName: Name of the repository for the task.
187 // - revision: Revision at which the task would run.
188 // - commitsBuf: Buffer for use as scratch space.
189 func ComputeBlamelist(cache db.TaskCache, repo *gitrepo.Repo, name, repoName, re vision string, commitsBuf []*gitrepo.Commit) ([]string, *db.Task, error) {
190 // TODO(borenet): If this is a try job, don't compute the blamelist.
191
192 // If this is the first invocation of a given task spec, save time by
193 // setting the blamelist to only be c.Revision.
194 if !cache.KnownTaskName(repoName, name) {
195 return []string{revision}, nil, nil
196 }
197
198 commit := repo.Get(revision)
199 if commit == nil {
200 return nil, nil, fmt.Errorf("No such commit: %q", revision)
201 }
202
203 commitsBuf = commitsBuf[:0]
204
205 // Run the helper function to recurse on commit history.
206 commits, stealFrom, err := computeBlamelistRecursive(repoName, commit, n ame, commit, commitsBuf, cache, repo, nil)
207 if err != nil {
208 return nil, nil, err
209 }
210
211 // De-duplicate the commits list. Duplicates are rare but will occur
212 // in the case of a task which runs after a short-lived branch is merged
213 // so that the blamelist includes both the branch point and the merge
214 // commit. In this case, any commits just before the branch point will
215 // be duplicated.
216 // TODO(borenet): This has never happened in the Skia repo, but the
217 // below 8 lines of code account for ~10% of the execution time of this
218 // function, which is the critical path for the scheduler. Consider
219 // either ignoring this case or come up with an alternate solution which
220 // moves this logic out of the critical path.
221 rv := make([]string, 0, len(commits))
222 visited := make(map[*gitrepo.Commit]bool, len(commits))
223 for _, c := range commits {
224 if !visited[c] {
225 rv = append(rv, c.Hash)
226 visited[c] = true
227 }
228 }
229 return rv, stealFrom, nil
230 }
231
232 // findTaskCandidates goes through the given commits-by-repos, loads task specs
233 // from each repo/commit pair and passes them onto the out channel, filtering
234 // candidates which we don't want to run. The out channel will be closed when
235 // all task specs have been considered, or when an error occurs. If an error
236 // occurs, it will be passed onto the errs channel, but that channel will not
237 // be closed.
238 func (s *TaskScheduler) findTaskCandidates(commitsByRepo map[string][]string) (m ap[string][]*taskCandidate, error) {
239 defer timer.New("TaskScheduler.findTaskCandidates").Stop()
240 // Obtain all possible tasks.
241 specs, err := s.taskCfgCache.GetTaskSpecsForCommits(commitsByRepo)
242 if err != nil {
243 return nil, err
244 }
245 bySpec := map[string][]*taskCandidate{}
246 total := 0
247 for repo, commits := range specs {
248 for commit, tasks := range commits {
249 for name, task := range tasks {
250 c := &taskCandidate{
251 IsolatedHashes: nil,
252 Name: name,
253 Repo: repo,
254 Revision: commit,
255 Score: 0.0,
256 TaskSpec: task,
257 }
258 // We shouldn't duplicate pending, in-progress,
259 // or successfully completed tasks.
260 previous, err := s.cache.GetTaskForCommit(c.Repo , c.Revision, c.Name)
261 if err != nil {
262 return nil, err
263 }
264 if previous != nil && previous.Revision == c.Rev ision {
265 if previous.Status == db.TASK_STATUS_PEN DING || previous.Status == db.TASK_STATUS_RUNNING {
266 continue
267 }
268 if previous.Success() {
269 continue
270 }
271 }
272
273 // Don't consider candidates whose dependencies are not met.
274 depsMet, hashes, err := c.allDepsMet(s.cache)
275 if err != nil {
276 return nil, err
277 }
278 if !depsMet {
279 continue
280 }
281 c.IsolatedHashes = hashes
282
283 key := fmt.Sprintf("%s|%s", c.Repo, c.Name)
284 candidates, ok := bySpec[key]
285 if !ok {
286 candidates = make([]*taskCandidate, 0, l en(commits))
287 }
288 bySpec[key] = append(candidates, c)
289 total++
290 }
291 }
292 }
293 glog.Infof("Found %d candidates in %d specs:", total, len(bySpec))
294 return bySpec, nil
295 }
296
297 // processTaskCandidate computes the remaining information about the task
298 // candidate, eg. blamelists and scoring.
299 func (s *TaskScheduler) processTaskCandidate(c *taskCandidate, now time.Time, ca che *cacheWrapper, commitsBuf []*gitrepo.Commit) error {
300 // Compute blamelist.
301 repo, ok := s.repos[c.Repo]
302 if !ok {
303 return fmt.Errorf("No such repo: %s", c.Repo)
304 }
305 commits, stealingFrom, err := ComputeBlamelist(cache, repo, c.Name, c.Re po, c.Revision, commitsBuf)
306 if err != nil {
307 return err
308 }
309 c.Commits = commits
310 if stealingFrom != nil {
311 c.StealingFromId = stealingFrom.Id
312 }
313
314 // Score the candidates.
315 // The score for a candidate is based on the "testedness" increase
316 // provided by running the task.
317 stoleFromCommits := 0
318 if stealingFrom != nil {
319 stoleFromCommits = len(stealingFrom.Commits)
320 }
321 score := testednessIncrease(len(c.Commits), stoleFromCommits)
322
323 // Scale the score by other factors, eg. time decay.
324 decay, err := s.timeDecayForCommit(now, c.Repo, c.Revision)
325 if err != nil {
326 return err
327 }
328 score *= decay
329
330 c.Score = score
331 return nil
332 }
333
334 // regenerateTaskQueue obtains the set of all eligible task candidates, scores
335 // them, and prepares them to be triggered.
336 func (s *TaskScheduler) regenerateTaskQueue() error {
337 defer timer.New("TaskScheduler.regenerateTaskQueue").Stop()
338
339 // Find the recent commits to use.
340 for _, repoName := range s.repoMap.Repos() {
341 r, err := s.repoMap.Repo(repoName)
342 if err != nil {
343 return err
344 }
345 if err := r.Reset("HEAD"); err != nil {
346 return err
347 }
348 if err := r.Checkout("master"); err != nil {
349 return err
350 }
351 }
352 if err := s.repoMap.Update(); err != nil {
353 return err
354 }
355 from := time.Now().Add(-s.period)
356 commits := map[string][]string{}
357 for _, repoName := range s.repoMap.Repos() {
358 repo, err := s.repoMap.Repo(repoName)
359 if err != nil {
360 return err
361 }
362 commits[repoName] = repo.From(from)
363 }
364
365 // Find and process task candidates.
366 candidates, err := s.findTaskCandidates(commits)
367 if err != nil {
368 return err
369 }
370 defer timer.New("process task candidates").Stop()
371 now := time.Now()
372 processed := make(chan *taskCandidate)
373 errs := make(chan error)
374 wg := sync.WaitGroup{}
375 for _, c := range candidates {
376 wg.Add(1)
377 go func(candidates []*taskCandidate) {
378 defer wg.Done()
379 cache := newCacheWrapper(s.cache)
380 commitsBuf := make([]*gitrepo.Commit, 0, buildbot.MAX_BL AMELIST_COMMITS)
381 for {
382 // Find the best candidate.
383 idx := -1
384 var best *taskCandidate
385 for i, candidate := range candidates {
386 c := candidate.Copy()
387 if err := s.processTaskCandidate(c, now, cache, commitsBuf); err != nil {
388 errs <- err
389 return
390 }
391 if best == nil || c.Score > best.Score {
392 best = c
393 idx = i
394 }
395 }
396 if best == nil {
397 return
398 }
399 processed <- best
400 t := best.MakeTask()
401 t.Id = best.MakeId()
402 cache.insert(t)
403 if best.StealingFromId != "" {
404 stoleFrom, err := cache.GetTask(best.Ste alingFromId)
405 if err != nil {
406 errs <- err
407 return
408 }
409 stole := util.NewStringSet(best.Commits)
410 oldC := util.NewStringSet(stoleFrom.Comm its)
411 newC := oldC.Complement(stole)
412 commits := make([]string, 0, len(newC))
413 for c, _ := range newC {
414 commits = append(commits, c)
415 }
416 stoleFrom.Commits = commits
417 cache.insert(stoleFrom)
418 }
419 candidates = append(candidates[:idx], candidates [idx+1:]...)
420 }
421 }(c)
422 }
423 go func() {
424 wg.Wait()
425 close(processed)
426 close(errs)
427 }()
428 rvCandidates := []*taskCandidate{}
429 rvErrs := []error{}
430 for {
431 select {
432 case c, ok := <-processed:
433 if ok {
434 rvCandidates = append(rvCandidates, c)
435 } else {
436 processed = nil
437 }
438 case err, ok := <-errs:
439 if ok {
440 rvErrs = append(rvErrs, err)
441 } else {
442 errs = nil
443 }
444 }
445 if processed == nil && errs == nil {
446 break
447 }
448 }
449
450 if len(rvErrs) != 0 {
451 return rvErrs[0]
452 }
453 sort.Sort(taskCandidateSlice(rvCandidates))
454
455 s.queueMtx.Lock()
456 defer s.queueMtx.Unlock()
457 s.queue = rvCandidates
458 return nil
459 }
460
461 // getCandidatesToSchedule matches the list of free Swarming bots to task
462 // candidates in the queue and returns the candidates which should be run.
463 // Assumes that the tasks are sorted in decreasing order by score.
464 func getCandidatesToSchedule(bots []*swarming_api.SwarmingRpcsBotInfo, tasks []* taskCandidate) []*taskCandidate {
465 defer timer.New("task_scheduler.getCandidatesToSchedule").Stop()
466 // Create a bots-by-swarming-dimension mapping.
467 botsByDim := map[string]util.StringSet{}
468 for _, b := range bots {
469 for _, dim := range b.Dimensions {
470 for _, val := range dim.Value {
471 d := fmt.Sprintf("%s:%s", dim.Key, val)
472 if _, ok := botsByDim[d]; !ok {
473 botsByDim[d] = util.StringSet{}
474 }
475 botsByDim[d][b.BotId] = true
476 }
477 }
478 }
479
480 // Match bots to tasks.
481 // TODO(borenet): Some tasks require a more specialized bot. We should
482 // match so that less-specialized tasks don't "steal" more-specialized
483 // bots which they don't actually need.
484 rv := make([]*taskCandidate, 0, len(bots))
485 for _, c := range tasks {
486 // For each dimension of the task, find the set of bots which ma tches.
487 matches := util.StringSet{}
488 for i, d := range c.TaskSpec.Dimensions {
489 if i == 0 {
490 matches = matches.Union(botsByDim[d])
491 } else {
492 matches = matches.Intersect(botsByDim[d])
493 }
494 }
495 if len(matches) > 0 {
496 // We're going to run this task. Choose a bot. Sort the
497 // bots by ID so that the choice is deterministic.
498 choices := make([]string, 0, len(matches))
499 for botId, _ := range matches {
500 choices = append(choices, botId)
501 }
502 sort.Strings(choices)
503 bot := choices[0]
504
505 // Remove the bot from consideration.
506 for dim, subset := range botsByDim {
507 delete(subset, bot)
508 if len(subset) == 0 {
509 delete(botsByDim, dim)
510 }
511 }
512
513 // Force the candidate to run on this bot.
514 c.TaskSpec.Dimensions = append(c.TaskSpec.Dimensions, fm t.Sprintf("id:%s", bot))
515
516 // Add the task to the scheduling list.
517 rv = append(rv, c)
518
519 // If we've exhausted the bot list, stop here.
520 if len(botsByDim) == 0 {
521 break
522 }
523 }
524 }
525 sort.Sort(taskCandidateSlice(rv))
526 return rv
527 }
528
529 // scheduleTasks queries for free Swarming bots and triggers tasks according
530 // to relative priorities in the queue.
531 func (s *TaskScheduler) scheduleTasks() error {
532 defer timer.New("TaskScheduler.scheduleTasks").Stop()
533 // Find free bots, match them with tasks.
534 bots, err := getFreeSwarmingBots(s.swarming)
535 if err != nil {
536 return err
537 }
538 s.queueMtx.Lock()
539 defer s.queueMtx.Unlock()
540 schedule := getCandidatesToSchedule(bots, s.queue)
541
542 // First, group by commit hash since we have to isolate the code at
543 // a particular revision for each task.
544 byRepoCommit := map[string]map[string][]*taskCandidate{}
545 for _, c := range schedule {
546 if mRepo, ok := byRepoCommit[c.Repo]; !ok {
547 byRepoCommit[c.Repo] = map[string][]*taskCandidate{c.Rev ision: []*taskCandidate{c}}
548 } else {
549 mRepo[c.Revision] = append(mRepo[c.Revision], c)
550 }
551 }
552
553 // Isolate the tasks by commit.
554 for repoName, commits := range byRepoCommit {
555 infraBotsDir := path.Join(s.workdir, repoName, "infra", "bots")
556 for commit, candidates := range commits {
557 repo, err := s.repoMap.Repo(repoName)
558 if err != nil {
559 return err
560 }
561 if err := repo.Checkout(commit); err != nil {
562 return err
563 }
564 tasks := make([]*isolate.Task, 0, len(candidates))
565 for _, c := range candidates {
566 tasks = append(tasks, c.MakeIsolateTask(infraBot sDir, s.workdir))
567 }
568 hashes, err := s.isolate.IsolateTasks(tasks)
569 if err != nil {
570 return err
571 }
572 if len(hashes) != len(candidates) {
573 return fmt.Errorf("IsolateTasks returned incorre ct number of hashes.")
574 }
575 for i, c := range candidates {
576 c.IsolatedInput = hashes[i]
577 }
578 }
579 }
580
581 // Trigger tasks.
582 byCandidateId := make(map[string]*db.Task, len(schedule))
583 tasksToInsert := make(map[string]*db.Task, len(schedule)*2)
584 for _, candidate := range schedule {
585 t := candidate.MakeTask()
586 if err := s.db.AssignId(t); err != nil {
587 return err
588 }
589 req := candidate.MakeTaskRequest(t.Id)
590 resp, err := s.swarming.TriggerTask(req)
591 if err != nil {
592 return err
593 }
594 if _, err := t.UpdateFromSwarming(resp.TaskResult); err != nil {
595 return err
596 }
597 byCandidateId[candidate.MakeId()] = t
598 tasksToInsert[t.Id] = t
599 // If we're stealing commits from another task, find it and adju st
600 // its blamelist.
601 // TODO(borenet): We're retrieving a cached task which may have been
602 // changed since the cache was last updated. We need to handle t hat.
603 if candidate.StealingFromId != "" {
604 var stealingFrom *db.Task
605 if _, _, _, err := parseId(candidate.StealingFromId); er r == nil {
606 stealingFrom = byCandidateId[candidate.StealingF romId]
607 if stealingFrom == nil {
608 return fmt.Errorf("Attempting to backfil l a just-triggered candidate but can't find it: %q", candidate.StealingFromId)
609 }
610 } else {
611 var ok bool
612 stealingFrom, ok = tasksToInsert[candidate.Steal ingFromId]
613 if !ok {
614 stealingFrom, err = s.cache.GetTask(cand idate.StealingFromId)
615 if err != nil {
616 return err
617 }
618 }
619 }
620 oldCommits := util.NewStringSet(stealingFrom.Commits)
621 stealing := util.NewStringSet(t.Commits)
622 stealingFrom.Commits = oldCommits.Complement(stealing).K eys()
623 tasksToInsert[stealingFrom.Id] = stealingFrom
624 }
625 }
626 tasks := make([]*db.Task, 0, len(tasksToInsert))
627 for _, t := range tasksToInsert {
628 tasks = append(tasks, t)
629 }
630
631 // Insert the tasks into the database.
632 if err := s.db.PutTasks(tasks); err != nil {
633 return err
634 }
635
636 // Remove the tasks from the queue.
637 newQueue := make([]*taskCandidate, 0, len(s.queue)-len(schedule))
638 for i, j := 0, 0; i < len(s.queue); {
639 if j >= len(schedule) {
640 newQueue = append(newQueue, s.queue[i:]...)
641 break
642 }
643 if s.queue[i] == schedule[j] {
644 j++
645 } else {
646 newQueue = append(newQueue, s.queue[i])
647 }
648 i++
649 }
650 s.queue = newQueue
651
652 // Note; if regenerateQueue and scheduleTasks are ever decoupled so that
653 // the queue is reused by multiple runs of scheduleTasks, we'll need to
654 // address the fact that some candidates may still have their
655 // StoleFromId pointing to candidates which have been triggered and
656 // removed from the queue. In that case, we should just need to write a
657 // loop which updates those candidates to use the IDs of the newly-
658 // inserted Tasks in the database rather than the candidate ID.
659
660 glog.Infof("Triggered %d tasks on %d bots.", len(schedule), len(bots))
661 return nil
662 }
663
664 // MainLoop runs a single end-to-end task scheduling loop.
665 func (s *TaskScheduler) MainLoop() error {
666 defer timer.New("TaskSchedulder.MainLoop").Stop()
667
668 glog.Infof("Task Scheduler updating...")
669 var e1, e2 error
670 var wg sync.WaitGroup
671
672 wg.Add(1)
673 go func() {
674 defer wg.Done()
675 if err := s.updateRepos(); err != nil {
676 e1 = err
677 }
678 }()
679
680 // TODO(borenet): Do we have to fail out of scheduling if we fail to
681 // updateUnfinishedTasks? Maybe we can just add a liveness metric and
682 // alert if we go too long without updating successfully.
683 wg.Add(1)
684 go func() {
685 defer wg.Done()
686 if err := updateUnfinishedTasks(s.cache, s.db, s.swarming); err != nil {
687 e2 = err
688 }
689 }()
690 wg.Wait()
691
692 if e1 != nil {
693 return e1
694 }
695 if e2 != nil {
696 return e2
697 }
698
699 // Update the task cache.
700 if err := s.cache.Update(); err != nil {
701 return err
702 }
703
704 // Regenerate the queue, schedule tasks.
705 // TODO(borenet): Query for free Swarming bots while we're regenerating
706 // the queue.
707 glog.Infof("Task Scheduler regenerating the queue...")
708 if err := s.regenerateTaskQueue(); err != nil {
709 return err
710 }
711
712 glog.Infof("Task Scheduler scheduling tasks...")
713 if err := s.scheduleTasks(); err != nil {
714 return err
715 }
716
717 // Update the cache again to include the newly-inserted tasks.
718 return s.cache.Update()
719 }
720
721 // updateRepos syncs the scheduler's repos.
722 func (s *TaskScheduler) updateRepos() error {
723 for _, r := range s.repos {
724 if err := r.Update(); err != nil {
725 return err
726 }
727 }
728 return nil
729 }
730
731 // QueueLen returns the length of the queue.
732 func (s *TaskScheduler) QueueLen() int {
733 s.queueMtx.RLock()
734 defer s.queueMtx.RUnlock()
735 return len(s.queue)
736 }
737
738 // timeDecay24Hr computes a linear time decay amount for the given duration,
739 // given the requested decay amount at 24 hours.
740 func timeDecay24Hr(decayAmt24Hr float64, elapsed time.Duration) float64 {
741 return math.Max(1.0-(1.0-decayAmt24Hr)*(float64(elapsed)/float64(24*time .Hour)), 0.0)
742 }
743
744 // timeDecayForCommit computes a multiplier for a task candidate score based
745 // on how long ago the given commit landed. This allows us to prioritize more
746 // recent commits.
747 func (s *TaskScheduler) timeDecayForCommit(now time.Time, repoName, commit strin g) (float64, error) {
748 if s.timeDecayAmt24Hr == 1.0 {
749 // Shortcut for special case.
750 return 1.0, nil
751 }
752 repo, err := s.repoMap.Repo(repoName)
753 if err != nil {
754 return 0.0, err
755 }
756 d, err := repo.Details(commit, false)
757 if err != nil {
758 return 0.0, err
759 }
760 return timeDecay24Hr(s.timeDecayAmt24Hr, now.Sub(d.Timestamp)), nil
761 }
762
763 // testedness computes the total "testedness" of a set of commits covered by a
764 // task whose blamelist included N commits. The "testedness" of a task spec at a
765 // given commit is defined as follows:
766 //
767 // -1.0 if no task has ever included this commit for this task spec.
768 // 1.0 if a task was run for this task spec AT this commit.
769 // 1.0 / N if a task for this task spec has included this commit, where N is
770 // the number of commits included in the task.
771 //
772 // This function gives the sum of the testedness for a blamelist of N commits.
773 func testedness(n int) float64 {
774 if n < 0 {
775 // This should never happen.
776 glog.Errorf("Task score function got a blamelist with %d commits ", n)
777 return -1.0
778 } else if n == 0 {
779 // Zero commits have zero testedness.
780 return 0.0
781 } else if n == 1 {
782 return 1.0
783 } else {
784 return 1.0 + float64(n-1)/float64(n)
785 }
786 }
787
788 // testednessIncrease computes the increase in "testedness" obtained by running
789 // a task with the given blamelist length which may have "stolen" commits from
790 // a previous task with a different blamelist length. To do so, we compute the
791 // "testedness" for every commit affected by the task, before and after the
792 // task would run. We subtract the "before" score from the "after" score to
793 // obtain the "testedness" increase at each commit, then sum them to find the
794 // total increase in "testedness" obtained by running the task.
795 func testednessIncrease(blamelistLength, stoleFromBlamelistLength int) float64 {
796 // Invalid inputs.
797 if blamelistLength <= 0 || stoleFromBlamelistLength < 0 {
798 return -1.0
799 }
800
801 if stoleFromBlamelistLength == 0 {
802 // This task covers previously-untested commits. Previous tested ness
803 // is -1.0 for each commit in the blamelist.
804 beforeTestedness := float64(-blamelistLength)
805 afterTestedness := testedness(blamelistLength)
806 return afterTestedness - beforeTestedness
807 } else if blamelistLength == stoleFromBlamelistLength {
808 // This is a retry. It provides no testedness increase, so short cut here
809 // rather than perform the math to obtain the same answer.
810 return 0.0
811 } else {
812 // This is a bisect/backfill.
813 beforeTestedness := testedness(stoleFromBlamelistLength)
814 afterTestedness := testedness(blamelistLength) + testedness(stol eFromBlamelistLength-blamelistLength)
815 return afterTestedness - beforeTestedness
816 }
817 }
818
819 // getFreeSwarmingBots returns a slice of free swarming bots.
820 func getFreeSwarmingBots(s swarming.ApiClient) ([]*swarming_api.SwarmingRpcsBotI nfo, error) {
821 bots, err := s.ListSkiaBots()
822 if err != nil {
823 return nil, err
824 }
825 rv := make([]*swarming_api.SwarmingRpcsBotInfo, 0, len(bots))
826 for _, bot := range bots {
827 if bot.IsDead {
828 continue
829 }
830 if bot.Quarantined {
831 continue
832 }
833 if bot.TaskId != "" {
834 continue
835 }
836 rv = append(rv, bot)
837 }
838 return rv, nil
839 }
840
841 // updateUnfinishedTasks queries Swarming for all unfinished tasks and updates
842 // their status in the DB.
843 func updateUnfinishedTasks(cache db.TaskCache, d db.DB, s swarming.ApiClient) er ror {
844 tasks, err := cache.UnfinishedTasks()
845 if err != nil {
846 return err
847 }
848 sort.Sort(db.TaskSlice(tasks))
849
850 // TODO(borenet): This would be faster if Swarming had a
851 // get-multiple-tasks-by-ID endpoint.
852 var wg sync.WaitGroup
853 errs := make([]error, len(tasks))
854 for i, t := range tasks {
855 wg.Add(1)
856 go func(idx int, t *db.Task) {
857 defer wg.Done()
858 swarmTask, err := s.GetTask(t.SwarmingTaskId)
859 if err != nil {
860 errs[idx] = fmt.Errorf("Failed to update unfinis hed task; failed to get updated task from swarming: %s", err)
861 return
862 }
863 if err := db.UpdateDBFromSwarmingTask(d, swarmTask); err != nil {
864 errs[idx] = fmt.Errorf("Failed to update unfinis hed task: %s", err)
865 return
866 }
867 }(i, t)
868 }
869 wg.Wait()
870 for _, err := range errs {
871 if err != nil {
872 return err
873 }
874 }
875 return nil
876 }
OLDNEW
« no previous file with comments | « build_scheduler/go/task_scheduler/task_candidate_test.go ('k') | build_scheduler/go/task_scheduler/task_scheduler_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698