Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1311)

Unified Diff: milo/appengine/swarming/build.go

Issue 2695383002: milo: Enable Swarming LogDog log loading. (Closed)
Patch Set: Comments, fix links. Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « milo/appengine/logdog/http.go ('k') | milo/appengine/swarming/build_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: milo/appengine/swarming/build.go
diff --git a/milo/appengine/swarming/build.go b/milo/appengine/swarming/build.go
index df4ba5e6e3dbe8a18c8eec9bca0c9f9574648ea5..7133159ca76969fc9c32505a09a0ddfb0751d738 100644
--- a/milo/appengine/swarming/build.go
+++ b/milo/appengine/swarming/build.go
@@ -21,6 +21,7 @@ import (
miloProto "github.com/luci/luci-go/common/proto/milo"
"github.com/luci/luci-go/common/sync/parallel"
"github.com/luci/luci-go/logdog/client/annotee"
+ "github.com/luci/luci-go/logdog/client/coordinator"
"github.com/luci/luci-go/logdog/common/types"
"github.com/luci/luci-go/milo/api/resp"
"github.com/luci/luci-go/milo/appengine/logdog"
@@ -115,11 +116,22 @@ type swarmingFetchParams struct {
fetchReq bool
fetchRes bool
fetchLog bool
+
+ // taskTagCallback, if not nil, is a callback that will be invoked after
+ // fetching the result, if fetchRes is true. It will be passed a key/value map
+ // of the Swarming result's tags.
+ //
+ // If taskTagCallback returns true, any pending log fetch will be cancelled
+ // without error.
+ taskTagCallback func(map[string]string) bool
}
type swarmingFetchResult struct {
req *swarming.SwarmingRpcsTaskRequest
res *swarming.SwarmingRpcsTaskResult
+
+ // log is the log data content. If no log data was fetched, this will empty.
+ // If the log fetch was cancelled, this is undefined.
log string
}
@@ -139,6 +151,12 @@ func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw
// situations it's acceptable to not have a log stream.
var logErr error
var fr swarmingFetchResult
+ var resTags map[string]string
+
+ // Special Context to enable the cancellation of log fetching.
+ logsCancelled := false
+ logCtx, cancelLogs := context.WithCancel(c)
+ defer cancelLogs()
err := parallel.FanOutIn(func(workC chan<- func() error) {
if req.fetchReq {
@@ -150,14 +168,22 @@ func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw
if req.fetchRes {
workC <- func() (err error) {
- fr.res, err = svc.getSwarmingResult(c, taskID)
+ if fr.res, err = svc.getSwarmingResult(c, taskID); err == nil {
+ resTags = swarmingTags(fr.res.Tags)
+ if req.taskTagCallback != nil && req.taskTagCallback(resTags) {
+ logsCancelled = true
+ cancelLogs()
+ }
+ }
return
}
}
if req.fetchLog {
workC <- func() error {
- fr.log, logErr = svc.getTaskOutput(c, taskID)
+ // Note: we're using the log Context here so we can cancel log fetch
+ // explicitly.
+ fr.log, logErr = svc.getTaskOutput(logCtx, taskID)
return nil
}
}
@@ -172,10 +198,12 @@ func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw
if !isMiloJob(fr.req.Tags) {
return nil, errNotMiloJob
}
+
case req.fetchRes:
if !isMiloJob(fr.res.Tags) {
return nil, errNotMiloJob
}
+
default:
// No metadata to decide if this is a Milo job, so assume that it is not.
return nil, errNotMiloJob
@@ -192,6 +220,11 @@ func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw
}
}
}
+
+ // If we explicitly cancelled logs, everything is OK.
+ if logErr == context.Canceled && logsCancelled {
+ logErr = nil
+ }
return &fr, logErr
}
@@ -230,23 +263,13 @@ func tagsToProperties(tags []string) *resp.PropertyGroup {
return props
}
-// tagValue returns a value of the first tag matching the tag key. If not found
-// returns "".
-func tagValue(tags []string, key string) string {
- prefix := key + ":"
- for _, t := range tags {
- if strings.HasPrefix(t, prefix) {
- return strings.TrimPrefix(t, prefix)
- }
- }
- return ""
-}
-
// addBuilderLink adds a link to the buildbucket builder view.
func addBuilderLink(c context.Context, build *resp.MiloBuild, swarmingHostname string, sr *swarming.SwarmingRpcsTaskResult) {
- bbHost := tagValue(sr.Tags, "buildbucket_hostname")
- bucket := tagValue(sr.Tags, "buildbucket_bucket")
- builder := tagValue(sr.Tags, "builder")
+ tags := swarmingTags(sr.Tags)
+
+ bbHost := tags["buildbucket_hostname"]
+ bucket := tags["buildbucket_bucket"]
+ builder := tags["builder"]
if bucket == "" {
logging.Errorf(
c, "Could not extract buidlbucket bucket from task %s",
@@ -445,11 +468,59 @@ func streamsFromAnnotatedLog(ctx context.Context, log string) (*logdog.Streams,
return c.ToLogDogStreams()
}
-func swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID string) (*resp.MiloBuild, error) {
+// buildLoader represents the ability to load a Milo build from a Swarming task.
+//
+// It exists so that the internal build loading functionality can be stubbed out
+// for testing.
+type buildLoader struct {
+ // logdogClientFunc returns a coordinator Client instance for the supplied
+ // parameters.
+ //
+ // If nil, a production client will be generated.
+ logDogClientFunc func(c context.Context, host string) (*coordinator.Client, error)
+}
+
+func (bl *buildLoader) newEmptyAnnotationStream(c context.Context, addr *types.StreamAddr) (
+ *logdog.AnnotationStream, error) {
+
+ fn := bl.logDogClientFunc
+ if fn == nil {
+ fn = logdog.NewClient
+ }
+ client, err := fn(c, addr.Host)
+ if err != nil {
+ return nil, errors.Annotate(err).Reason("failed to create LogDog client").Err()
+ }
+
+ as := logdog.AnnotationStream{
+ Client: client,
+ Project: addr.Project,
+ Path: addr.Path,
+ }
+ if err := as.Normalize(); err != nil {
+ return nil, errors.Annotate(err).Reason("failed to normalize annotation stream parameters").Err()
+ }
+
+ return &as, nil
+}
+
+func (bl *buildLoader) swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID string) (*resp.MiloBuild, error) {
// Fetch the data from Swarming
+ var logDogStreamAddr *types.StreamAddr
+
fetchParams := swarmingFetchParams{
fetchRes: true,
fetchLog: true,
+
+ // Cancel if LogDog annotation stream parameters are present in the tag set.
+ taskTagCallback: func(tags map[string]string) (cancelLogs bool) {
+ var err error
+ if logDogStreamAddr, err = resolveLogDogStreamAddrFromTags(tags); err != nil {
+ logging.WithError(err).Debugf(c, "Not using LogDog annotation stream.")
+ return false
+ }
+ return true
+ },
}
fr, err := swarmingFetch(c, svc, taskID, fetchParams)
if err != nil {
@@ -459,11 +530,36 @@ func swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID
var build resp.MiloBuild
var s *miloProto.Step
var lds *logdog.Streams
+ var ub logdog.URLBuilder
+
+ // Load the build from the available data.
+ //
+ // If the Swarming task explicitly specifies its log location, we prefer that.
+ // As a fallback, we will try and parse the Swarming task's output for
+ // annotations.
+ switch {
+ case logDogStreamAddr != nil:
+ // If the LogDog stream is available, load the step from that.
+ as, err := bl.newEmptyAnnotationStream(c, logDogStreamAddr)
+ if err != nil {
+ return nil, errors.Annotate(err).Reason("failed to create LogDog annotation stream").Err()
+ }
+
+ if s, err = as.Fetch(c); err != nil {
+ return nil, errors.Annotate(err).Reason("failed to load LogDog annotation stream").Err()
+ }
+
+ prefix, _ := logDogStreamAddr.Path.Split()
+ ub = &logdog.ViewerURLBuilder{
+ Host: logDogStreamAddr.Host,
+ Prefix: prefix,
+ Project: logDogStreamAddr.Project,
+ }
- // Decode the data using annotee. The logdog stream returned here is assumed
- // to be consistent, which is why the following block of code are not
- // expected to ever err out.
- if fr.log != "" {
+ case fr.log != "":
+ // Decode the data using annotee. The logdog stream returned here is assumed
+ // to be consistent, which is why the following block of code are not
+ // expected to ever err out.
var err error
lds, err = streamsFromAnnotatedLog(c, fr.log)
if err != nil {
@@ -478,18 +574,21 @@ func swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID
}},
}}
}
- }
- if lds != nil && lds.MainStream != nil && lds.MainStream.Data != nil {
- s = lds.MainStream.Data
- } else {
+ if lds != nil && lds.MainStream != nil && lds.MainStream.Data != nil {
+ s = lds.MainStream.Data
+ }
+ ub = swarmingURLBuilder(linkBase)
+
+ default:
s = &miloProto.Step{}
+ ub = swarmingURLBuilder(linkBase)
}
if err := addTaskToMiloStep(c, svc.getHost(), fr.res, s); err != nil {
return nil, err
}
- logdog.AddLogDogToBuild(c, swarmingURLBuilder(linkBase), s, &build)
+ logdog.AddLogDogToBuild(c, ub, s, &build)
if err := addTaskToBuild(c, svc.getHost(), fr.res, &build); err != nil {
return nil, err
@@ -560,3 +659,16 @@ func (b swarmingURLBuilder) BuildLink(l *miloProto.Link) *resp.Link {
return nil
}
}
+
+func swarmingTags(v []string) map[string]string {
+ res := make(map[string]string, len(v))
+ for _, tag := range v {
+ var value string
+ parts := strings.SplitN(tag, ":", 2)
+ if len(parts) == 2 {
+ value = parts[1]
+ }
+ res[parts[0]] = value
+ }
+ return res
+}
« no previous file with comments | « milo/appengine/logdog/http.go ('k') | milo/appengine/swarming/build_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698