Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(634)

Side by Side Diff: build_scheduler/go/db/task_test.go

Issue 2296763008: [task scheduler] Move files from build_scheduler/ to task_scheduler/ (Closed) Base URL: https://skia.googlesource.com/buildbot@master
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « build_scheduler/go/db/task.go ('k') | build_scheduler/go/db/testutil.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 package db
2
3 import (
4 "bytes"
5 "encoding/gob"
6 "fmt"
7 "sort"
8 "testing"
9 "time"
10
11 swarming_api "github.com/luci/luci-go/common/api/swarming/swarming/v1"
12 assert "github.com/stretchr/testify/require"
13
14 "go.skia.org/infra/go/swarming"
15 "go.skia.org/infra/go/testutils"
16 )
17
18 // Test that Task.UpdateFromSwarming returns an error when the input data is
19 // invalid.
20 func TestUpdateFromSwarmingInvalid(t *testing.T) {
21 now := time.Now().UTC().Round(time.Microsecond)
22 task := &Task{
23 Id: "A",
24 Name: "A",
25 Repo: "A",
26 Revision: "A",
27 Created: now,
28 Commits: []string{"A", "B"},
29 }
30 copy := task.Copy()
31
32 testError := func(s *swarming_api.SwarmingRpcsTaskResult, msg string) {
33 changed, err := task.UpdateFromSwarming(s)
34 assert.False(t, changed)
35 assert.Error(t, err)
36 assert.Contains(t, err.Error(), msg)
37 }
38
39 testError(nil, "Missing TaskResult")
40
41 testError(&swarming_api.SwarmingRpcsTaskResult{
42 CreatedTs: now.Format(swarming.TIMESTAMP_FORMAT),
43 State: SWARMING_STATE_COMPLETED,
44 Tags: []string{"invalid"},
45 }, "Invalid Swarming task tag")
46
47 testError(&swarming_api.SwarmingRpcsTaskResult{
48 CreatedTs: "20160817T142302.543490",
49 State: SWARMING_STATE_COMPLETED,
50 }, "Unable to parse task creation time")
51
52 testError(&swarming_api.SwarmingRpcsTaskResult{
53 CreatedTs: now.Format(swarming.TIMESTAMP_FORMAT),
54 State: SWARMING_STATE_COMPLETED,
55 StartedTs: "20160817T142302.543490",
56 }, "Unable to parse StartedTs")
57
58 testError(&swarming_api.SwarmingRpcsTaskResult{
59 CreatedTs: now.Format(swarming.TIMESTAMP_FORMAT),
60 State: SWARMING_STATE_COMPLETED,
61 CompletedTs: "20160817T142302.543490",
62 }, "Unable to parse CompletedTs")
63
64 testError(&swarming_api.SwarmingRpcsTaskResult{
65 CreatedTs: now.Format(swarming.TIMESTAMP_FORMAT),
66 State: SWARMING_STATE_EXPIRED,
67 AbandonedTs: "20160817T142302.543490",
68 }, "Unable to parse AbandonedTs")
69
70 // Unchanged.
71 testutils.AssertDeepEqual(t, task, copy)
72 }
73
74 // Test that Task.UpdateFromSwarming returns an error when the task "identity"
75 // fields do not match.
76 func TestUpdateFromSwarmingMismatched(t *testing.T) {
77 now := time.Now().UTC().Round(time.Microsecond)
78 task := &Task{
79 Id: "A",
80 Name: "A",
81 Repo: "A",
82 Revision: "A",
83 Created: now,
84 Commits: []string{"A", "B"},
85 SwarmingTaskId: "A",
86 }
87 copy := task.Copy()
88
89 testError := func(s *swarming_api.SwarmingRpcsTaskResult, msg string) {
90 changed, err := task.UpdateFromSwarming(s)
91 assert.False(t, changed)
92 assert.Error(t, err)
93 assert.Contains(t, err.Error(), msg)
94 }
95
96 s := &swarming_api.SwarmingRpcsTaskResult{
97 TaskId: "A",
98 CreatedTs: now.Format(swarming.TIMESTAMP_FORMAT),
99 Failure: false,
100 State: SWARMING_STATE_COMPLETED,
101 Tags: []string{
102 fmt.Sprintf("%s:B", SWARMING_TAG_ID),
103 fmt.Sprintf("%s:A", SWARMING_TAG_NAME),
104 fmt.Sprintf("%s:A", SWARMING_TAG_REPO),
105 fmt.Sprintf("%s:A", SWARMING_TAG_REVISION),
106 },
107 }
108 testError(s, "Id does not match")
109
110 s.Tags[0] = fmt.Sprintf("%s:A", SWARMING_TAG_ID)
111 s.Tags[1] = fmt.Sprintf("%s:B", SWARMING_TAG_NAME)
112 testError(s, "Name does not match")
113
114 s.Tags[1] = fmt.Sprintf("%s:A", SWARMING_TAG_NAME)
115 s.Tags[2] = fmt.Sprintf("%s:B", SWARMING_TAG_REPO)
116 testError(s, "Repo does not match")
117
118 s.Tags[2] = fmt.Sprintf("%s:A", SWARMING_TAG_REPO)
119 s.Tags[3] = fmt.Sprintf("%s:B", SWARMING_TAG_REVISION)
120 testError(s, "Revision does not match")
121
122 s.Tags[3] = fmt.Sprintf("%s:A", SWARMING_TAG_REVISION)
123 s.CreatedTs = now.Add(time.Hour).Format(swarming.TIMESTAMP_FORMAT)
124 testError(s, "Creation time has changed")
125
126 s.CreatedTs = now.Format(swarming.TIMESTAMP_FORMAT)
127 s.TaskId = "D"
128 testError(s, "Swarming task ID does not match")
129
130 // Unchanged.
131 testutils.AssertDeepEqual(t, task, copy)
132 }
133
134 // Test that Task.UpdateFromSwarming sets the expected fields in an empty Task.
135 func TestUpdateFromSwarmingInit(t *testing.T) {
136 now := time.Now().UTC().Round(time.Microsecond)
137 task1 := &Task{}
138 s := &swarming_api.SwarmingRpcsTaskResult{
139 TaskId: "E",
140 // Include both AbandonedTs and CompletedTs to test that Complet edTs takes
141 // precedence.
142 AbandonedTs: now.Add(-1 * time.Minute).Format(swarming.TIMESTAMP _FORMAT),
143 CreatedTs: now.Add(-3 * time.Hour).Format(swarming.TIMESTAMP_F ORMAT),
144 CompletedTs: now.Add(-2 * time.Minute).Format(swarming.TIMESTAMP _FORMAT),
145 Failure: false,
146 StartedTs: now.Add(-time.Hour).Format(swarming.TIMESTAMP_FORMA T),
147 State: SWARMING_STATE_COMPLETED,
148 Tags: []string{
149 fmt.Sprintf("%s:A", SWARMING_TAG_ID),
150 fmt.Sprintf("%s:B", SWARMING_TAG_NAME),
151 fmt.Sprintf("%s:C", SWARMING_TAG_REPO),
152 fmt.Sprintf("%s:D", SWARMING_TAG_REVISION),
153 },
154 OutputsRef: &swarming_api.SwarmingRpcsFilesRef{
155 Isolated: "F",
156 },
157 }
158 changed1, err1 := task1.UpdateFromSwarming(s)
159 assert.NoError(t, err1)
160 assert.True(t, changed1)
161 testutils.AssertDeepEqual(t, task1, &Task{
162 Id: "A",
163 Name: "B",
164 Repo: "C",
165 Revision: "D",
166 Created: now.Add(-3 * time.Hour),
167 Commits: nil,
168 Started: now.Add(-time.Hour),
169 Finished: now.Add(-2 * time.Minute),
170 Status: TASK_STATUS_SUCCESS,
171 SwarmingTaskId: "E",
172 IsolatedOutput: "F",
173 })
174
175 // Repeat to get Finished from AbandonedTs.
176 task2 := &Task{}
177 s.CompletedTs = ""
178 s.State = SWARMING_STATE_EXPIRED
179 changed2, err2 := task2.UpdateFromSwarming(s)
180 assert.NoError(t, err2)
181 assert.True(t, changed2)
182 testutils.AssertDeepEqual(t, task2, &Task{
183 Id: "A",
184 Name: "B",
185 Repo: "C",
186 Revision: "D",
187 Created: now.Add(-3 * time.Hour),
188 Commits: nil,
189 Started: now.Add(-time.Hour),
190 Finished: now.Add(-time.Minute),
191 Status: TASK_STATUS_MISHAP,
192 SwarmingTaskId: "E",
193 IsolatedOutput: "F",
194 })
195 }
196
197 // Test that Task.UpdateFromSwarming updates the expected fields in an existing
198 // Task.
199 func TestUpdateFromSwarmingUpdate(t *testing.T) {
200 now := time.Now().UTC().Round(time.Microsecond)
201 task := &Task{
202 Id: "A",
203 Name: "B",
204 Repo: "C",
205 Revision: "D",
206 Created: now.Add(-3 * time.Hour),
207 Commits: []string{"D", "Z"},
208 Started: now.Add(-2 * time.Hour),
209 Finished: now.Add(-1 * time.Hour),
210 Status: TASK_STATUS_SUCCESS,
211 SwarmingTaskId: "E",
212 IsolatedOutput: "F",
213 }
214 s := &swarming_api.SwarmingRpcsTaskResult{
215 TaskId: "E",
216 // Include both AbandonedTs and CompletedTs to test that Complet edTs takes
217 // precedence.
218 AbandonedTs: now.Add(-90 * time.Second).Format(swarming.TIMESTAM P_FORMAT),
219 CreatedTs: now.Add(-3 * time.Hour).Format(swarming.TIMESTAMP_F ORMAT),
220 CompletedTs: now.Add(-1 * time.Minute).Format(swarming.TIMESTAMP _FORMAT),
221 Failure: true,
222 StartedTs: now.Add(-2 * time.Minute).Format(swarming.TIMESTAMP _FORMAT),
223 State: SWARMING_STATE_COMPLETED,
224 Tags: []string{
225 fmt.Sprintf("%s:A", SWARMING_TAG_ID),
226 fmt.Sprintf("%s:B", SWARMING_TAG_NAME),
227 fmt.Sprintf("%s:C", SWARMING_TAG_REPO),
228 fmt.Sprintf("%s:D", SWARMING_TAG_REVISION),
229 },
230 OutputsRef: &swarming_api.SwarmingRpcsFilesRef{
231 Isolated: "G",
232 },
233 }
234 changed, err := task.UpdateFromSwarming(s)
235 assert.NoError(t, err)
236 assert.True(t, changed)
237 testutils.AssertDeepEqual(t, task, &Task{
238 Id: "A",
239 Name: "B",
240 Repo: "C",
241 Revision: "D",
242 Created: now.Add(-3 * time.Hour),
243 Commits: []string{"D", "Z"},
244 Started: now.Add(-2 * time.Minute),
245 Finished: now.Add(-1 * time.Minute),
246 Status: TASK_STATUS_FAILURE,
247 SwarmingTaskId: "E",
248 IsolatedOutput: "G",
249 })
250
251 // Make an unrelated change, no change to Task.
252 s.ModifiedTs = now.Format(swarming.TIMESTAMP_FORMAT)
253 changed, err = task.UpdateFromSwarming(s)
254 assert.NoError(t, err)
255 assert.False(t, changed)
256 testutils.AssertDeepEqual(t, task, &Task{
257 Id: "A",
258 Name: "B",
259 Repo: "C",
260 Revision: "D",
261 Created: now.Add(-3 * time.Hour),
262 Commits: []string{"D", "Z"},
263 Started: now.Add(-2 * time.Minute),
264 Finished: now.Add(-1 * time.Minute),
265 Status: TASK_STATUS_FAILURE,
266 SwarmingTaskId: "E",
267 IsolatedOutput: "G",
268 })
269
270 // Modify so that we get Finished from AbandonedTs.
271 s.CompletedTs = ""
272 s.State = SWARMING_STATE_EXPIRED
273 changed, err = task.UpdateFromSwarming(s)
274 assert.NoError(t, err)
275 assert.True(t, changed)
276 testutils.AssertDeepEqual(t, task, &Task{
277 Id: "A",
278 Name: "B",
279 Repo: "C",
280 Revision: "D",
281 Created: now.Add(-3 * time.Hour),
282 Commits: []string{"D", "Z"},
283 Started: now.Add(-2 * time.Minute),
284 Finished: now.Add(-90 * time.Second),
285 Status: TASK_STATUS_MISHAP,
286 SwarmingTaskId: "E",
287 IsolatedOutput: "G",
288 })
289 }
290
291 // Test that Task.UpdateFromSwarming updates the Status field correctly.
292 func TestUpdateFromSwarmingUpdateStatus(t *testing.T) {
293 now := time.Now().UTC().Round(time.Microsecond)
294
295 testUpdateStatus := func(s *swarming_api.SwarmingRpcsTaskResult, newStat us TaskStatus) {
296 task := &Task{
297 Id: "A",
298 Name: "B",
299 Repo: "C",
300 Revision: "D",
301 Created: now.Add(-3 * time.Hour),
302 Commits: []string{"D", "Z"},
303 Status: TASK_STATUS_SUCCESS,
304 SwarmingTaskId: "E",
305 }
306 changed, err := task.UpdateFromSwarming(s)
307 assert.NoError(t, err)
308 assert.True(t, changed)
309 testutils.AssertDeepEqual(t, task, &Task{
310 Id: "A",
311 Name: "B",
312 Repo: "C",
313 Revision: "D",
314 Created: now.Add(-3 * time.Hour),
315 Commits: []string{"D", "Z"},
316 Status: newStatus,
317 SwarmingTaskId: "E",
318 })
319 }
320
321 s := &swarming_api.SwarmingRpcsTaskResult{
322 TaskId: "E",
323 CreatedTs: now.Add(-3 * time.Hour).Format(swarming.TIMESTAMP_FOR MAT),
324 Failure: false,
325 State: SWARMING_STATE_PENDING,
326 Tags: []string{
327 fmt.Sprintf("%s:A", SWARMING_TAG_ID),
328 fmt.Sprintf("%s:B", SWARMING_TAG_NAME),
329 fmt.Sprintf("%s:C", SWARMING_TAG_REPO),
330 fmt.Sprintf("%s:D", SWARMING_TAG_REVISION),
331 },
332 OutputsRef: nil,
333 }
334
335 testUpdateStatus(s, TASK_STATUS_PENDING)
336
337 s.State = SWARMING_STATE_RUNNING
338 testUpdateStatus(s, TASK_STATUS_RUNNING)
339
340 for _, state := range []string{SWARMING_STATE_BOT_DIED, SWARMING_STATE_C ANCELED, SWARMING_STATE_EXPIRED, SWARMING_STATE_TIMED_OUT} {
341 s.State = state
342 testUpdateStatus(s, TASK_STATUS_MISHAP)
343 }
344
345 s.State = SWARMING_STATE_COMPLETED
346 s.Failure = true
347 testUpdateStatus(s, TASK_STATUS_FAILURE)
348 }
349
350 func TestUpdateDBFromSwarmingTask(t *testing.T) {
351 db := NewInMemoryDB()
352 defer testutils.AssertCloses(t, db)
353
354 // Create task, initialize from swarming, and save.
355 now := time.Now().UTC().Round(time.Microsecond)
356 task := &Task{
357 Name: "B",
358 Repo: "C",
359 Revision: "D",
360 Commits: []string{"D", "Z"},
361 Status: TASK_STATUS_PENDING,
362 }
363 assert.NoError(t, db.AssignId(task))
364
365 s := &swarming_api.SwarmingRpcsTaskResult{
366 TaskId: "E", // This is the Swarming TaskId.
367 CreatedTs: now.Add(time.Second).Format(swarming.TIMESTAMP_FORMAT ),
368 State: SWARMING_STATE_PENDING,
369 Tags: []string{
370 fmt.Sprintf("%s:%s", SWARMING_TAG_ID, task.Id),
371 fmt.Sprintf("%s:B", SWARMING_TAG_NAME),
372 fmt.Sprintf("%s:C", SWARMING_TAG_REPO),
373 fmt.Sprintf("%s:D", SWARMING_TAG_REVISION),
374 },
375 }
376 modified, err := task.UpdateFromSwarming(s)
377 assert.NoError(t, err)
378 assert.True(t, modified)
379 assert.NoError(t, db.PutTask(task))
380
381 // Get update from Swarming.
382 s.StartedTs = now.Add(time.Minute).Format(swarming.TIMESTAMP_FORMAT)
383 s.CompletedTs = now.Add(2 * time.Minute).Format(swarming.TIMESTAMP_FORMA T)
384 s.State = SWARMING_STATE_COMPLETED
385 s.Failure = true
386 s.OutputsRef = &swarming_api.SwarmingRpcsFilesRef{
387 Isolated: "G",
388 }
389
390 assert.NoError(t, UpdateDBFromSwarmingTask(db, s))
391
392 updatedTask, err := db.GetTaskById(task.Id)
393 assert.NoError(t, err)
394 testutils.AssertDeepEqual(t, updatedTask, &Task{
395 Id: task.Id,
396 Name: "B",
397 Repo: "C",
398 Revision: "D",
399 Created: now.Add(time.Second),
400 Commits: []string{"D", "Z"},
401 Started: now.Add(time.Minute),
402 Finished: now.Add(2 * time.Minute),
403 Status: TASK_STATUS_FAILURE,
404 SwarmingTaskId: "E",
405 IsolatedOutput: "G",
406 // Use value from updatedTask so they are deep-equal.
407 DbModified: updatedTask.DbModified,
408 })
409
410 lastDbModified := updatedTask.DbModified
411
412 // Make an unrelated change; assert no change to Task.
413 s.ModifiedTs = now.Format(swarming.TIMESTAMP_FORMAT)
414
415 assert.NoError(t, UpdateDBFromSwarmingTask(db, s))
416 updatedTask, err = db.GetTaskById(task.Id)
417 assert.NoError(t, err)
418 assert.True(t, lastDbModified.Equal(updatedTask.DbModified))
419 }
420
421 // Test that sort.Sort(TaskSlice(...)) works correctly.
422 func TestSort(t *testing.T) {
423 tasks := []*Task{}
424 addTask := func(ts time.Time) {
425 task := &Task{
426 Created: ts,
427 }
428 tasks = append(tasks, task)
429 }
430
431 // Add tasks with various creation timestamps.
432 addTask(time.Date(2008, time.August, 8, 8, 8, 8, 8, time.UTC)) // 0
433 addTask(time.Date(1776, time.July, 4, 13, 0, 0, 0, time.UTC)) // 1
434 addTask(time.Date(2016, time.December, 31, 23, 59, 59, 999999999, time.U TC)) // 2
435 addTask(time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)) // 3
436
437 // Manually sort.
438 expected := []*Task{tasks[1], tasks[3], tasks[0], tasks[2]}
439
440 sort.Sort(TaskSlice(tasks))
441
442 testutils.AssertDeepEqual(t, expected, tasks)
443 }
444
445 func TestTaskEncoder(t *testing.T) {
446 // TODO(benjaminwagner): Is there any way to cause an error?
447 e := TaskEncoder{}
448 expectedTasks := map[*Task][]byte{}
449 for i := 0; i < 25; i++ {
450 task := &Task{}
451 task.Id = fmt.Sprintf("Id-%d", i)
452 task.Name = "Bingo-was-his-name-o"
453 task.Commits = []string{fmt.Sprintf("a%d", i), fmt.Sprintf("b%d" , i+1)}
454 var buf bytes.Buffer
455 err := gob.NewEncoder(&buf).Encode(task)
456 assert.NoError(t, err)
457 expectedTasks[task] = buf.Bytes()
458 assert.True(t, e.Process(task))
459 }
460
461 actualTasks := map[*Task][]byte{}
462 for task, serialized, err := e.Next(); task != nil; task, serialized, er r = e.Next() {
463 assert.NoError(t, err)
464 actualTasks[task] = serialized
465 }
466 testutils.AssertDeepEqual(t, expectedTasks, actualTasks)
467 }
468
469 func TestTaskEncoderNoTasks(t *testing.T) {
470 e := TaskEncoder{}
471 task, serialized, err := e.Next()
472 assert.NoError(t, err)
473 assert.Nil(t, task)
474 assert.Nil(t, serialized)
475 }
476
477 func TestTaskDecoder(t *testing.T) {
478 d := TaskDecoder{}
479 expectedTasks := map[string]*Task{}
480 for i := 0; i < 250; i++ {
481 task := &Task{}
482 task.Id = fmt.Sprintf("Id-%d", i)
483 task.Name = "Bingo-was-his-name-o"
484 task.Commits = []string{fmt.Sprintf("a%d", i), fmt.Sprintf("b%d" , i+1)}
485 var buf bytes.Buffer
486 err := gob.NewEncoder(&buf).Encode(task)
487 assert.NoError(t, err)
488 expectedTasks[task.Id] = task
489 assert.True(t, d.Process(buf.Bytes()))
490 }
491
492 actualTasks := map[string]*Task{}
493 result, err := d.Result()
494 assert.NoError(t, err)
495 assert.Equal(t, len(expectedTasks), len(result))
496 for _, task := range result {
497 actualTasks[task.Id] = task
498 }
499 testutils.AssertDeepEqual(t, expectedTasks, actualTasks)
500 }
501
502 func TestTaskDecoderNoTasks(t *testing.T) {
503 d := TaskDecoder{}
504 result, err := d.Result()
505 assert.NoError(t, err)
506 assert.Equal(t, 0, len(result))
507 }
508
509 func TestTaskDecoderError(t *testing.T) {
510 task := &Task{}
511 task.Id = "Id"
512 var buf bytes.Buffer
513 err := gob.NewEncoder(&buf).Encode(task)
514 assert.NoError(t, err)
515 serialized := buf.Bytes()
516 invalid := append([]byte("Hi Mom!"), serialized...)
517
518 d := TaskDecoder{}
519 // Process should return true before it encounters an invalid result.
520 assert.True(t, d.Process(serialized))
521 assert.True(t, d.Process(serialized))
522 // Process may return true or false after encountering an invalid value.
523 _ = d.Process(invalid)
524 for i := 0; i < 250; i++ {
525 _ = d.Process(serialized)
526 }
527
528 // Result should return error.
529 result, err := d.Result()
530 assert.Error(t, err)
531 assert.Equal(t, 0, len(result))
532 }
OLDNEW
« no previous file with comments | « build_scheduler/go/db/task.go ('k') | build_scheduler/go/db/testutil.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698