| Index: milo/appengine/swarming/build.go
|
| diff --git a/milo/appengine/swarming/build.go b/milo/appengine/swarming/build.go
|
| index be8b356f72ba9ab267cf81f2e6bbac006b725c3d..e34a90a323c52e722fea067065c14c3ae088daea 100644
|
| --- a/milo/appengine/swarming/build.go
|
| +++ b/milo/appengine/swarming/build.go
|
| @@ -6,22 +6,20 @@ package swarming
|
|
|
| import (
|
| "bytes"
|
| - "encoding/json"
|
| "fmt"
|
| - "io/ioutil"
|
| "net/http"
|
| "net/url"
|
| - "path/filepath"
|
| "strings"
|
| - "sync"
|
| "time"
|
|
|
| "golang.org/x/net/context"
|
|
|
| swarming "github.com/luci/luci-go/common/api/swarming/swarming/v1"
|
| + "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/logging"
|
| "github.com/luci/luci-go/common/proto/google"
|
| miloProto "github.com/luci/luci-go/common/proto/milo"
|
| + "github.com/luci/luci-go/common/sync/parallel"
|
| "github.com/luci/luci-go/logdog/client/annotee"
|
| "github.com/luci/luci-go/logdog/common/types"
|
| "github.com/luci/luci-go/milo/api/resp"
|
| @@ -29,6 +27,10 @@ import (
|
| "github.com/luci/luci-go/server/auth"
|
| )
|
|
|
| +// errNotMiloJob is returned if a Swarming task is fetched that does not self-
|
| +// identify as a Milo job.
|
| +var errNotMiloJob = errors.New("Not a Milo Job")
|
| +
|
| // SwarmingTimeLayout is time layout used by swarming.
|
| const SwarmingTimeLayout = "2006-01-02T15:04:05.999999999"
|
|
|
| @@ -64,93 +66,115 @@ func getSwarmingClient(c context.Context, server string) (*swarming.Service, err
|
| return sc, nil
|
| }
|
|
|
| -func getDebugTaskOutput(taskID string) (string, error) {
|
| - // Read the debug file instead.
|
| +// swarmingService is an interface that fetches data from Swarming.
|
| +//
|
| +// In production, this is fetched from a Swarming server. For testing, this can
|
| +// be replaced with a mock.
|
| +type swarmingService interface {
|
| + getHost() string
|
| + getSwarmingResult(c context.Context, taskID string) (*swarming.SwarmingRpcsTaskResult, error)
|
| + getTaskOutput(c context.Context, taskID string) (string, error)
|
| +}
|
|
|
| - // ../swarming below assumes that
|
| - // - this code is not executed by tests outside of this dir
|
| - // - this dir is a sibling of frontend dir
|
| - logFilename := filepath.Join("..", "swarming", "testdata", taskID)
|
| - b, err := ioutil.ReadFile(logFilename)
|
| - if err != nil {
|
| - return "", err
|
| - }
|
| - return string(b), nil
|
| +type prodSwarmingService struct {
|
| + host string
|
| + client *swarming.Service
|
| }
|
|
|
| -func getTaskOutput(sc *swarming.Service, taskID string) (string, error) {
|
| - res, err := sc.Task.Stdout(taskID).Do()
|
| +func newProdService(c context.Context, server string) (*prodSwarmingService, error) {
|
| + client, err := getSwarmingClient(c, server)
|
| if err != nil {
|
| - return "", err
|
| + return nil, err
|
| }
|
| - return res.Output, nil
|
| + return &prodSwarmingService{
|
| + host: server,
|
| + client: client,
|
| + }, nil
|
| }
|
|
|
| -func getDebugSwarmingResult(
|
| - taskID string) (*swarming.SwarmingRpcsTaskResult, error) {
|
| +func (svc *prodSwarmingService) getHost() string { return svc.host }
|
| +
|
| +func (svc *prodSwarmingService) getSwarmingResult(c context.Context, taskID string) (*swarming.SwarmingRpcsTaskResult, error) {
|
| + return svc.client.Task.Result(taskID).Context(c).Do()
|
| +}
|
|
|
| - // ../swarming below assumes that
|
| - // - this code is not executed by tests outside of this dir
|
| - // - this dir is a sibling of frontend dir
|
| - logFilename := filepath.Join("..", "swarming", "testdata", taskID)
|
| - swarmFilename := fmt.Sprintf("%s.swarm", logFilename)
|
| - s, err := ioutil.ReadFile(swarmFilename)
|
| +func (svc *prodSwarmingService) getTaskOutput(c context.Context, taskID string) (string, error) {
|
| + stdout, err := svc.client.Task.Stdout(taskID).Context(c).Do()
|
| if err != nil {
|
| - return nil, err
|
| - }
|
| - sr := &swarming.SwarmingRpcsTaskResult{}
|
| - if err := json.Unmarshal(s, sr); err != nil {
|
| - return nil, err
|
| + return "", err
|
| }
|
| - return sr, nil
|
| + return stdout.Output, nil
|
| }
|
|
|
| -func getSwarming(c context.Context, server string, taskID string) (
|
| - *swarming.SwarmingRpcsTaskResult, string, error) {
|
| +type swarmingFetchParams struct {
|
| + fetchRes bool
|
| + fetchLog bool
|
| +}
|
|
|
| - var log string
|
| - var sr *swarming.SwarmingRpcsTaskResult
|
| - var errLog, errRes error
|
| +type swarmingFetchResult struct {
|
| + res *swarming.SwarmingRpcsTaskResult
|
| + log string
|
| +}
|
|
|
| - // Detour: Return debugging results, useful for development.
|
| - if server == "debug" {
|
| - sr, errRes = getDebugSwarmingResult(taskID)
|
| - log, errLog = getDebugTaskOutput(taskID)
|
| - } else {
|
| - sc, err := getSwarmingClient(c, server)
|
| - if err != nil {
|
| - return nil, "", err
|
| +// swarmingFetch fetches (in parallel) the components that it is configured to
|
| +// fetch.
|
| +//
|
| +// After fetching, an ACL check is performed to confirm that the user is
|
| +// permitted to view the resulting data. If this check fails, get returns
|
| +// errNotMiloJob.
|
| +//
|
| +// TODO(hinoka): Make this ACL check something more specific than the
|
| +// resence of the "allow_milo" dimension.
|
| +func swarmingFetch(c context.Context, svc swarmingService, taskID string, req swarmingFetchParams) (
|
| + *swarmingFetchResult, error) {
|
| +
|
| + // logErr is managed separately from other fetch errors, since in some
|
| + // situations it's acceptable to not have a log stream.
|
| + var logErr error
|
| + var fr swarmingFetchResult
|
| +
|
| + err := parallel.FanOutIn(func(workC chan<- func() error) {
|
| + if req.fetchRes {
|
| + workC <- func() (err error) {
|
| + fr.res, err = svc.getSwarmingResult(c, taskID)
|
| + return
|
| + }
|
| }
|
|
|
| - var wg sync.WaitGroup
|
| - wg.Add(2) // Getting log and result can happen concurrently. Wait for both.
|
| -
|
| - go func() {
|
| - defer wg.Done()
|
| - log, errLog = getTaskOutput(sc, taskID)
|
| - }()
|
| - go func() {
|
| - defer wg.Done()
|
| - sr, errRes = sc.Task.Result(taskID).Do()
|
| - }()
|
| - wg.Wait()
|
| + if req.fetchLog {
|
| + workC <- func() error {
|
| + fr.log, logErr = svc.getTaskOutput(c, taskID)
|
| + return nil
|
| + }
|
| + }
|
| + })
|
| + if err != nil {
|
| + return nil, err
|
| }
|
|
|
| - if errRes != nil {
|
| - // Swarming result errors take priority.
|
| - return sr, log, errRes
|
| + // Current ACL implementation: error if this is not a Milo job.
|
| + switch {
|
| + case req.fetchRes:
|
| + if !isMiloJob(fr.res.Tags) {
|
| + return nil, errNotMiloJob
|
| + }
|
| + default:
|
| + // No metadata to decide if this is a Milo job, so assume that it is not.
|
| + return nil, errNotMiloJob
|
| }
|
|
|
| - switch sr.State {
|
| - case TaskCompleted, TaskRunning, TaskCanceled:
|
| - default:
|
| - // Ignore log errors if the task might be pending, timed out, expired, etc.
|
| - if errLog != nil {
|
| - errLog = nil
|
| - log = ""
|
| + if req.fetchRes && logErr != nil {
|
| + switch fr.res.State {
|
| + case TaskCompleted, TaskRunning, TaskCanceled:
|
| + default:
|
| + // Ignore log errors if the task might be pending, timed out, expired, etc.
|
| + if err != nil {
|
| + fr.log = ""
|
| + logErr = nil
|
| + }
|
| }
|
| }
|
| - return sr, log, errLog
|
| + return &fr, logErr
|
| }
|
|
|
| func taskProperties(sr *swarming.SwarmingRpcsTaskResult) *resp.PropertyGroup {
|
| @@ -403,24 +427,17 @@ func streamsFromAnnotatedLog(ctx context.Context, log string) (*logdog.Streams,
|
| return c.ToLogDogStreams()
|
| }
|
|
|
| -func swarmingBuildImpl(c context.Context, linkBase, server, taskID string) (*resp.MiloBuild, error) {
|
| +func swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID string) (*resp.MiloBuild, error) {
|
| // Fetch the data from Swarming
|
| - sr, body, err := getSwarming(c, server, taskID)
|
| + fetchParams := swarmingFetchParams{
|
| + fetchRes: true,
|
| + fetchLog: true,
|
| + }
|
| + fr, err := swarmingFetch(c, svc, taskID, fetchParams)
|
| if err != nil {
|
| return nil, err
|
| }
|
|
|
| - allowMilo := false
|
| - for _, t := range sr.Tags {
|
| - if t == "allow_milo:1" {
|
| - allowMilo = true
|
| - break
|
| - }
|
| - }
|
| - if !allowMilo {
|
| - return nil, fmt.Errorf("Not A Milo Job")
|
| - }
|
| -
|
| var build resp.MiloBuild
|
| var s *miloProto.Step
|
| var lds *logdog.Streams
|
| @@ -428,8 +445,9 @@ func swarmingBuildImpl(c context.Context, linkBase, server, taskID string) (*res
|
| // Decode the data using annotee. The logdog stream returned here is assumed
|
| // to be consistent, which is why the following block of code are not
|
| // expected to ever err out.
|
| - if body != "" {
|
| - lds, err = streamsFromAnnotatedLog(c, body)
|
| + if fr.log != "" {
|
| + var err error
|
| + lds, err = streamsFromAnnotatedLog(c, fr.log)
|
| if err != nil {
|
| build.Components = []*resp.BuildComponent{{
|
| Type: resp.Summary,
|
| @@ -438,7 +456,7 @@ func swarmingBuildImpl(c context.Context, linkBase, server, taskID string) (*res
|
| Status: resp.InfraFailure,
|
| SubLink: []*resp.Link{{
|
| Label: "swarming task",
|
| - URL: taskPageURL(server, taskID),
|
| + URL: taskPageURL(svc.getHost(), taskID),
|
| }},
|
| }}
|
| }
|
| @@ -450,18 +468,27 @@ func swarmingBuildImpl(c context.Context, linkBase, server, taskID string) (*res
|
| s = &miloProto.Step{}
|
| }
|
|
|
| - if err := addTaskToMiloStep(c, server, sr, s); err != nil {
|
| + if err := addTaskToMiloStep(c, svc.getHost(), fr.res, s); err != nil {
|
| return nil, err
|
| }
|
| logdog.AddLogDogToBuild(c, swarmingURLBuilder(linkBase), s, &build)
|
|
|
| - if err := addTaskToBuild(c, server, sr, &build); err != nil {
|
| + if err := addTaskToBuild(c, svc.getHost(), fr.res, &build); err != nil {
|
| return nil, err
|
| }
|
|
|
| return &build, nil
|
| }
|
|
|
| +func isMiloJob(tags []string) bool {
|
| + for _, t := range tags {
|
| + if t == "allow_milo:1" {
|
| + return true
|
| + }
|
| + }
|
| + return false
|
| +}
|
| +
|
| // taskPageURL returns a URL to a human-consumable page of a swarming task.
|
| // Supports server aliases.
|
| func taskPageURL(swarmingHostname, taskID string) string {
|
|
|