Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(254)

Unified Diff: milo/appengine/swarming/build.go

Issue 2675493003: milo: Use service interface for swarming. (Closed)
Patch Set: Update fetch signature Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: milo/appengine/swarming/build.go
diff --git a/milo/appengine/swarming/build.go b/milo/appengine/swarming/build.go
index be8b356f72ba9ab267cf81f2e6bbac006b725c3d..e34a90a323c52e722fea067065c14c3ae088daea 100644
--- a/milo/appengine/swarming/build.go
+++ b/milo/appengine/swarming/build.go
@@ -6,22 +6,20 @@ package swarming
import (
"bytes"
- "encoding/json"
"fmt"
- "io/ioutil"
"net/http"
"net/url"
- "path/filepath"
"strings"
- "sync"
"time"
"golang.org/x/net/context"
swarming "github.com/luci/luci-go/common/api/swarming/swarming/v1"
+ "github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/proto/google"
miloProto "github.com/luci/luci-go/common/proto/milo"
+ "github.com/luci/luci-go/common/sync/parallel"
"github.com/luci/luci-go/logdog/client/annotee"
"github.com/luci/luci-go/logdog/common/types"
"github.com/luci/luci-go/milo/api/resp"
@@ -29,6 +27,10 @@ import (
"github.com/luci/luci-go/server/auth"
)
+// errNotMiloJob is returned if a Swarming task is fetched that does not self-
+// identify as a Milo job.
+var errNotMiloJob = errors.New("Not a Milo Job")
+
// SwarmingTimeLayout is time layout used by swarming.
const SwarmingTimeLayout = "2006-01-02T15:04:05.999999999"
@@ -64,93 +66,115 @@ func getSwarmingClient(c context.Context, server string) (*swarming.Service, err
return sc, nil
}
-func getDebugTaskOutput(taskID string) (string, error) {
- // Read the debug file instead.
+// swarmingService is an interface that fetches data from Swarming.
+//
+// In production, this is fetched from a Swarming server. For testing, this can
+// be replaced with a mock.
+type swarmingService interface {
+ getHost() string
+ getSwarmingResult(c context.Context, taskID string) (*swarming.SwarmingRpcsTaskResult, error)
+ getTaskOutput(c context.Context, taskID string) (string, error)
+}
- // ../swarming below assumes that
- // - this code is not executed by tests outside of this dir
- // - this dir is a sibling of frontend dir
- logFilename := filepath.Join("..", "swarming", "testdata", taskID)
- b, err := ioutil.ReadFile(logFilename)
- if err != nil {
- return "", err
- }
- return string(b), nil
+type prodSwarmingService struct {
+ host string
+ client *swarming.Service
}
-func getTaskOutput(sc *swarming.Service, taskID string) (string, error) {
- res, err := sc.Task.Stdout(taskID).Do()
+func newProdService(c context.Context, server string) (*prodSwarmingService, error) {
+ client, err := getSwarmingClient(c, server)
if err != nil {
- return "", err
+ return nil, err
}
- return res.Output, nil
+ return &prodSwarmingService{
+ host: server,
+ client: client,
+ }, nil
}
-func getDebugSwarmingResult(
- taskID string) (*swarming.SwarmingRpcsTaskResult, error) {
+func (svc *prodSwarmingService) getHost() string { return svc.host }
+
+func (svc *prodSwarmingService) getSwarmingResult(c context.Context, taskID string) (*swarming.SwarmingRpcsTaskResult, error) {
+ return svc.client.Task.Result(taskID).Context(c).Do()
+}
- // ../swarming below assumes that
- // - this code is not executed by tests outside of this dir
- // - this dir is a sibling of frontend dir
- logFilename := filepath.Join("..", "swarming", "testdata", taskID)
- swarmFilename := fmt.Sprintf("%s.swarm", logFilename)
- s, err := ioutil.ReadFile(swarmFilename)
+func (svc *prodSwarmingService) getTaskOutput(c context.Context, taskID string) (string, error) {
+ stdout, err := svc.client.Task.Stdout(taskID).Context(c).Do()
if err != nil {
- return nil, err
- }
- sr := &swarming.SwarmingRpcsTaskResult{}
- if err := json.Unmarshal(s, sr); err != nil {
- return nil, err
+ return "", err
}
- return sr, nil
+ return stdout.Output, nil
}
-func getSwarming(c context.Context, server string, taskID string) (
- *swarming.SwarmingRpcsTaskResult, string, error) {
+type swarmingFetchParams struct {
+ fetchRes bool
+ fetchLog bool
+}
- var log string
- var sr *swarming.SwarmingRpcsTaskResult
- var errLog, errRes error
+type swarmingFetchResult struct {
+ res *swarming.SwarmingRpcsTaskResult
+ log string
+}
- // Detour: Return debugging results, useful for development.
- if server == "debug" {
- sr, errRes = getDebugSwarmingResult(taskID)
- log, errLog = getDebugTaskOutput(taskID)
- } else {
- sc, err := getSwarmingClient(c, server)
- if err != nil {
- return nil, "", err
+// swarmingFetch fetches (in parallel) the components that it is configured to
+// fetch.
+//
+// After fetching, an ACL check is performed to confirm that the user is
+// permitted to view the resulting data. If this check fails, get returns
+// errNotMiloJob.
+//
+// TODO(hinoka): Make this ACL check something more specific than the
+// resence of the "allow_milo" dimension.
+func swarmingFetch(c context.Context, svc swarmingService, taskID string, req swarmingFetchParams) (
+ *swarmingFetchResult, error) {
+
+ // logErr is managed separately from other fetch errors, since in some
+ // situations it's acceptable to not have a log stream.
+ var logErr error
+ var fr swarmingFetchResult
+
+ err := parallel.FanOutIn(func(workC chan<- func() error) {
+ if req.fetchRes {
+ workC <- func() (err error) {
+ fr.res, err = svc.getSwarmingResult(c, taskID)
+ return
+ }
}
- var wg sync.WaitGroup
- wg.Add(2) // Getting log and result can happen concurrently. Wait for both.
-
- go func() {
- defer wg.Done()
- log, errLog = getTaskOutput(sc, taskID)
- }()
- go func() {
- defer wg.Done()
- sr, errRes = sc.Task.Result(taskID).Do()
- }()
- wg.Wait()
+ if req.fetchLog {
+ workC <- func() error {
+ fr.log, logErr = svc.getTaskOutput(c, taskID)
+ return nil
+ }
+ }
+ })
+ if err != nil {
+ return nil, err
}
- if errRes != nil {
- // Swarming result errors take priority.
- return sr, log, errRes
+ // Current ACL implementation: error if this is not a Milo job.
+ switch {
+ case req.fetchRes:
+ if !isMiloJob(fr.res.Tags) {
+ return nil, errNotMiloJob
+ }
+ default:
+ // No metadata to decide if this is a Milo job, so assume that it is not.
+ return nil, errNotMiloJob
}
- switch sr.State {
- case TaskCompleted, TaskRunning, TaskCanceled:
- default:
- // Ignore log errors if the task might be pending, timed out, expired, etc.
- if errLog != nil {
- errLog = nil
- log = ""
+ if req.fetchRes && logErr != nil {
+ switch fr.res.State {
+ case TaskCompleted, TaskRunning, TaskCanceled:
+ default:
+ // Ignore log errors if the task might be pending, timed out, expired, etc.
+ if err != nil {
+ fr.log = ""
+ logErr = nil
+ }
}
}
- return sr, log, errLog
+ return &fr, logErr
}
func taskProperties(sr *swarming.SwarmingRpcsTaskResult) *resp.PropertyGroup {
@@ -403,24 +427,17 @@ func streamsFromAnnotatedLog(ctx context.Context, log string) (*logdog.Streams,
return c.ToLogDogStreams()
}
-func swarmingBuildImpl(c context.Context, linkBase, server, taskID string) (*resp.MiloBuild, error) {
+func swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID string) (*resp.MiloBuild, error) {
// Fetch the data from Swarming
- sr, body, err := getSwarming(c, server, taskID)
+ fetchParams := swarmingFetchParams{
+ fetchRes: true,
+ fetchLog: true,
+ }
+ fr, err := swarmingFetch(c, svc, taskID, fetchParams)
if err != nil {
return nil, err
}
- allowMilo := false
- for _, t := range sr.Tags {
- if t == "allow_milo:1" {
- allowMilo = true
- break
- }
- }
- if !allowMilo {
- return nil, fmt.Errorf("Not A Milo Job")
- }
-
var build resp.MiloBuild
var s *miloProto.Step
var lds *logdog.Streams
@@ -428,8 +445,9 @@ func swarmingBuildImpl(c context.Context, linkBase, server, taskID string) (*res
// Decode the data using annotee. The logdog stream returned here is assumed
// to be consistent, which is why the following block of code are not
// expected to ever err out.
- if body != "" {
- lds, err = streamsFromAnnotatedLog(c, body)
+ if fr.log != "" {
+ var err error
+ lds, err = streamsFromAnnotatedLog(c, fr.log)
if err != nil {
build.Components = []*resp.BuildComponent{{
Type: resp.Summary,
@@ -438,7 +456,7 @@ func swarmingBuildImpl(c context.Context, linkBase, server, taskID string) (*res
Status: resp.InfraFailure,
SubLink: []*resp.Link{{
Label: "swarming task",
- URL: taskPageURL(server, taskID),
+ URL: taskPageURL(svc.getHost(), taskID),
}},
}}
}
@@ -450,18 +468,27 @@ func swarmingBuildImpl(c context.Context, linkBase, server, taskID string) (*res
s = &miloProto.Step{}
}
- if err := addTaskToMiloStep(c, server, sr, s); err != nil {
+ if err := addTaskToMiloStep(c, svc.getHost(), fr.res, s); err != nil {
return nil, err
}
logdog.AddLogDogToBuild(c, swarmingURLBuilder(linkBase), s, &build)
- if err := addTaskToBuild(c, server, sr, &build); err != nil {
+ if err := addTaskToBuild(c, svc.getHost(), fr.res, &build); err != nil {
return nil, err
}
return &build, nil
}
+func isMiloJob(tags []string) bool {
+ for _, t := range tags {
+ if t == "allow_milo:1" {
+ return true
+ }
+ }
+ return false
+}
+
// taskPageURL returns a URL to a human-consumable page of a swarming task.
// Supports server aliases.
func taskPageURL(swarmingHostname, taskID string) string {

Powered by Google App Engine
This is Rietveld 408576698