Index: build_scheduler/go/db/task.go |
diff --git a/build_scheduler/go/db/task.go b/build_scheduler/go/db/task.go |
index c7ceef60667f18b688da8d591b941aef4197c59a..79305ee3cb2ded096e6d2ac9ba3d623e41ae465c 100644 |
--- a/build_scheduler/go/db/task.go |
+++ b/build_scheduler/go/db/task.go |
@@ -59,7 +59,8 @@ type Task struct { |
// Created is the creation timestamp. |
Created time.Time |
- // Id is a generated unique identifier for this Task instance. Must be URL-safe. |
+ // Id is a generated unique identifier for this Task instance. Must be |
+ // URL-safe. |
Id string |
// IsolatedOutput is the isolated hash of any outputs produced by this |
@@ -154,7 +155,8 @@ func (t *Task) UpdateFromSwarming(s *swarming_api.SwarmingRpcsTaskRequestMetadat |
t.Status = TASK_STATUS_RUNNING |
case SWARMING_STATE_COMPLETED: |
if s.TaskResult.Failure { |
- // TODO(benjaminwagner): Choose FAILURE or MISHAP depending on ExitCode? |
+ // TODO(benjaminwagner): Choose FAILURE or MISHAP depending on |
+ // ExitCode? |
t.Status = TASK_STATUS_FAILURE |
} else { |
t.Status = TASK_STATUS_SUCCESS |
@@ -194,6 +196,8 @@ func (t *Task) Copy() *Task { |
return &rv |
} |
+// TaskSlice implements sort.Interface. To sort tasks []*Task, use |
+// sort.Sort(TaskSlice(tasks)). |
type TaskSlice []*Task |
func (s TaskSlice) Len() int { return len(s) } |
@@ -205,3 +209,85 @@ func (s TaskSlice) Less(i, j int) bool { |
func (s TaskSlice) Swap(i, j int) { |
s[i], s[j] = s[j], s[i] |
} |
+ |
+// TaskEncoder encodes Tasks into bytes via GOB encoding. Not safe for |
+// concurrent use. |
+// TODO(benjaminwagner): Encode in parallel. |
+type TaskEncoder struct { |
+ err error |
+ tasks []*Task |
+ result [][]byte |
+} |
+ |
+// Process encodes the Task into a byte slice that will be returned from Next() |
+// (in arbitrary order). Returns false if Next is certain to return an error. |
+// Caller must ensure t does not change until after the first call to Next(). |
+// May not be called after calling Next(). |
+func (e *TaskEncoder) Process(t *Task) bool { |
+ if e.err != nil { |
+ return false |
+ } |
+ var buf bytes.Buffer |
+ if err := gob.NewEncoder(&buf).Encode(t); err != nil { |
+ e.err = err |
+ e.tasks = nil |
+ e.result = nil |
+ return false |
+ } |
+ e.tasks = append(e.tasks, t) |
+ e.result = append(e.result, buf.Bytes()) |
+ return true |
+} |
+ |
+// Next returns one of the Tasks provided to Process (in arbitrary order) and |
+// its serialized bytes. If any tasks remain, returns the task, the serialized |
+// bytes, nil. If all tasks have been returned, returns nil, nil, nil. If an |
+// error is encountered, returns nil, nil, error. |
+func (e *TaskEncoder) Next() (*Task, []byte, error) { |
+ if e.err != nil { |
+ return nil, nil, e.err |
+ } |
+ if len(e.tasks) == 0 { |
+ return nil, nil, nil |
+ } |
+ t := e.tasks[0] |
+ e.tasks = e.tasks[1:] |
+ serialized := e.result[0] |
+ e.result = e.result[1:] |
+ return t, serialized, nil |
+} |
+ |
+// TaskDecoder decodes bytes into Tasks via GOB decoding. Not safe for |
+// concurrent use. |
+// TODO(benjaminwagner): Decode in parallel. |
+type TaskDecoder struct { |
+ err error |
+ result []*Task |
+} |
+ |
+// Process decodes the byte slice into a Task and includes it in Result() (in |
+// arbitrary order). Returns false if Result is certain to return an error. |
+// Caller must ensure b does not change until after Result() returns. |
+func (d *TaskDecoder) Process(b []byte) bool { |
+ if d.err != nil { |
+ return false |
+ } |
+ var t Task |
+ if err := gob.NewDecoder(bytes.NewReader(b)).Decode(&t); err != nil { |
+ d.err = err |
+ d.result = nil |
+ return false |
+ } |
+ d.result = append(d.result, &t) |
+ return true |
+} |
+ |
+// Result returns all decoded Tasks provided to Process (in arbitrary order), or |
+// any error encountered. |
+func (d *TaskDecoder) Result() ([]*Task, error) { |
+ // Allow TaskDecoder to be used without initialization. |
+ if d.err == nil && d.result == nil { |
+ return []*Task{}, nil |
+ } |
+ return d.result, d.err |
+} |