| Index: build_scheduler/go/task_scheduler/task_scheduler.go
|
| diff --git a/build_scheduler/go/task_scheduler/task_scheduler.go b/build_scheduler/go/task_scheduler/task_scheduler.go
|
| deleted file mode 100644
|
| index 12fdffc7614a185411a1942bfe090147768c0885..0000000000000000000000000000000000000000
|
| --- a/build_scheduler/go/task_scheduler/task_scheduler.go
|
| +++ /dev/null
|
| @@ -1,876 +0,0 @@
|
| -package task_scheduler
|
| -
|
| -import (
|
| - "fmt"
|
| - "math"
|
| - "path"
|
| - "sort"
|
| - "sync"
|
| - "time"
|
| -
|
| - swarming_api "github.com/luci/luci-go/common/api/swarming/swarming/v1"
|
| - "github.com/skia-dev/glog"
|
| - "go.skia.org/infra/build_scheduler/go/db"
|
| - "go.skia.org/infra/go/buildbot"
|
| - "go.skia.org/infra/go/gitinfo"
|
| - "go.skia.org/infra/go/gitrepo"
|
| - "go.skia.org/infra/go/isolate"
|
| - "go.skia.org/infra/go/metrics2"
|
| - "go.skia.org/infra/go/swarming"
|
| - "go.skia.org/infra/go/timer"
|
| - "go.skia.org/infra/go/util"
|
| -)
|
| -
|
| -// TaskScheduler is a struct used for scheduling tasks on bots.
|
| -type TaskScheduler struct {
|
| - cache db.TaskCache
|
| - db db.DB
|
| - isolate *isolate.Client
|
| - period time.Duration
|
| - queue []*taskCandidate
|
| - queueMtx sync.RWMutex
|
| - repoMap *gitinfo.RepoMap
|
| - repos map[string]*gitrepo.Repo
|
| - swarming swarming.ApiClient
|
| - taskCfgCache *taskCfgCache
|
| - timeDecayAmt24Hr float64
|
| - workdir string
|
| -}
|
| -
|
| -func NewTaskScheduler(d db.DB, cache db.TaskCache, period time.Duration, workdir string, repoNames []string, isolateClient *isolate.Client, swarmingClient swarming.ApiClient) (*TaskScheduler, error) {
|
| - repos := make(map[string]*gitrepo.Repo, len(repoNames))
|
| - rm := gitinfo.NewRepoMap(workdir)
|
| - for _, r := range repoNames {
|
| - repo, err := gitrepo.NewRepo(r, path.Join(workdir, path.Base(r)))
|
| - if err != nil {
|
| - return nil, err
|
| - }
|
| - repos[r] = repo
|
| - if _, err := rm.Repo(r); err != nil {
|
| - return nil, err
|
| - }
|
| - }
|
| - s := &TaskScheduler{
|
| - cache: cache,
|
| - db: d,
|
| - isolate: isolateClient,
|
| - period: period,
|
| - queue: []*taskCandidate{},
|
| - queueMtx: sync.RWMutex{},
|
| - repoMap: rm,
|
| - repos: repos,
|
| - swarming: swarmingClient,
|
| - taskCfgCache: newTaskCfgCache(rm),
|
| - timeDecayAmt24Hr: 1.0,
|
| - workdir: workdir,
|
| - }
|
| - return s, nil
|
| -}
|
| -
|
| -// Start initiates the TaskScheduler's goroutines for scheduling tasks.
|
| -func (s *TaskScheduler) Start() {
|
| - go func() {
|
| - lv := metrics2.NewLiveness("last-successful-task-scheduling")
|
| - for _ = range time.Tick(time.Minute) {
|
| - if err := s.MainLoop(); err != nil {
|
| - glog.Errorf("Failed to run the task scheduler: %s", err)
|
| - } else {
|
| - lv.Reset()
|
| - }
|
| - }
|
| - }()
|
| -}
|
| -
|
| -// computeBlamelistRecursive traces through commit history, adding to
|
| -// the commits map until the blamelist for the task is complete.
|
| -//
|
| -// Args:
|
| -// - repoName: Name of the repository.
|
| -// - commit: Current commit as we're recursing through history.
|
| -// - taskName: Name of the task.
|
| -// - revision: Revision at which the task would run.
|
| -// - commits: Buffer in which to place the blamelist commits as they accumulate.
|
| -// - cache: TaskCache, for finding previous tasks.
|
| -// - repo: gitrepo.Repo corresponding to the repository.
|
| -// - stealFrom: Existing Task from which this Task will steal commits, if one exists.
|
| -func computeBlamelistRecursive(repoName string, commit *gitrepo.Commit, taskName string, revision *gitrepo.Commit, commits []*gitrepo.Commit, cache db.TaskCache, repo *gitrepo.Repo, stealFrom *db.Task) ([]*gitrepo.Commit, *db.Task, error) {
|
| - // Shortcut in case we missed this case before; if this is the first
|
| - // task for this task spec which has a valid Revision, the blamelist will
|
| - // be the entire Git history. If we find too many commits, assume we've
|
| - // hit this case and just return the Revision as the blamelist.
|
| - if len(commits) > buildbot.MAX_BLAMELIST_COMMITS && stealFrom == nil {
|
| - commits = append(commits[:0], commit)
|
| - return commits, nil, nil
|
| - }
|
| -
|
| - // Determine whether any task already includes this commit.
|
| - prev, err := cache.GetTaskForCommit(repoName, commit.Hash, taskName)
|
| - if err != nil {
|
| - return nil, nil, err
|
| - }
|
| -
|
| - // If we're stealing commits from a previous task but the current
|
| - // commit is not in any task's blamelist, we must have scrolled past
|
| - // the beginning of the tasks. Just return.
|
| - if prev == nil && stealFrom != nil {
|
| - return commits, stealFrom, nil
|
| - }
|
| -
|
| - // If a previous task already included this commit, we have to make a decision.
|
| - if prev != nil {
|
| - // If this Task's Revision is already included in a different
|
| - // Task, then we're either bisecting or retrying a task. We'll
|
| - // "steal" commits from the previous Task's blamelist.
|
| - if len(commits) == 0 {
|
| - stealFrom = prev
|
| -
|
| - // Another shortcut: If our Revision is the same as the
|
| - // Revision of the Task we're stealing commits from,
|
| - // ie. both tasks ran at the same commit, then this is a
|
| - // retry. Just steal all of the commits without doing
|
| - // any more work.
|
| - if stealFrom.Revision == revision.Hash {
|
| - commits = commits[:0]
|
| - for _, c := range stealFrom.Commits {
|
| - ptr := repo.Get(c)
|
| - if ptr == nil {
|
| - return nil, nil, fmt.Errorf("No such commit: %q", c)
|
| - }
|
| - commits = append(commits, ptr)
|
| - }
|
| - return commits, stealFrom, nil
|
| - }
|
| - }
|
| - if stealFrom == nil || prev != stealFrom {
|
| - // If we've hit a commit belonging to a different task,
|
| - // we're done.
|
| - return commits, stealFrom, nil
|
| - }
|
| - }
|
| -
|
| - // Add the commit.
|
| - commits = append(commits, commit)
|
| -
|
| - // Recurse on the commit's parents.
|
| - for _, p := range commit.Parents {
|
| - var err error
|
| - commits, stealFrom, err = computeBlamelistRecursive(repoName, p, taskName, revision, commits, cache, repo, stealFrom)
|
| - if err != nil {
|
| - return nil, nil, err
|
| - }
|
| - }
|
| - return commits, stealFrom, nil
|
| -}
|
| -
|
| -// ComputeBlamelist computes the blamelist for a new task, specified by name,
|
| -// repo, and revision. Returns the list of commits covered by the task, and any
|
| -// previous task which part or all of the blamelist was "stolen" from (see
|
| -// below). There are three cases:
|
| -//
|
| -// 1. The new task tests commits which have not yet been tested. Trace commit
|
| -// history, accumulating commits until we find commits which have been tested
|
| -// by previous tasks.
|
| -//
|
| -// 2. The new task runs at the same commit as a previous task. This is a retry,
|
| -// so the entire blamelist of the previous task is "stolen".
|
| -//
|
| -// 3. The new task runs at a commit which is in a previous task's blamelist, but
|
| -// no task has run at the same commit. This is a bisect. Trace commit
|
| -// history, "stealing" commits from the previous task until we find a commit
|
| -// which was covered by a *different* previous task.
|
| -//
|
| -// Args:
|
| -// - cache: TaskCache instance.
|
| -// - repo: gitrepo.Repo instance corresponding to the repository of the task.
|
| -// - name: Name of the task.
|
| -// - repoName: Name of the repository for the task.
|
| -// - revision: Revision at which the task would run.
|
| -// - commitsBuf: Buffer for use as scratch space.
|
| -func ComputeBlamelist(cache db.TaskCache, repo *gitrepo.Repo, name, repoName, revision string, commitsBuf []*gitrepo.Commit) ([]string, *db.Task, error) {
|
| - // TODO(borenet): If this is a try job, don't compute the blamelist.
|
| -
|
| - // If this is the first invocation of a given task spec, save time by
|
| - // setting the blamelist to only be c.Revision.
|
| - if !cache.KnownTaskName(repoName, name) {
|
| - return []string{revision}, nil, nil
|
| - }
|
| -
|
| - commit := repo.Get(revision)
|
| - if commit == nil {
|
| - return nil, nil, fmt.Errorf("No such commit: %q", revision)
|
| - }
|
| -
|
| - commitsBuf = commitsBuf[:0]
|
| -
|
| - // Run the helper function to recurse on commit history.
|
| - commits, stealFrom, err := computeBlamelistRecursive(repoName, commit, name, commit, commitsBuf, cache, repo, nil)
|
| - if err != nil {
|
| - return nil, nil, err
|
| - }
|
| -
|
| - // De-duplicate the commits list. Duplicates are rare but will occur
|
| - // in the case of a task which runs after a short-lived branch is merged
|
| - // so that the blamelist includes both the branch point and the merge
|
| - // commit. In this case, any commits just before the branch point will
|
| - // be duplicated.
|
| - // TODO(borenet): This has never happened in the Skia repo, but the
|
| - // below 8 lines of code account for ~10% of the execution time of this
|
| - // function, which is the critical path for the scheduler. Consider
|
| - // either ignoring this case or come up with an alternate solution which
|
| - // moves this logic out of the critical path.
|
| - rv := make([]string, 0, len(commits))
|
| - visited := make(map[*gitrepo.Commit]bool, len(commits))
|
| - for _, c := range commits {
|
| - if !visited[c] {
|
| - rv = append(rv, c.Hash)
|
| - visited[c] = true
|
| - }
|
| - }
|
| - return rv, stealFrom, nil
|
| -}
|
| -
|
| -// findTaskCandidates goes through the given commits-by-repos, loads task specs
|
| -// from each repo/commit pair and passes them onto the out channel, filtering
|
| -// candidates which we don't want to run. The out channel will be closed when
|
| -// all task specs have been considered, or when an error occurs. If an error
|
| -// occurs, it will be passed onto the errs channel, but that channel will not
|
| -// be closed.
|
| -func (s *TaskScheduler) findTaskCandidates(commitsByRepo map[string][]string) (map[string][]*taskCandidate, error) {
|
| - defer timer.New("TaskScheduler.findTaskCandidates").Stop()
|
| - // Obtain all possible tasks.
|
| - specs, err := s.taskCfgCache.GetTaskSpecsForCommits(commitsByRepo)
|
| - if err != nil {
|
| - return nil, err
|
| - }
|
| - bySpec := map[string][]*taskCandidate{}
|
| - total := 0
|
| - for repo, commits := range specs {
|
| - for commit, tasks := range commits {
|
| - for name, task := range tasks {
|
| - c := &taskCandidate{
|
| - IsolatedHashes: nil,
|
| - Name: name,
|
| - Repo: repo,
|
| - Revision: commit,
|
| - Score: 0.0,
|
| - TaskSpec: task,
|
| - }
|
| - // We shouldn't duplicate pending, in-progress,
|
| - // or successfully completed tasks.
|
| - previous, err := s.cache.GetTaskForCommit(c.Repo, c.Revision, c.Name)
|
| - if err != nil {
|
| - return nil, err
|
| - }
|
| - if previous != nil && previous.Revision == c.Revision {
|
| - if previous.Status == db.TASK_STATUS_PENDING || previous.Status == db.TASK_STATUS_RUNNING {
|
| - continue
|
| - }
|
| - if previous.Success() {
|
| - continue
|
| - }
|
| - }
|
| -
|
| - // Don't consider candidates whose dependencies are not met.
|
| - depsMet, hashes, err := c.allDepsMet(s.cache)
|
| - if err != nil {
|
| - return nil, err
|
| - }
|
| - if !depsMet {
|
| - continue
|
| - }
|
| - c.IsolatedHashes = hashes
|
| -
|
| - key := fmt.Sprintf("%s|%s", c.Repo, c.Name)
|
| - candidates, ok := bySpec[key]
|
| - if !ok {
|
| - candidates = make([]*taskCandidate, 0, len(commits))
|
| - }
|
| - bySpec[key] = append(candidates, c)
|
| - total++
|
| - }
|
| - }
|
| - }
|
| - glog.Infof("Found %d candidates in %d specs:", total, len(bySpec))
|
| - return bySpec, nil
|
| -}
|
| -
|
| -// processTaskCandidate computes the remaining information about the task
|
| -// candidate, eg. blamelists and scoring.
|
| -func (s *TaskScheduler) processTaskCandidate(c *taskCandidate, now time.Time, cache *cacheWrapper, commitsBuf []*gitrepo.Commit) error {
|
| - // Compute blamelist.
|
| - repo, ok := s.repos[c.Repo]
|
| - if !ok {
|
| - return fmt.Errorf("No such repo: %s", c.Repo)
|
| - }
|
| - commits, stealingFrom, err := ComputeBlamelist(cache, repo, c.Name, c.Repo, c.Revision, commitsBuf)
|
| - if err != nil {
|
| - return err
|
| - }
|
| - c.Commits = commits
|
| - if stealingFrom != nil {
|
| - c.StealingFromId = stealingFrom.Id
|
| - }
|
| -
|
| - // Score the candidates.
|
| - // The score for a candidate is based on the "testedness" increase
|
| - // provided by running the task.
|
| - stoleFromCommits := 0
|
| - if stealingFrom != nil {
|
| - stoleFromCommits = len(stealingFrom.Commits)
|
| - }
|
| - score := testednessIncrease(len(c.Commits), stoleFromCommits)
|
| -
|
| - // Scale the score by other factors, eg. time decay.
|
| - decay, err := s.timeDecayForCommit(now, c.Repo, c.Revision)
|
| - if err != nil {
|
| - return err
|
| - }
|
| - score *= decay
|
| -
|
| - c.Score = score
|
| - return nil
|
| -}
|
| -
|
| -// regenerateTaskQueue obtains the set of all eligible task candidates, scores
|
| -// them, and prepares them to be triggered.
|
| -func (s *TaskScheduler) regenerateTaskQueue() error {
|
| - defer timer.New("TaskScheduler.regenerateTaskQueue").Stop()
|
| -
|
| - // Find the recent commits to use.
|
| - for _, repoName := range s.repoMap.Repos() {
|
| - r, err := s.repoMap.Repo(repoName)
|
| - if err != nil {
|
| - return err
|
| - }
|
| - if err := r.Reset("HEAD"); err != nil {
|
| - return err
|
| - }
|
| - if err := r.Checkout("master"); err != nil {
|
| - return err
|
| - }
|
| - }
|
| - if err := s.repoMap.Update(); err != nil {
|
| - return err
|
| - }
|
| - from := time.Now().Add(-s.period)
|
| - commits := map[string][]string{}
|
| - for _, repoName := range s.repoMap.Repos() {
|
| - repo, err := s.repoMap.Repo(repoName)
|
| - if err != nil {
|
| - return err
|
| - }
|
| - commits[repoName] = repo.From(from)
|
| - }
|
| -
|
| - // Find and process task candidates.
|
| - candidates, err := s.findTaskCandidates(commits)
|
| - if err != nil {
|
| - return err
|
| - }
|
| - defer timer.New("process task candidates").Stop()
|
| - now := time.Now()
|
| - processed := make(chan *taskCandidate)
|
| - errs := make(chan error)
|
| - wg := sync.WaitGroup{}
|
| - for _, c := range candidates {
|
| - wg.Add(1)
|
| - go func(candidates []*taskCandidate) {
|
| - defer wg.Done()
|
| - cache := newCacheWrapper(s.cache)
|
| - commitsBuf := make([]*gitrepo.Commit, 0, buildbot.MAX_BLAMELIST_COMMITS)
|
| - for {
|
| - // Find the best candidate.
|
| - idx := -1
|
| - var best *taskCandidate
|
| - for i, candidate := range candidates {
|
| - c := candidate.Copy()
|
| - if err := s.processTaskCandidate(c, now, cache, commitsBuf); err != nil {
|
| - errs <- err
|
| - return
|
| - }
|
| - if best == nil || c.Score > best.Score {
|
| - best = c
|
| - idx = i
|
| - }
|
| - }
|
| - if best == nil {
|
| - return
|
| - }
|
| - processed <- best
|
| - t := best.MakeTask()
|
| - t.Id = best.MakeId()
|
| - cache.insert(t)
|
| - if best.StealingFromId != "" {
|
| - stoleFrom, err := cache.GetTask(best.StealingFromId)
|
| - if err != nil {
|
| - errs <- err
|
| - return
|
| - }
|
| - stole := util.NewStringSet(best.Commits)
|
| - oldC := util.NewStringSet(stoleFrom.Commits)
|
| - newC := oldC.Complement(stole)
|
| - commits := make([]string, 0, len(newC))
|
| - for c, _ := range newC {
|
| - commits = append(commits, c)
|
| - }
|
| - stoleFrom.Commits = commits
|
| - cache.insert(stoleFrom)
|
| - }
|
| - candidates = append(candidates[:idx], candidates[idx+1:]...)
|
| - }
|
| - }(c)
|
| - }
|
| - go func() {
|
| - wg.Wait()
|
| - close(processed)
|
| - close(errs)
|
| - }()
|
| - rvCandidates := []*taskCandidate{}
|
| - rvErrs := []error{}
|
| - for {
|
| - select {
|
| - case c, ok := <-processed:
|
| - if ok {
|
| - rvCandidates = append(rvCandidates, c)
|
| - } else {
|
| - processed = nil
|
| - }
|
| - case err, ok := <-errs:
|
| - if ok {
|
| - rvErrs = append(rvErrs, err)
|
| - } else {
|
| - errs = nil
|
| - }
|
| - }
|
| - if processed == nil && errs == nil {
|
| - break
|
| - }
|
| - }
|
| -
|
| - if len(rvErrs) != 0 {
|
| - return rvErrs[0]
|
| - }
|
| - sort.Sort(taskCandidateSlice(rvCandidates))
|
| -
|
| - s.queueMtx.Lock()
|
| - defer s.queueMtx.Unlock()
|
| - s.queue = rvCandidates
|
| - return nil
|
| -}
|
| -
|
| -// getCandidatesToSchedule matches the list of free Swarming bots to task
|
| -// candidates in the queue and returns the candidates which should be run.
|
| -// Assumes that the tasks are sorted in decreasing order by score.
|
| -func getCandidatesToSchedule(bots []*swarming_api.SwarmingRpcsBotInfo, tasks []*taskCandidate) []*taskCandidate {
|
| - defer timer.New("task_scheduler.getCandidatesToSchedule").Stop()
|
| - // Create a bots-by-swarming-dimension mapping.
|
| - botsByDim := map[string]util.StringSet{}
|
| - for _, b := range bots {
|
| - for _, dim := range b.Dimensions {
|
| - for _, val := range dim.Value {
|
| - d := fmt.Sprintf("%s:%s", dim.Key, val)
|
| - if _, ok := botsByDim[d]; !ok {
|
| - botsByDim[d] = util.StringSet{}
|
| - }
|
| - botsByDim[d][b.BotId] = true
|
| - }
|
| - }
|
| - }
|
| -
|
| - // Match bots to tasks.
|
| - // TODO(borenet): Some tasks require a more specialized bot. We should
|
| - // match so that less-specialized tasks don't "steal" more-specialized
|
| - // bots which they don't actually need.
|
| - rv := make([]*taskCandidate, 0, len(bots))
|
| - for _, c := range tasks {
|
| - // For each dimension of the task, find the set of bots which matches.
|
| - matches := util.StringSet{}
|
| - for i, d := range c.TaskSpec.Dimensions {
|
| - if i == 0 {
|
| - matches = matches.Union(botsByDim[d])
|
| - } else {
|
| - matches = matches.Intersect(botsByDim[d])
|
| - }
|
| - }
|
| - if len(matches) > 0 {
|
| - // We're going to run this task. Choose a bot. Sort the
|
| - // bots by ID so that the choice is deterministic.
|
| - choices := make([]string, 0, len(matches))
|
| - for botId, _ := range matches {
|
| - choices = append(choices, botId)
|
| - }
|
| - sort.Strings(choices)
|
| - bot := choices[0]
|
| -
|
| - // Remove the bot from consideration.
|
| - for dim, subset := range botsByDim {
|
| - delete(subset, bot)
|
| - if len(subset) == 0 {
|
| - delete(botsByDim, dim)
|
| - }
|
| - }
|
| -
|
| - // Force the candidate to run on this bot.
|
| - c.TaskSpec.Dimensions = append(c.TaskSpec.Dimensions, fmt.Sprintf("id:%s", bot))
|
| -
|
| - // Add the task to the scheduling list.
|
| - rv = append(rv, c)
|
| -
|
| - // If we've exhausted the bot list, stop here.
|
| - if len(botsByDim) == 0 {
|
| - break
|
| - }
|
| - }
|
| - }
|
| - sort.Sort(taskCandidateSlice(rv))
|
| - return rv
|
| -}
|
| -
|
| -// scheduleTasks queries for free Swarming bots and triggers tasks according
|
| -// to relative priorities in the queue.
|
| -func (s *TaskScheduler) scheduleTasks() error {
|
| - defer timer.New("TaskScheduler.scheduleTasks").Stop()
|
| - // Find free bots, match them with tasks.
|
| - bots, err := getFreeSwarmingBots(s.swarming)
|
| - if err != nil {
|
| - return err
|
| - }
|
| - s.queueMtx.Lock()
|
| - defer s.queueMtx.Unlock()
|
| - schedule := getCandidatesToSchedule(bots, s.queue)
|
| -
|
| - // First, group by commit hash since we have to isolate the code at
|
| - // a particular revision for each task.
|
| - byRepoCommit := map[string]map[string][]*taskCandidate{}
|
| - for _, c := range schedule {
|
| - if mRepo, ok := byRepoCommit[c.Repo]; !ok {
|
| - byRepoCommit[c.Repo] = map[string][]*taskCandidate{c.Revision: []*taskCandidate{c}}
|
| - } else {
|
| - mRepo[c.Revision] = append(mRepo[c.Revision], c)
|
| - }
|
| - }
|
| -
|
| - // Isolate the tasks by commit.
|
| - for repoName, commits := range byRepoCommit {
|
| - infraBotsDir := path.Join(s.workdir, repoName, "infra", "bots")
|
| - for commit, candidates := range commits {
|
| - repo, err := s.repoMap.Repo(repoName)
|
| - if err != nil {
|
| - return err
|
| - }
|
| - if err := repo.Checkout(commit); err != nil {
|
| - return err
|
| - }
|
| - tasks := make([]*isolate.Task, 0, len(candidates))
|
| - for _, c := range candidates {
|
| - tasks = append(tasks, c.MakeIsolateTask(infraBotsDir, s.workdir))
|
| - }
|
| - hashes, err := s.isolate.IsolateTasks(tasks)
|
| - if err != nil {
|
| - return err
|
| - }
|
| - if len(hashes) != len(candidates) {
|
| - return fmt.Errorf("IsolateTasks returned incorrect number of hashes.")
|
| - }
|
| - for i, c := range candidates {
|
| - c.IsolatedInput = hashes[i]
|
| - }
|
| - }
|
| - }
|
| -
|
| - // Trigger tasks.
|
| - byCandidateId := make(map[string]*db.Task, len(schedule))
|
| - tasksToInsert := make(map[string]*db.Task, len(schedule)*2)
|
| - for _, candidate := range schedule {
|
| - t := candidate.MakeTask()
|
| - if err := s.db.AssignId(t); err != nil {
|
| - return err
|
| - }
|
| - req := candidate.MakeTaskRequest(t.Id)
|
| - resp, err := s.swarming.TriggerTask(req)
|
| - if err != nil {
|
| - return err
|
| - }
|
| - if _, err := t.UpdateFromSwarming(resp.TaskResult); err != nil {
|
| - return err
|
| - }
|
| - byCandidateId[candidate.MakeId()] = t
|
| - tasksToInsert[t.Id] = t
|
| - // If we're stealing commits from another task, find it and adjust
|
| - // its blamelist.
|
| - // TODO(borenet): We're retrieving a cached task which may have been
|
| - // changed since the cache was last updated. We need to handle that.
|
| - if candidate.StealingFromId != "" {
|
| - var stealingFrom *db.Task
|
| - if _, _, _, err := parseId(candidate.StealingFromId); err == nil {
|
| - stealingFrom = byCandidateId[candidate.StealingFromId]
|
| - if stealingFrom == nil {
|
| - return fmt.Errorf("Attempting to backfill a just-triggered candidate but can't find it: %q", candidate.StealingFromId)
|
| - }
|
| - } else {
|
| - var ok bool
|
| - stealingFrom, ok = tasksToInsert[candidate.StealingFromId]
|
| - if !ok {
|
| - stealingFrom, err = s.cache.GetTask(candidate.StealingFromId)
|
| - if err != nil {
|
| - return err
|
| - }
|
| - }
|
| - }
|
| - oldCommits := util.NewStringSet(stealingFrom.Commits)
|
| - stealing := util.NewStringSet(t.Commits)
|
| - stealingFrom.Commits = oldCommits.Complement(stealing).Keys()
|
| - tasksToInsert[stealingFrom.Id] = stealingFrom
|
| - }
|
| - }
|
| - tasks := make([]*db.Task, 0, len(tasksToInsert))
|
| - for _, t := range tasksToInsert {
|
| - tasks = append(tasks, t)
|
| - }
|
| -
|
| - // Insert the tasks into the database.
|
| - if err := s.db.PutTasks(tasks); err != nil {
|
| - return err
|
| - }
|
| -
|
| - // Remove the tasks from the queue.
|
| - newQueue := make([]*taskCandidate, 0, len(s.queue)-len(schedule))
|
| - for i, j := 0, 0; i < len(s.queue); {
|
| - if j >= len(schedule) {
|
| - newQueue = append(newQueue, s.queue[i:]...)
|
| - break
|
| - }
|
| - if s.queue[i] == schedule[j] {
|
| - j++
|
| - } else {
|
| - newQueue = append(newQueue, s.queue[i])
|
| - }
|
| - i++
|
| - }
|
| - s.queue = newQueue
|
| -
|
| - // Note; if regenerateQueue and scheduleTasks are ever decoupled so that
|
| - // the queue is reused by multiple runs of scheduleTasks, we'll need to
|
| - // address the fact that some candidates may still have their
|
| - // StoleFromId pointing to candidates which have been triggered and
|
| - // removed from the queue. In that case, we should just need to write a
|
| - // loop which updates those candidates to use the IDs of the newly-
|
| - // inserted Tasks in the database rather than the candidate ID.
|
| -
|
| - glog.Infof("Triggered %d tasks on %d bots.", len(schedule), len(bots))
|
| - return nil
|
| -}
|
| -
|
| -// MainLoop runs a single end-to-end task scheduling loop.
|
| -func (s *TaskScheduler) MainLoop() error {
|
| - defer timer.New("TaskSchedulder.MainLoop").Stop()
|
| -
|
| - glog.Infof("Task Scheduler updating...")
|
| - var e1, e2 error
|
| - var wg sync.WaitGroup
|
| -
|
| - wg.Add(1)
|
| - go func() {
|
| - defer wg.Done()
|
| - if err := s.updateRepos(); err != nil {
|
| - e1 = err
|
| - }
|
| - }()
|
| -
|
| - // TODO(borenet): Do we have to fail out of scheduling if we fail to
|
| - // updateUnfinishedTasks? Maybe we can just add a liveness metric and
|
| - // alert if we go too long without updating successfully.
|
| - wg.Add(1)
|
| - go func() {
|
| - defer wg.Done()
|
| - if err := updateUnfinishedTasks(s.cache, s.db, s.swarming); err != nil {
|
| - e2 = err
|
| - }
|
| - }()
|
| - wg.Wait()
|
| -
|
| - if e1 != nil {
|
| - return e1
|
| - }
|
| - if e2 != nil {
|
| - return e2
|
| - }
|
| -
|
| - // Update the task cache.
|
| - if err := s.cache.Update(); err != nil {
|
| - return err
|
| - }
|
| -
|
| - // Regenerate the queue, schedule tasks.
|
| - // TODO(borenet): Query for free Swarming bots while we're regenerating
|
| - // the queue.
|
| - glog.Infof("Task Scheduler regenerating the queue...")
|
| - if err := s.regenerateTaskQueue(); err != nil {
|
| - return err
|
| - }
|
| -
|
| - glog.Infof("Task Scheduler scheduling tasks...")
|
| - if err := s.scheduleTasks(); err != nil {
|
| - return err
|
| - }
|
| -
|
| - // Update the cache again to include the newly-inserted tasks.
|
| - return s.cache.Update()
|
| -}
|
| -
|
| -// updateRepos syncs the scheduler's repos.
|
| -func (s *TaskScheduler) updateRepos() error {
|
| - for _, r := range s.repos {
|
| - if err := r.Update(); err != nil {
|
| - return err
|
| - }
|
| - }
|
| - return nil
|
| -}
|
| -
|
| -// QueueLen returns the length of the queue.
|
| -func (s *TaskScheduler) QueueLen() int {
|
| - s.queueMtx.RLock()
|
| - defer s.queueMtx.RUnlock()
|
| - return len(s.queue)
|
| -}
|
| -
|
| -// timeDecay24Hr computes a linear time decay amount for the given duration,
|
| -// given the requested decay amount at 24 hours.
|
| -func timeDecay24Hr(decayAmt24Hr float64, elapsed time.Duration) float64 {
|
| - return math.Max(1.0-(1.0-decayAmt24Hr)*(float64(elapsed)/float64(24*time.Hour)), 0.0)
|
| -}
|
| -
|
| -// timeDecayForCommit computes a multiplier for a task candidate score based
|
| -// on how long ago the given commit landed. This allows us to prioritize more
|
| -// recent commits.
|
| -func (s *TaskScheduler) timeDecayForCommit(now time.Time, repoName, commit string) (float64, error) {
|
| - if s.timeDecayAmt24Hr == 1.0 {
|
| - // Shortcut for special case.
|
| - return 1.0, nil
|
| - }
|
| - repo, err := s.repoMap.Repo(repoName)
|
| - if err != nil {
|
| - return 0.0, err
|
| - }
|
| - d, err := repo.Details(commit, false)
|
| - if err != nil {
|
| - return 0.0, err
|
| - }
|
| - return timeDecay24Hr(s.timeDecayAmt24Hr, now.Sub(d.Timestamp)), nil
|
| -}
|
| -
|
| -// testedness computes the total "testedness" of a set of commits covered by a
|
| -// task whose blamelist included N commits. The "testedness" of a task spec at a
|
| -// given commit is defined as follows:
|
| -//
|
| -// -1.0 if no task has ever included this commit for this task spec.
|
| -// 1.0 if a task was run for this task spec AT this commit.
|
| -// 1.0 / N if a task for this task spec has included this commit, where N is
|
| -// the number of commits included in the task.
|
| -//
|
| -// This function gives the sum of the testedness for a blamelist of N commits.
|
| -func testedness(n int) float64 {
|
| - if n < 0 {
|
| - // This should never happen.
|
| - glog.Errorf("Task score function got a blamelist with %d commits", n)
|
| - return -1.0
|
| - } else if n == 0 {
|
| - // Zero commits have zero testedness.
|
| - return 0.0
|
| - } else if n == 1 {
|
| - return 1.0
|
| - } else {
|
| - return 1.0 + float64(n-1)/float64(n)
|
| - }
|
| -}
|
| -
|
| -// testednessIncrease computes the increase in "testedness" obtained by running
|
| -// a task with the given blamelist length which may have "stolen" commits from
|
| -// a previous task with a different blamelist length. To do so, we compute the
|
| -// "testedness" for every commit affected by the task, before and after the
|
| -// task would run. We subtract the "before" score from the "after" score to
|
| -// obtain the "testedness" increase at each commit, then sum them to find the
|
| -// total increase in "testedness" obtained by running the task.
|
| -func testednessIncrease(blamelistLength, stoleFromBlamelistLength int) float64 {
|
| - // Invalid inputs.
|
| - if blamelistLength <= 0 || stoleFromBlamelistLength < 0 {
|
| - return -1.0
|
| - }
|
| -
|
| - if stoleFromBlamelistLength == 0 {
|
| - // This task covers previously-untested commits. Previous testedness
|
| - // is -1.0 for each commit in the blamelist.
|
| - beforeTestedness := float64(-blamelistLength)
|
| - afterTestedness := testedness(blamelistLength)
|
| - return afterTestedness - beforeTestedness
|
| - } else if blamelistLength == stoleFromBlamelistLength {
|
| - // This is a retry. It provides no testedness increase, so shortcut here
|
| - // rather than perform the math to obtain the same answer.
|
| - return 0.0
|
| - } else {
|
| - // This is a bisect/backfill.
|
| - beforeTestedness := testedness(stoleFromBlamelistLength)
|
| - afterTestedness := testedness(blamelistLength) + testedness(stoleFromBlamelistLength-blamelistLength)
|
| - return afterTestedness - beforeTestedness
|
| - }
|
| -}
|
| -
|
| -// getFreeSwarmingBots returns a slice of free swarming bots.
|
| -func getFreeSwarmingBots(s swarming.ApiClient) ([]*swarming_api.SwarmingRpcsBotInfo, error) {
|
| - bots, err := s.ListSkiaBots()
|
| - if err != nil {
|
| - return nil, err
|
| - }
|
| - rv := make([]*swarming_api.SwarmingRpcsBotInfo, 0, len(bots))
|
| - for _, bot := range bots {
|
| - if bot.IsDead {
|
| - continue
|
| - }
|
| - if bot.Quarantined {
|
| - continue
|
| - }
|
| - if bot.TaskId != "" {
|
| - continue
|
| - }
|
| - rv = append(rv, bot)
|
| - }
|
| - return rv, nil
|
| -}
|
| -
|
| -// updateUnfinishedTasks queries Swarming for all unfinished tasks and updates
|
| -// their status in the DB.
|
| -func updateUnfinishedTasks(cache db.TaskCache, d db.DB, s swarming.ApiClient) error {
|
| - tasks, err := cache.UnfinishedTasks()
|
| - if err != nil {
|
| - return err
|
| - }
|
| - sort.Sort(db.TaskSlice(tasks))
|
| -
|
| - // TODO(borenet): This would be faster if Swarming had a
|
| - // get-multiple-tasks-by-ID endpoint.
|
| - var wg sync.WaitGroup
|
| - errs := make([]error, len(tasks))
|
| - for i, t := range tasks {
|
| - wg.Add(1)
|
| - go func(idx int, t *db.Task) {
|
| - defer wg.Done()
|
| - swarmTask, err := s.GetTask(t.SwarmingTaskId)
|
| - if err != nil {
|
| - errs[idx] = fmt.Errorf("Failed to update unfinished task; failed to get updated task from swarming: %s", err)
|
| - return
|
| - }
|
| - if err := db.UpdateDBFromSwarmingTask(d, swarmTask); err != nil {
|
| - errs[idx] = fmt.Errorf("Failed to update unfinished task: %s", err)
|
| - return
|
| - }
|
| - }(i, t)
|
| - }
|
| - wg.Wait()
|
| - for _, err := range errs {
|
| - if err != nil {
|
| - return err
|
| - }
|
| - }
|
| - return nil
|
| -}
|
|
|