| Index: milo/appengine/swarming/build.go
|
| diff --git a/milo/appengine/swarming/build.go b/milo/appengine/swarming/build.go
|
| index df4ba5e6e3dbe8a18c8eec9bca0c9f9574648ea5..7133159ca76969fc9c32505a09a0ddfb0751d738 100644
|
| --- a/milo/appengine/swarming/build.go
|
| +++ b/milo/appengine/swarming/build.go
|
| @@ -21,6 +21,7 @@ import (
|
| miloProto "github.com/luci/luci-go/common/proto/milo"
|
| "github.com/luci/luci-go/common/sync/parallel"
|
| "github.com/luci/luci-go/logdog/client/annotee"
|
| + "github.com/luci/luci-go/logdog/client/coordinator"
|
| "github.com/luci/luci-go/logdog/common/types"
|
| "github.com/luci/luci-go/milo/api/resp"
|
| "github.com/luci/luci-go/milo/appengine/logdog"
|
| @@ -115,11 +116,22 @@ type swarmingFetchParams struct {
|
| fetchReq bool
|
| fetchRes bool
|
| fetchLog bool
|
| +
|
| + // taskTagCallback, if not nil, is a callback that will be invoked after
|
| + // fetching the result, if fetchRes is true. It will be passed a key/value map
|
| + // of the Swarming result's tags.
|
| + //
|
| + // If taskTagCallback returns true, any pending log fetch will be cancelled
|
| + // without error.
|
| + taskTagCallback func(map[string]string) bool
|
| }
|
|
|
| type swarmingFetchResult struct {
|
| req *swarming.SwarmingRpcsTaskRequest
|
| res *swarming.SwarmingRpcsTaskResult
|
| +
|
| + // log is the log data content. If no log data was fetched, this will empty.
|
| + // If the log fetch was cancelled, this is undefined.
|
| log string
|
| }
|
|
|
| @@ -139,6 +151,12 @@ func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw
|
| // situations it's acceptable to not have a log stream.
|
| var logErr error
|
| var fr swarmingFetchResult
|
| + var resTags map[string]string
|
| +
|
| + // Special Context to enable the cancellation of log fetching.
|
| + logsCancelled := false
|
| + logCtx, cancelLogs := context.WithCancel(c)
|
| + defer cancelLogs()
|
|
|
| err := parallel.FanOutIn(func(workC chan<- func() error) {
|
| if req.fetchReq {
|
| @@ -150,14 +168,22 @@ func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw
|
|
|
| if req.fetchRes {
|
| workC <- func() (err error) {
|
| - fr.res, err = svc.getSwarmingResult(c, taskID)
|
| + if fr.res, err = svc.getSwarmingResult(c, taskID); err == nil {
|
| + resTags = swarmingTags(fr.res.Tags)
|
| + if req.taskTagCallback != nil && req.taskTagCallback(resTags) {
|
| + logsCancelled = true
|
| + cancelLogs()
|
| + }
|
| + }
|
| return
|
| }
|
| }
|
|
|
| if req.fetchLog {
|
| workC <- func() error {
|
| - fr.log, logErr = svc.getTaskOutput(c, taskID)
|
| + // Note: we're using the log Context here so we can cancel log fetch
|
| + // explicitly.
|
| + fr.log, logErr = svc.getTaskOutput(logCtx, taskID)
|
| return nil
|
| }
|
| }
|
| @@ -172,10 +198,12 @@ func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw
|
| if !isMiloJob(fr.req.Tags) {
|
| return nil, errNotMiloJob
|
| }
|
| +
|
| case req.fetchRes:
|
| if !isMiloJob(fr.res.Tags) {
|
| return nil, errNotMiloJob
|
| }
|
| +
|
| default:
|
| // No metadata to decide if this is a Milo job, so assume that it is not.
|
| return nil, errNotMiloJob
|
| @@ -192,6 +220,11 @@ func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw
|
| }
|
| }
|
| }
|
| +
|
| + // If we explicitly cancelled logs, everything is OK.
|
| + if logErr == context.Canceled && logsCancelled {
|
| + logErr = nil
|
| + }
|
| return &fr, logErr
|
| }
|
|
|
| @@ -230,23 +263,13 @@ func tagsToProperties(tags []string) *resp.PropertyGroup {
|
| return props
|
| }
|
|
|
| -// tagValue returns a value of the first tag matching the tag key. If not found
|
| -// returns "".
|
| -func tagValue(tags []string, key string) string {
|
| - prefix := key + ":"
|
| - for _, t := range tags {
|
| - if strings.HasPrefix(t, prefix) {
|
| - return strings.TrimPrefix(t, prefix)
|
| - }
|
| - }
|
| - return ""
|
| -}
|
| -
|
| // addBuilderLink adds a link to the buildbucket builder view.
|
| func addBuilderLink(c context.Context, build *resp.MiloBuild, swarmingHostname string, sr *swarming.SwarmingRpcsTaskResult) {
|
| - bbHost := tagValue(sr.Tags, "buildbucket_hostname")
|
| - bucket := tagValue(sr.Tags, "buildbucket_bucket")
|
| - builder := tagValue(sr.Tags, "builder")
|
| + tags := swarmingTags(sr.Tags)
|
| +
|
| + bbHost := tags["buildbucket_hostname"]
|
| + bucket := tags["buildbucket_bucket"]
|
| + builder := tags["builder"]
|
| if bucket == "" {
|
| logging.Errorf(
|
| c, "Could not extract buidlbucket bucket from task %s",
|
| @@ -445,11 +468,59 @@ func streamsFromAnnotatedLog(ctx context.Context, log string) (*logdog.Streams,
|
| return c.ToLogDogStreams()
|
| }
|
|
|
| -func swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID string) (*resp.MiloBuild, error) {
|
| +// buildLoader represents the ability to load a Milo build from a Swarming task.
|
| +//
|
| +// It exists so that the internal build loading functionality can be stubbed out
|
| +// for testing.
|
| +type buildLoader struct {
|
| + // logdogClientFunc returns a coordinator Client instance for the supplied
|
| + // parameters.
|
| + //
|
| + // If nil, a production client will be generated.
|
| + logDogClientFunc func(c context.Context, host string) (*coordinator.Client, error)
|
| +}
|
| +
|
| +func (bl *buildLoader) newEmptyAnnotationStream(c context.Context, addr *types.StreamAddr) (
|
| + *logdog.AnnotationStream, error) {
|
| +
|
| + fn := bl.logDogClientFunc
|
| + if fn == nil {
|
| + fn = logdog.NewClient
|
| + }
|
| + client, err := fn(c, addr.Host)
|
| + if err != nil {
|
| + return nil, errors.Annotate(err).Reason("failed to create LogDog client").Err()
|
| + }
|
| +
|
| + as := logdog.AnnotationStream{
|
| + Client: client,
|
| + Project: addr.Project,
|
| + Path: addr.Path,
|
| + }
|
| + if err := as.Normalize(); err != nil {
|
| + return nil, errors.Annotate(err).Reason("failed to normalize annotation stream parameters").Err()
|
| + }
|
| +
|
| + return &as, nil
|
| +}
|
| +
|
| +func (bl *buildLoader) swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID string) (*resp.MiloBuild, error) {
|
| // Fetch the data from Swarming
|
| + var logDogStreamAddr *types.StreamAddr
|
| +
|
| fetchParams := swarmingFetchParams{
|
| fetchRes: true,
|
| fetchLog: true,
|
| +
|
| + // Cancel if LogDog annotation stream parameters are present in the tag set.
|
| + taskTagCallback: func(tags map[string]string) (cancelLogs bool) {
|
| + var err error
|
| + if logDogStreamAddr, err = resolveLogDogStreamAddrFromTags(tags); err != nil {
|
| + logging.WithError(err).Debugf(c, "Not using LogDog annotation stream.")
|
| + return false
|
| + }
|
| + return true
|
| + },
|
| }
|
| fr, err := swarmingFetch(c, svc, taskID, fetchParams)
|
| if err != nil {
|
| @@ -459,11 +530,36 @@ func swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID
|
| var build resp.MiloBuild
|
| var s *miloProto.Step
|
| var lds *logdog.Streams
|
| + var ub logdog.URLBuilder
|
| +
|
| + // Load the build from the available data.
|
| + //
|
| + // If the Swarming task explicitly specifies its log location, we prefer that.
|
| + // As a fallback, we will try and parse the Swarming task's output for
|
| + // annotations.
|
| + switch {
|
| + case logDogStreamAddr != nil:
|
| + // If the LogDog stream is available, load the step from that.
|
| + as, err := bl.newEmptyAnnotationStream(c, logDogStreamAddr)
|
| + if err != nil {
|
| + return nil, errors.Annotate(err).Reason("failed to create LogDog annotation stream").Err()
|
| + }
|
| +
|
| + if s, err = as.Fetch(c); err != nil {
|
| + return nil, errors.Annotate(err).Reason("failed to load LogDog annotation stream").Err()
|
| + }
|
| +
|
| + prefix, _ := logDogStreamAddr.Path.Split()
|
| + ub = &logdog.ViewerURLBuilder{
|
| + Host: logDogStreamAddr.Host,
|
| + Prefix: prefix,
|
| + Project: logDogStreamAddr.Project,
|
| + }
|
|
|
| - // Decode the data using annotee. The logdog stream returned here is assumed
|
| - // to be consistent, which is why the following block of code are not
|
| - // expected to ever err out.
|
| - if fr.log != "" {
|
| + case fr.log != "":
|
| + // Decode the data using annotee. The logdog stream returned here is assumed
|
| + // to be consistent, which is why the following block of code are not
|
| + // expected to ever err out.
|
| var err error
|
| lds, err = streamsFromAnnotatedLog(c, fr.log)
|
| if err != nil {
|
| @@ -478,18 +574,21 @@ func swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID
|
| }},
|
| }}
|
| }
|
| - }
|
|
|
| - if lds != nil && lds.MainStream != nil && lds.MainStream.Data != nil {
|
| - s = lds.MainStream.Data
|
| - } else {
|
| + if lds != nil && lds.MainStream != nil && lds.MainStream.Data != nil {
|
| + s = lds.MainStream.Data
|
| + }
|
| + ub = swarmingURLBuilder(linkBase)
|
| +
|
| + default:
|
| s = &miloProto.Step{}
|
| + ub = swarmingURLBuilder(linkBase)
|
| }
|
|
|
| if err := addTaskToMiloStep(c, svc.getHost(), fr.res, s); err != nil {
|
| return nil, err
|
| }
|
| - logdog.AddLogDogToBuild(c, swarmingURLBuilder(linkBase), s, &build)
|
| + logdog.AddLogDogToBuild(c, ub, s, &build)
|
|
|
| if err := addTaskToBuild(c, svc.getHost(), fr.res, &build); err != nil {
|
| return nil, err
|
| @@ -560,3 +659,16 @@ func (b swarmingURLBuilder) BuildLink(l *miloProto.Link) *resp.Link {
|
| return nil
|
| }
|
| }
|
| +
|
| +func swarmingTags(v []string) map[string]string {
|
| + res := make(map[string]string, len(v))
|
| + for _, tag := range v {
|
| + var value string
|
| + parts := strings.SplitN(tag, ":", 2)
|
| + if len(parts) == 2 {
|
| + value = parts[1]
|
| + }
|
| + res[parts[0]] = value
|
| + }
|
| + return res
|
| +}
|
|
|