Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(678)

Side by Side Diff: build_scheduler/go/db/task.go

Issue 2246933002: Add Task DB implementation using a local BoltDB. (Closed) Base URL: https://skia.googlesource.com/buildbot@taskdb-impl-track
Patch Set: Fix bad merge. Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 package db 1 package db
2 2
3 import ( 3 import (
4 "bytes" 4 "bytes"
5 "encoding/gob" 5 "encoding/gob"
6 "fmt" 6 "fmt"
7 "time" 7 "time"
8 8
9 "go.skia.org/infra/go/swarming" 9 "go.skia.org/infra/go/swarming"
10 "go.skia.org/infra/go/util" 10 "go.skia.org/infra/go/util"
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
52 // that can not be executed on Swarming, but can be added to the DB and 52 // that can not be executed on Swarming, but can be added to the DB and
53 // displayed as if it were a real TaskSpec. 53 // displayed as if it were a real TaskSpec.
54 type Task struct { 54 type Task struct {
55 // Commits are the commits which were tested in this Task. The list may 55 // Commits are the commits which were tested in this Task. The list may
56 // change due to backfilling/bisecting. 56 // change due to backfilling/bisecting.
57 Commits []string 57 Commits []string
58 58
59 // Created is the creation timestamp. 59 // Created is the creation timestamp.
60 Created time.Time 60 Created time.Time
61 61
62 » // Id is a generated unique identifier for this Task instance. Must be U RL-safe. 62 » // Id is a generated unique identifier for this Task instance. Must be
63 » // URL-safe.
63 Id string 64 Id string
64 65
65 // IsolatedOutput is the isolated hash of any outputs produced by this 66 // IsolatedOutput is the isolated hash of any outputs produced by this
66 // Task. Filled in when the task is completed. 67 // Task. Filled in when the task is completed.
67 IsolatedOutput string 68 IsolatedOutput string
68 69
69 // Name is a human-friendly descriptive name for this Task. All Tasks 70 // Name is a human-friendly descriptive name for this Task. All Tasks
70 // generated from the same TaskSpec have the same name. 71 // generated from the same TaskSpec have the same name.
71 Name string 72 Name string
72 73
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after
147 148
148 switch s.TaskResult.State { 149 switch s.TaskResult.State {
149 case SWARMING_STATE_BOT_DIED, SWARMING_STATE_CANCELED, SWARMING_STATE_EX PIRED, SWARMING_STATE_TIMED_OUT: 150 case SWARMING_STATE_BOT_DIED, SWARMING_STATE_CANCELED, SWARMING_STATE_EX PIRED, SWARMING_STATE_TIMED_OUT:
150 t.Status = TASK_STATUS_MISHAP 151 t.Status = TASK_STATUS_MISHAP
151 case SWARMING_STATE_PENDING: 152 case SWARMING_STATE_PENDING:
152 t.Status = TASK_STATUS_PENDING 153 t.Status = TASK_STATUS_PENDING
153 case SWARMING_STATE_RUNNING: 154 case SWARMING_STATE_RUNNING:
154 t.Status = TASK_STATUS_RUNNING 155 t.Status = TASK_STATUS_RUNNING
155 case SWARMING_STATE_COMPLETED: 156 case SWARMING_STATE_COMPLETED:
156 if s.TaskResult.Failure { 157 if s.TaskResult.Failure {
157 » » » // TODO(benjaminwagner): Choose FAILURE or MISHAP depend ing on ExitCode? 158 » » » // TODO(benjaminwagner): Choose FAILURE or MISHAP depend ing on
159 » » » // ExitCode?
158 t.Status = TASK_STATUS_FAILURE 160 t.Status = TASK_STATUS_FAILURE
159 } else { 161 } else {
160 t.Status = TASK_STATUS_SUCCESS 162 t.Status = TASK_STATUS_SUCCESS
161 } 163 }
162 default: 164 default:
163 return fmt.Errorf("Unknown Swarming State %v in %v", s.TaskResul t.State, s) 165 return fmt.Errorf("Unknown Swarming State %v in %v", s.TaskResul t.State, s)
164 } 166 }
165 167
166 if s.TaskResult.OutputsRef == nil { 168 if s.TaskResult.OutputsRef == nil {
167 t.IsolatedOutput = "" 169 t.IsolatedOutput = ""
(...skipping 19 matching lines...) Expand all
187 if err := gob.NewEncoder(&buf).Encode(t); err != nil { 189 if err := gob.NewEncoder(&buf).Encode(t); err != nil {
188 glog.Fatal(err) 190 glog.Fatal(err)
189 } 191 }
190 var rv Task 192 var rv Task
191 if err := gob.NewDecoder(&buf).Decode(&rv); err != nil { 193 if err := gob.NewDecoder(&buf).Decode(&rv); err != nil {
192 glog.Fatal(err) 194 glog.Fatal(err)
193 } 195 }
194 return &rv 196 return &rv
195 } 197 }
196 198
199 // TaskSlice implements sort.Interface. To sort tasks []*Task, use
200 // sort.Sort(TaskSlice(tasks)).
197 type TaskSlice []*Task 201 type TaskSlice []*Task
198 202
199 func (s TaskSlice) Len() int { return len(s) } 203 func (s TaskSlice) Len() int { return len(s) }
200 204
201 func (s TaskSlice) Less(i, j int) bool { 205 func (s TaskSlice) Less(i, j int) bool {
202 return s[i].Created.Before(s[j].Created) 206 return s[i].Created.Before(s[j].Created)
203 } 207 }
204 208
205 func (s TaskSlice) Swap(i, j int) { 209 func (s TaskSlice) Swap(i, j int) {
206 s[i], s[j] = s[j], s[i] 210 s[i], s[j] = s[j], s[i]
207 } 211 }
212
213 // TaskEncoder encodes Tasks into bytes via GOB encoding. Not safe for
214 // concurrent use.
215 // TODO(benjaminwagner): Encode in parallel.
216 type TaskEncoder struct {
217 err error
218 tasks []*Task
219 result [][]byte
220 }
221
222 // Process encodes the Task into a byte slice that will be returned from Next()
223 // (in arbitrary order). Returns false if Next is certain to return an error.
224 // Caller must ensure t does not change until after the first call to Next().
225 // May not be called after calling Next().
226 func (e *TaskEncoder) Process(t *Task) bool {
227 if e.err != nil {
228 return false
229 }
230 var buf bytes.Buffer
231 if err := gob.NewEncoder(&buf).Encode(t); err != nil {
232 e.err = err
233 e.tasks = nil
234 e.result = nil
235 return false
236 }
237 e.tasks = append(e.tasks, t)
238 e.result = append(e.result, buf.Bytes())
239 return true
240 }
241
242 // Next returns one of the Tasks provided to Process (in arbitrary order) and
243 // its serialized bytes. If any tasks remain, returns the task, the serialized
244 // bytes, nil. If all tasks have been returned, returns nil, nil, nil. If an
245 // error is encountered, returns nil, nil, error.
246 func (e *TaskEncoder) Next() (*Task, []byte, error) {
247 if e.err != nil {
248 return nil, nil, e.err
249 }
250 if len(e.tasks) == 0 {
251 return nil, nil, nil
252 }
253 t := e.tasks[0]
254 e.tasks = e.tasks[1:]
255 serialized := e.result[0]
256 e.result = e.result[1:]
257 return t, serialized, nil
258 }
259
260 // TaskDecoder decodes bytes into Tasks via GOB decoding. Not safe for
261 // concurrent use.
262 // TODO(benjaminwagner): Decode in parallel.
263 type TaskDecoder struct {
264 err error
265 result []*Task
266 }
267
268 // Process decodes the byte slice into a Task and includes it in Result() (in
269 // arbitrary order). Returns false if Result is certain to return an error.
270 // Caller must ensure b does not change until after Result() returns.
271 func (d *TaskDecoder) Process(b []byte) bool {
272 if d.err != nil {
273 return false
274 }
275 var t Task
276 if err := gob.NewDecoder(bytes.NewReader(b)).Decode(&t); err != nil {
277 d.err = err
278 d.result = nil
279 return false
280 }
281 d.result = append(d.result, &t)
282 return true
283 }
284
285 // Result returns all decoded Tasks provided to Process (in arbitrary order), or
286 // any error encountered.
287 func (d *TaskDecoder) Result() ([]*Task, error) {
288 // Allow TaskDecoder to be used without initialization.
289 if d.err == nil && d.result == nil {
290 return []*Task{}, nil
291 }
292 return d.result, d.err
293 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698