Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(92)

Side by Side Diff: build_scheduler/go/task_scheduler/specs.go

Issue 2296763008: [task scheduler] Move files from build_scheduler/ to task_scheduler/ (Closed) Base URL: https://skia.googlesource.com/buildbot@master
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 package task_scheduler
2
3 import (
4 "encoding/json"
5 "fmt"
6 "strings"
7 "sync"
8 "time"
9
10 "go.skia.org/infra/go/gitinfo"
11 )
12
13 const (
14 TASKS_CFG_FILE = "infra/bots/tasks.json"
15 )
16
17 // ParseTasksCfg parses the given task cfg file contents and returns the config.
18 func ParseTasksCfg(contents string) (*TasksCfg, error) {
19 var rv TasksCfg
20 if err := json.Unmarshal([]byte(contents), &rv); err != nil {
21 return nil, fmt.Errorf("Failed to read tasks cfg: could not pars e file: %s", err)
22 }
23
24 for _, t := range rv.Tasks {
25 if err := t.Validate(&rv); err != nil {
26 return nil, err
27 }
28 }
29
30 if err := findCycles(rv.Tasks); err != nil {
31 return nil, err
32 }
33
34 return &rv, nil
35 }
36
37 // ReadTasksCfg reads the task cfg file from the given repo and returns it.
38 func ReadTasksCfg(repo *gitinfo.GitInfo, commit string) (*TasksCfg, error) {
39 contents, err := repo.GetFile(TASKS_CFG_FILE, commit)
40 if err != nil {
41 return nil, fmt.Errorf("Failed to read tasks cfg: could not read file: %s", err)
42 }
43 return ParseTasksCfg(contents)
44 }
45
46 // TasksCfg is a struct which describes all Swarming tasks for a repo at a
47 // particular commit.
48 type TasksCfg struct {
49 // Tasks is a map whose keys are TaskSpec names and values are TaskSpecs
50 // detailing the Swarming tasks to run at each commit.
51 Tasks map[string]*TaskSpec `json:"tasks"`
52 }
53
54 // TaskSpec is a struct which describes a Swarming task to run.
55 type TaskSpec struct {
56 // CipdPackages are CIPD packages which should be installed for the task .
57 CipdPackages []*CipdPackage `json:"cipd_packages"`
58
59 // Dependencies are names of other TaskSpecs for tasks which need to run
60 // before this task.
61 Dependencies []string `json:"dependencies"`
62
63 // Dimensions are Swarming bot dimensions which describe the type of bot
64 // which may run this task.
65 Dimensions []string `json:"dimensions"`
66
67 // Isolate is the name of the isolate file used by this task.
68 Isolate string `json:"isolate"`
69
70 // Priority indicates the relative priority of the task, with 0 < p <= 1
71 Priority float64 `json:"priority"`
72 }
73
74 // Validate ensures that the TaskSpec is defined properly.
75 func (t *TaskSpec) Validate(cfg *TasksCfg) error {
76 // Ensure that CIPD packages are specified properly.
77 for _, p := range t.CipdPackages {
78 if p.Name == "" || p.Path == "" {
79 return fmt.Errorf("CIPD packages must have a name, path, and version.")
80 }
81 }
82
83 // Ensure that the dimensions are specified properly.
84 for _, d := range t.Dimensions {
85 split := strings.SplitN(d, ":", 2)
86 if len(split) != 2 {
87 return fmt.Errorf("Dimension %q does not contain a colon !", d)
88 }
89 }
90
91 // Isolate file is required.
92 if t.Isolate == "" {
93 return fmt.Errorf("Isolate file is required.")
94 }
95
96 return nil
97 }
98
99 // Copy returns a copy of the TaskSpec.
100 func (t *TaskSpec) Copy() *TaskSpec {
101 cipdPackages := make([]*CipdPackage, 0, len(t.CipdPackages))
102 pkgs := make([]CipdPackage, len(t.CipdPackages))
103 for i, p := range t.CipdPackages {
104 pkgs[i] = *p
105 cipdPackages = append(cipdPackages, &pkgs[i])
106 }
107 deps := make([]string, len(t.Dependencies))
108 copy(deps, t.Dependencies)
109 dims := make([]string, len(t.Dimensions))
110 copy(dims, t.Dimensions)
111 return &TaskSpec{
112 CipdPackages: cipdPackages,
113 Dependencies: deps,
114 Dimensions: dims,
115 Isolate: t.Isolate,
116 Priority: t.Priority,
117 }
118 }
119
120 // CipdPackage is a struct representing a CIPD package which needs to be
121 // installed on a bot for a particular task.
122 type CipdPackage struct {
123 Name string `json:"name"`
124 Path string `json:"path"`
125 Version int64 `json:"version"`
126 }
127
128 // taskCfgCache is a struct used for caching tasks cfg files. The user should
129 // periodically call Cleanup() to remove old entries.
130 type taskCfgCache struct {
131 cache map[string]map[string]*TasksCfg
132 mtx sync.Mutex
133 repos *gitinfo.RepoMap
134 }
135
136 // newTaskCfgCache returns a taskCfgCache instance.
137 func newTaskCfgCache(repos *gitinfo.RepoMap) *taskCfgCache {
138 return &taskCfgCache{
139 cache: map[string]map[string]*TasksCfg{},
140 mtx: sync.Mutex{},
141 repos: repos,
142 }
143 }
144
145 // readTasksCfg reads the task cfg file from the given repo and returns it.
146 // Stores a cache of already-read task cfg files. Syncs the repo and reads the
147 // file if needed. Assumes the caller holds a lock.
148 func (c *taskCfgCache) readTasksCfg(repo, commit string) (*TasksCfg, error) {
149 r, err := c.repos.Repo(repo)
150 if err != nil {
151 return nil, fmt.Errorf("Could not read task cfg; failed to check out repo: %s", err)
152 }
153
154 if _, ok := c.cache[repo]; !ok {
155 c.cache[repo] = map[string]*TasksCfg{}
156 }
157 if _, ok := c.cache[repo][commit]; !ok {
158 cfg, err := ReadTasksCfg(r, commit)
159 if err != nil {
160 // The tasks.cfg file may not exist for a particular com mit.
161 if strings.Contains(err.Error(), "does not exist in") {
162 // In this case, use an empty config.
163 cfg = &TasksCfg{
164 Tasks: map[string]*TaskSpec{},
165 }
166 } else {
167 return nil, err
168 }
169 }
170 c.cache[repo][commit] = cfg
171 }
172 return c.cache[repo][commit], nil
173 }
174
175 // GetTaskSpecsForCommits returns a set of TaskSpecs for each of the
176 // given set of commits, in the form of nested maps:
177 //
178 // map[repo_name][commit_hash][task_name]*TaskSpec
179 //
180 func (c *taskCfgCache) GetTaskSpecsForCommits(commitsByRepo map[string][]string) (map[string]map[string]map[string]*TaskSpec, error) {
181 c.mtx.Lock()
182 defer c.mtx.Unlock()
183 rv := make(map[string]map[string]map[string]*TaskSpec, len(commitsByRepo ))
184 for repo, commits := range commitsByRepo {
185 tasksByCommit := make(map[string]map[string]*TaskSpec, len(commi ts))
186 for _, commit := range commits {
187 cfg, err := c.readTasksCfg(repo, commit)
188 if err != nil {
189 return nil, err
190 }
191 // Make a copy of the task specs.
192 tasks := make(map[string]*TaskSpec, len(cfg.Tasks))
193 for name, task := range cfg.Tasks {
194 tasks[name] = task.Copy()
195 }
196 tasksByCommit[commit] = tasks
197 }
198 rv[repo] = tasksByCommit
199 }
200 return rv, nil
201 }
202
203 // Cleanup removes cache entries which are outside of our scheduling window.
204 func (c *taskCfgCache) Cleanup(period time.Duration) error {
205 c.mtx.Lock()
206 defer c.mtx.Unlock()
207 periodStart := time.Now().Add(-period)
208 for repoName, taskCfgsByCommit := range c.cache {
209 repo, err := c.repos.Repo(repoName)
210 if err != nil {
211 return err
212 }
213 remove := []string{}
214 for commit, _ := range taskCfgsByCommit {
215 details, err := repo.Details(commit, false)
216 if err != nil {
217 return err
218 }
219 if details.Timestamp.Before(periodStart) {
220 remove = append(remove, commit)
221 }
222 }
223 for _, commit := range remove {
224 delete(c.cache[repoName], commit)
225 }
226 }
227 return nil
228 }
229
230 // findCycles searches for cyclical dependencies in the task specs and returns
231 // an error if any are found.
232 func findCycles(tasks map[string]*TaskSpec) error {
233 // Create vertex objects with metadata for the depth-first search.
234 type vertex struct {
235 active bool
236 name string
237 ts *TaskSpec
238 visited bool
239 }
240 vertices := make(map[string]*vertex, len(tasks))
241 for name, t := range tasks {
242 vertices[name] = &vertex{
243 active: false,
244 name: name,
245 ts: t,
246 visited: false,
247 }
248 }
249
250 // Perform a depth-first search of the graph.
251 var visit func(*vertex) error
252 visit = func(v *vertex) error {
253 v.active = true
254 v.visited = true
255 for _, dep := range v.ts.Dependencies {
256 e := vertices[dep]
257 if e == nil {
258 return fmt.Errorf("Task %q has unknown task %q a s a dependency.", v.name, dep)
259 }
260 if !e.visited {
261 if err := visit(e); err != nil {
262 return err
263 }
264 } else if e.active {
265 return fmt.Errorf("Found a circular dependency i nvolving %q and %q", v.name, e.name)
266 }
267 }
268 v.active = false
269 return nil
270 }
271
272 for _, v := range vertices {
273 if !v.visited {
274 if err := visit(v); err != nil {
275 return err
276 }
277 }
278 }
279 return nil
280 }
OLDNEW
« no previous file with comments | « build_scheduler/go/task_scheduler/perftest/perftest.go ('k') | build_scheduler/go/task_scheduler/specs_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698