Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package swarming | 5 package swarming |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "fmt" | 9 "fmt" |
| 10 "net/http" | 10 "net/http" |
| 11 "net/url" | 11 "net/url" |
| 12 "strings" | 12 "strings" |
| 13 "time" | 13 "time" |
| 14 | 14 |
| 15 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
| 16 | 16 |
| 17 swarming "github.com/luci/luci-go/common/api/swarming/swarming/v1" | 17 swarming "github.com/luci/luci-go/common/api/swarming/swarming/v1" |
| 18 "github.com/luci/luci-go/common/errors" | 18 "github.com/luci/luci-go/common/errors" |
| 19 "github.com/luci/luci-go/common/logging" | 19 "github.com/luci/luci-go/common/logging" |
| 20 "github.com/luci/luci-go/common/proto/google" | 20 "github.com/luci/luci-go/common/proto/google" |
| 21 miloProto "github.com/luci/luci-go/common/proto/milo" | 21 miloProto "github.com/luci/luci-go/common/proto/milo" |
| 22 "github.com/luci/luci-go/common/sync/parallel" | 22 "github.com/luci/luci-go/common/sync/parallel" |
| 23 "github.com/luci/luci-go/logdog/client/annotee" | 23 "github.com/luci/luci-go/logdog/client/annotee" |
| 24 "github.com/luci/luci-go/logdog/client/coordinator" | |
| 24 "github.com/luci/luci-go/logdog/common/types" | 25 "github.com/luci/luci-go/logdog/common/types" |
| 25 "github.com/luci/luci-go/milo/api/resp" | 26 "github.com/luci/luci-go/milo/api/resp" |
| 26 "github.com/luci/luci-go/milo/appengine/logdog" | 27 "github.com/luci/luci-go/milo/appengine/logdog" |
| 27 "github.com/luci/luci-go/server/auth" | 28 "github.com/luci/luci-go/server/auth" |
| 28 ) | 29 ) |
| 29 | 30 |
| 30 // errNotMiloJob is returned if a Swarming task is fetched that does not self- | 31 // errNotMiloJob is returned if a Swarming task is fetched that does not self- |
| 31 // identify as a Milo job. | 32 // identify as a Milo job. |
| 32 var errNotMiloJob = errors.New("Not a Milo Job") | 33 var errNotMiloJob = errors.New("Not a Milo Job") |
| 33 | 34 |
| (...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 108 } | 109 } |
| 109 | 110 |
| 110 func (svc *prodSwarmingService) getSwarmingRequest(c context.Context, taskID str ing) (*swarming.SwarmingRpcsTaskRequest, error) { | 111 func (svc *prodSwarmingService) getSwarmingRequest(c context.Context, taskID str ing) (*swarming.SwarmingRpcsTaskRequest, error) { |
| 111 return svc.client.Task.Request(taskID).Context(c).Do() | 112 return svc.client.Task.Request(taskID).Context(c).Do() |
| 112 } | 113 } |
| 113 | 114 |
| 114 type swarmingFetchParams struct { | 115 type swarmingFetchParams struct { |
| 115 fetchReq bool | 116 fetchReq bool |
| 116 fetchRes bool | 117 fetchRes bool |
| 117 fetchLog bool | 118 fetchLog bool |
| 119 | |
| 120 // taskTagCallback, if not nil, is a callback that will be invoked durin g | |
| 121 // the swarmingFetch of the fetch result (if enabled). It will contain t he | |
|
hinoka
2017/02/16 02:08:21
invoked after fetching the swarm fetchRes call?
dnj
2017/02/16 02:20:51
Done.
| |
| 122 // mapped result tags. | |
| 123 // | |
| 124 // If taskTagCallback returns true, any pending log fetch will be cancel led | |
| 125 // without error. | |
| 126 taskTagCallback func(map[string]string) bool | |
|
hinoka
2017/02/16 02:08:21
This is funky, but I don't have any better ideas :
dnj
2017/02/16 02:20:51
Acknowledged.
| |
| 118 } | 127 } |
| 119 | 128 |
| 120 type swarmingFetchResult struct { | 129 type swarmingFetchResult struct { |
| 121 req *swarming.SwarmingRpcsTaskRequest | 130 req *swarming.SwarmingRpcsTaskRequest |
| 122 res *swarming.SwarmingRpcsTaskResult | 131 res *swarming.SwarmingRpcsTaskResult |
| 132 | |
| 133 // log is the log data content. If no log data was fetched, this will em pty. | |
| 134 // If the log fetch was cancelled, this is undefined. | |
| 123 log string | 135 log string |
| 124 } | 136 } |
| 125 | 137 |
| 126 // swarmingFetch fetches (in parallel) the components that it is configured to | 138 // swarmingFetch fetches (in parallel) the components that it is configured to |
| 127 // fetch. | 139 // fetch. |
| 128 // | 140 // |
| 129 // After fetching, an ACL check is performed to confirm that the user is | 141 // After fetching, an ACL check is performed to confirm that the user is |
| 130 // permitted to view the resulting data. If this check fails, get returns | 142 // permitted to view the resulting data. If this check fails, get returns |
| 131 // errNotMiloJob. | 143 // errNotMiloJob. |
| 132 // | 144 // |
| 133 // TODO(hinoka): Make this ACL check something more specific than the | 145 // TODO(hinoka): Make this ACL check something more specific than the |
| 134 // resence of the "allow_milo" dimension. | 146 // resence of the "allow_milo" dimension. |
| 135 func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw armingFetchParams) ( | 147 func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw armingFetchParams) ( |
| 136 *swarmingFetchResult, error) { | 148 *swarmingFetchResult, error) { |
| 137 | 149 |
| 138 // logErr is managed separately from other fetch errors, since in some | 150 // logErr is managed separately from other fetch errors, since in some |
| 139 // situations it's acceptable to not have a log stream. | 151 // situations it's acceptable to not have a log stream. |
| 140 var logErr error | 152 var logErr error |
| 141 var fr swarmingFetchResult | 153 var fr swarmingFetchResult |
| 154 var resTags map[string]string | |
| 155 | |
| 156 // Special Context to enable the cancellation of log fetching. | |
| 157 logsCancelled := false | |
| 158 logCtx, cancelLogs := context.WithCancel(c) | |
| 159 defer cancelLogs() | |
| 142 | 160 |
| 143 err := parallel.FanOutIn(func(workC chan<- func() error) { | 161 err := parallel.FanOutIn(func(workC chan<- func() error) { |
| 144 if req.fetchReq { | 162 if req.fetchReq { |
| 145 workC <- func() (err error) { | 163 workC <- func() (err error) { |
| 146 fr.req, err = svc.getSwarmingRequest(c, taskID) | 164 fr.req, err = svc.getSwarmingRequest(c, taskID) |
| 147 return | 165 return |
| 148 } | 166 } |
| 149 } | 167 } |
| 150 | 168 |
| 151 if req.fetchRes { | 169 if req.fetchRes { |
| 152 workC <- func() (err error) { | 170 workC <- func() (err error) { |
| 153 » » » » fr.res, err = svc.getSwarmingResult(c, taskID) | 171 » » » » if fr.res, err = svc.getSwarmingResult(c, taskID ); err == nil { |
| 172 » » » » » resTags = swarmingTags(fr.res.Tags) | |
| 173 » » » » » if req.taskTagCallback != nil && req.tas kTagCallback(resTags) { | |
| 174 » » » » » » logsCancelled = true | |
| 175 » » » » » » cancelLogs() | |
| 176 » » » » » } | |
| 177 » » » » } | |
| 154 return | 178 return |
| 155 } | 179 } |
| 156 } | 180 } |
| 157 | 181 |
| 158 if req.fetchLog { | 182 if req.fetchLog { |
| 159 workC <- func() error { | 183 workC <- func() error { |
| 160 » » » » fr.log, logErr = svc.getTaskOutput(c, taskID) | 184 » » » » // Note: we're using the log Context here so we can cancel log fetch |
| 185 » » » » // explicitly. | |
| 186 » » » » fr.log, logErr = svc.getTaskOutput(logCtx, taskI D) | |
| 161 return nil | 187 return nil |
| 162 } | 188 } |
| 163 } | 189 } |
| 164 }) | 190 }) |
| 165 if err != nil { | 191 if err != nil { |
| 166 return nil, err | 192 return nil, err |
| 167 } | 193 } |
| 168 | 194 |
| 169 // Current ACL implementation: error if this is not a Milo job. | 195 // Current ACL implementation: error if this is not a Milo job. |
| 170 switch { | 196 switch { |
| 171 case req.fetchReq: | 197 case req.fetchReq: |
| 172 if !isMiloJob(fr.req.Tags) { | 198 if !isMiloJob(fr.req.Tags) { |
| 173 return nil, errNotMiloJob | 199 return nil, errNotMiloJob |
| 174 } | 200 } |
| 201 | |
| 175 case req.fetchRes: | 202 case req.fetchRes: |
| 176 if !isMiloJob(fr.res.Tags) { | 203 if !isMiloJob(fr.res.Tags) { |
| 177 return nil, errNotMiloJob | 204 return nil, errNotMiloJob |
| 178 } | 205 } |
| 206 | |
| 179 default: | 207 default: |
| 180 // No metadata to decide if this is a Milo job, so assume that i t is not. | 208 // No metadata to decide if this is a Milo job, so assume that i t is not. |
| 181 return nil, errNotMiloJob | 209 return nil, errNotMiloJob |
| 182 } | 210 } |
| 183 | 211 |
| 184 if req.fetchRes && logErr != nil { | 212 if req.fetchRes && logErr != nil { |
| 185 switch fr.res.State { | 213 switch fr.res.State { |
| 186 case TaskCompleted, TaskRunning, TaskCanceled: | 214 case TaskCompleted, TaskRunning, TaskCanceled: |
| 187 default: | 215 default: |
| 188 // Ignore log errors if the task might be pending, time d out, expired, etc. | 216 // Ignore log errors if the task might be pending, time d out, expired, etc. |
| 189 if err != nil { | 217 if err != nil { |
| 190 fr.log = "" | 218 fr.log = "" |
| 191 logErr = nil | 219 logErr = nil |
| 192 } | 220 } |
| 193 } | 221 } |
| 194 } | 222 } |
| 223 | |
| 224 // If we explicitly cancelled logs, everything is OK. | |
| 225 if logErr == context.Canceled && logsCancelled { | |
| 226 logErr = nil | |
| 227 } | |
| 195 return &fr, logErr | 228 return &fr, logErr |
| 196 } | 229 } |
| 197 | 230 |
| 198 func taskProperties(sr *swarming.SwarmingRpcsTaskResult) *resp.PropertyGroup { | 231 func taskProperties(sr *swarming.SwarmingRpcsTaskResult) *resp.PropertyGroup { |
| 199 props := &resp.PropertyGroup{GroupName: "Swarming"} | 232 props := &resp.PropertyGroup{GroupName: "Swarming"} |
| 200 if len(sr.CostsUsd) == 1 { | 233 if len(sr.CostsUsd) == 1 { |
| 201 props.Property = append(props.Property, &resp.Property{ | 234 props.Property = append(props.Property, &resp.Property{ |
| 202 Key: "Cost of job (USD)", | 235 Key: "Cost of job (USD)", |
| 203 Value: fmt.Sprintf("$%.2f", sr.CostsUsd[0]), | 236 Value: fmt.Sprintf("$%.2f", sr.CostsUsd[0]), |
| 204 }) | 237 }) |
| (...skipping 18 matching lines...) Expand all Loading... | |
| 223 Key: parts[0], | 256 Key: parts[0], |
| 224 } | 257 } |
| 225 if len(parts) == 2 { | 258 if len(parts) == 2 { |
| 226 p.Value = parts[1] | 259 p.Value = parts[1] |
| 227 } | 260 } |
| 228 props.Property = append(props.Property, p) | 261 props.Property = append(props.Property, p) |
| 229 } | 262 } |
| 230 return props | 263 return props |
| 231 } | 264 } |
| 232 | 265 |
| 233 // tagValue returns a value of the first tag matching the tag key. If not found | |
| 234 // returns "". | |
| 235 func tagValue(tags []string, key string) string { | |
| 236 prefix := key + ":" | |
| 237 for _, t := range tags { | |
| 238 if strings.HasPrefix(t, prefix) { | |
| 239 return strings.TrimPrefix(t, prefix) | |
| 240 } | |
| 241 } | |
| 242 return "" | |
| 243 } | |
| 244 | |
| 245 // addBuilderLink adds a link to the buildbucket builder view. | 266 // addBuilderLink adds a link to the buildbucket builder view. |
| 246 func addBuilderLink(c context.Context, build *resp.MiloBuild, swarmingHostname s tring, sr *swarming.SwarmingRpcsTaskResult) { | 267 func addBuilderLink(c context.Context, build *resp.MiloBuild, swarmingHostname s tring, sr *swarming.SwarmingRpcsTaskResult) { |
| 247 » bbHost := tagValue(sr.Tags, "buildbucket_hostname") | 268 » tags := swarmingTags(sr.Tags) |
| 248 » bucket := tagValue(sr.Tags, "buildbucket_bucket") | 269 |
| 249 » builder := tagValue(sr.Tags, "builder") | 270 » bbHost := tags["buildbucket_hostname"] |
| 271 » bucket := tags["buildbucket_bucket"] | |
| 272 » builder := tags["builder"] | |
| 250 if bucket == "" { | 273 if bucket == "" { |
| 251 logging.Errorf( | 274 logging.Errorf( |
| 252 c, "Could not extract buidlbucket bucket from task %s", | 275 c, "Could not extract buidlbucket bucket from task %s", |
| 253 taskPageURL(swarmingHostname, sr.TaskId)) | 276 taskPageURL(swarmingHostname, sr.TaskId)) |
| 254 } | 277 } |
| 255 if builder == "" { | 278 if builder == "" { |
| 256 logging.Errorf( | 279 logging.Errorf( |
| 257 c, "Could not extract builder name from task %s", | 280 c, "Could not extract builder name from task %s", |
| 258 taskPageURL(swarmingHostname, sr.TaskId)) | 281 taskPageURL(swarmingHostname, sr.TaskId)) |
| 259 } | 282 } |
| (...skipping 178 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 438 } | 461 } |
| 439 // If this ever has more than one stream then memoryClient needs to beco me | 462 // If this ever has more than one stream then memoryClient needs to beco me |
| 440 // goroutine safe | 463 // goroutine safe |
| 441 if err := p.RunStreams([]*annotee.Stream{&is}); err != nil { | 464 if err := p.RunStreams([]*annotee.Stream{&is}); err != nil { |
| 442 return nil, err | 465 return nil, err |
| 443 } | 466 } |
| 444 p.Finish() | 467 p.Finish() |
| 445 return c.ToLogDogStreams() | 468 return c.ToLogDogStreams() |
| 446 } | 469 } |
| 447 | 470 |
| 448 func swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID string) (*resp.MiloBuild, error) { | 471 type buildLoader struct { |
|
hinoka
2017/02/16 02:08:21
Comment on why this exists (testing)
dnj
2017/02/16 02:20:51
Done.
| |
| 472 » // logdogClientFunc returns a coordinator Client instance for the suppli ed | |
| 473 » // parameters. | |
| 474 » // | |
| 475 » // If nil, a production client will be generated. | |
| 476 » logDogClientFunc func(c context.Context, host string) (*coordinator.Clie nt, error) | |
| 477 } | |
| 478 | |
| 479 func (bl *buildLoader) newAnnotationStream(c context.Context, stream *types.Stre am) (*logdog.AnnotationStream, error) { | |
|
hinoka
2017/02/16 02:08:21
newEmptyAnnotationStream might be more appropriate
dnj
2017/02/16 02:20:51
Done.
| |
| 480 » fn := bl.logDogClientFunc | |
| 481 » if fn == nil { | |
| 482 » » fn = logdog.NewClient | |
| 483 » } | |
| 484 » client, err := fn(c, stream.Host) | |
| 485 » if err != nil { | |
| 486 » » return nil, errors.Annotate(err).Reason("failed to create LogDog client").Err() | |
| 487 » } | |
| 488 | |
| 489 » as := logdog.AnnotationStream{ | |
| 490 » » Client: client, | |
| 491 » » Project: stream.Project, | |
| 492 » » Path: stream.Path, | |
| 493 » } | |
| 494 » if err := as.Normalize(); err != nil { | |
| 495 » » return nil, errors.Annotate(err).Reason("failed to normalize ann otation stream parameters").Err() | |
| 496 » } | |
| 497 | |
| 498 » return &as, nil | |
| 499 } | |
| 500 | |
| 501 func (bl *buildLoader) swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID string) (*resp.MiloBuild, error) { | |
| 449 // Fetch the data from Swarming | 502 // Fetch the data from Swarming |
| 503 var logDogStream *types.Stream | |
| 504 | |
| 450 fetchParams := swarmingFetchParams{ | 505 fetchParams := swarmingFetchParams{ |
| 451 fetchRes: true, | 506 fetchRes: true, |
| 452 fetchLog: true, | 507 fetchLog: true, |
| 508 | |
| 509 // Cancel if LogDog annotation strem parameters are present in t he tag set. | |
|
hinoka
2017/02/16 02:08:21
stream
dnj
2017/02/16 02:20:51
Done.
| |
| 510 taskTagCallback: func(tags map[string]string) (cancelLogs bool) { | |
| 511 var err error | |
| 512 if logDogStream, err = resolveLogDogStreamFromTags(tags) ; err != nil { | |
| 513 logging.WithError(err).Debugf(c, "Not using LogD og annotation stream.") | |
| 514 return false | |
| 515 } | |
| 516 return true | |
| 517 }, | |
| 453 } | 518 } |
| 454 fr, err := swarmingFetch(c, svc, taskID, fetchParams) | 519 fr, err := swarmingFetch(c, svc, taskID, fetchParams) |
| 455 if err != nil { | 520 if err != nil { |
| 456 return nil, err | 521 return nil, err |
| 457 } | 522 } |
| 458 | 523 |
| 459 var build resp.MiloBuild | 524 var build resp.MiloBuild |
| 460 var s *miloProto.Step | 525 var s *miloProto.Step |
| 461 var lds *logdog.Streams | 526 var lds *logdog.Streams |
| 462 | 527 |
| 463 » // Decode the data using annotee. The logdog stream returned here is ass umed | 528 » // If the LogDog stream is available, load the step from that. |
|
hinoka
2017/02/16 02:08:21
Add to comment: Explanation on why we're preferrin
dnj
2017/02/16 02:20:51
Done.
| |
| 464 » // to be consistent, which is why the following block of code are not | 529 » switch { |
| 465 » // expected to ever err out. | 530 » case logDogStream != nil: |
| 466 » if fr.log != "" { | 531 » » as, err := bl.newAnnotationStream(c, logDogStream) |
| 532 » » if err != nil { | |
| 533 » » » return nil, errors.Annotate(err).Reason("failed to creat e LogDog annotation stream").Err() | |
| 534 » » } | |
| 535 | |
| 536 » » if s, err = as.Load(c); err != nil { | |
|
hinoka
2017/02/16 02:08:21
Fetch seems like a more appropriate name
dnj
2017/02/16 02:20:51
Done.
| |
| 537 » » » return nil, errors.Annotate(err).Reason("failed to load LogDog annotation stream").Err() | |
| 538 » » } | |
| 539 | |
| 540 » case fr.log != "": | |
| 541 » » // Decode the data using annotee. The logdog stream returned her e is assumed | |
| 542 » » // to be consistent, which is why the following block of code ar e not | |
| 543 » » // expected to ever err out. | |
| 467 var err error | 544 var err error |
| 468 lds, err = streamsFromAnnotatedLog(c, fr.log) | 545 lds, err = streamsFromAnnotatedLog(c, fr.log) |
| 469 if err != nil { | 546 if err != nil { |
| 470 build.Components = []*resp.BuildComponent{{ | 547 build.Components = []*resp.BuildComponent{{ |
| 471 Type: resp.Summary, | 548 Type: resp.Summary, |
| 472 Label: "Milo annotation parser", | 549 Label: "Milo annotation parser", |
| 473 Text: []string{err.Error()}, | 550 Text: []string{err.Error()}, |
| 474 Status: resp.InfraFailure, | 551 Status: resp.InfraFailure, |
| 475 SubLink: []*resp.Link{{ | 552 SubLink: []*resp.Link{{ |
| 476 Label: "swarming task", | 553 Label: "swarming task", |
| 477 URL: taskPageURL(svc.getHost(), taskID ), | 554 URL: taskPageURL(svc.getHost(), taskID ), |
| 478 }}, | 555 }}, |
| 479 }} | 556 }} |
| 480 } | 557 } |
| 481 } | |
| 482 | 558 |
| 483 » if lds != nil && lds.MainStream != nil && lds.MainStream.Data != nil { | 559 » » if lds != nil && lds.MainStream != nil && lds.MainStream.Data != nil { |
| 484 » » s = lds.MainStream.Data | 560 » » » s = lds.MainStream.Data |
| 485 » } else { | 561 » » } |
| 562 | |
| 563 » default: | |
| 486 s = &miloProto.Step{} | 564 s = &miloProto.Step{} |
| 487 } | 565 } |
| 488 | 566 |
| 489 if err := addTaskToMiloStep(c, svc.getHost(), fr.res, s); err != nil { | 567 if err := addTaskToMiloStep(c, svc.getHost(), fr.res, s); err != nil { |
| 490 return nil, err | 568 return nil, err |
| 491 } | 569 } |
| 492 logdog.AddLogDogToBuild(c, swarmingURLBuilder(linkBase), s, &build) | 570 logdog.AddLogDogToBuild(c, swarmingURLBuilder(linkBase), s, &build) |
| 493 | 571 |
| 494 if err := addTaskToBuild(c, svc.getHost(), fr.res, &build); err != nil { | 572 if err := addTaskToBuild(c, svc.getHost(), fr.res, &build); err != nil { |
| 495 return nil, err | 573 return nil, err |
| (...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 553 case *miloProto.Link_Url: | 631 case *miloProto.Link_Url: |
| 554 return &resp.Link{ | 632 return &resp.Link{ |
| 555 Label: l.Label, | 633 Label: l.Label, |
| 556 URL: t.Url, | 634 URL: t.Url, |
| 557 } | 635 } |
| 558 | 636 |
| 559 default: | 637 default: |
| 560 return nil | 638 return nil |
| 561 } | 639 } |
| 562 } | 640 } |
| 641 | |
| 642 func swarmingTags(v []string) map[string]string { | |
| 643 res := make(map[string]string, len(v)) | |
| 644 for _, tag := range v { | |
| 645 var value string | |
| 646 parts := strings.SplitN(tag, ":", 2) | |
| 647 if len(parts) == 2 { | |
| 648 value = parts[1] | |
| 649 } | |
| 650 res[parts[0]] = value | |
| 651 } | |
| 652 return res | |
| 653 } | |
| OLD | NEW |