Chromium Code Reviews| Index: milo/appengine/swarming/build.go |
| diff --git a/milo/appengine/swarming/build.go b/milo/appengine/swarming/build.go |
| index df4ba5e6e3dbe8a18c8eec9bca0c9f9574648ea5..cde6bbe0f16f3dda8a98a641aab20b83b8838b8a 100644 |
| --- a/milo/appengine/swarming/build.go |
| +++ b/milo/appengine/swarming/build.go |
| @@ -21,6 +21,7 @@ import ( |
| miloProto "github.com/luci/luci-go/common/proto/milo" |
| "github.com/luci/luci-go/common/sync/parallel" |
| "github.com/luci/luci-go/logdog/client/annotee" |
| + "github.com/luci/luci-go/logdog/client/coordinator" |
| "github.com/luci/luci-go/logdog/common/types" |
| "github.com/luci/luci-go/milo/api/resp" |
| "github.com/luci/luci-go/milo/appengine/logdog" |
| @@ -115,11 +116,22 @@ type swarmingFetchParams struct { |
| fetchReq bool |
| fetchRes bool |
| fetchLog bool |
| + |
| + // taskTagCallback, if not nil, is a callback that will be invoked during |
| + // the swarmingFetch of the fetch result (if enabled). It will contain the |
|
hinoka
2017/02/16 02:08:21
invoked after fetching the swarm fetchRes call?
dnj
2017/02/16 02:20:51
Done.
|
| + // mapped result tags. |
| + // |
| + // If taskTagCallback returns true, any pending log fetch will be cancelled |
| + // without error. |
| + taskTagCallback func(map[string]string) bool |
|
hinoka
2017/02/16 02:08:21
This is funky, but I don't have any better ideas :
dnj
2017/02/16 02:20:51
Acknowledged.
|
| } |
| type swarmingFetchResult struct { |
| req *swarming.SwarmingRpcsTaskRequest |
| res *swarming.SwarmingRpcsTaskResult |
| + |
| + // log is the log data content. If no log data was fetched, this will empty. |
| + // If the log fetch was cancelled, this is undefined. |
| log string |
| } |
| @@ -139,6 +151,12 @@ func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw |
| // situations it's acceptable to not have a log stream. |
| var logErr error |
| var fr swarmingFetchResult |
| + var resTags map[string]string |
| + |
| + // Special Context to enable the cancellation of log fetching. |
| + logsCancelled := false |
| + logCtx, cancelLogs := context.WithCancel(c) |
| + defer cancelLogs() |
| err := parallel.FanOutIn(func(workC chan<- func() error) { |
| if req.fetchReq { |
| @@ -150,14 +168,22 @@ func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw |
| if req.fetchRes { |
| workC <- func() (err error) { |
| - fr.res, err = svc.getSwarmingResult(c, taskID) |
| + if fr.res, err = svc.getSwarmingResult(c, taskID); err == nil { |
| + resTags = swarmingTags(fr.res.Tags) |
| + if req.taskTagCallback != nil && req.taskTagCallback(resTags) { |
| + logsCancelled = true |
| + cancelLogs() |
| + } |
| + } |
| return |
| } |
| } |
| if req.fetchLog { |
| workC <- func() error { |
| - fr.log, logErr = svc.getTaskOutput(c, taskID) |
| + // Note: we're using the log Context here so we can cancel log fetch |
| + // explicitly. |
| + fr.log, logErr = svc.getTaskOutput(logCtx, taskID) |
| return nil |
| } |
| } |
| @@ -172,10 +198,12 @@ func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw |
| if !isMiloJob(fr.req.Tags) { |
| return nil, errNotMiloJob |
| } |
| + |
| case req.fetchRes: |
| if !isMiloJob(fr.res.Tags) { |
| return nil, errNotMiloJob |
| } |
| + |
| default: |
| // No metadata to decide if this is a Milo job, so assume that it is not. |
| return nil, errNotMiloJob |
| @@ -192,6 +220,11 @@ func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw |
| } |
| } |
| } |
| + |
| + // If we explicitly cancelled logs, everything is OK. |
| + if logErr == context.Canceled && logsCancelled { |
| + logErr = nil |
| + } |
| return &fr, logErr |
| } |
| @@ -230,23 +263,13 @@ func tagsToProperties(tags []string) *resp.PropertyGroup { |
| return props |
| } |
| -// tagValue returns a value of the first tag matching the tag key. If not found |
| -// returns "". |
| -func tagValue(tags []string, key string) string { |
| - prefix := key + ":" |
| - for _, t := range tags { |
| - if strings.HasPrefix(t, prefix) { |
| - return strings.TrimPrefix(t, prefix) |
| - } |
| - } |
| - return "" |
| -} |
| - |
| // addBuilderLink adds a link to the buildbucket builder view. |
| func addBuilderLink(c context.Context, build *resp.MiloBuild, swarmingHostname string, sr *swarming.SwarmingRpcsTaskResult) { |
| - bbHost := tagValue(sr.Tags, "buildbucket_hostname") |
| - bucket := tagValue(sr.Tags, "buildbucket_bucket") |
| - builder := tagValue(sr.Tags, "builder") |
| + tags := swarmingTags(sr.Tags) |
| + |
| + bbHost := tags["buildbucket_hostname"] |
| + bucket := tags["buildbucket_bucket"] |
| + builder := tags["builder"] |
| if bucket == "" { |
| logging.Errorf( |
| c, "Could not extract buidlbucket bucket from task %s", |
| @@ -445,11 +468,53 @@ func streamsFromAnnotatedLog(ctx context.Context, log string) (*logdog.Streams, |
| return c.ToLogDogStreams() |
| } |
| -func swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID string) (*resp.MiloBuild, error) { |
| +type buildLoader struct { |
|
hinoka
2017/02/16 02:08:21
Comment on why this exists (testing)
dnj
2017/02/16 02:20:51
Done.
|
| + // logdogClientFunc returns a coordinator Client instance for the supplied |
| + // parameters. |
| + // |
| + // If nil, a production client will be generated. |
| + logDogClientFunc func(c context.Context, host string) (*coordinator.Client, error) |
| +} |
| + |
| +func (bl *buildLoader) newAnnotationStream(c context.Context, stream *types.Stream) (*logdog.AnnotationStream, error) { |
|
hinoka
2017/02/16 02:08:21
newEmptyAnnotationStream might be more appropriate
dnj
2017/02/16 02:20:51
Done.
|
| + fn := bl.logDogClientFunc |
| + if fn == nil { |
| + fn = logdog.NewClient |
| + } |
| + client, err := fn(c, stream.Host) |
| + if err != nil { |
| + return nil, errors.Annotate(err).Reason("failed to create LogDog client").Err() |
| + } |
| + |
| + as := logdog.AnnotationStream{ |
| + Client: client, |
| + Project: stream.Project, |
| + Path: stream.Path, |
| + } |
| + if err := as.Normalize(); err != nil { |
| + return nil, errors.Annotate(err).Reason("failed to normalize annotation stream parameters").Err() |
| + } |
| + |
| + return &as, nil |
| +} |
| + |
| +func (bl *buildLoader) swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID string) (*resp.MiloBuild, error) { |
| // Fetch the data from Swarming |
| + var logDogStream *types.Stream |
| + |
| fetchParams := swarmingFetchParams{ |
| fetchRes: true, |
| fetchLog: true, |
| + |
| + // Cancel if LogDog annotation strem parameters are present in the tag set. |
|
hinoka
2017/02/16 02:08:21
stream
dnj
2017/02/16 02:20:51
Done.
|
| + taskTagCallback: func(tags map[string]string) (cancelLogs bool) { |
| + var err error |
| + if logDogStream, err = resolveLogDogStreamFromTags(tags); err != nil { |
| + logging.WithError(err).Debugf(c, "Not using LogDog annotation stream.") |
| + return false |
| + } |
| + return true |
| + }, |
| } |
| fr, err := swarmingFetch(c, svc, taskID, fetchParams) |
| if err != nil { |
| @@ -460,10 +525,22 @@ func swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID |
| var s *miloProto.Step |
| var lds *logdog.Streams |
| - // Decode the data using annotee. The logdog stream returned here is assumed |
| - // to be consistent, which is why the following block of code are not |
| - // expected to ever err out. |
| - if fr.log != "" { |
| + // If the LogDog stream is available, load the step from that. |
|
hinoka
2017/02/16 02:08:21
Add to comment: Explanation on why we're preferrin
dnj
2017/02/16 02:20:51
Done.
|
| + switch { |
| + case logDogStream != nil: |
| + as, err := bl.newAnnotationStream(c, logDogStream) |
| + if err != nil { |
| + return nil, errors.Annotate(err).Reason("failed to create LogDog annotation stream").Err() |
| + } |
| + |
| + if s, err = as.Load(c); err != nil { |
|
hinoka
2017/02/16 02:08:21
Fetch seems like a more appropriate name
dnj
2017/02/16 02:20:51
Done.
|
| + return nil, errors.Annotate(err).Reason("failed to load LogDog annotation stream").Err() |
| + } |
| + |
| + case fr.log != "": |
| + // Decode the data using annotee. The logdog stream returned here is assumed |
| + // to be consistent, which is why the following block of code are not |
| + // expected to ever err out. |
| var err error |
| lds, err = streamsFromAnnotatedLog(c, fr.log) |
| if err != nil { |
| @@ -478,11 +555,12 @@ func swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID |
| }}, |
| }} |
| } |
| - } |
| - if lds != nil && lds.MainStream != nil && lds.MainStream.Data != nil { |
| - s = lds.MainStream.Data |
| - } else { |
| + if lds != nil && lds.MainStream != nil && lds.MainStream.Data != nil { |
| + s = lds.MainStream.Data |
| + } |
| + |
| + default: |
| s = &miloProto.Step{} |
| } |
| @@ -560,3 +638,16 @@ func (b swarmingURLBuilder) BuildLink(l *miloProto.Link) *resp.Link { |
| return nil |
| } |
| } |
| + |
| +func swarmingTags(v []string) map[string]string { |
| + res := make(map[string]string, len(v)) |
| + for _, tag := range v { |
| + var value string |
| + parts := strings.SplitN(tag, ":", 2) |
| + if len(parts) == 2 { |
| + value = parts[1] |
| + } |
| + res[parts[0]] = value |
| + } |
| + return res |
| +} |