Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(59)

Side by Side Diff: build_scheduler/go/db/task.go

Issue 2296763008: [task scheduler] Move files from build_scheduler/ to task_scheduler/ (Closed) Base URL: https://skia.googlesource.com/buildbot@master
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « build_scheduler/go/db/modified_tasks_test.go ('k') | build_scheduler/go/db/task_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 package db
2
3 import (
4 "bytes"
5 "encoding/gob"
6 "errors"
7 "fmt"
8 "reflect"
9 "sync"
10 "time"
11
12 "go.skia.org/infra/go/swarming"
13 "go.skia.org/infra/go/util"
14
15 swarming_api "github.com/luci/luci-go/common/api/swarming/swarming/v1"
16 "github.com/skia-dev/glog"
17 )
18
19 const (
20 // Swarming task states.
21 SWARMING_STATE_BOT_DIED = "BOT_DIED"
22 SWARMING_STATE_CANCELED = "CANCELED"
23 SWARMING_STATE_COMPLETED = "COMPLETED"
24 SWARMING_STATE_EXPIRED = "EXPIRED"
25 SWARMING_STATE_PENDING = "PENDING"
26 SWARMING_STATE_RUNNING = "RUNNING"
27 SWARMING_STATE_TIMED_OUT = "TIMED_OUT"
28
29 // Swarming tags added by Build Scheduler.
30 SWARMING_TAG_ALLOW_MILO = "allow_milo"
31 SWARMING_TAG_ID = "scheduler_id"
32 SWARMING_TAG_NAME = "name"
33 SWARMING_TAG_PRIORITY = "priority"
34 SWARMING_TAG_REPO = "repo"
35 SWARMING_TAG_REVISION = "revision"
36 )
37
38 type TaskStatus string
39
40 const (
41 // TASK_STATUS_PENDING indicates the task has not started. It is the emp ty
42 // string so that it is the zero value of TaskStatus.
43 TASK_STATUS_PENDING TaskStatus = ""
44 // TASK_STATUS_RUNNING indicates the task is in progress.
45 TASK_STATUS_RUNNING TaskStatus = "RUNNING"
46 // TASK_STATUS_SUCCESS indicates the task completed successfully.
47 TASK_STATUS_SUCCESS TaskStatus = "SUCCESS"
48 // TASK_STATUS_FAILURE indicates the task completed with failures.
49 TASK_STATUS_FAILURE TaskStatus = "FAILURE"
50 // TASK_STATUS_MISHAP indicates the task exited early with an error, die d
51 // while in progress, was manually canceled, expired while waiting on th e
52 // queue, or timed out before completing.
53 TASK_STATUS_MISHAP TaskStatus = "MISHAP"
54 )
55
56 // Task describes a Swarming task generated from a TaskSpec, or a "fake" task
57 // that can not be executed on Swarming, but can be added to the DB and
58 // displayed as if it were a real TaskSpec.
59 //
60 // Task is stored as a GOB, so changes must maintain backwards compatibility.
61 // See gob package documentation for details, but generally:
62 // - Ensure new fields can be initialized with their zero value.
63 // - Do not change the type of any existing field.
64 // - Leave removed fields commented out to ensure the field name is not
65 // reused.
66 type Task struct {
67 // Commits are the commits which were tested in this Task. The list may
68 // change due to backfilling/bisecting.
69 Commits []string
70
71 // Created is the creation timestamp.
72 Created time.Time
73
74 // DbModified is the time of the last successful call to DB.PutTask/s fo r this
75 // Task, or zero if the task is new. It is not related to the ModifiedTs time
76 // of the associated Swarming task.
77 DbModified time.Time
78
79 // Finished is the time the task stopped running or expired from the que ue, or
80 // zero if the task is pending or running.
81 Finished time.Time
82
83 // Id is a generated unique identifier for this Task instance. Must be
84 // URL-safe.
85 Id string
86
87 // IsolatedOutput is the isolated hash of any outputs produced by this T ask.
88 // Filled in when the task is completed. We assume the isolate server is
89 // isolate.ISOLATE_SERVER_URL and the namespace is isolate.DEFAULT_NAMES PACE.
90 // This field will not be set if the Task does not correspond to a Swarm ing
91 // task.
92 IsolatedOutput string
93
94 // Name is a human-friendly descriptive name for this Task. All Tasks
95 // generated from the same TaskSpec have the same name.
96 Name string
97
98 // Repo is the repository of the commit at which this task ran.
99 Repo string
100
101 // Revision is the commit at which this task ran.
102 Revision string
103
104 // Started is the time the task started running, or zero if the task is
105 // pending, or the same as Finished if the task never ran.
106 Started time.Time
107
108 // Status is the current task status, default TASK_STATUS_PENDING.
109 Status TaskStatus
110
111 // SwarmingTaskId is the Swarming task ID. This field will not be set if the
112 // Task does not correspond to a Swarming task.
113 SwarmingTaskId string
114 }
115
116 // UpdateFromSwarming sets or initializes t from data in s. If any changes were
117 // made to t, returns true.
118 //
119 // If empty, sets t.Id, t.Name, t.Repo, and t.Revision from s's tags named
120 // SWARMING_TAG_ID, SWARMING_TAG_NAME, SWARMING_TAG_REPO, and
121 // SWARMING_TAG_REVISION, sets t.Created from s.CreatedTs, and sets
122 // t.SwarmingTaskId from s.TaskId. If these fields are non-empty, returns an
123 // error if they do not match.
124 //
125 // Always sets t.Status, t.Started, t.Finished, and t.IsolatedOutput based on s.
126 func (orig *Task) UpdateFromSwarming(s *swarming_api.SwarmingRpcsTaskResult) (bo ol, error) {
127 if s == nil {
128 return false, fmt.Errorf("Missing TaskResult. %v", s)
129 }
130 tags, err := swarming.TagValues(s)
131 if err != nil {
132 return false, err
133 }
134
135 copy := orig.Copy()
136 if !reflect.DeepEqual(orig, copy) {
137 glog.Fatalf("Task.Copy is broken; original and copy differ:\n%#v \n%#v", orig, copy)
138 }
139
140 // "Identity" fields stored in tags.
141 checkOrSetFromTag := func(tagName string, field *string, fieldName strin g) error {
142 if tagValue, ok := tags[tagName]; ok {
143 if *field == "" {
144 *field = tagValue
145 } else if *field != tagValue {
146 return fmt.Errorf("%s does not match for task %s . Was %s, now %s. %v %v", fieldName, orig.Id, *field, tagValue, orig, s)
147 }
148 }
149 return nil
150 }
151 if err := checkOrSetFromTag(SWARMING_TAG_ID, &copy.Id, "Id"); err != nil {
152 return false, err
153 }
154 if err := checkOrSetFromTag(SWARMING_TAG_NAME, &copy.Name, "Name"); err != nil {
155 return false, err
156 }
157 if err := checkOrSetFromTag(SWARMING_TAG_REPO, &copy.Repo, "Repo"); err != nil {
158 return false, err
159 }
160 if err := checkOrSetFromTag(SWARMING_TAG_REVISION, &copy.Revision, "Revi sion"); err != nil {
161 return false, err
162 }
163
164 // CreatedTs should always be present.
165 if sCreated, err := swarming.ParseTimestamp(s.CreatedTs); err == nil {
166 if util.TimeIsZero(copy.Created) {
167 copy.Created = sCreated
168 } else if copy.Created != sCreated {
169 return false, fmt.Errorf("Creation time has changed for task %s. Was %s, now %s. %v", orig.Id, orig.Created, sCreated, orig)
170 }
171 } else {
172 return false, fmt.Errorf("Unable to parse task creation time for task %s. %v %v", orig.Id, err, orig)
173 }
174
175 // Swarming TaskId.
176 if copy.SwarmingTaskId == "" {
177 copy.SwarmingTaskId = s.TaskId
178 } else if copy.SwarmingTaskId != s.TaskId {
179 return false, fmt.Errorf("Swarming task ID does not match for ta sk %s. Was %s, now %s. %v", orig.Id, orig.SwarmingTaskId, s.TaskId, orig)
180 }
181
182 // Status.
183 switch s.State {
184 case SWARMING_STATE_BOT_DIED, SWARMING_STATE_CANCELED, SWARMING_STATE_EX PIRED, SWARMING_STATE_TIMED_OUT:
185 copy.Status = TASK_STATUS_MISHAP
186 case SWARMING_STATE_PENDING:
187 copy.Status = TASK_STATUS_PENDING
188 case SWARMING_STATE_RUNNING:
189 copy.Status = TASK_STATUS_RUNNING
190 case SWARMING_STATE_COMPLETED:
191 if s.Failure {
192 // TODO(benjaminwagner): Choose FAILURE or MISHAP depend ing on ExitCode?
193 copy.Status = TASK_STATUS_FAILURE
194 } else {
195 copy.Status = TASK_STATUS_SUCCESS
196 }
197 default:
198 return false, fmt.Errorf("Unknown Swarming State %v in %v", s.St ate, s)
199 }
200
201 // Isolated output.
202 if s.OutputsRef == nil {
203 copy.IsolatedOutput = ""
204 } else {
205 copy.IsolatedOutput = s.OutputsRef.Isolated
206 }
207
208 // Timestamps.
209 maybeUpdateTime := func(newTimeStr string, field *time.Time, name string ) error {
210 if newTimeStr == "" {
211 return nil
212 }
213 newTime, err := swarming.ParseTimestamp(newTimeStr)
214 if err != nil {
215 return fmt.Errorf("Unable to parse %s for task %s. %v %v ", name, orig.Id, err, s)
216 }
217 *field = newTime
218 return nil
219 }
220
221 if err := maybeUpdateTime(s.StartedTs, &copy.Started, "StartedTs"); err != nil {
222 return false, err
223 }
224 if err := maybeUpdateTime(s.CompletedTs, &copy.Finished, "CompletedTs"); err != nil {
225 return false, err
226 }
227 if s.CompletedTs == "" && copy.Status == TASK_STATUS_MISHAP {
228 if err := maybeUpdateTime(s.AbandonedTs, &copy.Finished, "Abando nedTs"); err != nil {
229 return false, err
230 }
231 }
232 if copy.Done() && util.TimeIsZero(copy.Started) {
233 copy.Started = copy.Finished
234 }
235
236 // TODO(benjaminwagner): SwarmingRpcsTaskResult has a ModifiedTs field t hat we
237 // could use to detect modifications. Unfortunately, it seems that while the
238 // task is running, ModifiedTs gets updated every 30 seconds, regardless of
239 // whether any other data actually changed. Maybe we could still use it for
240 // pending or completed tasks.
241 if !reflect.DeepEqual(orig, copy) {
242 *orig = *copy
243 return true, nil
244 }
245 return false, nil
246 }
247
248 var errNotModified = errors.New("Task not modified")
249
250 // UpdateDBFromSwarmingTask updates a task in db from data in s.
251 func UpdateDBFromSwarmingTask(db DB, s *swarming_api.SwarmingRpcsTaskResult) err or {
252 id, err := swarming.GetTagValue(s, SWARMING_TAG_ID)
253 if err != nil {
254 return err
255 }
256 _, err = UpdateTaskWithRetries(db, id, func(task *Task) error {
257 modified, err := task.UpdateFromSwarming(s)
258 if err != nil {
259 return err
260 }
261 if !modified {
262 return errNotModified
263 }
264 return nil
265 })
266 if err == errNotModified {
267 return nil
268 } else {
269 return err
270 }
271 }
272
273 func (t *Task) Done() bool {
274 return t.Status != TASK_STATUS_PENDING && t.Status != TASK_STATUS_RUNNIN G
275 }
276
277 func (t *Task) Success() bool {
278 return t.Status == TASK_STATUS_SUCCESS
279 }
280
281 func (t *Task) Copy() *Task {
282 var commits []string
283 if t.Commits != nil {
284 commits = make([]string, len(t.Commits))
285 copy(commits, t.Commits)
286 }
287 return &Task{
288 Commits: commits,
289 Created: t.Created,
290 DbModified: t.DbModified,
291 Finished: t.Finished,
292 Id: t.Id,
293 IsolatedOutput: t.IsolatedOutput,
294 Name: t.Name,
295 Repo: t.Repo,
296 Revision: t.Revision,
297 Started: t.Started,
298 Status: t.Status,
299 SwarmingTaskId: t.SwarmingTaskId,
300 }
301 }
302
303 // TaskSlice implements sort.Interface. To sort tasks []*Task, use
304 // sort.Sort(TaskSlice(tasks)).
305 type TaskSlice []*Task
306
307 func (s TaskSlice) Len() int { return len(s) }
308
309 func (s TaskSlice) Less(i, j int) bool {
310 return s[i].Created.Before(s[j].Created)
311 }
312
313 func (s TaskSlice) Swap(i, j int) {
314 s[i], s[j] = s[j], s[i]
315 }
316
317 // TaskEncoder encodes Tasks into bytes via GOB encoding. Not safe for
318 // concurrent use.
319 // TODO(benjaminwagner): Encode in parallel.
320 type TaskEncoder struct {
321 err error
322 tasks []*Task
323 result [][]byte
324 }
325
326 // Process encodes the Task into a byte slice that will be returned from Next()
327 // (in arbitrary order). Returns false if Next is certain to return an error.
328 // Caller must ensure t does not change until after the first call to Next().
329 // May not be called after calling Next().
330 func (e *TaskEncoder) Process(t *Task) bool {
331 if e.err != nil {
332 return false
333 }
334 var buf bytes.Buffer
335 if err := gob.NewEncoder(&buf).Encode(t); err != nil {
336 e.err = err
337 e.tasks = nil
338 e.result = nil
339 return false
340 }
341 e.tasks = append(e.tasks, t)
342 e.result = append(e.result, buf.Bytes())
343 return true
344 }
345
346 // Next returns one of the Tasks provided to Process (in arbitrary order) and
347 // its serialized bytes. If any tasks remain, returns the task, the serialized
348 // bytes, nil. If all tasks have been returned, returns nil, nil, nil. If an
349 // error is encountered, returns nil, nil, error.
350 func (e *TaskEncoder) Next() (*Task, []byte, error) {
351 if e.err != nil {
352 return nil, nil, e.err
353 }
354 if len(e.tasks) == 0 {
355 return nil, nil, nil
356 }
357 t := e.tasks[0]
358 e.tasks = e.tasks[1:]
359 serialized := e.result[0]
360 e.result = e.result[1:]
361 return t, serialized, nil
362 }
363
364 // TaskDecoder decodes bytes into Tasks via GOB decoding. Not safe for
365 // concurrent use.
366 type TaskDecoder struct {
367 // input contains the incoming byte slices. Process() sends on this chan nel,
368 // decode() receives from it, and Result() closes it.
369 input chan []byte
370 // output contains decoded Tasks. decode() sends on this channel, collec t()
371 // receives from it, and run() closes it when all decode() goroutines ha ve
372 // finished.
373 output chan *Task
374 // result contains the return value of Result(). collect() sends a singl e
375 // value on this channel and closes it. Result() receives from it.
376 result chan []*Task
377 // errors contains the first error from any goroutine. It's a channel in case
378 // multiple goroutines experience an error at the same time.
379 errors chan error
380 }
381
382 const kNumDecoderGoroutines = 10
383
384 // init initializes d if it has not been initialized. May not be called concurre ntly.
385 func (d *TaskDecoder) init() {
386 if d.input == nil {
387 d.input = make(chan []byte, kNumDecoderGoroutines*2)
388 d.output = make(chan *Task, kNumDecoderGoroutines)
389 d.result = make(chan []*Task, 1)
390 d.errors = make(chan error, kNumDecoderGoroutines)
391 go d.run()
392 go d.collect()
393 }
394 }
395
396 // run starts the decode goroutines and closes d.output when they finish.
397 func (d *TaskDecoder) run() {
398 // Start decoders.
399 wg := sync.WaitGroup{}
400 for i := 0; i < kNumDecoderGoroutines; i++ {
401 wg.Add(1)
402 go d.decode(&wg)
403 }
404 // Wait for decoders to exit.
405 wg.Wait()
406 // Drain d.input in the case that errors were encountered, to avoid dead lock.
407 for _ = range d.input {
408 }
409 close(d.output)
410 }
411
412 // decode receives from d.input and sends to d.output until d.input is closed or
413 // d.errors is non-empty. Decrements wg when done.
414 func (d *TaskDecoder) decode(wg *sync.WaitGroup) {
415 for b := range d.input {
416 var t Task
417 if err := gob.NewDecoder(bytes.NewReader(b)).Decode(&t); err != nil {
418 d.errors <- err
419 break
420 }
421 d.output <- &t
422 if len(d.errors) > 0 {
423 break
424 }
425 }
426 wg.Done()
427 }
428
429 // collect receives from d.output until it is closed, then sends on d.result.
430 func (d *TaskDecoder) collect() {
431 result := []*Task{}
432 for t := range d.output {
433 result = append(result, t)
434 }
435 d.result <- result
436 close(d.result)
437 }
438
439 // Process decodes the byte slice into a Task and includes it in Result() (in
440 // arbitrary order). Returns false if Result is certain to return an error.
441 // Caller must ensure b does not change until after Result() returns.
442 func (d *TaskDecoder) Process(b []byte) bool {
443 d.init()
444 d.input <- b
445 return len(d.errors) == 0
446 }
447
448 // Result returns all decoded Tasks provided to Process (in arbitrary order), or
449 // any error encountered.
450 func (d *TaskDecoder) Result() ([]*Task, error) {
451 // Allow TaskDecoder to be used without initialization.
452 if d.result == nil {
453 return []*Task{}, nil
454 }
455 close(d.input)
456 select {
457 case err := <-d.errors:
458 return nil, err
459 case result := <-d.result:
460 return result, nil
461 }
462 }
463
464 // TagsForTask returns the tags which should be set for a Task.
465 func TagsForTask(name, id string, priority float64, repo, revision string, dimen sions map[string]string) []string {
466 tags := map[string]string{
467 SWARMING_TAG_ALLOW_MILO: "1",
468 SWARMING_TAG_NAME: name,
469 SWARMING_TAG_ID: id,
470 SWARMING_TAG_PRIORITY: fmt.Sprintf("%f", priority),
471 SWARMING_TAG_REPO: repo,
472 SWARMING_TAG_REVISION: revision,
473 }
474
475 for k, v := range dimensions {
476 if _, ok := tags[k]; !ok {
477 tags[k] = v
478 } else {
479 glog.Warningf("Duplicate dimension/tag %q.", k)
480 }
481 }
482
483 tagsList := make([]string, 0, len(tags))
484 for k, v := range tags {
485 tagsList = append(tagsList, fmt.Sprintf("%s:%s", k, v))
486 }
487 return tagsList
488 }
OLDNEW
« no previous file with comments | « build_scheduler/go/db/modified_tasks_test.go ('k') | build_scheduler/go/db/task_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698