Chromium Code Reviews| Index: milo/appengine/swarming/build.go |
| diff --git a/milo/appengine/swarming/build.go b/milo/appengine/swarming/build.go |
| index be8b356f72ba9ab267cf81f2e6bbac006b725c3d..7c41c8d89694e610ea5782c0bd661f9d40810223 100644 |
| --- a/milo/appengine/swarming/build.go |
| +++ b/milo/appengine/swarming/build.go |
| @@ -6,22 +6,20 @@ package swarming |
| import ( |
| "bytes" |
| - "encoding/json" |
| "fmt" |
| - "io/ioutil" |
| "net/http" |
| "net/url" |
| - "path/filepath" |
| "strings" |
| - "sync" |
| "time" |
| "golang.org/x/net/context" |
| swarming "github.com/luci/luci-go/common/api/swarming/swarming/v1" |
| + "github.com/luci/luci-go/common/errors" |
| "github.com/luci/luci-go/common/logging" |
| "github.com/luci/luci-go/common/proto/google" |
| miloProto "github.com/luci/luci-go/common/proto/milo" |
| + "github.com/luci/luci-go/common/sync/parallel" |
| "github.com/luci/luci-go/logdog/client/annotee" |
| "github.com/luci/luci-go/logdog/common/types" |
| "github.com/luci/luci-go/milo/api/resp" |
| @@ -29,6 +27,10 @@ import ( |
| "github.com/luci/luci-go/server/auth" |
| ) |
| +// errNotMiloJob is returned if a Swarming task is fetched that does not self- |
| +// identify as a Milo job. |
| +var errNotMiloJob = errors.New("No a Milo Job") |
|
hinoka
2017/02/02 22:36:51
No -> Not
dnj
2017/02/03 02:22:03
Done.
|
| + |
| // SwarmingTimeLayout is time layout used by swarming. |
| const SwarmingTimeLayout = "2006-01-02T15:04:05.999999999" |
| @@ -64,93 +66,103 @@ func getSwarmingClient(c context.Context, server string) (*swarming.Service, err |
| return sc, nil |
| } |
| -func getDebugTaskOutput(taskID string) (string, error) { |
| - // Read the debug file instead. |
| +// swarmingService is an interface that fetches data from Swarming. |
| +// |
| +// In production, this is fetched from a Swarming server. For testing, this can |
| +// be replaced with a mock. |
| +type swarmingService interface { |
| + getHost() string |
| + getSwarmingResult(c context.Context, taskID string) (*swarming.SwarmingRpcsTaskResult, error) |
| + getTaskOutput(c context.Context, taskID string) (string, error) |
| +} |
| - // ../swarming below assumes that |
| - // - this code is not executed by tests outside of this dir |
| - // - this dir is a sibling of frontend dir |
| - logFilename := filepath.Join("..", "swarming", "testdata", taskID) |
| - b, err := ioutil.ReadFile(logFilename) |
| - if err != nil { |
| - return "", err |
| - } |
| - return string(b), nil |
| +type prodSwarmingService struct { |
| + host string |
| + client *swarming.Service |
| } |
| -func getTaskOutput(sc *swarming.Service, taskID string) (string, error) { |
| - res, err := sc.Task.Stdout(taskID).Do() |
| +func newProdService(c context.Context, server string) (*prodSwarmingService, error) { |
| + client, err := getSwarmingClient(c, server) |
| if err != nil { |
| - return "", err |
| + return nil, err |
| } |
| - return res.Output, nil |
| + return &prodSwarmingService{ |
| + host: server, |
| + client: client, |
| + }, nil |
| } |
| -func getDebugSwarmingResult( |
| - taskID string) (*swarming.SwarmingRpcsTaskResult, error) { |
| +func (svc *prodSwarmingService) getHost() string { return svc.host } |
| + |
| +func (svc *prodSwarmingService) getSwarmingResult(c context.Context, taskID string) (*swarming.SwarmingRpcsTaskResult, error) { |
| + return svc.client.Task.Result(taskID).Context(c).Do() |
| +} |
| - // ../swarming below assumes that |
| - // - this code is not executed by tests outside of this dir |
| - // - this dir is a sibling of frontend dir |
| - logFilename := filepath.Join("..", "swarming", "testdata", taskID) |
| - swarmFilename := fmt.Sprintf("%s.swarm", logFilename) |
| - s, err := ioutil.ReadFile(swarmFilename) |
| +func (svc *prodSwarmingService) getTaskOutput(c context.Context, taskID string) (string, error) { |
| + stdout, err := svc.client.Task.Stdout(taskID).Context(c).Do() |
| if err != nil { |
| - return nil, err |
| - } |
| - sr := &swarming.SwarmingRpcsTaskResult{} |
| - if err := json.Unmarshal(s, sr); err != nil { |
| - return nil, err |
| + return "", err |
| } |
| - return sr, nil |
| + return stdout.Output, nil |
| } |
| -func getSwarming(c context.Context, server string, taskID string) ( |
| - *swarming.SwarmingRpcsTaskResult, string, error) { |
| +type swarmingFetch struct { |
|
hinoka
2017/02/03 01:02:23
Can this just be a function? Not a struct? There
dnj
2017/02/03 01:04:56
There will be (see the final CL in the chain).
|
| + fetchRes bool |
| + res *swarming.SwarmingRpcsTaskResult |
| - var log string |
| - var sr *swarming.SwarmingRpcsTaskResult |
| - var errLog, errRes error |
| + fetchLog bool |
| + log string |
| + logErr error |
| +} |
| - // Detour: Return debugging results, useful for development. |
| - if server == "debug" { |
| - sr, errRes = getDebugSwarmingResult(taskID) |
| - log, errLog = getDebugTaskOutput(taskID) |
| - } else { |
| - sc, err := getSwarmingClient(c, server) |
| - if err != nil { |
| - return nil, "", err |
| +// get fetches (in parallel) the components that it is configured to fetch. |
| +// |
| +// After fetching, get performs an ACL check to confirm that the user is |
| +// permitted to view the resulting data. If this check fails, get returns |
| +// errNotMiloJob. |
| +func (f *swarmingFetch) get(c context.Context, svc swarmingService, taskID string) error { |
| + err := parallel.FanOutIn(func(workC chan<- func() error) { |
| + if f.fetchRes { |
| + workC <- func() (err error) { |
| + f.res, err = svc.getSwarmingResult(c, taskID) |
| + return |
| + } |
| } |
| - var wg sync.WaitGroup |
| - wg.Add(2) // Getting log and result can happen concurrently. Wait for both. |
| - |
| - go func() { |
| - defer wg.Done() |
| - log, errLog = getTaskOutput(sc, taskID) |
| - }() |
| - go func() { |
| - defer wg.Done() |
| - sr, errRes = sc.Task.Result(taskID).Do() |
| - }() |
| - wg.Wait() |
| + if f.fetchLog { |
| + workC <- func() error { |
| + f.log, f.logErr = svc.getTaskOutput(c, taskID) |
| + return nil |
| + } |
| + } |
| + }) |
| + if err != nil { |
|
hinoka
2017/02/03 01:02:23
If both fail the error is non-deterministic, I'd l
dnj
2017/02/03 01:04:56
This is currently the case. Note that the log one
|
| + return err |
| } |
| - if errRes != nil { |
| - // Swarming result errors take priority. |
| - return sr, log, errRes |
| + // Current ACL implementation: error if this is not a Milo job. |
| + switch { |
| + case f.fetchRes: |
| + if !isMiloJob(f.res.Tags) { |
| + return errNotMiloJob |
| + } |
| + default: |
| + // No metadata to decide if this is a Milo job, so assume that it is not. |
| + return errNotMiloJob |
| } |
| - switch sr.State { |
| - case TaskCompleted, TaskRunning, TaskCanceled: |
| - default: |
| - // Ignore log errors if the task might be pending, timed out, expired, etc. |
| - if errLog != nil { |
| - errLog = nil |
| - log = "" |
| + if f.fetchRes && f.logErr != nil { |
| + switch f.res.State { |
| + case TaskCompleted, TaskRunning, TaskCanceled: |
| + default: |
| + // Ignore log errors if the task might be pending, timed out, expired, etc. |
| + if err != nil { |
| + f.log = "" |
| + f.logErr = nil |
| + } |
| } |
| } |
| - return sr, log, errLog |
| + return f.logErr |
| } |
| func taskProperties(sr *swarming.SwarmingRpcsTaskResult) *resp.PropertyGroup { |
| @@ -403,22 +415,14 @@ func streamsFromAnnotatedLog(ctx context.Context, log string) (*logdog.Streams, |
| return c.ToLogDogStreams() |
| } |
| -func swarmingBuildImpl(c context.Context, linkBase, server, taskID string) (*resp.MiloBuild, error) { |
| +func swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID string) (*resp.MiloBuild, error) { |
| // Fetch the data from Swarming |
| - sr, body, err := getSwarming(c, server, taskID) |
| - if err != nil { |
| - return nil, err |
| - } |
| - |
| - allowMilo := false |
| - for _, t := range sr.Tags { |
| - if t == "allow_milo:1" { |
| - allowMilo = true |
| - break |
| - } |
| + fetch := swarmingFetch{ |
| + fetchRes: true, |
| + fetchLog: true, |
| } |
| - if !allowMilo { |
| - return nil, fmt.Errorf("Not A Milo Job") |
| + if err := fetch.get(c, svc, taskID); err != nil { |
| + return nil, err |
| } |
| var build resp.MiloBuild |
| @@ -428,8 +432,9 @@ func swarmingBuildImpl(c context.Context, linkBase, server, taskID string) (*res |
| // Decode the data using annotee. The logdog stream returned here is assumed |
| // to be consistent, which is why the following block of code are not |
| // expected to ever err out. |
| - if body != "" { |
| - lds, err = streamsFromAnnotatedLog(c, body) |
| + if fetch.log != "" { |
| + var err error |
| + lds, err = streamsFromAnnotatedLog(c, fetch.log) |
| if err != nil { |
| build.Components = []*resp.BuildComponent{{ |
| Type: resp.Summary, |
| @@ -438,7 +443,7 @@ func swarmingBuildImpl(c context.Context, linkBase, server, taskID string) (*res |
| Status: resp.InfraFailure, |
| SubLink: []*resp.Link{{ |
| Label: "swarming task", |
| - URL: taskPageURL(server, taskID), |
| + URL: taskPageURL(svc.getHost(), taskID), |
| }}, |
| }} |
| } |
| @@ -450,18 +455,27 @@ func swarmingBuildImpl(c context.Context, linkBase, server, taskID string) (*res |
| s = &miloProto.Step{} |
| } |
| - if err := addTaskToMiloStep(c, server, sr, s); err != nil { |
| + if err := addTaskToMiloStep(c, svc.getHost(), fetch.res, s); err != nil { |
| return nil, err |
| } |
| logdog.AddLogDogToBuild(c, swarmingURLBuilder(linkBase), s, &build) |
| - if err := addTaskToBuild(c, server, sr, &build); err != nil { |
| + if err := addTaskToBuild(c, svc.getHost(), fetch.res, &build); err != nil { |
| return nil, err |
| } |
| return &build, nil |
| } |
| +func isMiloJob(tags []string) bool { |
| + for _, t := range tags { |
| + if t == "allow_milo:1" { |
| + return true |
| + } |
| + } |
| + return false |
| +} |
| + |
| // taskPageURL returns a URL to a human-consumable page of a swarming task. |
| // Supports server aliases. |
| func taskPageURL(swarmingHostname, taskID string) string { |