OLD | NEW |
| (Empty) |
1 package task_scheduler | |
2 | |
3 import ( | |
4 "encoding/json" | |
5 "fmt" | |
6 "io/ioutil" | |
7 "math" | |
8 "os" | |
9 "path" | |
10 "path/filepath" | |
11 "runtime" | |
12 "sort" | |
13 "strings" | |
14 "testing" | |
15 "time" | |
16 | |
17 swarming_api "github.com/luci/luci-go/common/api/swarming/swarming/v1" | |
18 assert "github.com/stretchr/testify/require" | |
19 "go.skia.org/infra/build_scheduler/go/db" | |
20 "go.skia.org/infra/go/buildbot" | |
21 "go.skia.org/infra/go/exec" | |
22 "go.skia.org/infra/go/gitinfo" | |
23 "go.skia.org/infra/go/gitrepo" | |
24 "go.skia.org/infra/go/isolate" | |
25 "go.skia.org/infra/go/swarming" | |
26 "go.skia.org/infra/go/testutils" | |
27 "go.skia.org/infra/go/util" | |
28 ) | |
29 | |
30 func makeTask(name, repo, revision string) *db.Task { | |
31 return &db.Task{ | |
32 Commits: []string{revision}, | |
33 Created: time.Now(), | |
34 Name: name, | |
35 Repo: repo, | |
36 Revision: revision, | |
37 } | |
38 } | |
39 | |
40 func makeSwarmingRpcsTaskRequestMetadata(t *testing.T, task *db.Task) *swarming_
api.SwarmingRpcsTaskRequestMetadata { | |
41 tag := func(k, v string) string { | |
42 return fmt.Sprintf("%s:%s", k, v) | |
43 } | |
44 ts := func(t time.Time) string { | |
45 return t.Format(swarming.TIMESTAMP_FORMAT) | |
46 } | |
47 abandoned := "" | |
48 state := db.SWARMING_STATE_PENDING | |
49 failed := false | |
50 switch task.Status { | |
51 case db.TASK_STATUS_MISHAP: | |
52 state = db.SWARMING_STATE_BOT_DIED | |
53 abandoned = ts(task.Finished) | |
54 case db.TASK_STATUS_RUNNING: | |
55 state = db.SWARMING_STATE_RUNNING | |
56 case db.TASK_STATUS_FAILURE: | |
57 state = db.SWARMING_STATE_COMPLETED | |
58 failed = true | |
59 case db.TASK_STATUS_SUCCESS: | |
60 state = db.SWARMING_STATE_COMPLETED | |
61 case db.TASK_STATUS_PENDING: | |
62 // noop | |
63 default: | |
64 assert.FailNow(t, "Unknown task status: %s", task.Status) | |
65 } | |
66 return &swarming_api.SwarmingRpcsTaskRequestMetadata{ | |
67 Request: &swarming_api.SwarmingRpcsTaskRequest{}, | |
68 TaskId: task.SwarmingTaskId, | |
69 TaskResult: &swarming_api.SwarmingRpcsTaskResult{ | |
70 AbandonedTs: abandoned, | |
71 CreatedTs: ts(task.Created), | |
72 CompletedTs: ts(task.Finished), | |
73 Failure: failed, | |
74 OutputsRef: &swarming_api.SwarmingRpcsFilesRef{ | |
75 Isolated: "???", | |
76 }, | |
77 StartedTs: ts(task.Started), | |
78 State: state, | |
79 Tags: []string{ | |
80 tag(db.SWARMING_TAG_ID, task.Id), | |
81 tag(db.SWARMING_TAG_NAME, task.Name), | |
82 tag(db.SWARMING_TAG_REPO, task.Repo), | |
83 tag(db.SWARMING_TAG_REVISION, task.Revision), | |
84 }, | |
85 TaskId: task.SwarmingTaskId, | |
86 }, | |
87 } | |
88 } | |
89 | |
90 func TestFindTaskCandidates(t *testing.T) { | |
91 testutils.SkipIfShort(t) | |
92 | |
93 // Setup. | |
94 tr := util.NewTempRepo() | |
95 defer tr.Cleanup() | |
96 d := db.NewInMemoryDB() | |
97 cache, err := db.NewTaskCache(d, time.Hour) | |
98 assert.NoError(t, err) | |
99 | |
100 // The test repo has two commits. The first commit adds a tasks.cfg file | |
101 // with two task specs: a build task and a test task, the test task | |
102 // depending on the build task. The second commit adds a perf task spec, | |
103 // which also depends on the build task. Therefore, there are five total | |
104 // possible tasks we could run: | |
105 // | |
106 // Build@c1, Test@c1, Build@c2, Test@c2, Perf@c2 | |
107 // | |
108 c1 := "c06ac6093d3029dffe997e9d85e8e61fee5f87b9" | |
109 c2 := "0f87799ac791b8d8573e93694d05b05a65e09668" | |
110 buildTask := "Build-Ubuntu-GCC-Arm7-Release-Android" | |
111 testTask := "Test-Android-GCC-Nexus7-GPU-Tegra3-Arm7-Release" | |
112 perfTask := "Perf-Android-GCC-Nexus7-GPU-Tegra3-Arm7-Release" | |
113 repo := "skia.git" | |
114 commits := map[string][]string{ | |
115 repo: []string{c1, c2}, | |
116 } | |
117 | |
118 assert.NoError(t, err) | |
119 isolateClient, err := isolate.NewClient(tr.Dir) | |
120 assert.NoError(t, err) | |
121 isolateClient.ServerUrl = isolate.FAKE_SERVER_URL | |
122 swarmingClient := swarming.NewTestClient() | |
123 s, err := NewTaskScheduler(d, cache, time.Duration(math.MaxInt64), tr.Di
r, []string{"skia.git"}, isolateClient, swarmingClient) | |
124 assert.NoError(t, err) | |
125 | |
126 // Check the initial set of task candidates. The two Build tasks | |
127 // should be the only ones available. | |
128 c, err := s.findTaskCandidates(commits) | |
129 assert.NoError(t, err) | |
130 assert.Equal(t, 1, len(c)) | |
131 key := fmt.Sprintf("%s|%s", repo, buildTask) | |
132 assert.Equal(t, 2, len(c[key])) | |
133 for _, candidate := range c[key] { | |
134 assert.Equal(t, candidate.Name, buildTask) | |
135 } | |
136 | |
137 // Insert a the Build task at c1 (1 dependent) into the database, | |
138 // transition through various states. | |
139 var t1 *db.Task | |
140 for _, candidates := range c { // Order not guaranteed; find the right c
andidate. | |
141 for _, candidate := range candidates { | |
142 if candidate.Revision == c1 { | |
143 t1 = makeTask(candidate.Name, candidate.Repo, ca
ndidate.Revision) | |
144 break | |
145 } | |
146 } | |
147 } | |
148 assert.NotNil(t, t1) | |
149 | |
150 // We shouldn't duplicate pending or running tasks. | |
151 for _, status := range []db.TaskStatus{db.TASK_STATUS_PENDING, db.TASK_S
TATUS_RUNNING} { | |
152 t1.Status = status | |
153 assert.NoError(t, d.PutTask(t1)) | |
154 assert.NoError(t, cache.Update()) | |
155 | |
156 c, err = s.findTaskCandidates(commits) | |
157 assert.NoError(t, err) | |
158 assert.Equal(t, 1, len(c)) | |
159 for _, candidates := range c { | |
160 for _, candidate := range candidates { | |
161 assert.Equal(t, candidate.Name, buildTask) | |
162 assert.Equal(t, c2, candidate.Revision) | |
163 } | |
164 } | |
165 } | |
166 | |
167 // The task failed. Ensure that its dependents are not candidates, but | |
168 // the task itself is back in the list of candidates, in case we want | |
169 // to retry. | |
170 t1.Status = db.TASK_STATUS_FAILURE | |
171 assert.NoError(t, d.PutTask(t1)) | |
172 assert.NoError(t, cache.Update()) | |
173 | |
174 c, err = s.findTaskCandidates(commits) | |
175 assert.NoError(t, err) | |
176 assert.Equal(t, 1, len(c)) | |
177 for _, candidates := range c { | |
178 assert.Equal(t, 2, len(candidates)) | |
179 for _, candidate := range candidates { | |
180 assert.Equal(t, candidate.Name, buildTask) | |
181 } | |
182 } | |
183 | |
184 // The task succeeded. Ensure that its dependents are candidates and | |
185 // the task itself is not. | |
186 t1.Status = db.TASK_STATUS_SUCCESS | |
187 t1.IsolatedOutput = "fake isolated hash" | |
188 assert.NoError(t, d.PutTask(t1)) | |
189 assert.NoError(t, cache.Update()) | |
190 | |
191 c, err = s.findTaskCandidates(commits) | |
192 assert.NoError(t, err) | |
193 assert.Equal(t, 2, len(c)) | |
194 for _, candidates := range c { | |
195 for _, candidate := range candidates { | |
196 assert.False(t, t1.Name == candidate.Name && t1.Revision
== candidate.Revision) | |
197 } | |
198 } | |
199 | |
200 // Create the other Build task. | |
201 var t2 *db.Task | |
202 for _, candidates := range c { | |
203 for _, candidate := range candidates { | |
204 if candidate.Revision == c2 && strings.HasPrefix(candida
te.Name, "Build-") { | |
205 t2 = makeTask(candidate.Name, candidate.Repo, ca
ndidate.Revision) | |
206 break | |
207 } | |
208 } | |
209 } | |
210 assert.NotNil(t, t2) | |
211 t2.Status = db.TASK_STATUS_SUCCESS | |
212 t2.IsolatedOutput = "fake isolated hash" | |
213 assert.NoError(t, d.PutTask(t2)) | |
214 assert.NoError(t, cache.Update()) | |
215 | |
216 // All test and perf tasks are now candidates, no build tasks. | |
217 c, err = s.findTaskCandidates(commits) | |
218 assert.NoError(t, err) | |
219 assert.Equal(t, 2, len(c)) | |
220 assert.Equal(t, 2, len(c[fmt.Sprintf("%s|%s", repo, testTask)])) | |
221 assert.Equal(t, 1, len(c[fmt.Sprintf("%s|%s", repo, perfTask)])) | |
222 for _, candidates := range c { | |
223 for _, candidate := range candidates { | |
224 assert.NotEqual(t, candidate.Name, buildTask) | |
225 } | |
226 } | |
227 } | |
228 | |
229 func TestTestedness(t *testing.T) { | |
230 tc := []struct { | |
231 in int | |
232 out float64 | |
233 }{ | |
234 { | |
235 in: -1, | |
236 out: -1.0, | |
237 }, | |
238 { | |
239 in: 0, | |
240 out: 0.0, | |
241 }, | |
242 { | |
243 in: 1, | |
244 out: 1.0, | |
245 }, | |
246 { | |
247 in: 2, | |
248 out: 1.0 + 1.0/2.0, | |
249 }, | |
250 { | |
251 in: 3, | |
252 out: 1.0 + float64(2.0)/float64(3.0), | |
253 }, | |
254 { | |
255 in: 4, | |
256 out: 1.0 + 3.0/4.0, | |
257 }, | |
258 { | |
259 in: 4096, | |
260 out: 1.0 + float64(4095)/float64(4096), | |
261 }, | |
262 } | |
263 for i, c := range tc { | |
264 assert.Equal(t, c.out, testedness(c.in), fmt.Sprintf("test case
#%d", i)) | |
265 } | |
266 } | |
267 | |
268 func TestTestednessIncrease(t *testing.T) { | |
269 tc := []struct { | |
270 a int | |
271 b int | |
272 out float64 | |
273 }{ | |
274 // Invalid cases. | |
275 { | |
276 a: -1, | |
277 b: 10, | |
278 out: -1.0, | |
279 }, | |
280 { | |
281 a: 10, | |
282 b: -1, | |
283 out: -1.0, | |
284 }, | |
285 { | |
286 a: 0, | |
287 b: -1, | |
288 out: -1.0, | |
289 }, | |
290 { | |
291 a: 0, | |
292 b: 0, | |
293 out: -1.0, | |
294 }, | |
295 // Invalid because if we're re-running at already-tested commits | |
296 // then we should have a blamelist which is at most the size of | |
297 // the blamelist of the previous task. We naturally get negative | |
298 // testedness increase in these cases. | |
299 { | |
300 a: 2, | |
301 b: 1, | |
302 out: -0.5, | |
303 }, | |
304 // Testing only new commits. | |
305 { | |
306 a: 1, | |
307 b: 0, | |
308 out: 1.0 + 1.0, | |
309 }, | |
310 { | |
311 a: 2, | |
312 b: 0, | |
313 out: 2.0 + (1.0 + 1.0/2.0), | |
314 }, | |
315 { | |
316 a: 3, | |
317 b: 0, | |
318 out: 3.0 + (1.0 + float64(2.0)/float64(3.0)), | |
319 }, | |
320 { | |
321 a: 4096, | |
322 b: 0, | |
323 out: 4096.0 + (1.0 + float64(4095.0)/float64(4096.0)), | |
324 }, | |
325 // Retries. | |
326 { | |
327 a: 1, | |
328 b: 1, | |
329 out: 0.0, | |
330 }, | |
331 { | |
332 a: 2, | |
333 b: 2, | |
334 out: 0.0, | |
335 }, | |
336 { | |
337 a: 3, | |
338 b: 3, | |
339 out: 0.0, | |
340 }, | |
341 { | |
342 a: 4096, | |
343 b: 4096, | |
344 out: 0.0, | |
345 }, | |
346 // Bisect/backfills. | |
347 { | |
348 a: 1, | |
349 b: 2, | |
350 out: 0.5, // (1 + 1) - (1 + 1/2) | |
351 }, | |
352 { | |
353 a: 1, | |
354 b: 3, | |
355 out: float64(2.5) - (1.0 + float64(2.0)/float64(3.0)), | |
356 }, | |
357 { | |
358 a: 5, | |
359 b: 10, | |
360 out: 2.0*(1.0+float64(4.0)/float64(5.0)) - (1.0 + float6
4(9.0)/float64(10.0)), | |
361 }, | |
362 } | |
363 for i, c := range tc { | |
364 assert.Equal(t, c.out, testednessIncrease(c.a, c.b), fmt.Sprintf
("test case #%d", i)) | |
365 } | |
366 } | |
367 | |
368 func TestComputeBlamelist(t *testing.T) { | |
369 testutils.SkipIfShort(t) | |
370 | |
371 // Setup. | |
372 _, filename, _, _ := runtime.Caller(0) | |
373 // Use the test repo from the buildbot package, since it's already set | |
374 // up for this type of test. | |
375 zipfile := filepath.Join(filepath.Dir(filename), "..", "..", "..", "go",
"buildbot", "testdata", "testrepo.zip") | |
376 tr := util.NewTempRepoFrom(zipfile) | |
377 defer tr.Cleanup() | |
378 d := db.NewInMemoryDB() | |
379 cache, err := db.NewTaskCache(d, time.Hour) | |
380 assert.NoError(t, err) | |
381 | |
382 // The test repo is laid out like this: | |
383 // | |
384 // * 06eb2a58139d3ff764f10232d5c8f9362d55e20f I (HEAD, master, Task #4
) | |
385 // * ecb424466a4f3b040586a062c15ed58356f6590e F (Task #3) | |
386 // |\ | |
387 // | * d30286d2254716d396073c177a754f9e152bbb52 H | |
388 // | * 8d2d1247ef5d2b8a8d3394543df6c12a85881296 G (Task #2) | |
389 // * | 67635e7015d74b06c00154f7061987f426349d9f E | |
390 // * | 6d4811eddfa637fac0852c3a0801b773be1f260d D (Task #1) | |
391 // * | d74dfd42a48325ab2f3d4a97278fc283036e0ea4 C (Task #6) | |
392 // |/ | |
393 // * 4b822ebb7cedd90acbac6a45b897438746973a87 B (Task #0) | |
394 // * 051955c355eb742550ddde4eccc3e90b6dc5b887 A | |
395 // | |
396 hashes := map[rune]string{ | |
397 'A': "051955c355eb742550ddde4eccc3e90b6dc5b887", | |
398 'B': "4b822ebb7cedd90acbac6a45b897438746973a87", | |
399 'C': "d74dfd42a48325ab2f3d4a97278fc283036e0ea4", | |
400 'D': "6d4811eddfa637fac0852c3a0801b773be1f260d", | |
401 'E': "67635e7015d74b06c00154f7061987f426349d9f", | |
402 'F': "ecb424466a4f3b040586a062c15ed58356f6590e", | |
403 'G': "8d2d1247ef5d2b8a8d3394543df6c12a85881296", | |
404 'H': "d30286d2254716d396073c177a754f9e152bbb52", | |
405 'I': "06eb2a58139d3ff764f10232d5c8f9362d55e20f", | |
406 } | |
407 | |
408 // Test cases. Each test case builds on the previous cases. | |
409 testCases := []struct { | |
410 Revision string | |
411 Expected []string | |
412 StoleFromIdx int | |
413 }{ | |
414 // 0. The first task. | |
415 { | |
416 Revision: hashes['B'], | |
417 Expected: []string{hashes['B']}, // Task #0 is limit
ed to a single commit. | |
418 StoleFromIdx: -1, | |
419 }, | |
420 // 1. On a linear set of commits, with at least one previous tas
k. | |
421 { | |
422 Revision: hashes['D'], | |
423 Expected: []string{hashes['D'], hashes['C']}, | |
424 StoleFromIdx: -1, | |
425 }, | |
426 // 2. The first task on a new branch. | |
427 { | |
428 Revision: hashes['G'], | |
429 Expected: []string{hashes['G']}, | |
430 StoleFromIdx: -1, | |
431 }, | |
432 // 3. After a merge. | |
433 { | |
434 Revision: hashes['F'], | |
435 Expected: []string{hashes['E'], hashes['H'], hashes[
'F']}, | |
436 StoleFromIdx: -1, | |
437 }, | |
438 // 4. One last "normal" task. | |
439 { | |
440 Revision: hashes['I'], | |
441 Expected: []string{hashes['I']}, | |
442 StoleFromIdx: -1, | |
443 }, | |
444 // 5. No Revision. | |
445 { | |
446 Revision: "", | |
447 Expected: []string{}, | |
448 StoleFromIdx: -1, | |
449 }, | |
450 // 6. Steal commits from a previously-ingested task. | |
451 { | |
452 Revision: hashes['C'], | |
453 Expected: []string{hashes['C']}, | |
454 StoleFromIdx: 1, | |
455 }, | |
456 } | |
457 name := "Test-Ubuntu12-ShuttleA-GTX660-x86-Release" | |
458 repoName := "skia.git" | |
459 repo, err := gitrepo.NewRepo(repoName, path.Join(tr.Dir, repoName)) | |
460 assert.NoError(t, err) | |
461 ids := make([]string, len(testCases)) | |
462 commitsBuf := make([]*gitrepo.Commit, 0, buildbot.MAX_BLAMELIST_COMMITS) | |
463 for i, tc := range testCases { | |
464 // Ensure that we get the expected blamelist. | |
465 commits, stoleFrom, err := ComputeBlamelist(cache, repo, name, r
epoName, tc.Revision, commitsBuf) | |
466 if tc.Revision == "" { | |
467 assert.Error(t, err) | |
468 continue | |
469 } else { | |
470 assert.NoError(t, err) | |
471 } | |
472 sort.Strings(commits) | |
473 testutils.AssertDeepEqual(t, tc.Expected, commits) | |
474 if tc.StoleFromIdx >= 0 { | |
475 assert.NotNil(t, stoleFrom) | |
476 assert.Equal(t, ids[tc.StoleFromIdx], stoleFrom.Id) | |
477 } else { | |
478 assert.Nil(t, stoleFrom) | |
479 } | |
480 | |
481 // Insert the task into the DB. | |
482 c := &taskCandidate{ | |
483 Name: name, | |
484 Repo: repoName, | |
485 Revision: tc.Revision, | |
486 } | |
487 task := c.MakeTask() | |
488 task.Commits = commits | |
489 task.Created = time.Now() | |
490 if stoleFrom != nil { | |
491 // Re-insert the stoleFrom task without the commits | |
492 // which were stolen from it. | |
493 stoleFromCommits := make([]string, 0, len(stoleFrom.Comm
its)-len(commits)) | |
494 for _, commit := range stoleFrom.Commits { | |
495 if !util.In(commit, task.Commits) { | |
496 stoleFromCommits = append(stoleFromCommi
ts, commit) | |
497 } | |
498 } | |
499 stoleFrom.Commits = stoleFromCommits | |
500 assert.NoError(t, d.PutTasks([]*db.Task{task, stoleFrom}
)) | |
501 } else { | |
502 assert.NoError(t, d.PutTask(task)) | |
503 } | |
504 ids[i] = task.Id | |
505 assert.NoError(t, cache.Update()) | |
506 } | |
507 | |
508 // Extra: ensure that task #6 really stole the commit from #1. | |
509 task, err := cache.GetTask(ids[1]) | |
510 assert.NoError(t, err) | |
511 assert.False(t, util.In(hashes['C'], task.Commits), fmt.Sprintf("Expecte
d not to find %s in %v", hashes['C'], task.Commits)) | |
512 } | |
513 | |
514 func TestTimeDecay24Hr(t *testing.T) { | |
515 tc := []struct { | |
516 decayAmt24Hr float64 | |
517 elapsed time.Duration | |
518 out float64 | |
519 }{ | |
520 { | |
521 decayAmt24Hr: 1.0, | |
522 elapsed: 10 * time.Hour, | |
523 out: 1.0, | |
524 }, | |
525 { | |
526 decayAmt24Hr: 0.5, | |
527 elapsed: 0 * time.Hour, | |
528 out: 1.0, | |
529 }, | |
530 { | |
531 decayAmt24Hr: 0.5, | |
532 elapsed: 24 * time.Hour, | |
533 out: 0.5, | |
534 }, | |
535 { | |
536 decayAmt24Hr: 0.5, | |
537 elapsed: 12 * time.Hour, | |
538 out: 0.75, | |
539 }, | |
540 { | |
541 decayAmt24Hr: 0.5, | |
542 elapsed: 36 * time.Hour, | |
543 out: 0.25, | |
544 }, | |
545 { | |
546 decayAmt24Hr: 0.5, | |
547 elapsed: 48 * time.Hour, | |
548 out: 0.0, | |
549 }, | |
550 { | |
551 decayAmt24Hr: 0.5, | |
552 elapsed: 72 * time.Hour, | |
553 out: 0.0, | |
554 }, | |
555 } | |
556 for i, c := range tc { | |
557 assert.Equal(t, c.out, timeDecay24Hr(c.decayAmt24Hr, c.elapsed),
fmt.Sprintf("test case #%d", i)) | |
558 } | |
559 } | |
560 | |
561 func TestRegenerateTaskQueue(t *testing.T) { | |
562 testutils.SkipIfShort(t) | |
563 | |
564 // Setup. | |
565 tr := util.NewTempRepo() | |
566 defer tr.Cleanup() | |
567 d := db.NewInMemoryDB() | |
568 cache, err := db.NewTaskCache(d, time.Hour) | |
569 assert.NoError(t, err) | |
570 | |
571 // The test repo has two commits. The first commit adds a tasks.cfg file | |
572 // with two task specs: a build task and a test task, the test task | |
573 // depending on the build task. The second commit adds a perf task spec, | |
574 // which also depends on the build task. Therefore, there are five total | |
575 // possible tasks we could run: | |
576 // | |
577 // Build@c1, Test@c1, Build@c2, Test@c2, Perf@c2 | |
578 // | |
579 c1 := "c06ac6093d3029dffe997e9d85e8e61fee5f87b9" | |
580 c2 := "0f87799ac791b8d8573e93694d05b05a65e09668" | |
581 buildTask := "Build-Ubuntu-GCC-Arm7-Release-Android" | |
582 testTask := "Test-Android-GCC-Nexus7-GPU-Tegra3-Arm7-Release" | |
583 perfTask := "Perf-Android-GCC-Nexus7-GPU-Tegra3-Arm7-Release" | |
584 repoName := "skia.git" | |
585 | |
586 assert.NoError(t, err) | |
587 isolateClient, err := isolate.NewClient(tr.Dir) | |
588 assert.NoError(t, err) | |
589 isolateClient.ServerUrl = isolate.FAKE_SERVER_URL | |
590 swarmingClient := swarming.NewTestClient() | |
591 s, err := NewTaskScheduler(d, cache, time.Duration(math.MaxInt64), tr.Di
r, []string{repoName}, isolateClient, swarmingClient) | |
592 assert.NoError(t, err) | |
593 | |
594 // Ensure that the queue is initially empty. | |
595 assert.Equal(t, 0, len(s.queue)) | |
596 | |
597 // Regenerate the task queue. | |
598 assert.NoError(t, s.regenerateTaskQueue()) | |
599 assert.Equal(t, 2, len(s.queue)) // Two Build tasks. | |
600 | |
601 testSort := func() { | |
602 // Ensure that we sorted correctly. | |
603 if len(s.queue) == 0 { | |
604 return | |
605 } | |
606 highScore := s.queue[0].Score | |
607 for _, c := range s.queue { | |
608 assert.True(t, highScore >= c.Score) | |
609 highScore = c.Score | |
610 } | |
611 } | |
612 testSort() | |
613 | |
614 // Since we haven't run any task yet, we should have the two Build | |
615 // tasks, each with a blamelist of 1 commit (since we don't go past | |
616 // taskCandidate.Revision when computing blamelists when we haven't run | |
617 // a given task spec before), and a score of 2.0. | |
618 for _, c := range s.queue { | |
619 assert.Equal(t, buildTask, c.Name) | |
620 assert.Equal(t, []string{c.Revision}, c.Commits) | |
621 assert.Equal(t, 2.0, c.Score) | |
622 } | |
623 | |
624 // Insert one of the tasks. | |
625 var t1 *db.Task | |
626 for _, c := range s.queue { // Order not guaranteed; find the right cand
idate. | |
627 if c.Revision == c1 { | |
628 t1 = makeTask(c.Name, c.Repo, c.Revision) | |
629 break | |
630 } | |
631 } | |
632 assert.NotNil(t, t1) | |
633 t1.Status = db.TASK_STATUS_SUCCESS | |
634 t1.IsolatedOutput = "fake isolated hash" | |
635 assert.NoError(t, d.PutTask(t1)) | |
636 assert.NoError(t, cache.Update()) | |
637 | |
638 // Regenerate the task queue. | |
639 assert.NoError(t, s.regenerateTaskQueue()) | |
640 | |
641 // Now we expect the queue to contain the other Build task and the one | |
642 // Test task we unblocked by running the first Build task. | |
643 assert.Equal(t, 2, len(s.queue)) | |
644 testSort() | |
645 for _, c := range s.queue { | |
646 assert.Equal(t, 2.0, c.Score) | |
647 assert.Equal(t, []string{c.Revision}, c.Commits) | |
648 } | |
649 buildIdx := 0 | |
650 testIdx := 1 | |
651 if s.queue[1].Name == buildTask { | |
652 buildIdx = 1 | |
653 testIdx = 0 | |
654 } | |
655 assert.Equal(t, buildTask, s.queue[buildIdx].Name) | |
656 assert.Equal(t, c2, s.queue[buildIdx].Revision) | |
657 | |
658 assert.Equal(t, testTask, s.queue[testIdx].Name) | |
659 assert.Equal(t, c1, s.queue[testIdx].Revision) | |
660 | |
661 // Run the other Build task. | |
662 t2 := makeTask(s.queue[buildIdx].Name, s.queue[buildIdx].Repo, s.queue[b
uildIdx].Revision) | |
663 t2.Status = db.TASK_STATUS_SUCCESS | |
664 t2.IsolatedOutput = "fake isolated hash" | |
665 assert.NoError(t, d.PutTask(t2)) | |
666 assert.NoError(t, cache.Update()) | |
667 | |
668 // Regenerate the task queue. | |
669 assert.NoError(t, s.regenerateTaskQueue()) | |
670 assert.Equal(t, 3, len(s.queue)) | |
671 testSort() | |
672 perfIdx := -1 | |
673 for i, c := range s.queue { | |
674 if c.Name == perfTask { | |
675 perfIdx = i | |
676 } else { | |
677 assert.Equal(t, c.Name, testTask) | |
678 } | |
679 assert.Equal(t, 2.0, c.Score) | |
680 assert.Equal(t, []string{c.Revision}, c.Commits) | |
681 } | |
682 assert.True(t, perfIdx > -1) | |
683 | |
684 // Run the Test task at tip of tree, but make its blamelist cover both | |
685 // commits. | |
686 t3 := makeTask(testTask, repoName, c2) | |
687 t3.Commits = append(t3.Commits, c1) | |
688 t3.Status = db.TASK_STATUS_SUCCESS | |
689 t3.IsolatedOutput = "fake isolated hash" | |
690 assert.NoError(t, d.PutTask(t3)) | |
691 assert.NoError(t, cache.Update()) | |
692 | |
693 // Regenerate the task queue. | |
694 assert.NoError(t, s.regenerateTaskQueue()) | |
695 | |
696 // Now we expect the queue to contain one Test and one Perf task. The | |
697 // Test task is a backfill, and should have a score of 0.5. | |
698 assert.Equal(t, 2, len(s.queue)) | |
699 testSort() | |
700 // First candidate should be the perf task. | |
701 assert.Equal(t, perfTask, s.queue[0].Name) | |
702 assert.Equal(t, 2.0, s.queue[0].Score) | |
703 // The test task is next, a backfill. | |
704 assert.Equal(t, testTask, s.queue[1].Name) | |
705 assert.Equal(t, 0.5, s.queue[1].Score) | |
706 } | |
707 | |
708 func makeTaskCandidate(name string, dims []string) *taskCandidate { | |
709 return &taskCandidate{ | |
710 Name: name, | |
711 TaskSpec: &TaskSpec{ | |
712 Dimensions: dims, | |
713 }, | |
714 } | |
715 } | |
716 | |
717 func makeSwarmingBot(id string, dims []string) *swarming_api.SwarmingRpcsBotInfo
{ | |
718 d := make([]*swarming_api.SwarmingRpcsStringListPair, 0, len(dims)) | |
719 for _, s := range dims { | |
720 split := strings.SplitN(s, ":", 2) | |
721 d = append(d, &swarming_api.SwarmingRpcsStringListPair{ | |
722 Key: split[0], | |
723 Value: []string{split[1]}, | |
724 }) | |
725 } | |
726 return &swarming_api.SwarmingRpcsBotInfo{ | |
727 BotId: id, | |
728 Dimensions: d, | |
729 } | |
730 } | |
731 | |
732 func TestGetCandidatesToSchedule(t *testing.T) { | |
733 // Empty lists. | |
734 rv := getCandidatesToSchedule([]*swarming_api.SwarmingRpcsBotInfo{}, []*
taskCandidate{}) | |
735 assert.Equal(t, 0, len(rv)) | |
736 | |
737 t1 := makeTaskCandidate("task1", []string{"k:v"}) | |
738 rv = getCandidatesToSchedule([]*swarming_api.SwarmingRpcsBotInfo{}, []*t
askCandidate{t1}) | |
739 assert.Equal(t, 0, len(rv)) | |
740 | |
741 b1 := makeSwarmingBot("bot1", []string{"k:v"}) | |
742 rv = getCandidatesToSchedule([]*swarming_api.SwarmingRpcsBotInfo{b1}, []
*taskCandidate{}) | |
743 assert.Equal(t, 0, len(rv)) | |
744 | |
745 // Single match. | |
746 rv = getCandidatesToSchedule([]*swarming_api.SwarmingRpcsBotInfo{b1}, []
*taskCandidate{t1}) | |
747 testutils.AssertDeepEqual(t, []*taskCandidate{t1}, rv) | |
748 | |
749 // No match. | |
750 t1.TaskSpec.Dimensions[0] = "k:v2" | |
751 rv = getCandidatesToSchedule([]*swarming_api.SwarmingRpcsBotInfo{b1}, []
*taskCandidate{t1}) | |
752 assert.Equal(t, 0, len(rv)) | |
753 | |
754 // Add a task candidate to match b1. | |
755 t1 = makeTaskCandidate("task1", []string{"k:v2"}) | |
756 t2 := makeTaskCandidate("task2", []string{"k:v"}) | |
757 rv = getCandidatesToSchedule([]*swarming_api.SwarmingRpcsBotInfo{b1}, []
*taskCandidate{t1, t2}) | |
758 testutils.AssertDeepEqual(t, []*taskCandidate{t2}, rv) | |
759 | |
760 // Switch the task order. | |
761 t1 = makeTaskCandidate("task1", []string{"k:v2"}) | |
762 t2 = makeTaskCandidate("task2", []string{"k:v"}) | |
763 rv = getCandidatesToSchedule([]*swarming_api.SwarmingRpcsBotInfo{b1}, []
*taskCandidate{t2, t1}) | |
764 testutils.AssertDeepEqual(t, []*taskCandidate{t2}, rv) | |
765 | |
766 // Make both tasks match the bot, ensure that we pick the first one. | |
767 t1 = makeTaskCandidate("task1", []string{"k:v"}) | |
768 t2 = makeTaskCandidate("task2", []string{"k:v"}) | |
769 rv = getCandidatesToSchedule([]*swarming_api.SwarmingRpcsBotInfo{b1}, []
*taskCandidate{t1, t2}) | |
770 testutils.AssertDeepEqual(t, []*taskCandidate{t1}, rv) | |
771 rv = getCandidatesToSchedule([]*swarming_api.SwarmingRpcsBotInfo{b1}, []
*taskCandidate{t2, t1}) | |
772 testutils.AssertDeepEqual(t, []*taskCandidate{t2}, rv) | |
773 | |
774 // Multiple dimensions. Ensure that different permutations of the bots | |
775 // and tasks lists give us the expected results. | |
776 dims := []string{"k:v", "k2:v2", "k3:v3"} | |
777 b1 = makeSwarmingBot("bot1", dims) | |
778 b2 := makeSwarmingBot("bot2", t1.TaskSpec.Dimensions) | |
779 t1 = makeTaskCandidate("task1", []string{"k:v"}) | |
780 t2 = makeTaskCandidate("task2", dims) | |
781 // In the first two cases, the task with fewer dimensions has the | |
782 // higher priority. It gets the bot with more dimensions because it | |
783 // is first in sorted order. The second task does not get scheduled | |
784 // because there is no bot available which can run it. | |
785 // TODO(borenet): Use a more optimal solution to avoid this case. | |
786 rv = getCandidatesToSchedule([]*swarming_api.SwarmingRpcsBotInfo{b1, b2}
, []*taskCandidate{t1, t2}) | |
787 testutils.AssertDeepEqual(t, []*taskCandidate{t1}, rv) | |
788 t1 = makeTaskCandidate("task1", []string{"k:v"}) | |
789 t2 = makeTaskCandidate("task2", dims) | |
790 rv = getCandidatesToSchedule([]*swarming_api.SwarmingRpcsBotInfo{b2, b1}
, []*taskCandidate{t1, t2}) | |
791 testutils.AssertDeepEqual(t, []*taskCandidate{t1}, rv) | |
792 // In these two cases, the task with more dimensions has the higher | |
793 // priority. Both tasks get scheduled. | |
794 t1 = makeTaskCandidate("task1", []string{"k:v"}) | |
795 t2 = makeTaskCandidate("task2", dims) | |
796 rv = getCandidatesToSchedule([]*swarming_api.SwarmingRpcsBotInfo{b1, b2}
, []*taskCandidate{t2, t1}) | |
797 testutils.AssertDeepEqual(t, []*taskCandidate{t2, t1}, rv) | |
798 t1 = makeTaskCandidate("task1", []string{"k:v"}) | |
799 t2 = makeTaskCandidate("task2", dims) | |
800 rv = getCandidatesToSchedule([]*swarming_api.SwarmingRpcsBotInfo{b2, b1}
, []*taskCandidate{t2, t1}) | |
801 testutils.AssertDeepEqual(t, []*taskCandidate{t2, t1}, rv) | |
802 | |
803 // Matching dimensions. More bots than tasks. | |
804 b2 = makeSwarmingBot("bot2", dims) | |
805 b3 := makeSwarmingBot("bot3", dims) | |
806 t1 = makeTaskCandidate("task1", dims) | |
807 t2 = makeTaskCandidate("task2", dims) | |
808 t3 := makeTaskCandidate("task3", dims) | |
809 rv = getCandidatesToSchedule([]*swarming_api.SwarmingRpcsBotInfo{b1, b2,
b3}, []*taskCandidate{t1, t2}) | |
810 testutils.AssertDeepEqual(t, []*taskCandidate{t1, t2}, rv) | |
811 | |
812 // More tasks than bots. | |
813 t1 = makeTaskCandidate("task1", dims) | |
814 t2 = makeTaskCandidate("task2", dims) | |
815 t3 = makeTaskCandidate("task3", dims) | |
816 rv = getCandidatesToSchedule([]*swarming_api.SwarmingRpcsBotInfo{b1, b2}
, []*taskCandidate{t1, t2, t3}) | |
817 testutils.AssertDeepEqual(t, []*taskCandidate{t1, t2}, rv) | |
818 } | |
819 | |
820 func makeBot(id string, dims map[string]string) *swarming_api.SwarmingRpcsBotInf
o { | |
821 dimensions := make([]*swarming_api.SwarmingRpcsStringListPair, 0, len(di
ms)) | |
822 for k, v := range dims { | |
823 dimensions = append(dimensions, &swarming_api.SwarmingRpcsString
ListPair{ | |
824 Key: k, | |
825 Value: []string{v}, | |
826 }) | |
827 } | |
828 return &swarming_api.SwarmingRpcsBotInfo{ | |
829 BotId: id, | |
830 Dimensions: dimensions, | |
831 } | |
832 } | |
833 | |
834 func TestSchedulingE2E(t *testing.T) { | |
835 testutils.SkipIfShort(t) | |
836 | |
837 // Setup. | |
838 tr := util.NewTempRepo() | |
839 defer tr.Cleanup() | |
840 d := db.NewInMemoryDB() | |
841 cache, err := db.NewTaskCache(d, time.Hour) | |
842 assert.NoError(t, err) | |
843 | |
844 // The test repo has two commits. The first commit adds a tasks.cfg file | |
845 // with two task specs: a build task and a test task, the test task | |
846 // depending on the build task. The second commit adds a perf task spec, | |
847 // which also depends on the build task. Therefore, there are five total | |
848 // possible tasks we could run: | |
849 // | |
850 // Build@c1, Test@c1, Build@c2, Test@c2, Perf@c2 | |
851 // | |
852 c1 := "c06ac6093d3029dffe997e9d85e8e61fee5f87b9" | |
853 c2 := "0f87799ac791b8d8573e93694d05b05a65e09668" | |
854 | |
855 repoName := "skia.git" | |
856 | |
857 isolateClient, err := isolate.NewClient(tr.Dir) | |
858 assert.NoError(t, err) | |
859 isolateClient.ServerUrl = isolate.FAKE_SERVER_URL | |
860 swarmingClient := swarming.NewTestClient() | |
861 s, err := NewTaskScheduler(d, cache, time.Duration(math.MaxInt64), tr.Di
r, []string{repoName}, isolateClient, swarmingClient) | |
862 | |
863 // Start testing. No free bots, so we get a full queue with nothing | |
864 // scheduled. | |
865 assert.NoError(t, s.MainLoop()) | |
866 tasks, err := cache.GetTasksForCommits(repoName, []string{c1, c2}) | |
867 assert.NoError(t, err) | |
868 expect := map[string]map[string]*db.Task{ | |
869 c1: map[string]*db.Task{}, | |
870 c2: map[string]*db.Task{}, | |
871 } | |
872 testutils.AssertDeepEqual(t, expect, tasks) | |
873 | |
874 // A bot is free but doesn't have all of the right dimensions to run a t
ask. | |
875 bot1 := makeBot("bot1", map[string]string{"pool": "Skia"}) | |
876 swarmingClient.MockBots([]*swarming_api.SwarmingRpcsBotInfo{bot1}) | |
877 assert.NoError(t, s.MainLoop()) | |
878 tasks, err = cache.GetTasksForCommits(repoName, []string{c1, c2}) | |
879 assert.NoError(t, err) | |
880 expect = map[string]map[string]*db.Task{ | |
881 c1: map[string]*db.Task{}, | |
882 c2: map[string]*db.Task{}, | |
883 } | |
884 testutils.AssertDeepEqual(t, expect, tasks) | |
885 assert.Equal(t, 2, len(s.queue)) | |
886 | |
887 // One bot free, schedule a task, ensure it's not in the queue. | |
888 bot1.Dimensions = append(bot1.Dimensions, &swarming_api.SwarmingRpcsStri
ngListPair{ | |
889 Key: "os", | |
890 Value: []string{"Ubuntu"}, | |
891 }) | |
892 swarmingClient.MockBots([]*swarming_api.SwarmingRpcsBotInfo{bot1}) | |
893 assert.NoError(t, s.MainLoop()) | |
894 tasks, err = cache.GetTasksForCommits(repoName, []string{c1, c2}) | |
895 assert.NoError(t, err) | |
896 var t1 *db.Task | |
897 for _, v := range tasks { | |
898 for _, t := range v { | |
899 t1 = t | |
900 break | |
901 } | |
902 } | |
903 assert.NotNil(t, t1) | |
904 assert.Equal(t, 1, len(s.queue)) | |
905 | |
906 // The task is complete. | |
907 t1.Status = db.TASK_STATUS_SUCCESS | |
908 t1.Finished = time.Now() | |
909 t1.IsolatedOutput = "abc123" | |
910 assert.NoError(t, d.PutTask(t1)) | |
911 swarmingClient.MockTasks([]*swarming_api.SwarmingRpcsTaskRequestMetadata
{ | |
912 makeSwarmingRpcsTaskRequestMetadata(t, t1), | |
913 }) | |
914 | |
915 // No bots free. Ensure that the queue is correct. | |
916 swarmingClient.MockBots([]*swarming_api.SwarmingRpcsBotInfo{}) | |
917 assert.NoError(t, s.MainLoop()) | |
918 tasks, err = cache.GetTasksForCommits(repoName, []string{c1, c2}) | |
919 assert.NoError(t, err) | |
920 // The tests don't use any time-based score scaling, because the commits | |
921 // in the test repo have fixed timestamps and would eventually result in | |
922 // zero scores. The side effect is that we don't know which of c1 or c2 | |
923 // will be chosen because they end up with the same score. | |
924 expectLen := 2 // One remaining build task, plus one test task. | |
925 if t1.Revision == c2 { | |
926 expectLen = 3 // c2 adds a perf task. | |
927 } | |
928 assert.Equal(t, expectLen, len(s.queue)) | |
929 | |
930 // More bots than tasks free, ensure the queue is correct. | |
931 bot2 := makeBot("bot2", map[string]string{ | |
932 "pool": "Skia", | |
933 "os": "Android", | |
934 "device_type": "grouper", | |
935 }) | |
936 bot3 := makeBot("bot3", map[string]string{ | |
937 "pool": "Skia", | |
938 "os": "Android", | |
939 "device_type": "grouper", | |
940 }) | |
941 bot4 := makeBot("bot4", map[string]string{ | |
942 "pool": "Skia", | |
943 "os": "Ubuntu", | |
944 }) | |
945 swarmingClient.MockBots([]*swarming_api.SwarmingRpcsBotInfo{bot1, bot2,
bot3, bot4}) | |
946 assert.NoError(t, s.MainLoop()) | |
947 _, err = cache.GetTasksForCommits(repoName, []string{c1, c2}) | |
948 assert.NoError(t, err) | |
949 assert.Equal(t, 0, len(s.queue)) | |
950 | |
951 // Second compile task finished. | |
952 var t2 *db.Task | |
953 var t3 *db.Task | |
954 var t4 *db.Task | |
955 tasks, err = cache.GetTasksForCommits(repoName, []string{c1, c2}) | |
956 assert.NoError(t, err) | |
957 for _, v := range tasks { | |
958 for _, task := range v { | |
959 if task.Name == t1.Name { | |
960 if (t1.Revision == c1 && task.Revision == c2) ||
(t1.Revision == c2 && task.Revision == c1) { | |
961 t2 = task | |
962 } | |
963 } else { | |
964 if t3 == nil { | |
965 t3 = task | |
966 } else { | |
967 t4 = task | |
968 } | |
969 } | |
970 } | |
971 } | |
972 assert.NotNil(t, t2) | |
973 assert.NotNil(t, t3) | |
974 t2.Status = db.TASK_STATUS_SUCCESS | |
975 t2.Finished = time.Now() | |
976 t2.IsolatedOutput = "abc123" | |
977 | |
978 // No new bots free; ensure that the newly-available tasks are in the qu
eue. | |
979 swarmingClient.MockBots([]*swarming_api.SwarmingRpcsBotInfo{}) | |
980 mockTasks := []*swarming_api.SwarmingRpcsTaskRequestMetadata{ | |
981 makeSwarmingRpcsTaskRequestMetadata(t, t2), | |
982 makeSwarmingRpcsTaskRequestMetadata(t, t3), | |
983 } | |
984 if t4 != nil { | |
985 mockTasks = append(mockTasks, makeSwarmingRpcsTaskRequestMetadat
a(t, t4)) | |
986 } | |
987 swarmingClient.MockTasks(mockTasks) | |
988 assert.NoError(t, s.MainLoop()) | |
989 expectLen = 1 // Test task from c1 | |
990 if t2.Revision == c2 { | |
991 expectLen = 2 // Test and perf tasks from c2 | |
992 } | |
993 assert.Equal(t, expectLen, len(s.queue)) | |
994 | |
995 // Finish the other tasks. | |
996 t3, err = cache.GetTask(t3.Id) | |
997 assert.NoError(t, err) | |
998 t3.Status = db.TASK_STATUS_SUCCESS | |
999 t3.Finished = time.Now() | |
1000 t3.IsolatedOutput = "abc123" | |
1001 if t4 != nil { | |
1002 t4, err = cache.GetTask(t4.Id) | |
1003 assert.NoError(t, err) | |
1004 t4.Status = db.TASK_STATUS_SUCCESS | |
1005 t4.Finished = time.Now() | |
1006 t4.IsolatedOutput = "abc123" | |
1007 } | |
1008 | |
1009 // Ensure that we finally run all of the tasks and insert into the DB. | |
1010 swarmingClient.MockBots([]*swarming_api.SwarmingRpcsBotInfo{bot1, bot2,
bot3, bot4}) | |
1011 mockTasks = []*swarming_api.SwarmingRpcsTaskRequestMetadata{ | |
1012 makeSwarmingRpcsTaskRequestMetadata(t, t3), | |
1013 } | |
1014 if t4 != nil { | |
1015 mockTasks = append(mockTasks, makeSwarmingRpcsTaskRequestMetadat
a(t, t4)) | |
1016 } | |
1017 assert.NoError(t, s.MainLoop()) | |
1018 tasks, err = cache.GetTasksForCommits(repoName, []string{c1, c2}) | |
1019 assert.NoError(t, err) | |
1020 assert.Equal(t, 2, len(tasks[c1])) | |
1021 assert.Equal(t, 3, len(tasks[c2])) | |
1022 assert.Equal(t, 0, len(s.queue)) | |
1023 | |
1024 // Mark everything as finished. Ensure that the queue still ends up empt
y. | |
1025 tasksList := []*db.Task{} | |
1026 for _, v := range tasks { | |
1027 for _, task := range v { | |
1028 if task.Status != db.TASK_STATUS_SUCCESS { | |
1029 task.Status = db.TASK_STATUS_SUCCESS | |
1030 task.Finished = time.Now() | |
1031 task.IsolatedOutput = "abc123" | |
1032 tasksList = append(tasksList, task) | |
1033 } | |
1034 } | |
1035 } | |
1036 mockTasks = make([]*swarming_api.SwarmingRpcsTaskRequestMetadata, 0, len
(tasksList)) | |
1037 for _, task := range tasksList { | |
1038 mockTasks = append(mockTasks, makeSwarmingRpcsTaskRequestMetadat
a(t, task)) | |
1039 } | |
1040 swarmingClient.MockTasks(mockTasks) | |
1041 swarmingClient.MockBots([]*swarming_api.SwarmingRpcsBotInfo{bot1, bot2,
bot3, bot4}) | |
1042 assert.NoError(t, s.MainLoop()) | |
1043 assert.Equal(t, 0, len(s.queue)) | |
1044 } | |
1045 | |
1046 func makeDummyCommits(t *testing.T, repoDir string, numCommits int) { | |
1047 _, err := exec.RunCwd(repoDir, "git", "config", "user.email", "test@skia
.org") | |
1048 assert.NoError(t, err) | |
1049 _, err = exec.RunCwd(repoDir, "git", "config", "user.name", "Skia Tester
") | |
1050 assert.NoError(t, err) | |
1051 _, err = exec.RunCwd(repoDir, "git", "checkout", "master") | |
1052 assert.NoError(t, err) | |
1053 dummyFile := path.Join(repoDir, "dummyfile.txt") | |
1054 for i := 0; i < numCommits; i++ { | |
1055 title := fmt.Sprintf("Dummy #%d", i) | |
1056 assert.NoError(t, ioutil.WriteFile(dummyFile, []byte(title), os.
ModePerm)) | |
1057 _, err = exec.RunCwd(repoDir, "git", "add", dummyFile) | |
1058 assert.NoError(t, err) | |
1059 _, err = exec.RunCwd(repoDir, "git", "commit", "-m", title) | |
1060 assert.NoError(t, err) | |
1061 _, err = exec.RunCwd(repoDir, "git", "push", "origin", "master") | |
1062 assert.NoError(t, err) | |
1063 } | |
1064 } | |
1065 | |
1066 func TestSchedulerStealingFrom(t *testing.T) { | |
1067 testutils.SkipIfShort(t) | |
1068 | |
1069 // Setup. | |
1070 tr := util.NewTempRepo() | |
1071 d := db.NewInMemoryDB() | |
1072 cache, err := db.NewTaskCache(d, time.Hour) | |
1073 assert.NoError(t, err) | |
1074 | |
1075 // The test repo has two commits. The first commit adds a tasks.cfg file | |
1076 // with two task specs: a build task and a test task, the test task | |
1077 // depending on the build task. The second commit adds a perf task spec, | |
1078 // which also depends on the build task. Therefore, there are five total | |
1079 // possible tasks we could run: | |
1080 // | |
1081 // Build@c1, Test@c1, Build@c2, Test@c2, Perf@c2 | |
1082 // | |
1083 c1 := "c06ac6093d3029dffe997e9d85e8e61fee5f87b9" | |
1084 c2 := "0f87799ac791b8d8573e93694d05b05a65e09668" | |
1085 buildTask := "Build-Ubuntu-GCC-Arm7-Release-Android" | |
1086 repoName := "skia.git" | |
1087 repoDir := path.Join(tr.Dir, repoName) | |
1088 | |
1089 repos := gitinfo.NewRepoMap(tr.Dir) | |
1090 repo, err := repos.Repo(repoName) | |
1091 assert.NoError(t, err) | |
1092 isolateClient, err := isolate.NewClient(tr.Dir) | |
1093 assert.NoError(t, err) | |
1094 isolateClient.ServerUrl = isolate.FAKE_SERVER_URL | |
1095 swarmingClient := swarming.NewTestClient() | |
1096 s, err := NewTaskScheduler(d, cache, time.Duration(math.MaxInt64), tr.Di
r, []string{"skia.git"}, isolateClient, swarmingClient) | |
1097 assert.NoError(t, err) | |
1098 | |
1099 // Run both available compile tasks. | |
1100 bot1 := makeBot("bot1", map[string]string{"pool": "Skia", "os": "Ubuntu"
}) | |
1101 bot2 := makeBot("bot2", map[string]string{"pool": "Skia", "os": "Ubuntu"
}) | |
1102 swarmingClient.MockBots([]*swarming_api.SwarmingRpcsBotInfo{bot1, bot2}) | |
1103 assert.NoError(t, s.MainLoop()) | |
1104 tasks, err := cache.GetTasksForCommits(repoName, []string{c1, c2}) | |
1105 assert.NoError(t, err) | |
1106 assert.Equal(t, 1, len(tasks[c1])) | |
1107 assert.Equal(t, 1, len(tasks[c2])) | |
1108 tasksList := []*db.Task{} | |
1109 for _, v := range tasks { | |
1110 for _, task := range v { | |
1111 if task.Status != db.TASK_STATUS_SUCCESS { | |
1112 task.Status = db.TASK_STATUS_SUCCESS | |
1113 task.Finished = time.Now() | |
1114 task.IsolatedOutput = "abc123" | |
1115 tasksList = append(tasksList, task) | |
1116 } | |
1117 } | |
1118 } | |
1119 assert.NoError(t, d.PutTasks(tasksList)) | |
1120 assert.NoError(t, cache.Update()) | |
1121 | |
1122 // Add some commits. | |
1123 makeDummyCommits(t, repoDir, 10) | |
1124 commits, err := repo.RevList("HEAD") | |
1125 assert.NoError(t, err) | |
1126 | |
1127 // Run one task. Ensure that it's at tip-of-tree. | |
1128 head, err := repo.FullHash("HEAD") | |
1129 assert.NoError(t, err) | |
1130 swarmingClient.MockBots([]*swarming_api.SwarmingRpcsBotInfo{bot1}) | |
1131 assert.NoError(t, s.MainLoop()) | |
1132 tasks, err = cache.GetTasksForCommits(repoName, commits) | |
1133 assert.NoError(t, err) | |
1134 assert.Equal(t, 1, len(tasks[head])) | |
1135 task := tasks[head][buildTask] | |
1136 assert.Equal(t, head, task.Revision) | |
1137 expect := commits[:len(commits)-2] | |
1138 sort.Strings(expect) | |
1139 sort.Strings(task.Commits) | |
1140 testutils.AssertDeepEqual(t, expect, task.Commits) | |
1141 | |
1142 task.Status = db.TASK_STATUS_SUCCESS | |
1143 task.Finished = time.Now() | |
1144 task.IsolatedOutput = "abc123" | |
1145 assert.NoError(t, d.PutTask(task)) | |
1146 assert.NoError(t, cache.Update()) | |
1147 | |
1148 oldTasksByCommit := tasks | |
1149 | |
1150 // Run backfills, ensuring that each one steals the right set of commits | |
1151 // from previous builds, until all of the build task candidates have run
. | |
1152 for i := 0; i < 9; i++ { | |
1153 // Now, run another task. The new task should bisect the old one
. | |
1154 swarmingClient.MockBots([]*swarming_api.SwarmingRpcsBotInfo{bot1
}) | |
1155 assert.NoError(t, s.MainLoop()) | |
1156 tasks, err = cache.GetTasksForCommits(repoName, commits) | |
1157 assert.NoError(t, err) | |
1158 var newTask *db.Task | |
1159 for _, v := range tasks { | |
1160 for _, task := range v { | |
1161 if task.Status == db.TASK_STATUS_PENDING { | |
1162 assert.True(t, newTask == nil || task.Id
== newTask.Id) | |
1163 newTask = task | |
1164 } | |
1165 } | |
1166 } | |
1167 assert.NotNil(t, newTask) | |
1168 | |
1169 oldTask := oldTasksByCommit[newTask.Revision][newTask.Name] | |
1170 assert.NotNil(t, oldTask) | |
1171 assert.True(t, util.In(newTask.Revision, oldTask.Commits)) | |
1172 | |
1173 // Find the updated old task. | |
1174 updatedOldTask, err := cache.GetTask(oldTask.Id) | |
1175 assert.NoError(t, err) | |
1176 assert.NotNil(t, updatedOldTask) | |
1177 | |
1178 // Ensure that the blamelists are correct. | |
1179 old := util.NewStringSet(oldTask.Commits) | |
1180 new := util.NewStringSet(newTask.Commits) | |
1181 updatedOld := util.NewStringSet(updatedOldTask.Commits) | |
1182 | |
1183 testutils.AssertDeepEqual(t, old, new.Union(updatedOld)) | |
1184 assert.Equal(t, 0, len(new.Intersect(updatedOld))) | |
1185 // Finish the new task. | |
1186 newTask.Status = db.TASK_STATUS_SUCCESS | |
1187 newTask.Finished = time.Now() | |
1188 newTask.IsolatedOutput = "abc123" | |
1189 assert.NoError(t, d.PutTask(newTask)) | |
1190 assert.NoError(t, cache.Update()) | |
1191 oldTasksByCommit = tasks | |
1192 | |
1193 } | |
1194 | |
1195 // Ensure that we're really done. | |
1196 swarmingClient.MockBots([]*swarming_api.SwarmingRpcsBotInfo{bot1}) | |
1197 assert.NoError(t, s.MainLoop()) | |
1198 tasks, err = cache.GetTasksForCommits(repoName, commits) | |
1199 assert.NoError(t, err) | |
1200 var newTask *db.Task | |
1201 for _, v := range tasks { | |
1202 for _, task := range v { | |
1203 if task.Status == db.TASK_STATUS_PENDING { | |
1204 assert.True(t, newTask == nil || task.Id == newT
ask.Id) | |
1205 newTask = task | |
1206 } | |
1207 } | |
1208 } | |
1209 assert.Nil(t, newTask) | |
1210 } | |
1211 | |
1212 func TestMultipleCandidatesBackfillingEachOther(t *testing.T) { | |
1213 testutils.SkipIfShort(t) | |
1214 | |
1215 workdir, err := ioutil.TempDir("", "") | |
1216 assert.NoError(t, err) | |
1217 defer testutils.RemoveAll(t, workdir) | |
1218 | |
1219 run := func(dir string, cmd ...string) { | |
1220 _, err := exec.RunCwd(dir, cmd...) | |
1221 assert.NoError(t, err) | |
1222 } | |
1223 | |
1224 addFile := func(repoDir, subPath, contents string) { | |
1225 assert.NoError(t, ioutil.WriteFile(path.Join(repoDir, subPath),
[]byte(contents), os.ModePerm)) | |
1226 run(repoDir, "git", "add", subPath) | |
1227 } | |
1228 | |
1229 repoName := "skia.git" | |
1230 repoDir := path.Join(workdir, repoName) | |
1231 | |
1232 assert.NoError(t, ioutil.WriteFile(path.Join(workdir, ".gclient"), []byt
e("dummy"), os.ModePerm)) | |
1233 | |
1234 assert.NoError(t, os.Mkdir(path.Join(workdir, repoName), os.ModePerm)) | |
1235 run(repoDir, "git", "init") | |
1236 run(repoDir, "git", "remote", "add", "origin", ".") | |
1237 | |
1238 infraBotsSubDir := path.Join("infra", "bots") | |
1239 infraBotsDir := path.Join(repoDir, infraBotsSubDir) | |
1240 assert.NoError(t, os.MkdirAll(infraBotsDir, os.ModePerm)) | |
1241 | |
1242 addFile(repoDir, "somefile.txt", "dummy3") | |
1243 addFile(repoDir, path.Join(infraBotsSubDir, "dummy.isolate"), `{ | |
1244 'variables': { | |
1245 'command': [ | |
1246 'python', 'recipes.py', 'run', | |
1247 ], | |
1248 'files': [ | |
1249 '../../somefile.txt', | |
1250 ], | |
1251 }, | |
1252 }`) | |
1253 | |
1254 // Create a single task in the config. | |
1255 taskName := "dummytask" | |
1256 cfg := &TasksCfg{ | |
1257 Tasks: map[string]*TaskSpec{ | |
1258 taskName: &TaskSpec{ | |
1259 CipdPackages: []*CipdPackage{}, | |
1260 Dependencies: []string{}, | |
1261 Dimensions: []string{"pool:Skia"}, | |
1262 Isolate: "dummy.isolate", | |
1263 Priority: 1.0, | |
1264 }, | |
1265 }, | |
1266 } | |
1267 f, err := os.Create(path.Join(repoDir, TASKS_CFG_FILE)) | |
1268 assert.NoError(t, err) | |
1269 assert.NoError(t, json.NewEncoder(f).Encode(&cfg)) | |
1270 assert.NoError(t, f.Close()) | |
1271 run(repoDir, "git", "add", TASKS_CFG_FILE) | |
1272 run(repoDir, "git", "commit", "-m", "Add more tasks!") | |
1273 run(repoDir, "git", "push", "origin", "master") | |
1274 run(repoDir, "git", "branch", "-u", "origin/master") | |
1275 | |
1276 // Setup the scheduler. | |
1277 repos := gitinfo.NewRepoMap(workdir) | |
1278 repo, err := repos.Repo(repoName) | |
1279 assert.NoError(t, err) | |
1280 d := db.NewInMemoryDB() | |
1281 cache, err := db.NewTaskCache(d, time.Hour) | |
1282 assert.NoError(t, err) | |
1283 isolateClient, err := isolate.NewClient(workdir) | |
1284 assert.NoError(t, err) | |
1285 isolateClient.ServerUrl = isolate.FAKE_SERVER_URL | |
1286 swarmingClient := swarming.NewTestClient() | |
1287 s, err := NewTaskScheduler(d, cache, time.Duration(math.MaxInt64), workd
ir, []string{repoName}, isolateClient, swarmingClient) | |
1288 assert.NoError(t, err) | |
1289 | |
1290 mockTasks := []*swarming_api.SwarmingRpcsTaskRequestMetadata{} | |
1291 mock := func(task *db.Task) { | |
1292 mockTasks = append(mockTasks, makeSwarmingRpcsTaskRequestMetadat
a(t, task)) | |
1293 swarmingClient.MockTasks(mockTasks) | |
1294 } | |
1295 | |
1296 // Cycle once. | |
1297 bot1 := makeBot("bot1", map[string]string{"pool": "Skia"}) | |
1298 swarmingClient.MockBots([]*swarming_api.SwarmingRpcsBotInfo{bot1}) | |
1299 assert.NoError(t, s.MainLoop()) | |
1300 assert.Equal(t, 0, len(s.queue)) | |
1301 head, err := repo.FullHash("HEAD") | |
1302 assert.NoError(t, err) | |
1303 tasks, err := cache.GetTasksForCommits(repoName, []string{head}) | |
1304 assert.NoError(t, err) | |
1305 assert.Equal(t, 1, len(tasks[head])) | |
1306 mock(tasks[head][taskName]) | |
1307 | |
1308 // Add some commits to the repo. | |
1309 makeDummyCommits(t, repoDir, 8) | |
1310 commits, err := repo.RevList(fmt.Sprintf("%s..HEAD", head)) | |
1311 assert.Nil(t, err) | |
1312 assert.Equal(t, 8, len(commits)) | |
1313 | |
1314 // Trigger builds simultaneously. | |
1315 bot2 := makeBot("bot2", map[string]string{"pool": "Skia"}) | |
1316 bot3 := makeBot("bot3", map[string]string{"pool": "Skia"}) | |
1317 swarmingClient.MockBots([]*swarming_api.SwarmingRpcsBotInfo{bot1, bot2,
bot3}) | |
1318 assert.NoError(t, s.MainLoop()) | |
1319 assert.Equal(t, 5, len(s.queue)) | |
1320 tasks, err = cache.GetTasksForCommits(repoName, commits) | |
1321 assert.NoError(t, err) | |
1322 | |
1323 // If we're queueing correctly, we should've triggered tasks at | |
1324 // commits[0], commits[4], and either commits[2] or commits[6]. | |
1325 var t1, t2, t3 *db.Task | |
1326 for _, byName := range tasks { | |
1327 for _, task := range byName { | |
1328 if task.Revision == commits[0] { | |
1329 t1 = task | |
1330 } else if task.Revision == commits[4] { | |
1331 t2 = task | |
1332 } else if task.Revision == commits[2] || task.Revision =
= commits[6] { | |
1333 t3 = task | |
1334 } else { | |
1335 assert.FailNow(t, fmt.Sprintf("Task has unknown
revision: %v", task)) | |
1336 } | |
1337 } | |
1338 } | |
1339 assert.NotNil(t, t1) | |
1340 assert.NotNil(t, t2) | |
1341 assert.NotNil(t, t3) | |
1342 mock(t1) | |
1343 mock(t2) | |
1344 mock(t3) | |
1345 | |
1346 // Ensure that we got the blamelists right. | |
1347 mkCopy := func(orig []string) []string { | |
1348 rv := make([]string, len(orig)) | |
1349 copy(rv, orig) | |
1350 return rv | |
1351 } | |
1352 var expect1, expect2, expect3 []string | |
1353 if t3.Revision == commits[2] { | |
1354 expect1 = mkCopy(commits[:2]) | |
1355 expect2 = mkCopy(commits[4:]) | |
1356 expect3 = mkCopy(commits[2:4]) | |
1357 } else { | |
1358 expect1 = mkCopy(commits[:4]) | |
1359 expect2 = mkCopy(commits[4:6]) | |
1360 expect3 = mkCopy(commits[6:]) | |
1361 } | |
1362 sort.Strings(expect1) | |
1363 sort.Strings(expect2) | |
1364 sort.Strings(expect3) | |
1365 sort.Strings(t1.Commits) | |
1366 sort.Strings(t2.Commits) | |
1367 sort.Strings(t3.Commits) | |
1368 testutils.AssertDeepEqual(t, expect1, t1.Commits) | |
1369 testutils.AssertDeepEqual(t, expect2, t2.Commits) | |
1370 testutils.AssertDeepEqual(t, expect3, t3.Commits) | |
1371 | |
1372 // Just for good measure, check the task at the head of the queue. | |
1373 expectIdx := 2 | |
1374 if t3.Revision == commits[expectIdx] { | |
1375 expectIdx = 6 | |
1376 } | |
1377 assert.Equal(t, commits[expectIdx], s.queue[0].Revision) | |
1378 | |
1379 // Run again with 5 bots to check the case where we bisect the same | |
1380 // task twice. | |
1381 bot4 := makeBot("bot4", map[string]string{"pool": "Skia"}) | |
1382 bot5 := makeBot("bot5", map[string]string{"pool": "Skia"}) | |
1383 swarmingClient.MockBots([]*swarming_api.SwarmingRpcsBotInfo{bot1, bot2,
bot3, bot4, bot5}) | |
1384 assert.NoError(t, s.MainLoop()) | |
1385 assert.Equal(t, 0, len(s.queue)) | |
1386 tasks, err = cache.GetTasksForCommits(repoName, commits) | |
1387 assert.NoError(t, err) | |
1388 for _, byName := range tasks { | |
1389 for _, task := range byName { | |
1390 assert.Equal(t, 1, len(task.Commits)) | |
1391 } | |
1392 } | |
1393 } | |
OLD | NEW |