Index: build_scheduler/go/task_scheduler/specs.go |
diff --git a/build_scheduler/go/task_scheduler/specs.go b/build_scheduler/go/task_scheduler/specs.go |
deleted file mode 100644 |
index 295ca97ebb98f4f7667796cf149ef7a27dc140cc..0000000000000000000000000000000000000000 |
--- a/build_scheduler/go/task_scheduler/specs.go |
+++ /dev/null |
@@ -1,280 +0,0 @@ |
-package task_scheduler |
- |
-import ( |
- "encoding/json" |
- "fmt" |
- "strings" |
- "sync" |
- "time" |
- |
- "go.skia.org/infra/go/gitinfo" |
-) |
- |
-const ( |
- TASKS_CFG_FILE = "infra/bots/tasks.json" |
-) |
- |
-// ParseTasksCfg parses the given task cfg file contents and returns the config. |
-func ParseTasksCfg(contents string) (*TasksCfg, error) { |
- var rv TasksCfg |
- if err := json.Unmarshal([]byte(contents), &rv); err != nil { |
- return nil, fmt.Errorf("Failed to read tasks cfg: could not parse file: %s", err) |
- } |
- |
- for _, t := range rv.Tasks { |
- if err := t.Validate(&rv); err != nil { |
- return nil, err |
- } |
- } |
- |
- if err := findCycles(rv.Tasks); err != nil { |
- return nil, err |
- } |
- |
- return &rv, nil |
-} |
- |
-// ReadTasksCfg reads the task cfg file from the given repo and returns it. |
-func ReadTasksCfg(repo *gitinfo.GitInfo, commit string) (*TasksCfg, error) { |
- contents, err := repo.GetFile(TASKS_CFG_FILE, commit) |
- if err != nil { |
- return nil, fmt.Errorf("Failed to read tasks cfg: could not read file: %s", err) |
- } |
- return ParseTasksCfg(contents) |
-} |
- |
-// TasksCfg is a struct which describes all Swarming tasks for a repo at a |
-// particular commit. |
-type TasksCfg struct { |
- // Tasks is a map whose keys are TaskSpec names and values are TaskSpecs |
- // detailing the Swarming tasks to run at each commit. |
- Tasks map[string]*TaskSpec `json:"tasks"` |
-} |
- |
-// TaskSpec is a struct which describes a Swarming task to run. |
-type TaskSpec struct { |
- // CipdPackages are CIPD packages which should be installed for the task. |
- CipdPackages []*CipdPackage `json:"cipd_packages"` |
- |
- // Dependencies are names of other TaskSpecs for tasks which need to run |
- // before this task. |
- Dependencies []string `json:"dependencies"` |
- |
- // Dimensions are Swarming bot dimensions which describe the type of bot |
- // which may run this task. |
- Dimensions []string `json:"dimensions"` |
- |
- // Isolate is the name of the isolate file used by this task. |
- Isolate string `json:"isolate"` |
- |
- // Priority indicates the relative priority of the task, with 0 < p <= 1 |
- Priority float64 `json:"priority"` |
-} |
- |
-// Validate ensures that the TaskSpec is defined properly. |
-func (t *TaskSpec) Validate(cfg *TasksCfg) error { |
- // Ensure that CIPD packages are specified properly. |
- for _, p := range t.CipdPackages { |
- if p.Name == "" || p.Path == "" { |
- return fmt.Errorf("CIPD packages must have a name, path, and version.") |
- } |
- } |
- |
- // Ensure that the dimensions are specified properly. |
- for _, d := range t.Dimensions { |
- split := strings.SplitN(d, ":", 2) |
- if len(split) != 2 { |
- return fmt.Errorf("Dimension %q does not contain a colon!", d) |
- } |
- } |
- |
- // Isolate file is required. |
- if t.Isolate == "" { |
- return fmt.Errorf("Isolate file is required.") |
- } |
- |
- return nil |
-} |
- |
-// Copy returns a copy of the TaskSpec. |
-func (t *TaskSpec) Copy() *TaskSpec { |
- cipdPackages := make([]*CipdPackage, 0, len(t.CipdPackages)) |
- pkgs := make([]CipdPackage, len(t.CipdPackages)) |
- for i, p := range t.CipdPackages { |
- pkgs[i] = *p |
- cipdPackages = append(cipdPackages, &pkgs[i]) |
- } |
- deps := make([]string, len(t.Dependencies)) |
- copy(deps, t.Dependencies) |
- dims := make([]string, len(t.Dimensions)) |
- copy(dims, t.Dimensions) |
- return &TaskSpec{ |
- CipdPackages: cipdPackages, |
- Dependencies: deps, |
- Dimensions: dims, |
- Isolate: t.Isolate, |
- Priority: t.Priority, |
- } |
-} |
- |
-// CipdPackage is a struct representing a CIPD package which needs to be |
-// installed on a bot for a particular task. |
-type CipdPackage struct { |
- Name string `json:"name"` |
- Path string `json:"path"` |
- Version int64 `json:"version"` |
-} |
- |
-// taskCfgCache is a struct used for caching tasks cfg files. The user should |
-// periodically call Cleanup() to remove old entries. |
-type taskCfgCache struct { |
- cache map[string]map[string]*TasksCfg |
- mtx sync.Mutex |
- repos *gitinfo.RepoMap |
-} |
- |
-// newTaskCfgCache returns a taskCfgCache instance. |
-func newTaskCfgCache(repos *gitinfo.RepoMap) *taskCfgCache { |
- return &taskCfgCache{ |
- cache: map[string]map[string]*TasksCfg{}, |
- mtx: sync.Mutex{}, |
- repos: repos, |
- } |
-} |
- |
-// readTasksCfg reads the task cfg file from the given repo and returns it. |
-// Stores a cache of already-read task cfg files. Syncs the repo and reads the |
-// file if needed. Assumes the caller holds a lock. |
-func (c *taskCfgCache) readTasksCfg(repo, commit string) (*TasksCfg, error) { |
- r, err := c.repos.Repo(repo) |
- if err != nil { |
- return nil, fmt.Errorf("Could not read task cfg; failed to check out repo: %s", err) |
- } |
- |
- if _, ok := c.cache[repo]; !ok { |
- c.cache[repo] = map[string]*TasksCfg{} |
- } |
- if _, ok := c.cache[repo][commit]; !ok { |
- cfg, err := ReadTasksCfg(r, commit) |
- if err != nil { |
- // The tasks.cfg file may not exist for a particular commit. |
- if strings.Contains(err.Error(), "does not exist in") { |
- // In this case, use an empty config. |
- cfg = &TasksCfg{ |
- Tasks: map[string]*TaskSpec{}, |
- } |
- } else { |
- return nil, err |
- } |
- } |
- c.cache[repo][commit] = cfg |
- } |
- return c.cache[repo][commit], nil |
-} |
- |
-// GetTaskSpecsForCommits returns a set of TaskSpecs for each of the |
-// given set of commits, in the form of nested maps: |
-// |
-// map[repo_name][commit_hash][task_name]*TaskSpec |
-// |
-func (c *taskCfgCache) GetTaskSpecsForCommits(commitsByRepo map[string][]string) (map[string]map[string]map[string]*TaskSpec, error) { |
- c.mtx.Lock() |
- defer c.mtx.Unlock() |
- rv := make(map[string]map[string]map[string]*TaskSpec, len(commitsByRepo)) |
- for repo, commits := range commitsByRepo { |
- tasksByCommit := make(map[string]map[string]*TaskSpec, len(commits)) |
- for _, commit := range commits { |
- cfg, err := c.readTasksCfg(repo, commit) |
- if err != nil { |
- return nil, err |
- } |
- // Make a copy of the task specs. |
- tasks := make(map[string]*TaskSpec, len(cfg.Tasks)) |
- for name, task := range cfg.Tasks { |
- tasks[name] = task.Copy() |
- } |
- tasksByCommit[commit] = tasks |
- } |
- rv[repo] = tasksByCommit |
- } |
- return rv, nil |
-} |
- |
-// Cleanup removes cache entries which are outside of our scheduling window. |
-func (c *taskCfgCache) Cleanup(period time.Duration) error { |
- c.mtx.Lock() |
- defer c.mtx.Unlock() |
- periodStart := time.Now().Add(-period) |
- for repoName, taskCfgsByCommit := range c.cache { |
- repo, err := c.repos.Repo(repoName) |
- if err != nil { |
- return err |
- } |
- remove := []string{} |
- for commit, _ := range taskCfgsByCommit { |
- details, err := repo.Details(commit, false) |
- if err != nil { |
- return err |
- } |
- if details.Timestamp.Before(periodStart) { |
- remove = append(remove, commit) |
- } |
- } |
- for _, commit := range remove { |
- delete(c.cache[repoName], commit) |
- } |
- } |
- return nil |
-} |
- |
-// findCycles searches for cyclical dependencies in the task specs and returns |
-// an error if any are found. |
-func findCycles(tasks map[string]*TaskSpec) error { |
- // Create vertex objects with metadata for the depth-first search. |
- type vertex struct { |
- active bool |
- name string |
- ts *TaskSpec |
- visited bool |
- } |
- vertices := make(map[string]*vertex, len(tasks)) |
- for name, t := range tasks { |
- vertices[name] = &vertex{ |
- active: false, |
- name: name, |
- ts: t, |
- visited: false, |
- } |
- } |
- |
- // Perform a depth-first search of the graph. |
- var visit func(*vertex) error |
- visit = func(v *vertex) error { |
- v.active = true |
- v.visited = true |
- for _, dep := range v.ts.Dependencies { |
- e := vertices[dep] |
- if e == nil { |
- return fmt.Errorf("Task %q has unknown task %q as a dependency.", v.name, dep) |
- } |
- if !e.visited { |
- if err := visit(e); err != nil { |
- return err |
- } |
- } else if e.active { |
- return fmt.Errorf("Found a circular dependency involving %q and %q", v.name, e.name) |
- } |
- } |
- v.active = false |
- return nil |
- } |
- |
- for _, v := range vertices { |
- if !v.visited { |
- if err := visit(v); err != nil { |
- return err |
- } |
- } |
- } |
- return nil |
-} |