OLD | NEW |
| (Empty) |
1 package task_scheduler | |
2 | |
3 import ( | |
4 "encoding/json" | |
5 "fmt" | |
6 "strings" | |
7 "sync" | |
8 "time" | |
9 | |
10 "go.skia.org/infra/go/gitinfo" | |
11 ) | |
12 | |
13 const ( | |
14 TASKS_CFG_FILE = "infra/bots/tasks.json" | |
15 ) | |
16 | |
17 // ParseTasksCfg parses the given task cfg file contents and returns the config. | |
18 func ParseTasksCfg(contents string) (*TasksCfg, error) { | |
19 var rv TasksCfg | |
20 if err := json.Unmarshal([]byte(contents), &rv); err != nil { | |
21 return nil, fmt.Errorf("Failed to read tasks cfg: could not pars
e file: %s", err) | |
22 } | |
23 | |
24 for _, t := range rv.Tasks { | |
25 if err := t.Validate(&rv); err != nil { | |
26 return nil, err | |
27 } | |
28 } | |
29 | |
30 if err := findCycles(rv.Tasks); err != nil { | |
31 return nil, err | |
32 } | |
33 | |
34 return &rv, nil | |
35 } | |
36 | |
37 // ReadTasksCfg reads the task cfg file from the given repo and returns it. | |
38 func ReadTasksCfg(repo *gitinfo.GitInfo, commit string) (*TasksCfg, error) { | |
39 contents, err := repo.GetFile(TASKS_CFG_FILE, commit) | |
40 if err != nil { | |
41 return nil, fmt.Errorf("Failed to read tasks cfg: could not read
file: %s", err) | |
42 } | |
43 return ParseTasksCfg(contents) | |
44 } | |
45 | |
46 // TasksCfg is a struct which describes all Swarming tasks for a repo at a | |
47 // particular commit. | |
48 type TasksCfg struct { | |
49 // Tasks is a map whose keys are TaskSpec names and values are TaskSpecs | |
50 // detailing the Swarming tasks to run at each commit. | |
51 Tasks map[string]*TaskSpec `json:"tasks"` | |
52 } | |
53 | |
54 // TaskSpec is a struct which describes a Swarming task to run. | |
55 type TaskSpec struct { | |
56 // CipdPackages are CIPD packages which should be installed for the task
. | |
57 CipdPackages []*CipdPackage `json:"cipd_packages"` | |
58 | |
59 // Dependencies are names of other TaskSpecs for tasks which need to run | |
60 // before this task. | |
61 Dependencies []string `json:"dependencies"` | |
62 | |
63 // Dimensions are Swarming bot dimensions which describe the type of bot | |
64 // which may run this task. | |
65 Dimensions []string `json:"dimensions"` | |
66 | |
67 // Isolate is the name of the isolate file used by this task. | |
68 Isolate string `json:"isolate"` | |
69 | |
70 // Priority indicates the relative priority of the task, with 0 < p <= 1 | |
71 Priority float64 `json:"priority"` | |
72 } | |
73 | |
74 // Validate ensures that the TaskSpec is defined properly. | |
75 func (t *TaskSpec) Validate(cfg *TasksCfg) error { | |
76 // Ensure that CIPD packages are specified properly. | |
77 for _, p := range t.CipdPackages { | |
78 if p.Name == "" || p.Path == "" { | |
79 return fmt.Errorf("CIPD packages must have a name, path,
and version.") | |
80 } | |
81 } | |
82 | |
83 // Ensure that the dimensions are specified properly. | |
84 for _, d := range t.Dimensions { | |
85 split := strings.SplitN(d, ":", 2) | |
86 if len(split) != 2 { | |
87 return fmt.Errorf("Dimension %q does not contain a colon
!", d) | |
88 } | |
89 } | |
90 | |
91 // Isolate file is required. | |
92 if t.Isolate == "" { | |
93 return fmt.Errorf("Isolate file is required.") | |
94 } | |
95 | |
96 return nil | |
97 } | |
98 | |
99 // Copy returns a copy of the TaskSpec. | |
100 func (t *TaskSpec) Copy() *TaskSpec { | |
101 cipdPackages := make([]*CipdPackage, 0, len(t.CipdPackages)) | |
102 pkgs := make([]CipdPackage, len(t.CipdPackages)) | |
103 for i, p := range t.CipdPackages { | |
104 pkgs[i] = *p | |
105 cipdPackages = append(cipdPackages, &pkgs[i]) | |
106 } | |
107 deps := make([]string, len(t.Dependencies)) | |
108 copy(deps, t.Dependencies) | |
109 dims := make([]string, len(t.Dimensions)) | |
110 copy(dims, t.Dimensions) | |
111 return &TaskSpec{ | |
112 CipdPackages: cipdPackages, | |
113 Dependencies: deps, | |
114 Dimensions: dims, | |
115 Isolate: t.Isolate, | |
116 Priority: t.Priority, | |
117 } | |
118 } | |
119 | |
120 // CipdPackage is a struct representing a CIPD package which needs to be | |
121 // installed on a bot for a particular task. | |
122 type CipdPackage struct { | |
123 Name string `json:"name"` | |
124 Path string `json:"path"` | |
125 Version int64 `json:"version"` | |
126 } | |
127 | |
128 // taskCfgCache is a struct used for caching tasks cfg files. The user should | |
129 // periodically call Cleanup() to remove old entries. | |
130 type taskCfgCache struct { | |
131 cache map[string]map[string]*TasksCfg | |
132 mtx sync.Mutex | |
133 repos *gitinfo.RepoMap | |
134 } | |
135 | |
136 // newTaskCfgCache returns a taskCfgCache instance. | |
137 func newTaskCfgCache(repos *gitinfo.RepoMap) *taskCfgCache { | |
138 return &taskCfgCache{ | |
139 cache: map[string]map[string]*TasksCfg{}, | |
140 mtx: sync.Mutex{}, | |
141 repos: repos, | |
142 } | |
143 } | |
144 | |
145 // readTasksCfg reads the task cfg file from the given repo and returns it. | |
146 // Stores a cache of already-read task cfg files. Syncs the repo and reads the | |
147 // file if needed. Assumes the caller holds a lock. | |
148 func (c *taskCfgCache) readTasksCfg(repo, commit string) (*TasksCfg, error) { | |
149 r, err := c.repos.Repo(repo) | |
150 if err != nil { | |
151 return nil, fmt.Errorf("Could not read task cfg; failed to check
out repo: %s", err) | |
152 } | |
153 | |
154 if _, ok := c.cache[repo]; !ok { | |
155 c.cache[repo] = map[string]*TasksCfg{} | |
156 } | |
157 if _, ok := c.cache[repo][commit]; !ok { | |
158 cfg, err := ReadTasksCfg(r, commit) | |
159 if err != nil { | |
160 // The tasks.cfg file may not exist for a particular com
mit. | |
161 if strings.Contains(err.Error(), "does not exist in") { | |
162 // In this case, use an empty config. | |
163 cfg = &TasksCfg{ | |
164 Tasks: map[string]*TaskSpec{}, | |
165 } | |
166 } else { | |
167 return nil, err | |
168 } | |
169 } | |
170 c.cache[repo][commit] = cfg | |
171 } | |
172 return c.cache[repo][commit], nil | |
173 } | |
174 | |
175 // GetTaskSpecsForCommits returns a set of TaskSpecs for each of the | |
176 // given set of commits, in the form of nested maps: | |
177 // | |
178 // map[repo_name][commit_hash][task_name]*TaskSpec | |
179 // | |
180 func (c *taskCfgCache) GetTaskSpecsForCommits(commitsByRepo map[string][]string)
(map[string]map[string]map[string]*TaskSpec, error) { | |
181 c.mtx.Lock() | |
182 defer c.mtx.Unlock() | |
183 rv := make(map[string]map[string]map[string]*TaskSpec, len(commitsByRepo
)) | |
184 for repo, commits := range commitsByRepo { | |
185 tasksByCommit := make(map[string]map[string]*TaskSpec, len(commi
ts)) | |
186 for _, commit := range commits { | |
187 cfg, err := c.readTasksCfg(repo, commit) | |
188 if err != nil { | |
189 return nil, err | |
190 } | |
191 // Make a copy of the task specs. | |
192 tasks := make(map[string]*TaskSpec, len(cfg.Tasks)) | |
193 for name, task := range cfg.Tasks { | |
194 tasks[name] = task.Copy() | |
195 } | |
196 tasksByCommit[commit] = tasks | |
197 } | |
198 rv[repo] = tasksByCommit | |
199 } | |
200 return rv, nil | |
201 } | |
202 | |
203 // Cleanup removes cache entries which are outside of our scheduling window. | |
204 func (c *taskCfgCache) Cleanup(period time.Duration) error { | |
205 c.mtx.Lock() | |
206 defer c.mtx.Unlock() | |
207 periodStart := time.Now().Add(-period) | |
208 for repoName, taskCfgsByCommit := range c.cache { | |
209 repo, err := c.repos.Repo(repoName) | |
210 if err != nil { | |
211 return err | |
212 } | |
213 remove := []string{} | |
214 for commit, _ := range taskCfgsByCommit { | |
215 details, err := repo.Details(commit, false) | |
216 if err != nil { | |
217 return err | |
218 } | |
219 if details.Timestamp.Before(periodStart) { | |
220 remove = append(remove, commit) | |
221 } | |
222 } | |
223 for _, commit := range remove { | |
224 delete(c.cache[repoName], commit) | |
225 } | |
226 } | |
227 return nil | |
228 } | |
229 | |
230 // findCycles searches for cyclical dependencies in the task specs and returns | |
231 // an error if any are found. | |
232 func findCycles(tasks map[string]*TaskSpec) error { | |
233 // Create vertex objects with metadata for the depth-first search. | |
234 type vertex struct { | |
235 active bool | |
236 name string | |
237 ts *TaskSpec | |
238 visited bool | |
239 } | |
240 vertices := make(map[string]*vertex, len(tasks)) | |
241 for name, t := range tasks { | |
242 vertices[name] = &vertex{ | |
243 active: false, | |
244 name: name, | |
245 ts: t, | |
246 visited: false, | |
247 } | |
248 } | |
249 | |
250 // Perform a depth-first search of the graph. | |
251 var visit func(*vertex) error | |
252 visit = func(v *vertex) error { | |
253 v.active = true | |
254 v.visited = true | |
255 for _, dep := range v.ts.Dependencies { | |
256 e := vertices[dep] | |
257 if e == nil { | |
258 return fmt.Errorf("Task %q has unknown task %q a
s a dependency.", v.name, dep) | |
259 } | |
260 if !e.visited { | |
261 if err := visit(e); err != nil { | |
262 return err | |
263 } | |
264 } else if e.active { | |
265 return fmt.Errorf("Found a circular dependency i
nvolving %q and %q", v.name, e.name) | |
266 } | |
267 } | |
268 v.active = false | |
269 return nil | |
270 } | |
271 | |
272 for _, v := range vertices { | |
273 if !v.visited { | |
274 if err := visit(v); err != nil { | |
275 return err | |
276 } | |
277 } | |
278 } | |
279 return nil | |
280 } | |
OLD | NEW |