Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(488)

Unified Diff: build_scheduler/go/db/task.go

Issue 2246933002: Add Task DB implementation using a local BoltDB. (Closed) Base URL: https://skia.googlesource.com/buildbot@taskdb-impl-track
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: build_scheduler/go/db/task.go
diff --git a/build_scheduler/go/db/task.go b/build_scheduler/go/db/task.go
index 09ad700e2f86a7a480f7486da7bdee55980b1f55..46cc6046909715540dc1cc901c8a2acc8cbd6f4b 100644
--- a/build_scheduler/go/db/task.go
+++ b/build_scheduler/go/db/task.go
@@ -56,7 +56,8 @@ type Task struct {
// Created is the creation timestamp.
Created time.Time
- // Id is a generated unique identifier for this Task instance. Must be URL-safe.
+ // Id is a generated unique identifier for this Task instance. Must be
+ // URL-safe.
Id string
// IsolatedOutput is the isolated hash of any outputs produced by this
@@ -149,7 +150,8 @@ func (t *Task) UpdateFromSwarming(s *swarming.SwarmingRpcsTaskRequestMetadata) e
t.Status = TASK_STATUS_PENDING
case SWARMING_STATE_COMPLETED:
if s.TaskResult.Failure {
- // TODO(benjaminwagner): Choose FAILURE or MISHAP depending on ExitCode?
+ // TODO(benjaminwagner): Choose FAILURE or MISHAP depending on
+ // ExitCode?
t.Status = TASK_STATUS_FAILURE
} else {
t.Status = TASK_STATUS_SUCCESS
@@ -189,6 +191,8 @@ func (t *Task) Copy() *Task {
return &rv
}
+// TaskSlice implements sort.Interface. To sort tasks []*Task, use
+// sort.Sort(TaskSlice(tasks)).
type TaskSlice []*Task
func (s TaskSlice) Len() int { return len(s) }
@@ -200,3 +204,85 @@ func (s TaskSlice) Less(i, j int) bool {
func (s TaskSlice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
+
+// TaskEncoder encodes Tasks into bytes via GOB encoding. Not safe for
+// concurrent use.
+// TODO(benjaminwagner): Encode in parallel.
+type TaskEncoder struct {
+ err error
+ tasks []*Task
+ result [][]byte
+}
+
+// Process encodes the Task into a byte slice that will be returned from Next()
+// (in arbitrary order). Returns false if Next is certain to return an error.
+// Caller must ensure t does not change until after the first call to Next().
+// May not be called after calling Next().
+func (e *TaskEncoder) Process(t *Task) bool {
+ if e.err != nil {
+ return false
+ }
+ var buf bytes.Buffer
+ if err := gob.NewEncoder(&buf).Encode(t); err != nil {
+ e.err = err
+ e.tasks = nil
+ e.result = nil
+ return false
+ }
+ e.tasks = append(e.tasks, t)
+ e.result = append(e.result, buf.Bytes())
+ return true
+}
+
+// Next returns one of the Tasks provided to Process (in arbitrary order) and
+// its serialized bytes. If any tasks remain, returns the task, the serialized
+// bytes, nil. If all tasks have been returned, returns nil, nil, nil. If an
+// error is encountered, returns nil, nil, error.
+func (e *TaskEncoder) Next() (*Task, []byte, error) {
+ if e.err != nil {
+ return nil, nil, e.err
+ }
+ if len(e.tasks) == 0 {
+ return nil, nil, nil
+ }
+ t := e.tasks[0]
+ e.tasks = e.tasks[1:]
+ serialized := e.result[0]
+ e.result = e.result[1:]
+ return t, serialized, nil
+}
+
+// TaskDecoder decodes bytes into Tasks via GOB decoding. Not safe for
+// concurrent use.
+// TODO(benjaminwagner): Decode in parallel.
+type TaskDecoder struct {
+ err error
+ result []*Task
+}
+
+// Process decodes the byte slice into a Task and includes it in Result() (in
+// arbitrary order). Returns false if Result is certain to return an error.
+// Caller must ensure b does not change until after Result() returns.
+func (d *TaskDecoder) Process(b []byte) bool {
+ if d.err != nil {
+ return false
+ }
+ var t Task
+ if err := gob.NewDecoder(bytes.NewReader(b)).Decode(&t); err != nil {
+ d.err = err
+ d.result = nil
+ return false
+ }
+ d.result = append(d.result, &t)
+ return true
+}
+
+// Result returns all decoded Tasks provided to Process (in arbitrary order), or
+// any error encountered.
+func (d *TaskDecoder) Result() ([]*Task, error) {
+ // Allow TaskDecoder to be used without initialization.
+ if d.err == nil && d.result == nil {
+ return []*Task{}, nil
+ }
+ return d.result, d.err
+}

Powered by Google App Engine
This is Rietveld 408576698