| OLD | NEW |
| (Empty) |
| 1 package db | |
| 2 | |
| 3 import ( | |
| 4 "bytes" | |
| 5 "encoding/gob" | |
| 6 "errors" | |
| 7 "fmt" | |
| 8 "reflect" | |
| 9 "sync" | |
| 10 "time" | |
| 11 | |
| 12 "go.skia.org/infra/go/swarming" | |
| 13 "go.skia.org/infra/go/util" | |
| 14 | |
| 15 swarming_api "github.com/luci/luci-go/common/api/swarming/swarming/v1" | |
| 16 "github.com/skia-dev/glog" | |
| 17 ) | |
| 18 | |
| 19 const ( | |
| 20 // Swarming task states. | |
| 21 SWARMING_STATE_BOT_DIED = "BOT_DIED" | |
| 22 SWARMING_STATE_CANCELED = "CANCELED" | |
| 23 SWARMING_STATE_COMPLETED = "COMPLETED" | |
| 24 SWARMING_STATE_EXPIRED = "EXPIRED" | |
| 25 SWARMING_STATE_PENDING = "PENDING" | |
| 26 SWARMING_STATE_RUNNING = "RUNNING" | |
| 27 SWARMING_STATE_TIMED_OUT = "TIMED_OUT" | |
| 28 | |
| 29 // Swarming tags added by Build Scheduler. | |
| 30 SWARMING_TAG_ALLOW_MILO = "allow_milo" | |
| 31 SWARMING_TAG_ID = "scheduler_id" | |
| 32 SWARMING_TAG_NAME = "name" | |
| 33 SWARMING_TAG_PRIORITY = "priority" | |
| 34 SWARMING_TAG_REPO = "repo" | |
| 35 SWARMING_TAG_REVISION = "revision" | |
| 36 ) | |
| 37 | |
| 38 type TaskStatus string | |
| 39 | |
| 40 const ( | |
| 41 // TASK_STATUS_PENDING indicates the task has not started. It is the emp
ty | |
| 42 // string so that it is the zero value of TaskStatus. | |
| 43 TASK_STATUS_PENDING TaskStatus = "" | |
| 44 // TASK_STATUS_RUNNING indicates the task is in progress. | |
| 45 TASK_STATUS_RUNNING TaskStatus = "RUNNING" | |
| 46 // TASK_STATUS_SUCCESS indicates the task completed successfully. | |
| 47 TASK_STATUS_SUCCESS TaskStatus = "SUCCESS" | |
| 48 // TASK_STATUS_FAILURE indicates the task completed with failures. | |
| 49 TASK_STATUS_FAILURE TaskStatus = "FAILURE" | |
| 50 // TASK_STATUS_MISHAP indicates the task exited early with an error, die
d | |
| 51 // while in progress, was manually canceled, expired while waiting on th
e | |
| 52 // queue, or timed out before completing. | |
| 53 TASK_STATUS_MISHAP TaskStatus = "MISHAP" | |
| 54 ) | |
| 55 | |
| 56 // Task describes a Swarming task generated from a TaskSpec, or a "fake" task | |
| 57 // that can not be executed on Swarming, but can be added to the DB and | |
| 58 // displayed as if it were a real TaskSpec. | |
| 59 // | |
| 60 // Task is stored as a GOB, so changes must maintain backwards compatibility. | |
| 61 // See gob package documentation for details, but generally: | |
| 62 // - Ensure new fields can be initialized with their zero value. | |
| 63 // - Do not change the type of any existing field. | |
| 64 // - Leave removed fields commented out to ensure the field name is not | |
| 65 // reused. | |
| 66 type Task struct { | |
| 67 // Commits are the commits which were tested in this Task. The list may | |
| 68 // change due to backfilling/bisecting. | |
| 69 Commits []string | |
| 70 | |
| 71 // Created is the creation timestamp. | |
| 72 Created time.Time | |
| 73 | |
| 74 // DbModified is the time of the last successful call to DB.PutTask/s fo
r this | |
| 75 // Task, or zero if the task is new. It is not related to the ModifiedTs
time | |
| 76 // of the associated Swarming task. | |
| 77 DbModified time.Time | |
| 78 | |
| 79 // Finished is the time the task stopped running or expired from the que
ue, or | |
| 80 // zero if the task is pending or running. | |
| 81 Finished time.Time | |
| 82 | |
| 83 // Id is a generated unique identifier for this Task instance. Must be | |
| 84 // URL-safe. | |
| 85 Id string | |
| 86 | |
| 87 // IsolatedOutput is the isolated hash of any outputs produced by this T
ask. | |
| 88 // Filled in when the task is completed. We assume the isolate server is | |
| 89 // isolate.ISOLATE_SERVER_URL and the namespace is isolate.DEFAULT_NAMES
PACE. | |
| 90 // This field will not be set if the Task does not correspond to a Swarm
ing | |
| 91 // task. | |
| 92 IsolatedOutput string | |
| 93 | |
| 94 // Name is a human-friendly descriptive name for this Task. All Tasks | |
| 95 // generated from the same TaskSpec have the same name. | |
| 96 Name string | |
| 97 | |
| 98 // Repo is the repository of the commit at which this task ran. | |
| 99 Repo string | |
| 100 | |
| 101 // Revision is the commit at which this task ran. | |
| 102 Revision string | |
| 103 | |
| 104 // Started is the time the task started running, or zero if the task is | |
| 105 // pending, or the same as Finished if the task never ran. | |
| 106 Started time.Time | |
| 107 | |
| 108 // Status is the current task status, default TASK_STATUS_PENDING. | |
| 109 Status TaskStatus | |
| 110 | |
| 111 // SwarmingTaskId is the Swarming task ID. This field will not be set if
the | |
| 112 // Task does not correspond to a Swarming task. | |
| 113 SwarmingTaskId string | |
| 114 } | |
| 115 | |
| 116 // UpdateFromSwarming sets or initializes t from data in s. If any changes were | |
| 117 // made to t, returns true. | |
| 118 // | |
| 119 // If empty, sets t.Id, t.Name, t.Repo, and t.Revision from s's tags named | |
| 120 // SWARMING_TAG_ID, SWARMING_TAG_NAME, SWARMING_TAG_REPO, and | |
| 121 // SWARMING_TAG_REVISION, sets t.Created from s.CreatedTs, and sets | |
| 122 // t.SwarmingTaskId from s.TaskId. If these fields are non-empty, returns an | |
| 123 // error if they do not match. | |
| 124 // | |
| 125 // Always sets t.Status, t.Started, t.Finished, and t.IsolatedOutput based on s. | |
| 126 func (orig *Task) UpdateFromSwarming(s *swarming_api.SwarmingRpcsTaskResult) (bo
ol, error) { | |
| 127 if s == nil { | |
| 128 return false, fmt.Errorf("Missing TaskResult. %v", s) | |
| 129 } | |
| 130 tags, err := swarming.TagValues(s) | |
| 131 if err != nil { | |
| 132 return false, err | |
| 133 } | |
| 134 | |
| 135 copy := orig.Copy() | |
| 136 if !reflect.DeepEqual(orig, copy) { | |
| 137 glog.Fatalf("Task.Copy is broken; original and copy differ:\n%#v
\n%#v", orig, copy) | |
| 138 } | |
| 139 | |
| 140 // "Identity" fields stored in tags. | |
| 141 checkOrSetFromTag := func(tagName string, field *string, fieldName strin
g) error { | |
| 142 if tagValue, ok := tags[tagName]; ok { | |
| 143 if *field == "" { | |
| 144 *field = tagValue | |
| 145 } else if *field != tagValue { | |
| 146 return fmt.Errorf("%s does not match for task %s
. Was %s, now %s. %v %v", fieldName, orig.Id, *field, tagValue, orig, s) | |
| 147 } | |
| 148 } | |
| 149 return nil | |
| 150 } | |
| 151 if err := checkOrSetFromTag(SWARMING_TAG_ID, ©.Id, "Id"); err != nil
{ | |
| 152 return false, err | |
| 153 } | |
| 154 if err := checkOrSetFromTag(SWARMING_TAG_NAME, ©.Name, "Name"); err
!= nil { | |
| 155 return false, err | |
| 156 } | |
| 157 if err := checkOrSetFromTag(SWARMING_TAG_REPO, ©.Repo, "Repo"); err
!= nil { | |
| 158 return false, err | |
| 159 } | |
| 160 if err := checkOrSetFromTag(SWARMING_TAG_REVISION, ©.Revision, "Revi
sion"); err != nil { | |
| 161 return false, err | |
| 162 } | |
| 163 | |
| 164 // CreatedTs should always be present. | |
| 165 if sCreated, err := swarming.ParseTimestamp(s.CreatedTs); err == nil { | |
| 166 if util.TimeIsZero(copy.Created) { | |
| 167 copy.Created = sCreated | |
| 168 } else if copy.Created != sCreated { | |
| 169 return false, fmt.Errorf("Creation time has changed for
task %s. Was %s, now %s. %v", orig.Id, orig.Created, sCreated, orig) | |
| 170 } | |
| 171 } else { | |
| 172 return false, fmt.Errorf("Unable to parse task creation time for
task %s. %v %v", orig.Id, err, orig) | |
| 173 } | |
| 174 | |
| 175 // Swarming TaskId. | |
| 176 if copy.SwarmingTaskId == "" { | |
| 177 copy.SwarmingTaskId = s.TaskId | |
| 178 } else if copy.SwarmingTaskId != s.TaskId { | |
| 179 return false, fmt.Errorf("Swarming task ID does not match for ta
sk %s. Was %s, now %s. %v", orig.Id, orig.SwarmingTaskId, s.TaskId, orig) | |
| 180 } | |
| 181 | |
| 182 // Status. | |
| 183 switch s.State { | |
| 184 case SWARMING_STATE_BOT_DIED, SWARMING_STATE_CANCELED, SWARMING_STATE_EX
PIRED, SWARMING_STATE_TIMED_OUT: | |
| 185 copy.Status = TASK_STATUS_MISHAP | |
| 186 case SWARMING_STATE_PENDING: | |
| 187 copy.Status = TASK_STATUS_PENDING | |
| 188 case SWARMING_STATE_RUNNING: | |
| 189 copy.Status = TASK_STATUS_RUNNING | |
| 190 case SWARMING_STATE_COMPLETED: | |
| 191 if s.Failure { | |
| 192 // TODO(benjaminwagner): Choose FAILURE or MISHAP depend
ing on ExitCode? | |
| 193 copy.Status = TASK_STATUS_FAILURE | |
| 194 } else { | |
| 195 copy.Status = TASK_STATUS_SUCCESS | |
| 196 } | |
| 197 default: | |
| 198 return false, fmt.Errorf("Unknown Swarming State %v in %v", s.St
ate, s) | |
| 199 } | |
| 200 | |
| 201 // Isolated output. | |
| 202 if s.OutputsRef == nil { | |
| 203 copy.IsolatedOutput = "" | |
| 204 } else { | |
| 205 copy.IsolatedOutput = s.OutputsRef.Isolated | |
| 206 } | |
| 207 | |
| 208 // Timestamps. | |
| 209 maybeUpdateTime := func(newTimeStr string, field *time.Time, name string
) error { | |
| 210 if newTimeStr == "" { | |
| 211 return nil | |
| 212 } | |
| 213 newTime, err := swarming.ParseTimestamp(newTimeStr) | |
| 214 if err != nil { | |
| 215 return fmt.Errorf("Unable to parse %s for task %s. %v %v
", name, orig.Id, err, s) | |
| 216 } | |
| 217 *field = newTime | |
| 218 return nil | |
| 219 } | |
| 220 | |
| 221 if err := maybeUpdateTime(s.StartedTs, ©.Started, "StartedTs"); err
!= nil { | |
| 222 return false, err | |
| 223 } | |
| 224 if err := maybeUpdateTime(s.CompletedTs, ©.Finished, "CompletedTs");
err != nil { | |
| 225 return false, err | |
| 226 } | |
| 227 if s.CompletedTs == "" && copy.Status == TASK_STATUS_MISHAP { | |
| 228 if err := maybeUpdateTime(s.AbandonedTs, ©.Finished, "Abando
nedTs"); err != nil { | |
| 229 return false, err | |
| 230 } | |
| 231 } | |
| 232 if copy.Done() && util.TimeIsZero(copy.Started) { | |
| 233 copy.Started = copy.Finished | |
| 234 } | |
| 235 | |
| 236 // TODO(benjaminwagner): SwarmingRpcsTaskResult has a ModifiedTs field t
hat we | |
| 237 // could use to detect modifications. Unfortunately, it seems that while
the | |
| 238 // task is running, ModifiedTs gets updated every 30 seconds, regardless
of | |
| 239 // whether any other data actually changed. Maybe we could still use it
for | |
| 240 // pending or completed tasks. | |
| 241 if !reflect.DeepEqual(orig, copy) { | |
| 242 *orig = *copy | |
| 243 return true, nil | |
| 244 } | |
| 245 return false, nil | |
| 246 } | |
| 247 | |
| 248 var errNotModified = errors.New("Task not modified") | |
| 249 | |
| 250 // UpdateDBFromSwarmingTask updates a task in db from data in s. | |
| 251 func UpdateDBFromSwarmingTask(db DB, s *swarming_api.SwarmingRpcsTaskResult) err
or { | |
| 252 id, err := swarming.GetTagValue(s, SWARMING_TAG_ID) | |
| 253 if err != nil { | |
| 254 return err | |
| 255 } | |
| 256 _, err = UpdateTaskWithRetries(db, id, func(task *Task) error { | |
| 257 modified, err := task.UpdateFromSwarming(s) | |
| 258 if err != nil { | |
| 259 return err | |
| 260 } | |
| 261 if !modified { | |
| 262 return errNotModified | |
| 263 } | |
| 264 return nil | |
| 265 }) | |
| 266 if err == errNotModified { | |
| 267 return nil | |
| 268 } else { | |
| 269 return err | |
| 270 } | |
| 271 } | |
| 272 | |
| 273 func (t *Task) Done() bool { | |
| 274 return t.Status != TASK_STATUS_PENDING && t.Status != TASK_STATUS_RUNNIN
G | |
| 275 } | |
| 276 | |
| 277 func (t *Task) Success() bool { | |
| 278 return t.Status == TASK_STATUS_SUCCESS | |
| 279 } | |
| 280 | |
| 281 func (t *Task) Copy() *Task { | |
| 282 var commits []string | |
| 283 if t.Commits != nil { | |
| 284 commits = make([]string, len(t.Commits)) | |
| 285 copy(commits, t.Commits) | |
| 286 } | |
| 287 return &Task{ | |
| 288 Commits: commits, | |
| 289 Created: t.Created, | |
| 290 DbModified: t.DbModified, | |
| 291 Finished: t.Finished, | |
| 292 Id: t.Id, | |
| 293 IsolatedOutput: t.IsolatedOutput, | |
| 294 Name: t.Name, | |
| 295 Repo: t.Repo, | |
| 296 Revision: t.Revision, | |
| 297 Started: t.Started, | |
| 298 Status: t.Status, | |
| 299 SwarmingTaskId: t.SwarmingTaskId, | |
| 300 } | |
| 301 } | |
| 302 | |
| 303 // TaskSlice implements sort.Interface. To sort tasks []*Task, use | |
| 304 // sort.Sort(TaskSlice(tasks)). | |
| 305 type TaskSlice []*Task | |
| 306 | |
| 307 func (s TaskSlice) Len() int { return len(s) } | |
| 308 | |
| 309 func (s TaskSlice) Less(i, j int) bool { | |
| 310 return s[i].Created.Before(s[j].Created) | |
| 311 } | |
| 312 | |
| 313 func (s TaskSlice) Swap(i, j int) { | |
| 314 s[i], s[j] = s[j], s[i] | |
| 315 } | |
| 316 | |
| 317 // TaskEncoder encodes Tasks into bytes via GOB encoding. Not safe for | |
| 318 // concurrent use. | |
| 319 // TODO(benjaminwagner): Encode in parallel. | |
| 320 type TaskEncoder struct { | |
| 321 err error | |
| 322 tasks []*Task | |
| 323 result [][]byte | |
| 324 } | |
| 325 | |
| 326 // Process encodes the Task into a byte slice that will be returned from Next() | |
| 327 // (in arbitrary order). Returns false if Next is certain to return an error. | |
| 328 // Caller must ensure t does not change until after the first call to Next(). | |
| 329 // May not be called after calling Next(). | |
| 330 func (e *TaskEncoder) Process(t *Task) bool { | |
| 331 if e.err != nil { | |
| 332 return false | |
| 333 } | |
| 334 var buf bytes.Buffer | |
| 335 if err := gob.NewEncoder(&buf).Encode(t); err != nil { | |
| 336 e.err = err | |
| 337 e.tasks = nil | |
| 338 e.result = nil | |
| 339 return false | |
| 340 } | |
| 341 e.tasks = append(e.tasks, t) | |
| 342 e.result = append(e.result, buf.Bytes()) | |
| 343 return true | |
| 344 } | |
| 345 | |
| 346 // Next returns one of the Tasks provided to Process (in arbitrary order) and | |
| 347 // its serialized bytes. If any tasks remain, returns the task, the serialized | |
| 348 // bytes, nil. If all tasks have been returned, returns nil, nil, nil. If an | |
| 349 // error is encountered, returns nil, nil, error. | |
| 350 func (e *TaskEncoder) Next() (*Task, []byte, error) { | |
| 351 if e.err != nil { | |
| 352 return nil, nil, e.err | |
| 353 } | |
| 354 if len(e.tasks) == 0 { | |
| 355 return nil, nil, nil | |
| 356 } | |
| 357 t := e.tasks[0] | |
| 358 e.tasks = e.tasks[1:] | |
| 359 serialized := e.result[0] | |
| 360 e.result = e.result[1:] | |
| 361 return t, serialized, nil | |
| 362 } | |
| 363 | |
| 364 // TaskDecoder decodes bytes into Tasks via GOB decoding. Not safe for | |
| 365 // concurrent use. | |
| 366 type TaskDecoder struct { | |
| 367 // input contains the incoming byte slices. Process() sends on this chan
nel, | |
| 368 // decode() receives from it, and Result() closes it. | |
| 369 input chan []byte | |
| 370 // output contains decoded Tasks. decode() sends on this channel, collec
t() | |
| 371 // receives from it, and run() closes it when all decode() goroutines ha
ve | |
| 372 // finished. | |
| 373 output chan *Task | |
| 374 // result contains the return value of Result(). collect() sends a singl
e | |
| 375 // value on this channel and closes it. Result() receives from it. | |
| 376 result chan []*Task | |
| 377 // errors contains the first error from any goroutine. It's a channel in
case | |
| 378 // multiple goroutines experience an error at the same time. | |
| 379 errors chan error | |
| 380 } | |
| 381 | |
| 382 const kNumDecoderGoroutines = 10 | |
| 383 | |
| 384 // init initializes d if it has not been initialized. May not be called concurre
ntly. | |
| 385 func (d *TaskDecoder) init() { | |
| 386 if d.input == nil { | |
| 387 d.input = make(chan []byte, kNumDecoderGoroutines*2) | |
| 388 d.output = make(chan *Task, kNumDecoderGoroutines) | |
| 389 d.result = make(chan []*Task, 1) | |
| 390 d.errors = make(chan error, kNumDecoderGoroutines) | |
| 391 go d.run() | |
| 392 go d.collect() | |
| 393 } | |
| 394 } | |
| 395 | |
| 396 // run starts the decode goroutines and closes d.output when they finish. | |
| 397 func (d *TaskDecoder) run() { | |
| 398 // Start decoders. | |
| 399 wg := sync.WaitGroup{} | |
| 400 for i := 0; i < kNumDecoderGoroutines; i++ { | |
| 401 wg.Add(1) | |
| 402 go d.decode(&wg) | |
| 403 } | |
| 404 // Wait for decoders to exit. | |
| 405 wg.Wait() | |
| 406 // Drain d.input in the case that errors were encountered, to avoid dead
lock. | |
| 407 for _ = range d.input { | |
| 408 } | |
| 409 close(d.output) | |
| 410 } | |
| 411 | |
| 412 // decode receives from d.input and sends to d.output until d.input is closed or | |
| 413 // d.errors is non-empty. Decrements wg when done. | |
| 414 func (d *TaskDecoder) decode(wg *sync.WaitGroup) { | |
| 415 for b := range d.input { | |
| 416 var t Task | |
| 417 if err := gob.NewDecoder(bytes.NewReader(b)).Decode(&t); err !=
nil { | |
| 418 d.errors <- err | |
| 419 break | |
| 420 } | |
| 421 d.output <- &t | |
| 422 if len(d.errors) > 0 { | |
| 423 break | |
| 424 } | |
| 425 } | |
| 426 wg.Done() | |
| 427 } | |
| 428 | |
| 429 // collect receives from d.output until it is closed, then sends on d.result. | |
| 430 func (d *TaskDecoder) collect() { | |
| 431 result := []*Task{} | |
| 432 for t := range d.output { | |
| 433 result = append(result, t) | |
| 434 } | |
| 435 d.result <- result | |
| 436 close(d.result) | |
| 437 } | |
| 438 | |
| 439 // Process decodes the byte slice into a Task and includes it in Result() (in | |
| 440 // arbitrary order). Returns false if Result is certain to return an error. | |
| 441 // Caller must ensure b does not change until after Result() returns. | |
| 442 func (d *TaskDecoder) Process(b []byte) bool { | |
| 443 d.init() | |
| 444 d.input <- b | |
| 445 return len(d.errors) == 0 | |
| 446 } | |
| 447 | |
| 448 // Result returns all decoded Tasks provided to Process (in arbitrary order), or | |
| 449 // any error encountered. | |
| 450 func (d *TaskDecoder) Result() ([]*Task, error) { | |
| 451 // Allow TaskDecoder to be used without initialization. | |
| 452 if d.result == nil { | |
| 453 return []*Task{}, nil | |
| 454 } | |
| 455 close(d.input) | |
| 456 select { | |
| 457 case err := <-d.errors: | |
| 458 return nil, err | |
| 459 case result := <-d.result: | |
| 460 return result, nil | |
| 461 } | |
| 462 } | |
| 463 | |
| 464 // TagsForTask returns the tags which should be set for a Task. | |
| 465 func TagsForTask(name, id string, priority float64, repo, revision string, dimen
sions map[string]string) []string { | |
| 466 tags := map[string]string{ | |
| 467 SWARMING_TAG_ALLOW_MILO: "1", | |
| 468 SWARMING_TAG_NAME: name, | |
| 469 SWARMING_TAG_ID: id, | |
| 470 SWARMING_TAG_PRIORITY: fmt.Sprintf("%f", priority), | |
| 471 SWARMING_TAG_REPO: repo, | |
| 472 SWARMING_TAG_REVISION: revision, | |
| 473 } | |
| 474 | |
| 475 for k, v := range dimensions { | |
| 476 if _, ok := tags[k]; !ok { | |
| 477 tags[k] = v | |
| 478 } else { | |
| 479 glog.Warningf("Duplicate dimension/tag %q.", k) | |
| 480 } | |
| 481 } | |
| 482 | |
| 483 tagsList := make([]string, 0, len(tags)) | |
| 484 for k, v := range tags { | |
| 485 tagsList = append(tagsList, fmt.Sprintf("%s:%s", k, v)) | |
| 486 } | |
| 487 return tagsList | |
| 488 } | |
| OLD | NEW |