Index: build_scheduler/go/db/task.go |
diff --git a/build_scheduler/go/db/task.go b/build_scheduler/go/db/task.go |
index 09ad700e2f86a7a480f7486da7bdee55980b1f55..46cc6046909715540dc1cc901c8a2acc8cbd6f4b 100644 |
--- a/build_scheduler/go/db/task.go |
+++ b/build_scheduler/go/db/task.go |
@@ -56,7 +56,8 @@ type Task struct { |
// Created is the creation timestamp. |
Created time.Time |
- // Id is a generated unique identifier for this Task instance. Must be URL-safe. |
+ // Id is a generated unique identifier for this Task instance. Must be |
+ // URL-safe. |
Id string |
// IsolatedOutput is the isolated hash of any outputs produced by this |
@@ -149,7 +150,8 @@ func (t *Task) UpdateFromSwarming(s *swarming.SwarmingRpcsTaskRequestMetadata) e |
t.Status = TASK_STATUS_PENDING |
case SWARMING_STATE_COMPLETED: |
if s.TaskResult.Failure { |
- // TODO(benjaminwagner): Choose FAILURE or MISHAP depending on ExitCode? |
+ // TODO(benjaminwagner): Choose FAILURE or MISHAP depending on |
+ // ExitCode? |
t.Status = TASK_STATUS_FAILURE |
} else { |
t.Status = TASK_STATUS_SUCCESS |
@@ -189,6 +191,8 @@ func (t *Task) Copy() *Task { |
return &rv |
} |
+// TaskSlice implements sort.Interface. To sort tasks []*Task, use |
+// sort.Sort(TaskSlice(tasks)). |
type TaskSlice []*Task |
func (s TaskSlice) Len() int { return len(s) } |
@@ -200,3 +204,85 @@ func (s TaskSlice) Less(i, j int) bool { |
func (s TaskSlice) Swap(i, j int) { |
s[i], s[j] = s[j], s[i] |
} |
+ |
+// TaskEncoder encodes Tasks into bytes via GOB encoding. Not safe for |
+// concurrent use. |
+// TODO(benjaminwagner): Encode in parallel. |
+type TaskEncoder struct { |
+ err error |
+ tasks []*Task |
+ result [][]byte |
+} |
+ |
+// Process encodes the Task into a byte slice that will be returned from Next() |
+// (in arbitrary order). Returns false if Next is certain to return an error. |
+// Caller must ensure t does not change until after the first call to Next(). |
+// May not be called after calling Next(). |
+func (e *TaskEncoder) Process(t *Task) bool { |
+ if e.err != nil { |
+ return false |
+ } |
+ var buf bytes.Buffer |
+ if err := gob.NewEncoder(&buf).Encode(t); err != nil { |
+ e.err = err |
+ e.tasks = nil |
+ e.result = nil |
+ return false |
+ } |
+ e.tasks = append(e.tasks, t) |
+ e.result = append(e.result, buf.Bytes()) |
+ return true |
+} |
+ |
+// Next returns one of the Tasks provided to Process (in arbitrary order) and |
+// its serialized bytes. If any tasks remain, returns the task, the serialized |
+// bytes, nil. If all tasks have been returned, returns nil, nil, nil. If an |
+// error is encountered, returns nil, nil, error. |
+func (e *TaskEncoder) Next() (*Task, []byte, error) { |
+ if e.err != nil { |
+ return nil, nil, e.err |
+ } |
+ if len(e.tasks) == 0 { |
+ return nil, nil, nil |
+ } |
+ t := e.tasks[0] |
+ e.tasks = e.tasks[1:] |
+ serialized := e.result[0] |
+ e.result = e.result[1:] |
+ return t, serialized, nil |
+} |
+ |
+// TaskDecoder decodes bytes into Tasks via GOB decoding. Not safe for |
+// concurrent use. |
+// TODO(benjaminwagner): Decode in parallel. |
+type TaskDecoder struct { |
+ err error |
+ result []*Task |
+} |
+ |
+// Process decodes the byte slice into a Task and includes it in Result() (in |
+// arbitrary order). Returns false if Result is certain to return an error. |
+// Caller must ensure b does not change until after Result() returns. |
+func (d *TaskDecoder) Process(b []byte) bool { |
+ if d.err != nil { |
+ return false |
+ } |
+ var t Task |
+ if err := gob.NewDecoder(bytes.NewReader(b)).Decode(&t); err != nil { |
+ d.err = err |
+ d.result = nil |
+ return false |
+ } |
+ d.result = append(d.result, &t) |
+ return true |
+} |
+ |
+// Result returns all decoded Tasks provided to Process (in arbitrary order), or |
+// any error encountered. |
+func (d *TaskDecoder) Result() ([]*Task, error) { |
+ // Allow TaskDecoder to be used without initialization. |
+ if d.err == nil && d.result == nil { |
+ return []*Task{}, nil |
+ } |
+ return d.result, d.err |
+} |