| OLD | NEW |
| (Empty) |
| 1 package task_scheduler | |
| 2 | |
| 3 import ( | |
| 4 "fmt" | |
| 5 "math" | |
| 6 "path" | |
| 7 "sort" | |
| 8 "sync" | |
| 9 "time" | |
| 10 | |
| 11 swarming_api "github.com/luci/luci-go/common/api/swarming/swarming/v1" | |
| 12 "github.com/skia-dev/glog" | |
| 13 "go.skia.org/infra/build_scheduler/go/db" | |
| 14 "go.skia.org/infra/go/buildbot" | |
| 15 "go.skia.org/infra/go/gitinfo" | |
| 16 "go.skia.org/infra/go/gitrepo" | |
| 17 "go.skia.org/infra/go/isolate" | |
| 18 "go.skia.org/infra/go/metrics2" | |
| 19 "go.skia.org/infra/go/swarming" | |
| 20 "go.skia.org/infra/go/timer" | |
| 21 "go.skia.org/infra/go/util" | |
| 22 ) | |
| 23 | |
| 24 // TaskScheduler is a struct used for scheduling tasks on bots. | |
| 25 type TaskScheduler struct { | |
| 26 cache db.TaskCache | |
| 27 db db.DB | |
| 28 isolate *isolate.Client | |
| 29 period time.Duration | |
| 30 queue []*taskCandidate | |
| 31 queueMtx sync.RWMutex | |
| 32 repoMap *gitinfo.RepoMap | |
| 33 repos map[string]*gitrepo.Repo | |
| 34 swarming swarming.ApiClient | |
| 35 taskCfgCache *taskCfgCache | |
| 36 timeDecayAmt24Hr float64 | |
| 37 workdir string | |
| 38 } | |
| 39 | |
| 40 func NewTaskScheduler(d db.DB, cache db.TaskCache, period time.Duration, workdir
string, repoNames []string, isolateClient *isolate.Client, swarmingClient swarm
ing.ApiClient) (*TaskScheduler, error) { | |
| 41 repos := make(map[string]*gitrepo.Repo, len(repoNames)) | |
| 42 rm := gitinfo.NewRepoMap(workdir) | |
| 43 for _, r := range repoNames { | |
| 44 repo, err := gitrepo.NewRepo(r, path.Join(workdir, path.Base(r))
) | |
| 45 if err != nil { | |
| 46 return nil, err | |
| 47 } | |
| 48 repos[r] = repo | |
| 49 if _, err := rm.Repo(r); err != nil { | |
| 50 return nil, err | |
| 51 } | |
| 52 } | |
| 53 s := &TaskScheduler{ | |
| 54 cache: cache, | |
| 55 db: d, | |
| 56 isolate: isolateClient, | |
| 57 period: period, | |
| 58 queue: []*taskCandidate{}, | |
| 59 queueMtx: sync.RWMutex{}, | |
| 60 repoMap: rm, | |
| 61 repos: repos, | |
| 62 swarming: swarmingClient, | |
| 63 taskCfgCache: newTaskCfgCache(rm), | |
| 64 timeDecayAmt24Hr: 1.0, | |
| 65 workdir: workdir, | |
| 66 } | |
| 67 return s, nil | |
| 68 } | |
| 69 | |
| 70 // Start initiates the TaskScheduler's goroutines for scheduling tasks. | |
| 71 func (s *TaskScheduler) Start() { | |
| 72 go func() { | |
| 73 lv := metrics2.NewLiveness("last-successful-task-scheduling") | |
| 74 for _ = range time.Tick(time.Minute) { | |
| 75 if err := s.MainLoop(); err != nil { | |
| 76 glog.Errorf("Failed to run the task scheduler: %
s", err) | |
| 77 } else { | |
| 78 lv.Reset() | |
| 79 } | |
| 80 } | |
| 81 }() | |
| 82 } | |
| 83 | |
| 84 // computeBlamelistRecursive traces through commit history, adding to | |
| 85 // the commits map until the blamelist for the task is complete. | |
| 86 // | |
| 87 // Args: | |
| 88 // - repoName: Name of the repository. | |
| 89 // - commit: Current commit as we're recursing through history. | |
| 90 // - taskName: Name of the task. | |
| 91 // - revision: Revision at which the task would run. | |
| 92 // - commits: Buffer in which to place the blamelist commits as they accumula
te. | |
| 93 // - cache: TaskCache, for finding previous tasks. | |
| 94 // - repo: gitrepo.Repo corresponding to the repository. | |
| 95 // - stealFrom: Existing Task from which this Task will steal commits, if one e
xists. | |
| 96 func computeBlamelistRecursive(repoName string, commit *gitrepo.Commit, taskName
string, revision *gitrepo.Commit, commits []*gitrepo.Commit, cache db.TaskCache
, repo *gitrepo.Repo, stealFrom *db.Task) ([]*gitrepo.Commit, *db.Task, error) { | |
| 97 // Shortcut in case we missed this case before; if this is the first | |
| 98 // task for this task spec which has a valid Revision, the blamelist wil
l | |
| 99 // be the entire Git history. If we find too many commits, assume we've | |
| 100 // hit this case and just return the Revision as the blamelist. | |
| 101 if len(commits) > buildbot.MAX_BLAMELIST_COMMITS && stealFrom == nil { | |
| 102 commits = append(commits[:0], commit) | |
| 103 return commits, nil, nil | |
| 104 } | |
| 105 | |
| 106 // Determine whether any task already includes this commit. | |
| 107 prev, err := cache.GetTaskForCommit(repoName, commit.Hash, taskName) | |
| 108 if err != nil { | |
| 109 return nil, nil, err | |
| 110 } | |
| 111 | |
| 112 // If we're stealing commits from a previous task but the current | |
| 113 // commit is not in any task's blamelist, we must have scrolled past | |
| 114 // the beginning of the tasks. Just return. | |
| 115 if prev == nil && stealFrom != nil { | |
| 116 return commits, stealFrom, nil | |
| 117 } | |
| 118 | |
| 119 // If a previous task already included this commit, we have to make a de
cision. | |
| 120 if prev != nil { | |
| 121 // If this Task's Revision is already included in a different | |
| 122 // Task, then we're either bisecting or retrying a task. We'll | |
| 123 // "steal" commits from the previous Task's blamelist. | |
| 124 if len(commits) == 0 { | |
| 125 stealFrom = prev | |
| 126 | |
| 127 // Another shortcut: If our Revision is the same as the | |
| 128 // Revision of the Task we're stealing commits from, | |
| 129 // ie. both tasks ran at the same commit, then this is a | |
| 130 // retry. Just steal all of the commits without doing | |
| 131 // any more work. | |
| 132 if stealFrom.Revision == revision.Hash { | |
| 133 commits = commits[:0] | |
| 134 for _, c := range stealFrom.Commits { | |
| 135 ptr := repo.Get(c) | |
| 136 if ptr == nil { | |
| 137 return nil, nil, fmt.Errorf("No
such commit: %q", c) | |
| 138 } | |
| 139 commits = append(commits, ptr) | |
| 140 } | |
| 141 return commits, stealFrom, nil | |
| 142 } | |
| 143 } | |
| 144 if stealFrom == nil || prev != stealFrom { | |
| 145 // If we've hit a commit belonging to a different task, | |
| 146 // we're done. | |
| 147 return commits, stealFrom, nil | |
| 148 } | |
| 149 } | |
| 150 | |
| 151 // Add the commit. | |
| 152 commits = append(commits, commit) | |
| 153 | |
| 154 // Recurse on the commit's parents. | |
| 155 for _, p := range commit.Parents { | |
| 156 var err error | |
| 157 commits, stealFrom, err = computeBlamelistRecursive(repoName, p,
taskName, revision, commits, cache, repo, stealFrom) | |
| 158 if err != nil { | |
| 159 return nil, nil, err | |
| 160 } | |
| 161 } | |
| 162 return commits, stealFrom, nil | |
| 163 } | |
| 164 | |
| 165 // ComputeBlamelist computes the blamelist for a new task, specified by name, | |
| 166 // repo, and revision. Returns the list of commits covered by the task, and any | |
| 167 // previous task which part or all of the blamelist was "stolen" from (see | |
| 168 // below). There are three cases: | |
| 169 // | |
| 170 // 1. The new task tests commits which have not yet been tested. Trace commit | |
| 171 // history, accumulating commits until we find commits which have been tested | |
| 172 // by previous tasks. | |
| 173 // | |
| 174 // 2. The new task runs at the same commit as a previous task. This is a retry, | |
| 175 // so the entire blamelist of the previous task is "stolen". | |
| 176 // | |
| 177 // 3. The new task runs at a commit which is in a previous task's blamelist, but | |
| 178 // no task has run at the same commit. This is a bisect. Trace commit | |
| 179 // history, "stealing" commits from the previous task until we find a commit | |
| 180 // which was covered by a *different* previous task. | |
| 181 // | |
| 182 // Args: | |
| 183 // - cache: TaskCache instance. | |
| 184 // - repo: gitrepo.Repo instance corresponding to the repository of the
task. | |
| 185 // - name: Name of the task. | |
| 186 // - repoName: Name of the repository for the task. | |
| 187 // - revision: Revision at which the task would run. | |
| 188 // - commitsBuf: Buffer for use as scratch space. | |
| 189 func ComputeBlamelist(cache db.TaskCache, repo *gitrepo.Repo, name, repoName, re
vision string, commitsBuf []*gitrepo.Commit) ([]string, *db.Task, error) { | |
| 190 // TODO(borenet): If this is a try job, don't compute the blamelist. | |
| 191 | |
| 192 // If this is the first invocation of a given task spec, save time by | |
| 193 // setting the blamelist to only be c.Revision. | |
| 194 if !cache.KnownTaskName(repoName, name) { | |
| 195 return []string{revision}, nil, nil | |
| 196 } | |
| 197 | |
| 198 commit := repo.Get(revision) | |
| 199 if commit == nil { | |
| 200 return nil, nil, fmt.Errorf("No such commit: %q", revision) | |
| 201 } | |
| 202 | |
| 203 commitsBuf = commitsBuf[:0] | |
| 204 | |
| 205 // Run the helper function to recurse on commit history. | |
| 206 commits, stealFrom, err := computeBlamelistRecursive(repoName, commit, n
ame, commit, commitsBuf, cache, repo, nil) | |
| 207 if err != nil { | |
| 208 return nil, nil, err | |
| 209 } | |
| 210 | |
| 211 // De-duplicate the commits list. Duplicates are rare but will occur | |
| 212 // in the case of a task which runs after a short-lived branch is merged | |
| 213 // so that the blamelist includes both the branch point and the merge | |
| 214 // commit. In this case, any commits just before the branch point will | |
| 215 // be duplicated. | |
| 216 // TODO(borenet): This has never happened in the Skia repo, but the | |
| 217 // below 8 lines of code account for ~10% of the execution time of this | |
| 218 // function, which is the critical path for the scheduler. Consider | |
| 219 // either ignoring this case or come up with an alternate solution which | |
| 220 // moves this logic out of the critical path. | |
| 221 rv := make([]string, 0, len(commits)) | |
| 222 visited := make(map[*gitrepo.Commit]bool, len(commits)) | |
| 223 for _, c := range commits { | |
| 224 if !visited[c] { | |
| 225 rv = append(rv, c.Hash) | |
| 226 visited[c] = true | |
| 227 } | |
| 228 } | |
| 229 return rv, stealFrom, nil | |
| 230 } | |
| 231 | |
| 232 // findTaskCandidates goes through the given commits-by-repos, loads task specs | |
| 233 // from each repo/commit pair and passes them onto the out channel, filtering | |
| 234 // candidates which we don't want to run. The out channel will be closed when | |
| 235 // all task specs have been considered, or when an error occurs. If an error | |
| 236 // occurs, it will be passed onto the errs channel, but that channel will not | |
| 237 // be closed. | |
| 238 func (s *TaskScheduler) findTaskCandidates(commitsByRepo map[string][]string) (m
ap[string][]*taskCandidate, error) { | |
| 239 defer timer.New("TaskScheduler.findTaskCandidates").Stop() | |
| 240 // Obtain all possible tasks. | |
| 241 specs, err := s.taskCfgCache.GetTaskSpecsForCommits(commitsByRepo) | |
| 242 if err != nil { | |
| 243 return nil, err | |
| 244 } | |
| 245 bySpec := map[string][]*taskCandidate{} | |
| 246 total := 0 | |
| 247 for repo, commits := range specs { | |
| 248 for commit, tasks := range commits { | |
| 249 for name, task := range tasks { | |
| 250 c := &taskCandidate{ | |
| 251 IsolatedHashes: nil, | |
| 252 Name: name, | |
| 253 Repo: repo, | |
| 254 Revision: commit, | |
| 255 Score: 0.0, | |
| 256 TaskSpec: task, | |
| 257 } | |
| 258 // We shouldn't duplicate pending, in-progress, | |
| 259 // or successfully completed tasks. | |
| 260 previous, err := s.cache.GetTaskForCommit(c.Repo
, c.Revision, c.Name) | |
| 261 if err != nil { | |
| 262 return nil, err | |
| 263 } | |
| 264 if previous != nil && previous.Revision == c.Rev
ision { | |
| 265 if previous.Status == db.TASK_STATUS_PEN
DING || previous.Status == db.TASK_STATUS_RUNNING { | |
| 266 continue | |
| 267 } | |
| 268 if previous.Success() { | |
| 269 continue | |
| 270 } | |
| 271 } | |
| 272 | |
| 273 // Don't consider candidates whose dependencies
are not met. | |
| 274 depsMet, hashes, err := c.allDepsMet(s.cache) | |
| 275 if err != nil { | |
| 276 return nil, err | |
| 277 } | |
| 278 if !depsMet { | |
| 279 continue | |
| 280 } | |
| 281 c.IsolatedHashes = hashes | |
| 282 | |
| 283 key := fmt.Sprintf("%s|%s", c.Repo, c.Name) | |
| 284 candidates, ok := bySpec[key] | |
| 285 if !ok { | |
| 286 candidates = make([]*taskCandidate, 0, l
en(commits)) | |
| 287 } | |
| 288 bySpec[key] = append(candidates, c) | |
| 289 total++ | |
| 290 } | |
| 291 } | |
| 292 } | |
| 293 glog.Infof("Found %d candidates in %d specs:", total, len(bySpec)) | |
| 294 return bySpec, nil | |
| 295 } | |
| 296 | |
| 297 // processTaskCandidate computes the remaining information about the task | |
| 298 // candidate, eg. blamelists and scoring. | |
| 299 func (s *TaskScheduler) processTaskCandidate(c *taskCandidate, now time.Time, ca
che *cacheWrapper, commitsBuf []*gitrepo.Commit) error { | |
| 300 // Compute blamelist. | |
| 301 repo, ok := s.repos[c.Repo] | |
| 302 if !ok { | |
| 303 return fmt.Errorf("No such repo: %s", c.Repo) | |
| 304 } | |
| 305 commits, stealingFrom, err := ComputeBlamelist(cache, repo, c.Name, c.Re
po, c.Revision, commitsBuf) | |
| 306 if err != nil { | |
| 307 return err | |
| 308 } | |
| 309 c.Commits = commits | |
| 310 if stealingFrom != nil { | |
| 311 c.StealingFromId = stealingFrom.Id | |
| 312 } | |
| 313 | |
| 314 // Score the candidates. | |
| 315 // The score for a candidate is based on the "testedness" increase | |
| 316 // provided by running the task. | |
| 317 stoleFromCommits := 0 | |
| 318 if stealingFrom != nil { | |
| 319 stoleFromCommits = len(stealingFrom.Commits) | |
| 320 } | |
| 321 score := testednessIncrease(len(c.Commits), stoleFromCommits) | |
| 322 | |
| 323 // Scale the score by other factors, eg. time decay. | |
| 324 decay, err := s.timeDecayForCommit(now, c.Repo, c.Revision) | |
| 325 if err != nil { | |
| 326 return err | |
| 327 } | |
| 328 score *= decay | |
| 329 | |
| 330 c.Score = score | |
| 331 return nil | |
| 332 } | |
| 333 | |
| 334 // regenerateTaskQueue obtains the set of all eligible task candidates, scores | |
| 335 // them, and prepares them to be triggered. | |
| 336 func (s *TaskScheduler) regenerateTaskQueue() error { | |
| 337 defer timer.New("TaskScheduler.regenerateTaskQueue").Stop() | |
| 338 | |
| 339 // Find the recent commits to use. | |
| 340 for _, repoName := range s.repoMap.Repos() { | |
| 341 r, err := s.repoMap.Repo(repoName) | |
| 342 if err != nil { | |
| 343 return err | |
| 344 } | |
| 345 if err := r.Reset("HEAD"); err != nil { | |
| 346 return err | |
| 347 } | |
| 348 if err := r.Checkout("master"); err != nil { | |
| 349 return err | |
| 350 } | |
| 351 } | |
| 352 if err := s.repoMap.Update(); err != nil { | |
| 353 return err | |
| 354 } | |
| 355 from := time.Now().Add(-s.period) | |
| 356 commits := map[string][]string{} | |
| 357 for _, repoName := range s.repoMap.Repos() { | |
| 358 repo, err := s.repoMap.Repo(repoName) | |
| 359 if err != nil { | |
| 360 return err | |
| 361 } | |
| 362 commits[repoName] = repo.From(from) | |
| 363 } | |
| 364 | |
| 365 // Find and process task candidates. | |
| 366 candidates, err := s.findTaskCandidates(commits) | |
| 367 if err != nil { | |
| 368 return err | |
| 369 } | |
| 370 defer timer.New("process task candidates").Stop() | |
| 371 now := time.Now() | |
| 372 processed := make(chan *taskCandidate) | |
| 373 errs := make(chan error) | |
| 374 wg := sync.WaitGroup{} | |
| 375 for _, c := range candidates { | |
| 376 wg.Add(1) | |
| 377 go func(candidates []*taskCandidate) { | |
| 378 defer wg.Done() | |
| 379 cache := newCacheWrapper(s.cache) | |
| 380 commitsBuf := make([]*gitrepo.Commit, 0, buildbot.MAX_BL
AMELIST_COMMITS) | |
| 381 for { | |
| 382 // Find the best candidate. | |
| 383 idx := -1 | |
| 384 var best *taskCandidate | |
| 385 for i, candidate := range candidates { | |
| 386 c := candidate.Copy() | |
| 387 if err := s.processTaskCandidate(c, now,
cache, commitsBuf); err != nil { | |
| 388 errs <- err | |
| 389 return | |
| 390 } | |
| 391 if best == nil || c.Score > best.Score { | |
| 392 best = c | |
| 393 idx = i | |
| 394 } | |
| 395 } | |
| 396 if best == nil { | |
| 397 return | |
| 398 } | |
| 399 processed <- best | |
| 400 t := best.MakeTask() | |
| 401 t.Id = best.MakeId() | |
| 402 cache.insert(t) | |
| 403 if best.StealingFromId != "" { | |
| 404 stoleFrom, err := cache.GetTask(best.Ste
alingFromId) | |
| 405 if err != nil { | |
| 406 errs <- err | |
| 407 return | |
| 408 } | |
| 409 stole := util.NewStringSet(best.Commits) | |
| 410 oldC := util.NewStringSet(stoleFrom.Comm
its) | |
| 411 newC := oldC.Complement(stole) | |
| 412 commits := make([]string, 0, len(newC)) | |
| 413 for c, _ := range newC { | |
| 414 commits = append(commits, c) | |
| 415 } | |
| 416 stoleFrom.Commits = commits | |
| 417 cache.insert(stoleFrom) | |
| 418 } | |
| 419 candidates = append(candidates[:idx], candidates
[idx+1:]...) | |
| 420 } | |
| 421 }(c) | |
| 422 } | |
| 423 go func() { | |
| 424 wg.Wait() | |
| 425 close(processed) | |
| 426 close(errs) | |
| 427 }() | |
| 428 rvCandidates := []*taskCandidate{} | |
| 429 rvErrs := []error{} | |
| 430 for { | |
| 431 select { | |
| 432 case c, ok := <-processed: | |
| 433 if ok { | |
| 434 rvCandidates = append(rvCandidates, c) | |
| 435 } else { | |
| 436 processed = nil | |
| 437 } | |
| 438 case err, ok := <-errs: | |
| 439 if ok { | |
| 440 rvErrs = append(rvErrs, err) | |
| 441 } else { | |
| 442 errs = nil | |
| 443 } | |
| 444 } | |
| 445 if processed == nil && errs == nil { | |
| 446 break | |
| 447 } | |
| 448 } | |
| 449 | |
| 450 if len(rvErrs) != 0 { | |
| 451 return rvErrs[0] | |
| 452 } | |
| 453 sort.Sort(taskCandidateSlice(rvCandidates)) | |
| 454 | |
| 455 s.queueMtx.Lock() | |
| 456 defer s.queueMtx.Unlock() | |
| 457 s.queue = rvCandidates | |
| 458 return nil | |
| 459 } | |
| 460 | |
| 461 // getCandidatesToSchedule matches the list of free Swarming bots to task | |
| 462 // candidates in the queue and returns the candidates which should be run. | |
| 463 // Assumes that the tasks are sorted in decreasing order by score. | |
| 464 func getCandidatesToSchedule(bots []*swarming_api.SwarmingRpcsBotInfo, tasks []*
taskCandidate) []*taskCandidate { | |
| 465 defer timer.New("task_scheduler.getCandidatesToSchedule").Stop() | |
| 466 // Create a bots-by-swarming-dimension mapping. | |
| 467 botsByDim := map[string]util.StringSet{} | |
| 468 for _, b := range bots { | |
| 469 for _, dim := range b.Dimensions { | |
| 470 for _, val := range dim.Value { | |
| 471 d := fmt.Sprintf("%s:%s", dim.Key, val) | |
| 472 if _, ok := botsByDim[d]; !ok { | |
| 473 botsByDim[d] = util.StringSet{} | |
| 474 } | |
| 475 botsByDim[d][b.BotId] = true | |
| 476 } | |
| 477 } | |
| 478 } | |
| 479 | |
| 480 // Match bots to tasks. | |
| 481 // TODO(borenet): Some tasks require a more specialized bot. We should | |
| 482 // match so that less-specialized tasks don't "steal" more-specialized | |
| 483 // bots which they don't actually need. | |
| 484 rv := make([]*taskCandidate, 0, len(bots)) | |
| 485 for _, c := range tasks { | |
| 486 // For each dimension of the task, find the set of bots which ma
tches. | |
| 487 matches := util.StringSet{} | |
| 488 for i, d := range c.TaskSpec.Dimensions { | |
| 489 if i == 0 { | |
| 490 matches = matches.Union(botsByDim[d]) | |
| 491 } else { | |
| 492 matches = matches.Intersect(botsByDim[d]) | |
| 493 } | |
| 494 } | |
| 495 if len(matches) > 0 { | |
| 496 // We're going to run this task. Choose a bot. Sort the | |
| 497 // bots by ID so that the choice is deterministic. | |
| 498 choices := make([]string, 0, len(matches)) | |
| 499 for botId, _ := range matches { | |
| 500 choices = append(choices, botId) | |
| 501 } | |
| 502 sort.Strings(choices) | |
| 503 bot := choices[0] | |
| 504 | |
| 505 // Remove the bot from consideration. | |
| 506 for dim, subset := range botsByDim { | |
| 507 delete(subset, bot) | |
| 508 if len(subset) == 0 { | |
| 509 delete(botsByDim, dim) | |
| 510 } | |
| 511 } | |
| 512 | |
| 513 // Force the candidate to run on this bot. | |
| 514 c.TaskSpec.Dimensions = append(c.TaskSpec.Dimensions, fm
t.Sprintf("id:%s", bot)) | |
| 515 | |
| 516 // Add the task to the scheduling list. | |
| 517 rv = append(rv, c) | |
| 518 | |
| 519 // If we've exhausted the bot list, stop here. | |
| 520 if len(botsByDim) == 0 { | |
| 521 break | |
| 522 } | |
| 523 } | |
| 524 } | |
| 525 sort.Sort(taskCandidateSlice(rv)) | |
| 526 return rv | |
| 527 } | |
| 528 | |
| 529 // scheduleTasks queries for free Swarming bots and triggers tasks according | |
| 530 // to relative priorities in the queue. | |
| 531 func (s *TaskScheduler) scheduleTasks() error { | |
| 532 defer timer.New("TaskScheduler.scheduleTasks").Stop() | |
| 533 // Find free bots, match them with tasks. | |
| 534 bots, err := getFreeSwarmingBots(s.swarming) | |
| 535 if err != nil { | |
| 536 return err | |
| 537 } | |
| 538 s.queueMtx.Lock() | |
| 539 defer s.queueMtx.Unlock() | |
| 540 schedule := getCandidatesToSchedule(bots, s.queue) | |
| 541 | |
| 542 // First, group by commit hash since we have to isolate the code at | |
| 543 // a particular revision for each task. | |
| 544 byRepoCommit := map[string]map[string][]*taskCandidate{} | |
| 545 for _, c := range schedule { | |
| 546 if mRepo, ok := byRepoCommit[c.Repo]; !ok { | |
| 547 byRepoCommit[c.Repo] = map[string][]*taskCandidate{c.Rev
ision: []*taskCandidate{c}} | |
| 548 } else { | |
| 549 mRepo[c.Revision] = append(mRepo[c.Revision], c) | |
| 550 } | |
| 551 } | |
| 552 | |
| 553 // Isolate the tasks by commit. | |
| 554 for repoName, commits := range byRepoCommit { | |
| 555 infraBotsDir := path.Join(s.workdir, repoName, "infra", "bots") | |
| 556 for commit, candidates := range commits { | |
| 557 repo, err := s.repoMap.Repo(repoName) | |
| 558 if err != nil { | |
| 559 return err | |
| 560 } | |
| 561 if err := repo.Checkout(commit); err != nil { | |
| 562 return err | |
| 563 } | |
| 564 tasks := make([]*isolate.Task, 0, len(candidates)) | |
| 565 for _, c := range candidates { | |
| 566 tasks = append(tasks, c.MakeIsolateTask(infraBot
sDir, s.workdir)) | |
| 567 } | |
| 568 hashes, err := s.isolate.IsolateTasks(tasks) | |
| 569 if err != nil { | |
| 570 return err | |
| 571 } | |
| 572 if len(hashes) != len(candidates) { | |
| 573 return fmt.Errorf("IsolateTasks returned incorre
ct number of hashes.") | |
| 574 } | |
| 575 for i, c := range candidates { | |
| 576 c.IsolatedInput = hashes[i] | |
| 577 } | |
| 578 } | |
| 579 } | |
| 580 | |
| 581 // Trigger tasks. | |
| 582 byCandidateId := make(map[string]*db.Task, len(schedule)) | |
| 583 tasksToInsert := make(map[string]*db.Task, len(schedule)*2) | |
| 584 for _, candidate := range schedule { | |
| 585 t := candidate.MakeTask() | |
| 586 if err := s.db.AssignId(t); err != nil { | |
| 587 return err | |
| 588 } | |
| 589 req := candidate.MakeTaskRequest(t.Id) | |
| 590 resp, err := s.swarming.TriggerTask(req) | |
| 591 if err != nil { | |
| 592 return err | |
| 593 } | |
| 594 if _, err := t.UpdateFromSwarming(resp.TaskResult); err != nil { | |
| 595 return err | |
| 596 } | |
| 597 byCandidateId[candidate.MakeId()] = t | |
| 598 tasksToInsert[t.Id] = t | |
| 599 // If we're stealing commits from another task, find it and adju
st | |
| 600 // its blamelist. | |
| 601 // TODO(borenet): We're retrieving a cached task which may have
been | |
| 602 // changed since the cache was last updated. We need to handle t
hat. | |
| 603 if candidate.StealingFromId != "" { | |
| 604 var stealingFrom *db.Task | |
| 605 if _, _, _, err := parseId(candidate.StealingFromId); er
r == nil { | |
| 606 stealingFrom = byCandidateId[candidate.StealingF
romId] | |
| 607 if stealingFrom == nil { | |
| 608 return fmt.Errorf("Attempting to backfil
l a just-triggered candidate but can't find it: %q", candidate.StealingFromId) | |
| 609 } | |
| 610 } else { | |
| 611 var ok bool | |
| 612 stealingFrom, ok = tasksToInsert[candidate.Steal
ingFromId] | |
| 613 if !ok { | |
| 614 stealingFrom, err = s.cache.GetTask(cand
idate.StealingFromId) | |
| 615 if err != nil { | |
| 616 return err | |
| 617 } | |
| 618 } | |
| 619 } | |
| 620 oldCommits := util.NewStringSet(stealingFrom.Commits) | |
| 621 stealing := util.NewStringSet(t.Commits) | |
| 622 stealingFrom.Commits = oldCommits.Complement(stealing).K
eys() | |
| 623 tasksToInsert[stealingFrom.Id] = stealingFrom | |
| 624 } | |
| 625 } | |
| 626 tasks := make([]*db.Task, 0, len(tasksToInsert)) | |
| 627 for _, t := range tasksToInsert { | |
| 628 tasks = append(tasks, t) | |
| 629 } | |
| 630 | |
| 631 // Insert the tasks into the database. | |
| 632 if err := s.db.PutTasks(tasks); err != nil { | |
| 633 return err | |
| 634 } | |
| 635 | |
| 636 // Remove the tasks from the queue. | |
| 637 newQueue := make([]*taskCandidate, 0, len(s.queue)-len(schedule)) | |
| 638 for i, j := 0, 0; i < len(s.queue); { | |
| 639 if j >= len(schedule) { | |
| 640 newQueue = append(newQueue, s.queue[i:]...) | |
| 641 break | |
| 642 } | |
| 643 if s.queue[i] == schedule[j] { | |
| 644 j++ | |
| 645 } else { | |
| 646 newQueue = append(newQueue, s.queue[i]) | |
| 647 } | |
| 648 i++ | |
| 649 } | |
| 650 s.queue = newQueue | |
| 651 | |
| 652 // Note; if regenerateQueue and scheduleTasks are ever decoupled so that | |
| 653 // the queue is reused by multiple runs of scheduleTasks, we'll need to | |
| 654 // address the fact that some candidates may still have their | |
| 655 // StoleFromId pointing to candidates which have been triggered and | |
| 656 // removed from the queue. In that case, we should just need to write a | |
| 657 // loop which updates those candidates to use the IDs of the newly- | |
| 658 // inserted Tasks in the database rather than the candidate ID. | |
| 659 | |
| 660 glog.Infof("Triggered %d tasks on %d bots.", len(schedule), len(bots)) | |
| 661 return nil | |
| 662 } | |
| 663 | |
| 664 // MainLoop runs a single end-to-end task scheduling loop. | |
| 665 func (s *TaskScheduler) MainLoop() error { | |
| 666 defer timer.New("TaskSchedulder.MainLoop").Stop() | |
| 667 | |
| 668 glog.Infof("Task Scheduler updating...") | |
| 669 var e1, e2 error | |
| 670 var wg sync.WaitGroup | |
| 671 | |
| 672 wg.Add(1) | |
| 673 go func() { | |
| 674 defer wg.Done() | |
| 675 if err := s.updateRepos(); err != nil { | |
| 676 e1 = err | |
| 677 } | |
| 678 }() | |
| 679 | |
| 680 // TODO(borenet): Do we have to fail out of scheduling if we fail to | |
| 681 // updateUnfinishedTasks? Maybe we can just add a liveness metric and | |
| 682 // alert if we go too long without updating successfully. | |
| 683 wg.Add(1) | |
| 684 go func() { | |
| 685 defer wg.Done() | |
| 686 if err := updateUnfinishedTasks(s.cache, s.db, s.swarming); err
!= nil { | |
| 687 e2 = err | |
| 688 } | |
| 689 }() | |
| 690 wg.Wait() | |
| 691 | |
| 692 if e1 != nil { | |
| 693 return e1 | |
| 694 } | |
| 695 if e2 != nil { | |
| 696 return e2 | |
| 697 } | |
| 698 | |
| 699 // Update the task cache. | |
| 700 if err := s.cache.Update(); err != nil { | |
| 701 return err | |
| 702 } | |
| 703 | |
| 704 // Regenerate the queue, schedule tasks. | |
| 705 // TODO(borenet): Query for free Swarming bots while we're regenerating | |
| 706 // the queue. | |
| 707 glog.Infof("Task Scheduler regenerating the queue...") | |
| 708 if err := s.regenerateTaskQueue(); err != nil { | |
| 709 return err | |
| 710 } | |
| 711 | |
| 712 glog.Infof("Task Scheduler scheduling tasks...") | |
| 713 if err := s.scheduleTasks(); err != nil { | |
| 714 return err | |
| 715 } | |
| 716 | |
| 717 // Update the cache again to include the newly-inserted tasks. | |
| 718 return s.cache.Update() | |
| 719 } | |
| 720 | |
| 721 // updateRepos syncs the scheduler's repos. | |
| 722 func (s *TaskScheduler) updateRepos() error { | |
| 723 for _, r := range s.repos { | |
| 724 if err := r.Update(); err != nil { | |
| 725 return err | |
| 726 } | |
| 727 } | |
| 728 return nil | |
| 729 } | |
| 730 | |
| 731 // QueueLen returns the length of the queue. | |
| 732 func (s *TaskScheduler) QueueLen() int { | |
| 733 s.queueMtx.RLock() | |
| 734 defer s.queueMtx.RUnlock() | |
| 735 return len(s.queue) | |
| 736 } | |
| 737 | |
| 738 // timeDecay24Hr computes a linear time decay amount for the given duration, | |
| 739 // given the requested decay amount at 24 hours. | |
| 740 func timeDecay24Hr(decayAmt24Hr float64, elapsed time.Duration) float64 { | |
| 741 return math.Max(1.0-(1.0-decayAmt24Hr)*(float64(elapsed)/float64(24*time
.Hour)), 0.0) | |
| 742 } | |
| 743 | |
| 744 // timeDecayForCommit computes a multiplier for a task candidate score based | |
| 745 // on how long ago the given commit landed. This allows us to prioritize more | |
| 746 // recent commits. | |
| 747 func (s *TaskScheduler) timeDecayForCommit(now time.Time, repoName, commit strin
g) (float64, error) { | |
| 748 if s.timeDecayAmt24Hr == 1.0 { | |
| 749 // Shortcut for special case. | |
| 750 return 1.0, nil | |
| 751 } | |
| 752 repo, err := s.repoMap.Repo(repoName) | |
| 753 if err != nil { | |
| 754 return 0.0, err | |
| 755 } | |
| 756 d, err := repo.Details(commit, false) | |
| 757 if err != nil { | |
| 758 return 0.0, err | |
| 759 } | |
| 760 return timeDecay24Hr(s.timeDecayAmt24Hr, now.Sub(d.Timestamp)), nil | |
| 761 } | |
| 762 | |
| 763 // testedness computes the total "testedness" of a set of commits covered by a | |
| 764 // task whose blamelist included N commits. The "testedness" of a task spec at a | |
| 765 // given commit is defined as follows: | |
| 766 // | |
| 767 // -1.0 if no task has ever included this commit for this task spec. | |
| 768 // 1.0 if a task was run for this task spec AT this commit. | |
| 769 // 1.0 / N if a task for this task spec has included this commit, where N is | |
| 770 // the number of commits included in the task. | |
| 771 // | |
| 772 // This function gives the sum of the testedness for a blamelist of N commits. | |
| 773 func testedness(n int) float64 { | |
| 774 if n < 0 { | |
| 775 // This should never happen. | |
| 776 glog.Errorf("Task score function got a blamelist with %d commits
", n) | |
| 777 return -1.0 | |
| 778 } else if n == 0 { | |
| 779 // Zero commits have zero testedness. | |
| 780 return 0.0 | |
| 781 } else if n == 1 { | |
| 782 return 1.0 | |
| 783 } else { | |
| 784 return 1.0 + float64(n-1)/float64(n) | |
| 785 } | |
| 786 } | |
| 787 | |
| 788 // testednessIncrease computes the increase in "testedness" obtained by running | |
| 789 // a task with the given blamelist length which may have "stolen" commits from | |
| 790 // a previous task with a different blamelist length. To do so, we compute the | |
| 791 // "testedness" for every commit affected by the task, before and after the | |
| 792 // task would run. We subtract the "before" score from the "after" score to | |
| 793 // obtain the "testedness" increase at each commit, then sum them to find the | |
| 794 // total increase in "testedness" obtained by running the task. | |
| 795 func testednessIncrease(blamelistLength, stoleFromBlamelistLength int) float64 { | |
| 796 // Invalid inputs. | |
| 797 if blamelistLength <= 0 || stoleFromBlamelistLength < 0 { | |
| 798 return -1.0 | |
| 799 } | |
| 800 | |
| 801 if stoleFromBlamelistLength == 0 { | |
| 802 // This task covers previously-untested commits. Previous tested
ness | |
| 803 // is -1.0 for each commit in the blamelist. | |
| 804 beforeTestedness := float64(-blamelistLength) | |
| 805 afterTestedness := testedness(blamelistLength) | |
| 806 return afterTestedness - beforeTestedness | |
| 807 } else if blamelistLength == stoleFromBlamelistLength { | |
| 808 // This is a retry. It provides no testedness increase, so short
cut here | |
| 809 // rather than perform the math to obtain the same answer. | |
| 810 return 0.0 | |
| 811 } else { | |
| 812 // This is a bisect/backfill. | |
| 813 beforeTestedness := testedness(stoleFromBlamelistLength) | |
| 814 afterTestedness := testedness(blamelistLength) + testedness(stol
eFromBlamelistLength-blamelistLength) | |
| 815 return afterTestedness - beforeTestedness | |
| 816 } | |
| 817 } | |
| 818 | |
| 819 // getFreeSwarmingBots returns a slice of free swarming bots. | |
| 820 func getFreeSwarmingBots(s swarming.ApiClient) ([]*swarming_api.SwarmingRpcsBotI
nfo, error) { | |
| 821 bots, err := s.ListSkiaBots() | |
| 822 if err != nil { | |
| 823 return nil, err | |
| 824 } | |
| 825 rv := make([]*swarming_api.SwarmingRpcsBotInfo, 0, len(bots)) | |
| 826 for _, bot := range bots { | |
| 827 if bot.IsDead { | |
| 828 continue | |
| 829 } | |
| 830 if bot.Quarantined { | |
| 831 continue | |
| 832 } | |
| 833 if bot.TaskId != "" { | |
| 834 continue | |
| 835 } | |
| 836 rv = append(rv, bot) | |
| 837 } | |
| 838 return rv, nil | |
| 839 } | |
| 840 | |
| 841 // updateUnfinishedTasks queries Swarming for all unfinished tasks and updates | |
| 842 // their status in the DB. | |
| 843 func updateUnfinishedTasks(cache db.TaskCache, d db.DB, s swarming.ApiClient) er
ror { | |
| 844 tasks, err := cache.UnfinishedTasks() | |
| 845 if err != nil { | |
| 846 return err | |
| 847 } | |
| 848 sort.Sort(db.TaskSlice(tasks)) | |
| 849 | |
| 850 // TODO(borenet): This would be faster if Swarming had a | |
| 851 // get-multiple-tasks-by-ID endpoint. | |
| 852 var wg sync.WaitGroup | |
| 853 errs := make([]error, len(tasks)) | |
| 854 for i, t := range tasks { | |
| 855 wg.Add(1) | |
| 856 go func(idx int, t *db.Task) { | |
| 857 defer wg.Done() | |
| 858 swarmTask, err := s.GetTask(t.SwarmingTaskId) | |
| 859 if err != nil { | |
| 860 errs[idx] = fmt.Errorf("Failed to update unfinis
hed task; failed to get updated task from swarming: %s", err) | |
| 861 return | |
| 862 } | |
| 863 if err := db.UpdateDBFromSwarmingTask(d, swarmTask); err
!= nil { | |
| 864 errs[idx] = fmt.Errorf("Failed to update unfinis
hed task: %s", err) | |
| 865 return | |
| 866 } | |
| 867 }(i, t) | |
| 868 } | |
| 869 wg.Wait() | |
| 870 for _, err := range errs { | |
| 871 if err != nil { | |
| 872 return err | |
| 873 } | |
| 874 } | |
| 875 return nil | |
| 876 } | |
| OLD | NEW |