| OLD | NEW |
| (Empty) |
| 1 package task_scheduler | |
| 2 | |
| 3 import ( | |
| 4 "encoding/json" | |
| 5 "fmt" | |
| 6 "strings" | |
| 7 "sync" | |
| 8 "time" | |
| 9 | |
| 10 "go.skia.org/infra/go/gitinfo" | |
| 11 ) | |
| 12 | |
| 13 const ( | |
| 14 TASKS_CFG_FILE = "infra/bots/tasks.json" | |
| 15 ) | |
| 16 | |
| 17 // ParseTasksCfg parses the given task cfg file contents and returns the config. | |
| 18 func ParseTasksCfg(contents string) (*TasksCfg, error) { | |
| 19 var rv TasksCfg | |
| 20 if err := json.Unmarshal([]byte(contents), &rv); err != nil { | |
| 21 return nil, fmt.Errorf("Failed to read tasks cfg: could not pars
e file: %s", err) | |
| 22 } | |
| 23 | |
| 24 for _, t := range rv.Tasks { | |
| 25 if err := t.Validate(&rv); err != nil { | |
| 26 return nil, err | |
| 27 } | |
| 28 } | |
| 29 | |
| 30 if err := findCycles(rv.Tasks); err != nil { | |
| 31 return nil, err | |
| 32 } | |
| 33 | |
| 34 return &rv, nil | |
| 35 } | |
| 36 | |
| 37 // ReadTasksCfg reads the task cfg file from the given repo and returns it. | |
| 38 func ReadTasksCfg(repo *gitinfo.GitInfo, commit string) (*TasksCfg, error) { | |
| 39 contents, err := repo.GetFile(TASKS_CFG_FILE, commit) | |
| 40 if err != nil { | |
| 41 return nil, fmt.Errorf("Failed to read tasks cfg: could not read
file: %s", err) | |
| 42 } | |
| 43 return ParseTasksCfg(contents) | |
| 44 } | |
| 45 | |
| 46 // TasksCfg is a struct which describes all Swarming tasks for a repo at a | |
| 47 // particular commit. | |
| 48 type TasksCfg struct { | |
| 49 // Tasks is a map whose keys are TaskSpec names and values are TaskSpecs | |
| 50 // detailing the Swarming tasks to run at each commit. | |
| 51 Tasks map[string]*TaskSpec `json:"tasks"` | |
| 52 } | |
| 53 | |
| 54 // TaskSpec is a struct which describes a Swarming task to run. | |
| 55 type TaskSpec struct { | |
| 56 // CipdPackages are CIPD packages which should be installed for the task
. | |
| 57 CipdPackages []*CipdPackage `json:"cipd_packages"` | |
| 58 | |
| 59 // Dependencies are names of other TaskSpecs for tasks which need to run | |
| 60 // before this task. | |
| 61 Dependencies []string `json:"dependencies"` | |
| 62 | |
| 63 // Dimensions are Swarming bot dimensions which describe the type of bot | |
| 64 // which may run this task. | |
| 65 Dimensions []string `json:"dimensions"` | |
| 66 | |
| 67 // Isolate is the name of the isolate file used by this task. | |
| 68 Isolate string `json:"isolate"` | |
| 69 | |
| 70 // Priority indicates the relative priority of the task, with 0 < p <= 1 | |
| 71 Priority float64 `json:"priority"` | |
| 72 } | |
| 73 | |
| 74 // Validate ensures that the TaskSpec is defined properly. | |
| 75 func (t *TaskSpec) Validate(cfg *TasksCfg) error { | |
| 76 // Ensure that CIPD packages are specified properly. | |
| 77 for _, p := range t.CipdPackages { | |
| 78 if p.Name == "" || p.Path == "" { | |
| 79 return fmt.Errorf("CIPD packages must have a name, path,
and version.") | |
| 80 } | |
| 81 } | |
| 82 | |
| 83 // Ensure that the dimensions are specified properly. | |
| 84 for _, d := range t.Dimensions { | |
| 85 split := strings.SplitN(d, ":", 2) | |
| 86 if len(split) != 2 { | |
| 87 return fmt.Errorf("Dimension %q does not contain a colon
!", d) | |
| 88 } | |
| 89 } | |
| 90 | |
| 91 // Isolate file is required. | |
| 92 if t.Isolate == "" { | |
| 93 return fmt.Errorf("Isolate file is required.") | |
| 94 } | |
| 95 | |
| 96 return nil | |
| 97 } | |
| 98 | |
| 99 // Copy returns a copy of the TaskSpec. | |
| 100 func (t *TaskSpec) Copy() *TaskSpec { | |
| 101 cipdPackages := make([]*CipdPackage, 0, len(t.CipdPackages)) | |
| 102 pkgs := make([]CipdPackage, len(t.CipdPackages)) | |
| 103 for i, p := range t.CipdPackages { | |
| 104 pkgs[i] = *p | |
| 105 cipdPackages = append(cipdPackages, &pkgs[i]) | |
| 106 } | |
| 107 deps := make([]string, len(t.Dependencies)) | |
| 108 copy(deps, t.Dependencies) | |
| 109 dims := make([]string, len(t.Dimensions)) | |
| 110 copy(dims, t.Dimensions) | |
| 111 return &TaskSpec{ | |
| 112 CipdPackages: cipdPackages, | |
| 113 Dependencies: deps, | |
| 114 Dimensions: dims, | |
| 115 Isolate: t.Isolate, | |
| 116 Priority: t.Priority, | |
| 117 } | |
| 118 } | |
| 119 | |
| 120 // CipdPackage is a struct representing a CIPD package which needs to be | |
| 121 // installed on a bot for a particular task. | |
| 122 type CipdPackage struct { | |
| 123 Name string `json:"name"` | |
| 124 Path string `json:"path"` | |
| 125 Version int64 `json:"version"` | |
| 126 } | |
| 127 | |
| 128 // taskCfgCache is a struct used for caching tasks cfg files. The user should | |
| 129 // periodically call Cleanup() to remove old entries. | |
| 130 type taskCfgCache struct { | |
| 131 cache map[string]map[string]*TasksCfg | |
| 132 mtx sync.Mutex | |
| 133 repos *gitinfo.RepoMap | |
| 134 } | |
| 135 | |
| 136 // newTaskCfgCache returns a taskCfgCache instance. | |
| 137 func newTaskCfgCache(repos *gitinfo.RepoMap) *taskCfgCache { | |
| 138 return &taskCfgCache{ | |
| 139 cache: map[string]map[string]*TasksCfg{}, | |
| 140 mtx: sync.Mutex{}, | |
| 141 repos: repos, | |
| 142 } | |
| 143 } | |
| 144 | |
| 145 // readTasksCfg reads the task cfg file from the given repo and returns it. | |
| 146 // Stores a cache of already-read task cfg files. Syncs the repo and reads the | |
| 147 // file if needed. Assumes the caller holds a lock. | |
| 148 func (c *taskCfgCache) readTasksCfg(repo, commit string) (*TasksCfg, error) { | |
| 149 r, err := c.repos.Repo(repo) | |
| 150 if err != nil { | |
| 151 return nil, fmt.Errorf("Could not read task cfg; failed to check
out repo: %s", err) | |
| 152 } | |
| 153 | |
| 154 if _, ok := c.cache[repo]; !ok { | |
| 155 c.cache[repo] = map[string]*TasksCfg{} | |
| 156 } | |
| 157 if _, ok := c.cache[repo][commit]; !ok { | |
| 158 cfg, err := ReadTasksCfg(r, commit) | |
| 159 if err != nil { | |
| 160 // The tasks.cfg file may not exist for a particular com
mit. | |
| 161 if strings.Contains(err.Error(), "does not exist in") { | |
| 162 // In this case, use an empty config. | |
| 163 cfg = &TasksCfg{ | |
| 164 Tasks: map[string]*TaskSpec{}, | |
| 165 } | |
| 166 } else { | |
| 167 return nil, err | |
| 168 } | |
| 169 } | |
| 170 c.cache[repo][commit] = cfg | |
| 171 } | |
| 172 return c.cache[repo][commit], nil | |
| 173 } | |
| 174 | |
| 175 // GetTaskSpecsForCommits returns a set of TaskSpecs for each of the | |
| 176 // given set of commits, in the form of nested maps: | |
| 177 // | |
| 178 // map[repo_name][commit_hash][task_name]*TaskSpec | |
| 179 // | |
| 180 func (c *taskCfgCache) GetTaskSpecsForCommits(commitsByRepo map[string][]string)
(map[string]map[string]map[string]*TaskSpec, error) { | |
| 181 c.mtx.Lock() | |
| 182 defer c.mtx.Unlock() | |
| 183 rv := make(map[string]map[string]map[string]*TaskSpec, len(commitsByRepo
)) | |
| 184 for repo, commits := range commitsByRepo { | |
| 185 tasksByCommit := make(map[string]map[string]*TaskSpec, len(commi
ts)) | |
| 186 for _, commit := range commits { | |
| 187 cfg, err := c.readTasksCfg(repo, commit) | |
| 188 if err != nil { | |
| 189 return nil, err | |
| 190 } | |
| 191 // Make a copy of the task specs. | |
| 192 tasks := make(map[string]*TaskSpec, len(cfg.Tasks)) | |
| 193 for name, task := range cfg.Tasks { | |
| 194 tasks[name] = task.Copy() | |
| 195 } | |
| 196 tasksByCommit[commit] = tasks | |
| 197 } | |
| 198 rv[repo] = tasksByCommit | |
| 199 } | |
| 200 return rv, nil | |
| 201 } | |
| 202 | |
| 203 // Cleanup removes cache entries which are outside of our scheduling window. | |
| 204 func (c *taskCfgCache) Cleanup(period time.Duration) error { | |
| 205 c.mtx.Lock() | |
| 206 defer c.mtx.Unlock() | |
| 207 periodStart := time.Now().Add(-period) | |
| 208 for repoName, taskCfgsByCommit := range c.cache { | |
| 209 repo, err := c.repos.Repo(repoName) | |
| 210 if err != nil { | |
| 211 return err | |
| 212 } | |
| 213 remove := []string{} | |
| 214 for commit, _ := range taskCfgsByCommit { | |
| 215 details, err := repo.Details(commit, false) | |
| 216 if err != nil { | |
| 217 return err | |
| 218 } | |
| 219 if details.Timestamp.Before(periodStart) { | |
| 220 remove = append(remove, commit) | |
| 221 } | |
| 222 } | |
| 223 for _, commit := range remove { | |
| 224 delete(c.cache[repoName], commit) | |
| 225 } | |
| 226 } | |
| 227 return nil | |
| 228 } | |
| 229 | |
| 230 // findCycles searches for cyclical dependencies in the task specs and returns | |
| 231 // an error if any are found. | |
| 232 func findCycles(tasks map[string]*TaskSpec) error { | |
| 233 // Create vertex objects with metadata for the depth-first search. | |
| 234 type vertex struct { | |
| 235 active bool | |
| 236 name string | |
| 237 ts *TaskSpec | |
| 238 visited bool | |
| 239 } | |
| 240 vertices := make(map[string]*vertex, len(tasks)) | |
| 241 for name, t := range tasks { | |
| 242 vertices[name] = &vertex{ | |
| 243 active: false, | |
| 244 name: name, | |
| 245 ts: t, | |
| 246 visited: false, | |
| 247 } | |
| 248 } | |
| 249 | |
| 250 // Perform a depth-first search of the graph. | |
| 251 var visit func(*vertex) error | |
| 252 visit = func(v *vertex) error { | |
| 253 v.active = true | |
| 254 v.visited = true | |
| 255 for _, dep := range v.ts.Dependencies { | |
| 256 e := vertices[dep] | |
| 257 if e == nil { | |
| 258 return fmt.Errorf("Task %q has unknown task %q a
s a dependency.", v.name, dep) | |
| 259 } | |
| 260 if !e.visited { | |
| 261 if err := visit(e); err != nil { | |
| 262 return err | |
| 263 } | |
| 264 } else if e.active { | |
| 265 return fmt.Errorf("Found a circular dependency i
nvolving %q and %q", v.name, e.name) | |
| 266 } | |
| 267 } | |
| 268 v.active = false | |
| 269 return nil | |
| 270 } | |
| 271 | |
| 272 for _, v := range vertices { | |
| 273 if !v.visited { | |
| 274 if err := visit(v); err != nil { | |
| 275 return err | |
| 276 } | |
| 277 } | |
| 278 } | |
| 279 return nil | |
| 280 } | |
| OLD | NEW |