Index: build_scheduler/go/db/task.go |
diff --git a/build_scheduler/go/db/task.go b/build_scheduler/go/db/task.go |
deleted file mode 100644 |
index dddc8c176d27c24c695db04502ee5113f608b3a6..0000000000000000000000000000000000000000 |
--- a/build_scheduler/go/db/task.go |
+++ /dev/null |
@@ -1,488 +0,0 @@ |
-package db |
- |
-import ( |
- "bytes" |
- "encoding/gob" |
- "errors" |
- "fmt" |
- "reflect" |
- "sync" |
- "time" |
- |
- "go.skia.org/infra/go/swarming" |
- "go.skia.org/infra/go/util" |
- |
- swarming_api "github.com/luci/luci-go/common/api/swarming/swarming/v1" |
- "github.com/skia-dev/glog" |
-) |
- |
-const ( |
- // Swarming task states. |
- SWARMING_STATE_BOT_DIED = "BOT_DIED" |
- SWARMING_STATE_CANCELED = "CANCELED" |
- SWARMING_STATE_COMPLETED = "COMPLETED" |
- SWARMING_STATE_EXPIRED = "EXPIRED" |
- SWARMING_STATE_PENDING = "PENDING" |
- SWARMING_STATE_RUNNING = "RUNNING" |
- SWARMING_STATE_TIMED_OUT = "TIMED_OUT" |
- |
- // Swarming tags added by Build Scheduler. |
- SWARMING_TAG_ALLOW_MILO = "allow_milo" |
- SWARMING_TAG_ID = "scheduler_id" |
- SWARMING_TAG_NAME = "name" |
- SWARMING_TAG_PRIORITY = "priority" |
- SWARMING_TAG_REPO = "repo" |
- SWARMING_TAG_REVISION = "revision" |
-) |
- |
-type TaskStatus string |
- |
-const ( |
- // TASK_STATUS_PENDING indicates the task has not started. It is the empty |
- // string so that it is the zero value of TaskStatus. |
- TASK_STATUS_PENDING TaskStatus = "" |
- // TASK_STATUS_RUNNING indicates the task is in progress. |
- TASK_STATUS_RUNNING TaskStatus = "RUNNING" |
- // TASK_STATUS_SUCCESS indicates the task completed successfully. |
- TASK_STATUS_SUCCESS TaskStatus = "SUCCESS" |
- // TASK_STATUS_FAILURE indicates the task completed with failures. |
- TASK_STATUS_FAILURE TaskStatus = "FAILURE" |
- // TASK_STATUS_MISHAP indicates the task exited early with an error, died |
- // while in progress, was manually canceled, expired while waiting on the |
- // queue, or timed out before completing. |
- TASK_STATUS_MISHAP TaskStatus = "MISHAP" |
-) |
- |
-// Task describes a Swarming task generated from a TaskSpec, or a "fake" task |
-// that can not be executed on Swarming, but can be added to the DB and |
-// displayed as if it were a real TaskSpec. |
-// |
-// Task is stored as a GOB, so changes must maintain backwards compatibility. |
-// See gob package documentation for details, but generally: |
-// - Ensure new fields can be initialized with their zero value. |
-// - Do not change the type of any existing field. |
-// - Leave removed fields commented out to ensure the field name is not |
-// reused. |
-type Task struct { |
- // Commits are the commits which were tested in this Task. The list may |
- // change due to backfilling/bisecting. |
- Commits []string |
- |
- // Created is the creation timestamp. |
- Created time.Time |
- |
- // DbModified is the time of the last successful call to DB.PutTask/s for this |
- // Task, or zero if the task is new. It is not related to the ModifiedTs time |
- // of the associated Swarming task. |
- DbModified time.Time |
- |
- // Finished is the time the task stopped running or expired from the queue, or |
- // zero if the task is pending or running. |
- Finished time.Time |
- |
- // Id is a generated unique identifier for this Task instance. Must be |
- // URL-safe. |
- Id string |
- |
- // IsolatedOutput is the isolated hash of any outputs produced by this Task. |
- // Filled in when the task is completed. We assume the isolate server is |
- // isolate.ISOLATE_SERVER_URL and the namespace is isolate.DEFAULT_NAMESPACE. |
- // This field will not be set if the Task does not correspond to a Swarming |
- // task. |
- IsolatedOutput string |
- |
- // Name is a human-friendly descriptive name for this Task. All Tasks |
- // generated from the same TaskSpec have the same name. |
- Name string |
- |
- // Repo is the repository of the commit at which this task ran. |
- Repo string |
- |
- // Revision is the commit at which this task ran. |
- Revision string |
- |
- // Started is the time the task started running, or zero if the task is |
- // pending, or the same as Finished if the task never ran. |
- Started time.Time |
- |
- // Status is the current task status, default TASK_STATUS_PENDING. |
- Status TaskStatus |
- |
- // SwarmingTaskId is the Swarming task ID. This field will not be set if the |
- // Task does not correspond to a Swarming task. |
- SwarmingTaskId string |
-} |
- |
-// UpdateFromSwarming sets or initializes t from data in s. If any changes were |
-// made to t, returns true. |
-// |
-// If empty, sets t.Id, t.Name, t.Repo, and t.Revision from s's tags named |
-// SWARMING_TAG_ID, SWARMING_TAG_NAME, SWARMING_TAG_REPO, and |
-// SWARMING_TAG_REVISION, sets t.Created from s.CreatedTs, and sets |
-// t.SwarmingTaskId from s.TaskId. If these fields are non-empty, returns an |
-// error if they do not match. |
-// |
-// Always sets t.Status, t.Started, t.Finished, and t.IsolatedOutput based on s. |
-func (orig *Task) UpdateFromSwarming(s *swarming_api.SwarmingRpcsTaskResult) (bool, error) { |
- if s == nil { |
- return false, fmt.Errorf("Missing TaskResult. %v", s) |
- } |
- tags, err := swarming.TagValues(s) |
- if err != nil { |
- return false, err |
- } |
- |
- copy := orig.Copy() |
- if !reflect.DeepEqual(orig, copy) { |
- glog.Fatalf("Task.Copy is broken; original and copy differ:\n%#v\n%#v", orig, copy) |
- } |
- |
- // "Identity" fields stored in tags. |
- checkOrSetFromTag := func(tagName string, field *string, fieldName string) error { |
- if tagValue, ok := tags[tagName]; ok { |
- if *field == "" { |
- *field = tagValue |
- } else if *field != tagValue { |
- return fmt.Errorf("%s does not match for task %s. Was %s, now %s. %v %v", fieldName, orig.Id, *field, tagValue, orig, s) |
- } |
- } |
- return nil |
- } |
- if err := checkOrSetFromTag(SWARMING_TAG_ID, ©.Id, "Id"); err != nil { |
- return false, err |
- } |
- if err := checkOrSetFromTag(SWARMING_TAG_NAME, ©.Name, "Name"); err != nil { |
- return false, err |
- } |
- if err := checkOrSetFromTag(SWARMING_TAG_REPO, ©.Repo, "Repo"); err != nil { |
- return false, err |
- } |
- if err := checkOrSetFromTag(SWARMING_TAG_REVISION, ©.Revision, "Revision"); err != nil { |
- return false, err |
- } |
- |
- // CreatedTs should always be present. |
- if sCreated, err := swarming.ParseTimestamp(s.CreatedTs); err == nil { |
- if util.TimeIsZero(copy.Created) { |
- copy.Created = sCreated |
- } else if copy.Created != sCreated { |
- return false, fmt.Errorf("Creation time has changed for task %s. Was %s, now %s. %v", orig.Id, orig.Created, sCreated, orig) |
- } |
- } else { |
- return false, fmt.Errorf("Unable to parse task creation time for task %s. %v %v", orig.Id, err, orig) |
- } |
- |
- // Swarming TaskId. |
- if copy.SwarmingTaskId == "" { |
- copy.SwarmingTaskId = s.TaskId |
- } else if copy.SwarmingTaskId != s.TaskId { |
- return false, fmt.Errorf("Swarming task ID does not match for task %s. Was %s, now %s. %v", orig.Id, orig.SwarmingTaskId, s.TaskId, orig) |
- } |
- |
- // Status. |
- switch s.State { |
- case SWARMING_STATE_BOT_DIED, SWARMING_STATE_CANCELED, SWARMING_STATE_EXPIRED, SWARMING_STATE_TIMED_OUT: |
- copy.Status = TASK_STATUS_MISHAP |
- case SWARMING_STATE_PENDING: |
- copy.Status = TASK_STATUS_PENDING |
- case SWARMING_STATE_RUNNING: |
- copy.Status = TASK_STATUS_RUNNING |
- case SWARMING_STATE_COMPLETED: |
- if s.Failure { |
- // TODO(benjaminwagner): Choose FAILURE or MISHAP depending on ExitCode? |
- copy.Status = TASK_STATUS_FAILURE |
- } else { |
- copy.Status = TASK_STATUS_SUCCESS |
- } |
- default: |
- return false, fmt.Errorf("Unknown Swarming State %v in %v", s.State, s) |
- } |
- |
- // Isolated output. |
- if s.OutputsRef == nil { |
- copy.IsolatedOutput = "" |
- } else { |
- copy.IsolatedOutput = s.OutputsRef.Isolated |
- } |
- |
- // Timestamps. |
- maybeUpdateTime := func(newTimeStr string, field *time.Time, name string) error { |
- if newTimeStr == "" { |
- return nil |
- } |
- newTime, err := swarming.ParseTimestamp(newTimeStr) |
- if err != nil { |
- return fmt.Errorf("Unable to parse %s for task %s. %v %v", name, orig.Id, err, s) |
- } |
- *field = newTime |
- return nil |
- } |
- |
- if err := maybeUpdateTime(s.StartedTs, ©.Started, "StartedTs"); err != nil { |
- return false, err |
- } |
- if err := maybeUpdateTime(s.CompletedTs, ©.Finished, "CompletedTs"); err != nil { |
- return false, err |
- } |
- if s.CompletedTs == "" && copy.Status == TASK_STATUS_MISHAP { |
- if err := maybeUpdateTime(s.AbandonedTs, ©.Finished, "AbandonedTs"); err != nil { |
- return false, err |
- } |
- } |
- if copy.Done() && util.TimeIsZero(copy.Started) { |
- copy.Started = copy.Finished |
- } |
- |
- // TODO(benjaminwagner): SwarmingRpcsTaskResult has a ModifiedTs field that we |
- // could use to detect modifications. Unfortunately, it seems that while the |
- // task is running, ModifiedTs gets updated every 30 seconds, regardless of |
- // whether any other data actually changed. Maybe we could still use it for |
- // pending or completed tasks. |
- if !reflect.DeepEqual(orig, copy) { |
- *orig = *copy |
- return true, nil |
- } |
- return false, nil |
-} |
- |
-var errNotModified = errors.New("Task not modified") |
- |
-// UpdateDBFromSwarmingTask updates a task in db from data in s. |
-func UpdateDBFromSwarmingTask(db DB, s *swarming_api.SwarmingRpcsTaskResult) error { |
- id, err := swarming.GetTagValue(s, SWARMING_TAG_ID) |
- if err != nil { |
- return err |
- } |
- _, err = UpdateTaskWithRetries(db, id, func(task *Task) error { |
- modified, err := task.UpdateFromSwarming(s) |
- if err != nil { |
- return err |
- } |
- if !modified { |
- return errNotModified |
- } |
- return nil |
- }) |
- if err == errNotModified { |
- return nil |
- } else { |
- return err |
- } |
-} |
- |
-func (t *Task) Done() bool { |
- return t.Status != TASK_STATUS_PENDING && t.Status != TASK_STATUS_RUNNING |
-} |
- |
-func (t *Task) Success() bool { |
- return t.Status == TASK_STATUS_SUCCESS |
-} |
- |
-func (t *Task) Copy() *Task { |
- var commits []string |
- if t.Commits != nil { |
- commits = make([]string, len(t.Commits)) |
- copy(commits, t.Commits) |
- } |
- return &Task{ |
- Commits: commits, |
- Created: t.Created, |
- DbModified: t.DbModified, |
- Finished: t.Finished, |
- Id: t.Id, |
- IsolatedOutput: t.IsolatedOutput, |
- Name: t.Name, |
- Repo: t.Repo, |
- Revision: t.Revision, |
- Started: t.Started, |
- Status: t.Status, |
- SwarmingTaskId: t.SwarmingTaskId, |
- } |
-} |
- |
-// TaskSlice implements sort.Interface. To sort tasks []*Task, use |
-// sort.Sort(TaskSlice(tasks)). |
-type TaskSlice []*Task |
- |
-func (s TaskSlice) Len() int { return len(s) } |
- |
-func (s TaskSlice) Less(i, j int) bool { |
- return s[i].Created.Before(s[j].Created) |
-} |
- |
-func (s TaskSlice) Swap(i, j int) { |
- s[i], s[j] = s[j], s[i] |
-} |
- |
-// TaskEncoder encodes Tasks into bytes via GOB encoding. Not safe for |
-// concurrent use. |
-// TODO(benjaminwagner): Encode in parallel. |
-type TaskEncoder struct { |
- err error |
- tasks []*Task |
- result [][]byte |
-} |
- |
-// Process encodes the Task into a byte slice that will be returned from Next() |
-// (in arbitrary order). Returns false if Next is certain to return an error. |
-// Caller must ensure t does not change until after the first call to Next(). |
-// May not be called after calling Next(). |
-func (e *TaskEncoder) Process(t *Task) bool { |
- if e.err != nil { |
- return false |
- } |
- var buf bytes.Buffer |
- if err := gob.NewEncoder(&buf).Encode(t); err != nil { |
- e.err = err |
- e.tasks = nil |
- e.result = nil |
- return false |
- } |
- e.tasks = append(e.tasks, t) |
- e.result = append(e.result, buf.Bytes()) |
- return true |
-} |
- |
-// Next returns one of the Tasks provided to Process (in arbitrary order) and |
-// its serialized bytes. If any tasks remain, returns the task, the serialized |
-// bytes, nil. If all tasks have been returned, returns nil, nil, nil. If an |
-// error is encountered, returns nil, nil, error. |
-func (e *TaskEncoder) Next() (*Task, []byte, error) { |
- if e.err != nil { |
- return nil, nil, e.err |
- } |
- if len(e.tasks) == 0 { |
- return nil, nil, nil |
- } |
- t := e.tasks[0] |
- e.tasks = e.tasks[1:] |
- serialized := e.result[0] |
- e.result = e.result[1:] |
- return t, serialized, nil |
-} |
- |
-// TaskDecoder decodes bytes into Tasks via GOB decoding. Not safe for |
-// concurrent use. |
-type TaskDecoder struct { |
- // input contains the incoming byte slices. Process() sends on this channel, |
- // decode() receives from it, and Result() closes it. |
- input chan []byte |
- // output contains decoded Tasks. decode() sends on this channel, collect() |
- // receives from it, and run() closes it when all decode() goroutines have |
- // finished. |
- output chan *Task |
- // result contains the return value of Result(). collect() sends a single |
- // value on this channel and closes it. Result() receives from it. |
- result chan []*Task |
- // errors contains the first error from any goroutine. It's a channel in case |
- // multiple goroutines experience an error at the same time. |
- errors chan error |
-} |
- |
-const kNumDecoderGoroutines = 10 |
- |
-// init initializes d if it has not been initialized. May not be called concurrently. |
-func (d *TaskDecoder) init() { |
- if d.input == nil { |
- d.input = make(chan []byte, kNumDecoderGoroutines*2) |
- d.output = make(chan *Task, kNumDecoderGoroutines) |
- d.result = make(chan []*Task, 1) |
- d.errors = make(chan error, kNumDecoderGoroutines) |
- go d.run() |
- go d.collect() |
- } |
-} |
- |
-// run starts the decode goroutines and closes d.output when they finish. |
-func (d *TaskDecoder) run() { |
- // Start decoders. |
- wg := sync.WaitGroup{} |
- for i := 0; i < kNumDecoderGoroutines; i++ { |
- wg.Add(1) |
- go d.decode(&wg) |
- } |
- // Wait for decoders to exit. |
- wg.Wait() |
- // Drain d.input in the case that errors were encountered, to avoid deadlock. |
- for _ = range d.input { |
- } |
- close(d.output) |
-} |
- |
-// decode receives from d.input and sends to d.output until d.input is closed or |
-// d.errors is non-empty. Decrements wg when done. |
-func (d *TaskDecoder) decode(wg *sync.WaitGroup) { |
- for b := range d.input { |
- var t Task |
- if err := gob.NewDecoder(bytes.NewReader(b)).Decode(&t); err != nil { |
- d.errors <- err |
- break |
- } |
- d.output <- &t |
- if len(d.errors) > 0 { |
- break |
- } |
- } |
- wg.Done() |
-} |
- |
-// collect receives from d.output until it is closed, then sends on d.result. |
-func (d *TaskDecoder) collect() { |
- result := []*Task{} |
- for t := range d.output { |
- result = append(result, t) |
- } |
- d.result <- result |
- close(d.result) |
-} |
- |
-// Process decodes the byte slice into a Task and includes it in Result() (in |
-// arbitrary order). Returns false if Result is certain to return an error. |
-// Caller must ensure b does not change until after Result() returns. |
-func (d *TaskDecoder) Process(b []byte) bool { |
- d.init() |
- d.input <- b |
- return len(d.errors) == 0 |
-} |
- |
-// Result returns all decoded Tasks provided to Process (in arbitrary order), or |
-// any error encountered. |
-func (d *TaskDecoder) Result() ([]*Task, error) { |
- // Allow TaskDecoder to be used without initialization. |
- if d.result == nil { |
- return []*Task{}, nil |
- } |
- close(d.input) |
- select { |
- case err := <-d.errors: |
- return nil, err |
- case result := <-d.result: |
- return result, nil |
- } |
-} |
- |
-// TagsForTask returns the tags which should be set for a Task. |
-func TagsForTask(name, id string, priority float64, repo, revision string, dimensions map[string]string) []string { |
- tags := map[string]string{ |
- SWARMING_TAG_ALLOW_MILO: "1", |
- SWARMING_TAG_NAME: name, |
- SWARMING_TAG_ID: id, |
- SWARMING_TAG_PRIORITY: fmt.Sprintf("%f", priority), |
- SWARMING_TAG_REPO: repo, |
- SWARMING_TAG_REVISION: revision, |
- } |
- |
- for k, v := range dimensions { |
- if _, ok := tags[k]; !ok { |
- tags[k] = v |
- } else { |
- glog.Warningf("Duplicate dimension/tag %q.", k) |
- } |
- } |
- |
- tagsList := make([]string, 0, len(tags)) |
- for k, v := range tags { |
- tagsList = append(tagsList, fmt.Sprintf("%s:%s", k, v)) |
- } |
- return tagsList |
-} |