Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(21)

Unified Diff: milo/appengine/swarming/build.go

Issue 2675493003: milo: Use service interface for swarming. (Closed)
Patch Set: Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: milo/appengine/swarming/build.go
diff --git a/milo/appengine/swarming/build.go b/milo/appengine/swarming/build.go
index be8b356f72ba9ab267cf81f2e6bbac006b725c3d..7c41c8d89694e610ea5782c0bd661f9d40810223 100644
--- a/milo/appengine/swarming/build.go
+++ b/milo/appengine/swarming/build.go
@@ -6,22 +6,20 @@ package swarming
import (
"bytes"
- "encoding/json"
"fmt"
- "io/ioutil"
"net/http"
"net/url"
- "path/filepath"
"strings"
- "sync"
"time"
"golang.org/x/net/context"
swarming "github.com/luci/luci-go/common/api/swarming/swarming/v1"
+ "github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/proto/google"
miloProto "github.com/luci/luci-go/common/proto/milo"
+ "github.com/luci/luci-go/common/sync/parallel"
"github.com/luci/luci-go/logdog/client/annotee"
"github.com/luci/luci-go/logdog/common/types"
"github.com/luci/luci-go/milo/api/resp"
@@ -29,6 +27,10 @@ import (
"github.com/luci/luci-go/server/auth"
)
+// errNotMiloJob is returned if a Swarming task is fetched that does not self-
+// identify as a Milo job.
+var errNotMiloJob = errors.New("No a Milo Job")
hinoka 2017/02/02 22:36:51 No -> Not
dnj 2017/02/03 02:22:03 Done.
+
// SwarmingTimeLayout is time layout used by swarming.
const SwarmingTimeLayout = "2006-01-02T15:04:05.999999999"
@@ -64,93 +66,103 @@ func getSwarmingClient(c context.Context, server string) (*swarming.Service, err
return sc, nil
}
-func getDebugTaskOutput(taskID string) (string, error) {
- // Read the debug file instead.
+// swarmingService is an interface that fetches data from Swarming.
+//
+// In production, this is fetched from a Swarming server. For testing, this can
+// be replaced with a mock.
+type swarmingService interface {
+ getHost() string
+ getSwarmingResult(c context.Context, taskID string) (*swarming.SwarmingRpcsTaskResult, error)
+ getTaskOutput(c context.Context, taskID string) (string, error)
+}
- // ../swarming below assumes that
- // - this code is not executed by tests outside of this dir
- // - this dir is a sibling of frontend dir
- logFilename := filepath.Join("..", "swarming", "testdata", taskID)
- b, err := ioutil.ReadFile(logFilename)
- if err != nil {
- return "", err
- }
- return string(b), nil
+type prodSwarmingService struct {
+ host string
+ client *swarming.Service
}
-func getTaskOutput(sc *swarming.Service, taskID string) (string, error) {
- res, err := sc.Task.Stdout(taskID).Do()
+func newProdService(c context.Context, server string) (*prodSwarmingService, error) {
+ client, err := getSwarmingClient(c, server)
if err != nil {
- return "", err
+ return nil, err
}
- return res.Output, nil
+ return &prodSwarmingService{
+ host: server,
+ client: client,
+ }, nil
}
-func getDebugSwarmingResult(
- taskID string) (*swarming.SwarmingRpcsTaskResult, error) {
+func (svc *prodSwarmingService) getHost() string { return svc.host }
+
+func (svc *prodSwarmingService) getSwarmingResult(c context.Context, taskID string) (*swarming.SwarmingRpcsTaskResult, error) {
+ return svc.client.Task.Result(taskID).Context(c).Do()
+}
- // ../swarming below assumes that
- // - this code is not executed by tests outside of this dir
- // - this dir is a sibling of frontend dir
- logFilename := filepath.Join("..", "swarming", "testdata", taskID)
- swarmFilename := fmt.Sprintf("%s.swarm", logFilename)
- s, err := ioutil.ReadFile(swarmFilename)
+func (svc *prodSwarmingService) getTaskOutput(c context.Context, taskID string) (string, error) {
+ stdout, err := svc.client.Task.Stdout(taskID).Context(c).Do()
if err != nil {
- return nil, err
- }
- sr := &swarming.SwarmingRpcsTaskResult{}
- if err := json.Unmarshal(s, sr); err != nil {
- return nil, err
+ return "", err
}
- return sr, nil
+ return stdout.Output, nil
}
-func getSwarming(c context.Context, server string, taskID string) (
- *swarming.SwarmingRpcsTaskResult, string, error) {
+type swarmingFetch struct {
hinoka 2017/02/03 01:02:23 Can this just be a function? Not a struct? There
dnj 2017/02/03 01:04:56 There will be (see the final CL in the chain).
+ fetchRes bool
+ res *swarming.SwarmingRpcsTaskResult
- var log string
- var sr *swarming.SwarmingRpcsTaskResult
- var errLog, errRes error
+ fetchLog bool
+ log string
+ logErr error
+}
- // Detour: Return debugging results, useful for development.
- if server == "debug" {
- sr, errRes = getDebugSwarmingResult(taskID)
- log, errLog = getDebugTaskOutput(taskID)
- } else {
- sc, err := getSwarmingClient(c, server)
- if err != nil {
- return nil, "", err
+// get fetches (in parallel) the components that it is configured to fetch.
+//
+// After fetching, get performs an ACL check to confirm that the user is
+// permitted to view the resulting data. If this check fails, get returns
+// errNotMiloJob.
+func (f *swarmingFetch) get(c context.Context, svc swarmingService, taskID string) error {
+ err := parallel.FanOutIn(func(workC chan<- func() error) {
+ if f.fetchRes {
+ workC <- func() (err error) {
+ f.res, err = svc.getSwarmingResult(c, taskID)
+ return
+ }
}
- var wg sync.WaitGroup
- wg.Add(2) // Getting log and result can happen concurrently. Wait for both.
-
- go func() {
- defer wg.Done()
- log, errLog = getTaskOutput(sc, taskID)
- }()
- go func() {
- defer wg.Done()
- sr, errRes = sc.Task.Result(taskID).Do()
- }()
- wg.Wait()
+ if f.fetchLog {
+ workC <- func() error {
+ f.log, f.logErr = svc.getTaskOutput(c, taskID)
+ return nil
+ }
+ }
+ })
+ if err != nil {
hinoka 2017/02/03 01:02:23 If both fail the error is non-deterministic, I'd l
dnj 2017/02/03 01:04:56 This is currently the case. Note that the log one
+ return err
}
- if errRes != nil {
- // Swarming result errors take priority.
- return sr, log, errRes
+ // Current ACL implementation: error if this is not a Milo job.
+ switch {
+ case f.fetchRes:
+ if !isMiloJob(f.res.Tags) {
+ return errNotMiloJob
+ }
+ default:
+ // No metadata to decide if this is a Milo job, so assume that it is not.
+ return errNotMiloJob
}
- switch sr.State {
- case TaskCompleted, TaskRunning, TaskCanceled:
- default:
- // Ignore log errors if the task might be pending, timed out, expired, etc.
- if errLog != nil {
- errLog = nil
- log = ""
+ if f.fetchRes && f.logErr != nil {
+ switch f.res.State {
+ case TaskCompleted, TaskRunning, TaskCanceled:
+ default:
+ // Ignore log errors if the task might be pending, timed out, expired, etc.
+ if err != nil {
+ f.log = ""
+ f.logErr = nil
+ }
}
}
- return sr, log, errLog
+ return f.logErr
}
func taskProperties(sr *swarming.SwarmingRpcsTaskResult) *resp.PropertyGroup {
@@ -403,22 +415,14 @@ func streamsFromAnnotatedLog(ctx context.Context, log string) (*logdog.Streams,
return c.ToLogDogStreams()
}
-func swarmingBuildImpl(c context.Context, linkBase, server, taskID string) (*resp.MiloBuild, error) {
+func swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID string) (*resp.MiloBuild, error) {
// Fetch the data from Swarming
- sr, body, err := getSwarming(c, server, taskID)
- if err != nil {
- return nil, err
- }
-
- allowMilo := false
- for _, t := range sr.Tags {
- if t == "allow_milo:1" {
- allowMilo = true
- break
- }
+ fetch := swarmingFetch{
+ fetchRes: true,
+ fetchLog: true,
}
- if !allowMilo {
- return nil, fmt.Errorf("Not A Milo Job")
+ if err := fetch.get(c, svc, taskID); err != nil {
+ return nil, err
}
var build resp.MiloBuild
@@ -428,8 +432,9 @@ func swarmingBuildImpl(c context.Context, linkBase, server, taskID string) (*res
// Decode the data using annotee. The logdog stream returned here is assumed
// to be consistent, which is why the following block of code are not
// expected to ever err out.
- if body != "" {
- lds, err = streamsFromAnnotatedLog(c, body)
+ if fetch.log != "" {
+ var err error
+ lds, err = streamsFromAnnotatedLog(c, fetch.log)
if err != nil {
build.Components = []*resp.BuildComponent{{
Type: resp.Summary,
@@ -438,7 +443,7 @@ func swarmingBuildImpl(c context.Context, linkBase, server, taskID string) (*res
Status: resp.InfraFailure,
SubLink: []*resp.Link{{
Label: "swarming task",
- URL: taskPageURL(server, taskID),
+ URL: taskPageURL(svc.getHost(), taskID),
}},
}}
}
@@ -450,18 +455,27 @@ func swarmingBuildImpl(c context.Context, linkBase, server, taskID string) (*res
s = &miloProto.Step{}
}
- if err := addTaskToMiloStep(c, server, sr, s); err != nil {
+ if err := addTaskToMiloStep(c, svc.getHost(), fetch.res, s); err != nil {
return nil, err
}
logdog.AddLogDogToBuild(c, swarmingURLBuilder(linkBase), s, &build)
- if err := addTaskToBuild(c, server, sr, &build); err != nil {
+ if err := addTaskToBuild(c, svc.getHost(), fetch.res, &build); err != nil {
return nil, err
}
return &build, nil
}
+func isMiloJob(tags []string) bool {
+ for _, t := range tags {
+ if t == "allow_milo:1" {
+ return true
+ }
+ }
+ return false
+}
+
// taskPageURL returns a URL to a human-consumable page of a swarming task.
// Supports server aliases.
func taskPageURL(swarmingHostname, taskID string) string {

Powered by Google App Engine
This is Rietveld 408576698