| Index: build_scheduler/go/db/task.go
|
| diff --git a/build_scheduler/go/db/task.go b/build_scheduler/go/db/task.go
|
| index c7ceef60667f18b688da8d591b941aef4197c59a..79305ee3cb2ded096e6d2ac9ba3d623e41ae465c 100644
|
| --- a/build_scheduler/go/db/task.go
|
| +++ b/build_scheduler/go/db/task.go
|
| @@ -59,7 +59,8 @@ type Task struct {
|
| // Created is the creation timestamp.
|
| Created time.Time
|
|
|
| - // Id is a generated unique identifier for this Task instance. Must be URL-safe.
|
| + // Id is a generated unique identifier for this Task instance. Must be
|
| + // URL-safe.
|
| Id string
|
|
|
| // IsolatedOutput is the isolated hash of any outputs produced by this
|
| @@ -154,7 +155,8 @@ func (t *Task) UpdateFromSwarming(s *swarming_api.SwarmingRpcsTaskRequestMetadat
|
| t.Status = TASK_STATUS_RUNNING
|
| case SWARMING_STATE_COMPLETED:
|
| if s.TaskResult.Failure {
|
| - // TODO(benjaminwagner): Choose FAILURE or MISHAP depending on ExitCode?
|
| + // TODO(benjaminwagner): Choose FAILURE or MISHAP depending on
|
| + // ExitCode?
|
| t.Status = TASK_STATUS_FAILURE
|
| } else {
|
| t.Status = TASK_STATUS_SUCCESS
|
| @@ -194,6 +196,8 @@ func (t *Task) Copy() *Task {
|
| return &rv
|
| }
|
|
|
| +// TaskSlice implements sort.Interface. To sort tasks []*Task, use
|
| +// sort.Sort(TaskSlice(tasks)).
|
| type TaskSlice []*Task
|
|
|
| func (s TaskSlice) Len() int { return len(s) }
|
| @@ -205,3 +209,85 @@ func (s TaskSlice) Less(i, j int) bool {
|
| func (s TaskSlice) Swap(i, j int) {
|
| s[i], s[j] = s[j], s[i]
|
| }
|
| +
|
| +// TaskEncoder encodes Tasks into bytes via GOB encoding. Not safe for
|
| +// concurrent use.
|
| +// TODO(benjaminwagner): Encode in parallel.
|
| +type TaskEncoder struct {
|
| + err error
|
| + tasks []*Task
|
| + result [][]byte
|
| +}
|
| +
|
| +// Process encodes the Task into a byte slice that will be returned from Next()
|
| +// (in arbitrary order). Returns false if Next is certain to return an error.
|
| +// Caller must ensure t does not change until after the first call to Next().
|
| +// May not be called after calling Next().
|
| +func (e *TaskEncoder) Process(t *Task) bool {
|
| + if e.err != nil {
|
| + return false
|
| + }
|
| + var buf bytes.Buffer
|
| + if err := gob.NewEncoder(&buf).Encode(t); err != nil {
|
| + e.err = err
|
| + e.tasks = nil
|
| + e.result = nil
|
| + return false
|
| + }
|
| + e.tasks = append(e.tasks, t)
|
| + e.result = append(e.result, buf.Bytes())
|
| + return true
|
| +}
|
| +
|
| +// Next returns one of the Tasks provided to Process (in arbitrary order) and
|
| +// its serialized bytes. If any tasks remain, returns the task, the serialized
|
| +// bytes, nil. If all tasks have been returned, returns nil, nil, nil. If an
|
| +// error is encountered, returns nil, nil, error.
|
| +func (e *TaskEncoder) Next() (*Task, []byte, error) {
|
| + if e.err != nil {
|
| + return nil, nil, e.err
|
| + }
|
| + if len(e.tasks) == 0 {
|
| + return nil, nil, nil
|
| + }
|
| + t := e.tasks[0]
|
| + e.tasks = e.tasks[1:]
|
| + serialized := e.result[0]
|
| + e.result = e.result[1:]
|
| + return t, serialized, nil
|
| +}
|
| +
|
| +// TaskDecoder decodes bytes into Tasks via GOB decoding. Not safe for
|
| +// concurrent use.
|
| +// TODO(benjaminwagner): Decode in parallel.
|
| +type TaskDecoder struct {
|
| + err error
|
| + result []*Task
|
| +}
|
| +
|
| +// Process decodes the byte slice into a Task and includes it in Result() (in
|
| +// arbitrary order). Returns false if Result is certain to return an error.
|
| +// Caller must ensure b does not change until after Result() returns.
|
| +func (d *TaskDecoder) Process(b []byte) bool {
|
| + if d.err != nil {
|
| + return false
|
| + }
|
| + var t Task
|
| + if err := gob.NewDecoder(bytes.NewReader(b)).Decode(&t); err != nil {
|
| + d.err = err
|
| + d.result = nil
|
| + return false
|
| + }
|
| + d.result = append(d.result, &t)
|
| + return true
|
| +}
|
| +
|
| +// Result returns all decoded Tasks provided to Process (in arbitrary order), or
|
| +// any error encountered.
|
| +func (d *TaskDecoder) Result() ([]*Task, error) {
|
| + // Allow TaskDecoder to be used without initialization.
|
| + if d.err == nil && d.result == nil {
|
| + return []*Task{}, nil
|
| + }
|
| + return d.result, d.err
|
| +}
|
|
|