| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 // Package swarming implements tasks that run Swarming jobs. | 5 // Package swarming implements tasks that run Swarming jobs. |
| 6 package swarming | 6 package swarming |
| 7 | 7 |
| 8 import ( | 8 import ( |
| 9 "encoding/json" | 9 "encoding/json" |
| 10 "fmt" | 10 "fmt" |
| 11 "net/url" | 11 "net/url" |
| 12 "time" | 12 "time" |
| 13 | 13 |
| 14 "github.com/golang/protobuf/proto" | 14 "github.com/golang/protobuf/proto" |
| 15 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
| 16 "google.golang.org/api/pubsub/v1" | 16 "google.golang.org/api/pubsub/v1" |
| 17 | 17 |
| 18 "github.com/luci/gae/service/info" | 18 "github.com/luci/gae/service/info" |
| 19 "github.com/luci/luci-go/common/api/swarming/swarming/v1" | 19 "github.com/luci/luci-go/common/api/swarming/swarming/v1" |
| 20 "github.com/luci/luci-go/common/errors" | 20 "github.com/luci/luci-go/common/errors" |
| 21 "github.com/luci/luci-go/common/retry" |
| 21 "github.com/luci/luci-go/scheduler/appengine/messages" | 22 "github.com/luci/luci-go/scheduler/appengine/messages" |
| 22 "github.com/luci/luci-go/scheduler/appengine/task" | 23 "github.com/luci/luci-go/scheduler/appengine/task" |
| 23 "github.com/luci/luci-go/scheduler/appengine/task/utils" | 24 "github.com/luci/luci-go/scheduler/appengine/task/utils" |
| 24 ) | 25 ) |
| 25 | 26 |
| 26 const ( | 27 const ( |
| 27 statusCheckTimerName = "check-swarming-task-status" | 28 statusCheckTimerName = "check-swarming-task-status" |
| 28 statusCheckTimerInterval = time.Minute | 29 statusCheckTimerInterval = time.Minute |
| 29 ) | 30 ) |
| 30 | 31 |
| (...skipping 318 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 349 // invocation status based on it. | 350 // invocation status based on it. |
| 350 // | 351 // |
| 351 // Called on PubSub notifications and also periodically by timer. | 352 // Called on PubSub notifications and also periodically by timer. |
| 352 func (m TaskManager) checkTaskStatus(c context.Context, ctl task.Controller) err
or { | 353 func (m TaskManager) checkTaskStatus(c context.Context, ctl task.Controller) err
or { |
| 353 switch status := ctl.State().Status; { | 354 switch status := ctl.State().Status; { |
| 354 // This can happen if Swarming manages to send PubSub message before | 355 // This can happen if Swarming manages to send PubSub message before |
| 355 // LaunchTask finishes. Do not touch State or DebugLog to avoid collisio
n with | 356 // LaunchTask finishes. Do not touch State or DebugLog to avoid collisio
n with |
| 356 // still running LaunchTask when saving the invocation, it will only mak
e the | 357 // still running LaunchTask when saving the invocation, it will only mak
e the |
| 357 // matters worse. | 358 // matters worse. |
| 358 case status == task.StatusStarting: | 359 case status == task.StatusStarting: |
| 359 » » return errors.WrapTransient(errors.New("invocation is still star
ting, try again later")) | 360 » » return errors.New("invocation is still starting, try again later
", retry.Tag) |
| 360 case status != task.StatusRunning: | 361 case status != task.StatusRunning: |
| 361 return fmt.Errorf("unexpected invocation status %q, expecting %q
", status, task.StatusRunning) | 362 return fmt.Errorf("unexpected invocation status %q, expecting %q
", status, task.StatusRunning) |
| 362 } | 363 } |
| 363 | 364 |
| 364 // Grab task ID from the blob generated in LaunchTask. | 365 // Grab task ID from the blob generated in LaunchTask. |
| 365 taskData := taskData{} | 366 taskData := taskData{} |
| 366 if err := json.Unmarshal(ctl.State().TaskData, &taskData); err != nil { | 367 if err := json.Unmarshal(ctl.State().TaskData, &taskData); err != nil { |
| 367 ctl.State().Status = task.StatusFailed | 368 ctl.State().Status = task.StatusFailed |
| 368 return fmt.Errorf("could not parse TaskData - %s", err) | 369 return fmt.Errorf("could not parse TaskData - %s", err) |
| 369 } | 370 } |
| 370 | 371 |
| 371 // Fetch task result from Swarming. | 372 // Fetch task result from Swarming. |
| 372 service, err := m.createSwarmingService(c, ctl) | 373 service, err := m.createSwarmingService(c, ctl) |
| 373 if err != nil { | 374 if err != nil { |
| 374 return err | 375 return err |
| 375 } | 376 } |
| 376 resp, err := service.Task.Result(taskData.SwarmingTaskID).Do() | 377 resp, err := service.Task.Result(taskData.SwarmingTaskID).Do() |
| 377 if err != nil { | 378 if err != nil { |
| 378 ctl.DebugLog("Failed to fetch task results - %s", err) | 379 ctl.DebugLog("Failed to fetch task results - %s", err) |
| 379 err = utils.WrapAPIError(err) | 380 err = utils.WrapAPIError(err) |
| 380 » » if !errors.IsTransient(err) { | 381 » » if !retry.Tag.In(err) { |
| 381 ctl.State().Status = task.StatusFailed | 382 ctl.State().Status = task.StatusFailed |
| 382 } | 383 } |
| 383 return err | 384 return err |
| 384 } | 385 } |
| 385 | 386 |
| 386 blob, err := json.MarshalIndent(resp, "", " ") | 387 blob, err := json.MarshalIndent(resp, "", " ") |
| 387 if err != nil { | 388 if err != nil { |
| 388 return err | 389 return err |
| 389 } | 390 } |
| 390 m.handleTaskResult(c, ctl, resp) | 391 m.handleTaskResult(c, ctl, resp) |
| (...skipping 14 matching lines...) Expand all Loading... |
| 405 r.State, r.Failure, r.InternalFailure) | 406 r.State, r.Failure, r.InternalFailure) |
| 406 switch { | 407 switch { |
| 407 case r.State == "PENDING" || r.State == "RUNNING": | 408 case r.State == "PENDING" || r.State == "RUNNING": |
| 408 return // do nothing | 409 return // do nothing |
| 409 case r.State == "COMPLETED" && !(r.Failure || r.InternalFailure): | 410 case r.State == "COMPLETED" && !(r.Failure || r.InternalFailure): |
| 410 ctl.State().Status = task.StatusSucceeded | 411 ctl.State().Status = task.StatusSucceeded |
| 411 default: | 412 default: |
| 412 ctl.State().Status = task.StatusFailed | 413 ctl.State().Status = task.StatusFailed |
| 413 } | 414 } |
| 414 } | 415 } |
| OLD | NEW |