OLD | NEW |
| (Empty) |
1 package db | |
2 | |
3 import ( | |
4 "bytes" | |
5 "encoding/gob" | |
6 "errors" | |
7 "fmt" | |
8 "reflect" | |
9 "sync" | |
10 "time" | |
11 | |
12 "go.skia.org/infra/go/swarming" | |
13 "go.skia.org/infra/go/util" | |
14 | |
15 swarming_api "github.com/luci/luci-go/common/api/swarming/swarming/v1" | |
16 "github.com/skia-dev/glog" | |
17 ) | |
18 | |
19 const ( | |
20 // Swarming task states. | |
21 SWARMING_STATE_BOT_DIED = "BOT_DIED" | |
22 SWARMING_STATE_CANCELED = "CANCELED" | |
23 SWARMING_STATE_COMPLETED = "COMPLETED" | |
24 SWARMING_STATE_EXPIRED = "EXPIRED" | |
25 SWARMING_STATE_PENDING = "PENDING" | |
26 SWARMING_STATE_RUNNING = "RUNNING" | |
27 SWARMING_STATE_TIMED_OUT = "TIMED_OUT" | |
28 | |
29 // Swarming tags added by Build Scheduler. | |
30 SWARMING_TAG_ALLOW_MILO = "allow_milo" | |
31 SWARMING_TAG_ID = "scheduler_id" | |
32 SWARMING_TAG_NAME = "name" | |
33 SWARMING_TAG_PRIORITY = "priority" | |
34 SWARMING_TAG_REPO = "repo" | |
35 SWARMING_TAG_REVISION = "revision" | |
36 ) | |
37 | |
38 type TaskStatus string | |
39 | |
40 const ( | |
41 // TASK_STATUS_PENDING indicates the task has not started. It is the emp
ty | |
42 // string so that it is the zero value of TaskStatus. | |
43 TASK_STATUS_PENDING TaskStatus = "" | |
44 // TASK_STATUS_RUNNING indicates the task is in progress. | |
45 TASK_STATUS_RUNNING TaskStatus = "RUNNING" | |
46 // TASK_STATUS_SUCCESS indicates the task completed successfully. | |
47 TASK_STATUS_SUCCESS TaskStatus = "SUCCESS" | |
48 // TASK_STATUS_FAILURE indicates the task completed with failures. | |
49 TASK_STATUS_FAILURE TaskStatus = "FAILURE" | |
50 // TASK_STATUS_MISHAP indicates the task exited early with an error, die
d | |
51 // while in progress, was manually canceled, expired while waiting on th
e | |
52 // queue, or timed out before completing. | |
53 TASK_STATUS_MISHAP TaskStatus = "MISHAP" | |
54 ) | |
55 | |
56 // Task describes a Swarming task generated from a TaskSpec, or a "fake" task | |
57 // that can not be executed on Swarming, but can be added to the DB and | |
58 // displayed as if it were a real TaskSpec. | |
59 // | |
60 // Task is stored as a GOB, so changes must maintain backwards compatibility. | |
61 // See gob package documentation for details, but generally: | |
62 // - Ensure new fields can be initialized with their zero value. | |
63 // - Do not change the type of any existing field. | |
64 // - Leave removed fields commented out to ensure the field name is not | |
65 // reused. | |
66 type Task struct { | |
67 // Commits are the commits which were tested in this Task. The list may | |
68 // change due to backfilling/bisecting. | |
69 Commits []string | |
70 | |
71 // Created is the creation timestamp. | |
72 Created time.Time | |
73 | |
74 // DbModified is the time of the last successful call to DB.PutTask/s fo
r this | |
75 // Task, or zero if the task is new. It is not related to the ModifiedTs
time | |
76 // of the associated Swarming task. | |
77 DbModified time.Time | |
78 | |
79 // Finished is the time the task stopped running or expired from the que
ue, or | |
80 // zero if the task is pending or running. | |
81 Finished time.Time | |
82 | |
83 // Id is a generated unique identifier for this Task instance. Must be | |
84 // URL-safe. | |
85 Id string | |
86 | |
87 // IsolatedOutput is the isolated hash of any outputs produced by this T
ask. | |
88 // Filled in when the task is completed. We assume the isolate server is | |
89 // isolate.ISOLATE_SERVER_URL and the namespace is isolate.DEFAULT_NAMES
PACE. | |
90 // This field will not be set if the Task does not correspond to a Swarm
ing | |
91 // task. | |
92 IsolatedOutput string | |
93 | |
94 // Name is a human-friendly descriptive name for this Task. All Tasks | |
95 // generated from the same TaskSpec have the same name. | |
96 Name string | |
97 | |
98 // Repo is the repository of the commit at which this task ran. | |
99 Repo string | |
100 | |
101 // Revision is the commit at which this task ran. | |
102 Revision string | |
103 | |
104 // Started is the time the task started running, or zero if the task is | |
105 // pending, or the same as Finished if the task never ran. | |
106 Started time.Time | |
107 | |
108 // Status is the current task status, default TASK_STATUS_PENDING. | |
109 Status TaskStatus | |
110 | |
111 // SwarmingTaskId is the Swarming task ID. This field will not be set if
the | |
112 // Task does not correspond to a Swarming task. | |
113 SwarmingTaskId string | |
114 } | |
115 | |
116 // UpdateFromSwarming sets or initializes t from data in s. If any changes were | |
117 // made to t, returns true. | |
118 // | |
119 // If empty, sets t.Id, t.Name, t.Repo, and t.Revision from s's tags named | |
120 // SWARMING_TAG_ID, SWARMING_TAG_NAME, SWARMING_TAG_REPO, and | |
121 // SWARMING_TAG_REVISION, sets t.Created from s.CreatedTs, and sets | |
122 // t.SwarmingTaskId from s.TaskId. If these fields are non-empty, returns an | |
123 // error if they do not match. | |
124 // | |
125 // Always sets t.Status, t.Started, t.Finished, and t.IsolatedOutput based on s. | |
126 func (orig *Task) UpdateFromSwarming(s *swarming_api.SwarmingRpcsTaskResult) (bo
ol, error) { | |
127 if s == nil { | |
128 return false, fmt.Errorf("Missing TaskResult. %v", s) | |
129 } | |
130 tags, err := swarming.TagValues(s) | |
131 if err != nil { | |
132 return false, err | |
133 } | |
134 | |
135 copy := orig.Copy() | |
136 if !reflect.DeepEqual(orig, copy) { | |
137 glog.Fatalf("Task.Copy is broken; original and copy differ:\n%#v
\n%#v", orig, copy) | |
138 } | |
139 | |
140 // "Identity" fields stored in tags. | |
141 checkOrSetFromTag := func(tagName string, field *string, fieldName strin
g) error { | |
142 if tagValue, ok := tags[tagName]; ok { | |
143 if *field == "" { | |
144 *field = tagValue | |
145 } else if *field != tagValue { | |
146 return fmt.Errorf("%s does not match for task %s
. Was %s, now %s. %v %v", fieldName, orig.Id, *field, tagValue, orig, s) | |
147 } | |
148 } | |
149 return nil | |
150 } | |
151 if err := checkOrSetFromTag(SWARMING_TAG_ID, ©.Id, "Id"); err != nil
{ | |
152 return false, err | |
153 } | |
154 if err := checkOrSetFromTag(SWARMING_TAG_NAME, ©.Name, "Name"); err
!= nil { | |
155 return false, err | |
156 } | |
157 if err := checkOrSetFromTag(SWARMING_TAG_REPO, ©.Repo, "Repo"); err
!= nil { | |
158 return false, err | |
159 } | |
160 if err := checkOrSetFromTag(SWARMING_TAG_REVISION, ©.Revision, "Revi
sion"); err != nil { | |
161 return false, err | |
162 } | |
163 | |
164 // CreatedTs should always be present. | |
165 if sCreated, err := swarming.ParseTimestamp(s.CreatedTs); err == nil { | |
166 if util.TimeIsZero(copy.Created) { | |
167 copy.Created = sCreated | |
168 } else if copy.Created != sCreated { | |
169 return false, fmt.Errorf("Creation time has changed for
task %s. Was %s, now %s. %v", orig.Id, orig.Created, sCreated, orig) | |
170 } | |
171 } else { | |
172 return false, fmt.Errorf("Unable to parse task creation time for
task %s. %v %v", orig.Id, err, orig) | |
173 } | |
174 | |
175 // Swarming TaskId. | |
176 if copy.SwarmingTaskId == "" { | |
177 copy.SwarmingTaskId = s.TaskId | |
178 } else if copy.SwarmingTaskId != s.TaskId { | |
179 return false, fmt.Errorf("Swarming task ID does not match for ta
sk %s. Was %s, now %s. %v", orig.Id, orig.SwarmingTaskId, s.TaskId, orig) | |
180 } | |
181 | |
182 // Status. | |
183 switch s.State { | |
184 case SWARMING_STATE_BOT_DIED, SWARMING_STATE_CANCELED, SWARMING_STATE_EX
PIRED, SWARMING_STATE_TIMED_OUT: | |
185 copy.Status = TASK_STATUS_MISHAP | |
186 case SWARMING_STATE_PENDING: | |
187 copy.Status = TASK_STATUS_PENDING | |
188 case SWARMING_STATE_RUNNING: | |
189 copy.Status = TASK_STATUS_RUNNING | |
190 case SWARMING_STATE_COMPLETED: | |
191 if s.Failure { | |
192 // TODO(benjaminwagner): Choose FAILURE or MISHAP depend
ing on ExitCode? | |
193 copy.Status = TASK_STATUS_FAILURE | |
194 } else { | |
195 copy.Status = TASK_STATUS_SUCCESS | |
196 } | |
197 default: | |
198 return false, fmt.Errorf("Unknown Swarming State %v in %v", s.St
ate, s) | |
199 } | |
200 | |
201 // Isolated output. | |
202 if s.OutputsRef == nil { | |
203 copy.IsolatedOutput = "" | |
204 } else { | |
205 copy.IsolatedOutput = s.OutputsRef.Isolated | |
206 } | |
207 | |
208 // Timestamps. | |
209 maybeUpdateTime := func(newTimeStr string, field *time.Time, name string
) error { | |
210 if newTimeStr == "" { | |
211 return nil | |
212 } | |
213 newTime, err := swarming.ParseTimestamp(newTimeStr) | |
214 if err != nil { | |
215 return fmt.Errorf("Unable to parse %s for task %s. %v %v
", name, orig.Id, err, s) | |
216 } | |
217 *field = newTime | |
218 return nil | |
219 } | |
220 | |
221 if err := maybeUpdateTime(s.StartedTs, ©.Started, "StartedTs"); err
!= nil { | |
222 return false, err | |
223 } | |
224 if err := maybeUpdateTime(s.CompletedTs, ©.Finished, "CompletedTs");
err != nil { | |
225 return false, err | |
226 } | |
227 if s.CompletedTs == "" && copy.Status == TASK_STATUS_MISHAP { | |
228 if err := maybeUpdateTime(s.AbandonedTs, ©.Finished, "Abando
nedTs"); err != nil { | |
229 return false, err | |
230 } | |
231 } | |
232 if copy.Done() && util.TimeIsZero(copy.Started) { | |
233 copy.Started = copy.Finished | |
234 } | |
235 | |
236 // TODO(benjaminwagner): SwarmingRpcsTaskResult has a ModifiedTs field t
hat we | |
237 // could use to detect modifications. Unfortunately, it seems that while
the | |
238 // task is running, ModifiedTs gets updated every 30 seconds, regardless
of | |
239 // whether any other data actually changed. Maybe we could still use it
for | |
240 // pending or completed tasks. | |
241 if !reflect.DeepEqual(orig, copy) { | |
242 *orig = *copy | |
243 return true, nil | |
244 } | |
245 return false, nil | |
246 } | |
247 | |
248 var errNotModified = errors.New("Task not modified") | |
249 | |
250 // UpdateDBFromSwarmingTask updates a task in db from data in s. | |
251 func UpdateDBFromSwarmingTask(db DB, s *swarming_api.SwarmingRpcsTaskResult) err
or { | |
252 id, err := swarming.GetTagValue(s, SWARMING_TAG_ID) | |
253 if err != nil { | |
254 return err | |
255 } | |
256 _, err = UpdateTaskWithRetries(db, id, func(task *Task) error { | |
257 modified, err := task.UpdateFromSwarming(s) | |
258 if err != nil { | |
259 return err | |
260 } | |
261 if !modified { | |
262 return errNotModified | |
263 } | |
264 return nil | |
265 }) | |
266 if err == errNotModified { | |
267 return nil | |
268 } else { | |
269 return err | |
270 } | |
271 } | |
272 | |
273 func (t *Task) Done() bool { | |
274 return t.Status != TASK_STATUS_PENDING && t.Status != TASK_STATUS_RUNNIN
G | |
275 } | |
276 | |
277 func (t *Task) Success() bool { | |
278 return t.Status == TASK_STATUS_SUCCESS | |
279 } | |
280 | |
281 func (t *Task) Copy() *Task { | |
282 var commits []string | |
283 if t.Commits != nil { | |
284 commits = make([]string, len(t.Commits)) | |
285 copy(commits, t.Commits) | |
286 } | |
287 return &Task{ | |
288 Commits: commits, | |
289 Created: t.Created, | |
290 DbModified: t.DbModified, | |
291 Finished: t.Finished, | |
292 Id: t.Id, | |
293 IsolatedOutput: t.IsolatedOutput, | |
294 Name: t.Name, | |
295 Repo: t.Repo, | |
296 Revision: t.Revision, | |
297 Started: t.Started, | |
298 Status: t.Status, | |
299 SwarmingTaskId: t.SwarmingTaskId, | |
300 } | |
301 } | |
302 | |
303 // TaskSlice implements sort.Interface. To sort tasks []*Task, use | |
304 // sort.Sort(TaskSlice(tasks)). | |
305 type TaskSlice []*Task | |
306 | |
307 func (s TaskSlice) Len() int { return len(s) } | |
308 | |
309 func (s TaskSlice) Less(i, j int) bool { | |
310 return s[i].Created.Before(s[j].Created) | |
311 } | |
312 | |
313 func (s TaskSlice) Swap(i, j int) { | |
314 s[i], s[j] = s[j], s[i] | |
315 } | |
316 | |
317 // TaskEncoder encodes Tasks into bytes via GOB encoding. Not safe for | |
318 // concurrent use. | |
319 // TODO(benjaminwagner): Encode in parallel. | |
320 type TaskEncoder struct { | |
321 err error | |
322 tasks []*Task | |
323 result [][]byte | |
324 } | |
325 | |
326 // Process encodes the Task into a byte slice that will be returned from Next() | |
327 // (in arbitrary order). Returns false if Next is certain to return an error. | |
328 // Caller must ensure t does not change until after the first call to Next(). | |
329 // May not be called after calling Next(). | |
330 func (e *TaskEncoder) Process(t *Task) bool { | |
331 if e.err != nil { | |
332 return false | |
333 } | |
334 var buf bytes.Buffer | |
335 if err := gob.NewEncoder(&buf).Encode(t); err != nil { | |
336 e.err = err | |
337 e.tasks = nil | |
338 e.result = nil | |
339 return false | |
340 } | |
341 e.tasks = append(e.tasks, t) | |
342 e.result = append(e.result, buf.Bytes()) | |
343 return true | |
344 } | |
345 | |
346 // Next returns one of the Tasks provided to Process (in arbitrary order) and | |
347 // its serialized bytes. If any tasks remain, returns the task, the serialized | |
348 // bytes, nil. If all tasks have been returned, returns nil, nil, nil. If an | |
349 // error is encountered, returns nil, nil, error. | |
350 func (e *TaskEncoder) Next() (*Task, []byte, error) { | |
351 if e.err != nil { | |
352 return nil, nil, e.err | |
353 } | |
354 if len(e.tasks) == 0 { | |
355 return nil, nil, nil | |
356 } | |
357 t := e.tasks[0] | |
358 e.tasks = e.tasks[1:] | |
359 serialized := e.result[0] | |
360 e.result = e.result[1:] | |
361 return t, serialized, nil | |
362 } | |
363 | |
364 // TaskDecoder decodes bytes into Tasks via GOB decoding. Not safe for | |
365 // concurrent use. | |
366 type TaskDecoder struct { | |
367 // input contains the incoming byte slices. Process() sends on this chan
nel, | |
368 // decode() receives from it, and Result() closes it. | |
369 input chan []byte | |
370 // output contains decoded Tasks. decode() sends on this channel, collec
t() | |
371 // receives from it, and run() closes it when all decode() goroutines ha
ve | |
372 // finished. | |
373 output chan *Task | |
374 // result contains the return value of Result(). collect() sends a singl
e | |
375 // value on this channel and closes it. Result() receives from it. | |
376 result chan []*Task | |
377 // errors contains the first error from any goroutine. It's a channel in
case | |
378 // multiple goroutines experience an error at the same time. | |
379 errors chan error | |
380 } | |
381 | |
382 const kNumDecoderGoroutines = 10 | |
383 | |
384 // init initializes d if it has not been initialized. May not be called concurre
ntly. | |
385 func (d *TaskDecoder) init() { | |
386 if d.input == nil { | |
387 d.input = make(chan []byte, kNumDecoderGoroutines*2) | |
388 d.output = make(chan *Task, kNumDecoderGoroutines) | |
389 d.result = make(chan []*Task, 1) | |
390 d.errors = make(chan error, kNumDecoderGoroutines) | |
391 go d.run() | |
392 go d.collect() | |
393 } | |
394 } | |
395 | |
396 // run starts the decode goroutines and closes d.output when they finish. | |
397 func (d *TaskDecoder) run() { | |
398 // Start decoders. | |
399 wg := sync.WaitGroup{} | |
400 for i := 0; i < kNumDecoderGoroutines; i++ { | |
401 wg.Add(1) | |
402 go d.decode(&wg) | |
403 } | |
404 // Wait for decoders to exit. | |
405 wg.Wait() | |
406 // Drain d.input in the case that errors were encountered, to avoid dead
lock. | |
407 for _ = range d.input { | |
408 } | |
409 close(d.output) | |
410 } | |
411 | |
412 // decode receives from d.input and sends to d.output until d.input is closed or | |
413 // d.errors is non-empty. Decrements wg when done. | |
414 func (d *TaskDecoder) decode(wg *sync.WaitGroup) { | |
415 for b := range d.input { | |
416 var t Task | |
417 if err := gob.NewDecoder(bytes.NewReader(b)).Decode(&t); err !=
nil { | |
418 d.errors <- err | |
419 break | |
420 } | |
421 d.output <- &t | |
422 if len(d.errors) > 0 { | |
423 break | |
424 } | |
425 } | |
426 wg.Done() | |
427 } | |
428 | |
429 // collect receives from d.output until it is closed, then sends on d.result. | |
430 func (d *TaskDecoder) collect() { | |
431 result := []*Task{} | |
432 for t := range d.output { | |
433 result = append(result, t) | |
434 } | |
435 d.result <- result | |
436 close(d.result) | |
437 } | |
438 | |
439 // Process decodes the byte slice into a Task and includes it in Result() (in | |
440 // arbitrary order). Returns false if Result is certain to return an error. | |
441 // Caller must ensure b does not change until after Result() returns. | |
442 func (d *TaskDecoder) Process(b []byte) bool { | |
443 d.init() | |
444 d.input <- b | |
445 return len(d.errors) == 0 | |
446 } | |
447 | |
448 // Result returns all decoded Tasks provided to Process (in arbitrary order), or | |
449 // any error encountered. | |
450 func (d *TaskDecoder) Result() ([]*Task, error) { | |
451 // Allow TaskDecoder to be used without initialization. | |
452 if d.result == nil { | |
453 return []*Task{}, nil | |
454 } | |
455 close(d.input) | |
456 select { | |
457 case err := <-d.errors: | |
458 return nil, err | |
459 case result := <-d.result: | |
460 return result, nil | |
461 } | |
462 } | |
463 | |
464 // TagsForTask returns the tags which should be set for a Task. | |
465 func TagsForTask(name, id string, priority float64, repo, revision string, dimen
sions map[string]string) []string { | |
466 tags := map[string]string{ | |
467 SWARMING_TAG_ALLOW_MILO: "1", | |
468 SWARMING_TAG_NAME: name, | |
469 SWARMING_TAG_ID: id, | |
470 SWARMING_TAG_PRIORITY: fmt.Sprintf("%f", priority), | |
471 SWARMING_TAG_REPO: repo, | |
472 SWARMING_TAG_REVISION: revision, | |
473 } | |
474 | |
475 for k, v := range dimensions { | |
476 if _, ok := tags[k]; !ok { | |
477 tags[k] = v | |
478 } else { | |
479 glog.Warningf("Duplicate dimension/tag %q.", k) | |
480 } | |
481 } | |
482 | |
483 tagsList := make([]string, 0, len(tags)) | |
484 for k, v := range tags { | |
485 tagsList = append(tagsList, fmt.Sprintf("%s:%s", k, v)) | |
486 } | |
487 return tagsList | |
488 } | |
OLD | NEW |