| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package swarming | 5 package swarming |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "fmt" | 9 "fmt" |
| 10 "net/http" | 10 "net/http" |
| 11 "net/url" | 11 "net/url" |
| 12 "strings" | 12 "strings" |
| 13 "time" | 13 "time" |
| 14 | 14 |
| 15 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
| 16 | 16 |
| 17 swarming "github.com/luci/luci-go/common/api/swarming/swarming/v1" | 17 swarming "github.com/luci/luci-go/common/api/swarming/swarming/v1" |
| 18 "github.com/luci/luci-go/common/errors" | 18 "github.com/luci/luci-go/common/errors" |
| 19 "github.com/luci/luci-go/common/logging" | 19 "github.com/luci/luci-go/common/logging" |
| 20 "github.com/luci/luci-go/common/proto/google" | 20 "github.com/luci/luci-go/common/proto/google" |
| 21 miloProto "github.com/luci/luci-go/common/proto/milo" | 21 miloProto "github.com/luci/luci-go/common/proto/milo" |
| 22 "github.com/luci/luci-go/common/sync/parallel" | 22 "github.com/luci/luci-go/common/sync/parallel" |
| 23 "github.com/luci/luci-go/logdog/client/annotee" | 23 "github.com/luci/luci-go/logdog/client/annotee" |
| 24 "github.com/luci/luci-go/logdog/client/coordinator" |
| 24 "github.com/luci/luci-go/logdog/common/types" | 25 "github.com/luci/luci-go/logdog/common/types" |
| 25 "github.com/luci/luci-go/milo/api/resp" | 26 "github.com/luci/luci-go/milo/api/resp" |
| 26 "github.com/luci/luci-go/milo/appengine/logdog" | 27 "github.com/luci/luci-go/milo/appengine/logdog" |
| 27 "github.com/luci/luci-go/server/auth" | 28 "github.com/luci/luci-go/server/auth" |
| 28 ) | 29 ) |
| 29 | 30 |
| 30 // errNotMiloJob is returned if a Swarming task is fetched that does not self- | 31 // errNotMiloJob is returned if a Swarming task is fetched that does not self- |
| 31 // identify as a Milo job. | 32 // identify as a Milo job. |
| 32 var errNotMiloJob = errors.New("Not a Milo Job") | 33 var errNotMiloJob = errors.New("Not a Milo Job") |
| 33 | 34 |
| (...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 108 } | 109 } |
| 109 | 110 |
| 110 func (svc *prodSwarmingService) getSwarmingRequest(c context.Context, taskID str
ing) (*swarming.SwarmingRpcsTaskRequest, error) { | 111 func (svc *prodSwarmingService) getSwarmingRequest(c context.Context, taskID str
ing) (*swarming.SwarmingRpcsTaskRequest, error) { |
| 111 return svc.client.Task.Request(taskID).Context(c).Do() | 112 return svc.client.Task.Request(taskID).Context(c).Do() |
| 112 } | 113 } |
| 113 | 114 |
| 114 type swarmingFetchParams struct { | 115 type swarmingFetchParams struct { |
| 115 fetchReq bool | 116 fetchReq bool |
| 116 fetchRes bool | 117 fetchRes bool |
| 117 fetchLog bool | 118 fetchLog bool |
| 119 |
| 120 // taskTagCallback, if not nil, is a callback that will be invoked after |
| 121 // fetching the result, if fetchRes is true. It will be passed a key/val
ue map |
| 122 // of the Swarming result's tags. |
| 123 // |
| 124 // If taskTagCallback returns true, any pending log fetch will be cancel
led |
| 125 // without error. |
| 126 taskTagCallback func(map[string]string) bool |
| 118 } | 127 } |
| 119 | 128 |
| 120 type swarmingFetchResult struct { | 129 type swarmingFetchResult struct { |
| 121 req *swarming.SwarmingRpcsTaskRequest | 130 req *swarming.SwarmingRpcsTaskRequest |
| 122 res *swarming.SwarmingRpcsTaskResult | 131 res *swarming.SwarmingRpcsTaskResult |
| 132 |
| 133 // log is the log data content. If no log data was fetched, this will em
pty. |
| 134 // If the log fetch was cancelled, this is undefined. |
| 123 log string | 135 log string |
| 124 } | 136 } |
| 125 | 137 |
| 126 // swarmingFetch fetches (in parallel) the components that it is configured to | 138 // swarmingFetch fetches (in parallel) the components that it is configured to |
| 127 // fetch. | 139 // fetch. |
| 128 // | 140 // |
| 129 // After fetching, an ACL check is performed to confirm that the user is | 141 // After fetching, an ACL check is performed to confirm that the user is |
| 130 // permitted to view the resulting data. If this check fails, get returns | 142 // permitted to view the resulting data. If this check fails, get returns |
| 131 // errNotMiloJob. | 143 // errNotMiloJob. |
| 132 // | 144 // |
| 133 // TODO(hinoka): Make this ACL check something more specific than the | 145 // TODO(hinoka): Make this ACL check something more specific than the |
| 134 // resence of the "allow_milo" dimension. | 146 // resence of the "allow_milo" dimension. |
| 135 func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw
armingFetchParams) ( | 147 func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw
armingFetchParams) ( |
| 136 *swarmingFetchResult, error) { | 148 *swarmingFetchResult, error) { |
| 137 | 149 |
| 138 // logErr is managed separately from other fetch errors, since in some | 150 // logErr is managed separately from other fetch errors, since in some |
| 139 // situations it's acceptable to not have a log stream. | 151 // situations it's acceptable to not have a log stream. |
| 140 var logErr error | 152 var logErr error |
| 141 var fr swarmingFetchResult | 153 var fr swarmingFetchResult |
| 154 var resTags map[string]string |
| 155 |
| 156 // Special Context to enable the cancellation of log fetching. |
| 157 logsCancelled := false |
| 158 logCtx, cancelLogs := context.WithCancel(c) |
| 159 defer cancelLogs() |
| 142 | 160 |
| 143 err := parallel.FanOutIn(func(workC chan<- func() error) { | 161 err := parallel.FanOutIn(func(workC chan<- func() error) { |
| 144 if req.fetchReq { | 162 if req.fetchReq { |
| 145 workC <- func() (err error) { | 163 workC <- func() (err error) { |
| 146 fr.req, err = svc.getSwarmingRequest(c, taskID) | 164 fr.req, err = svc.getSwarmingRequest(c, taskID) |
| 147 return | 165 return |
| 148 } | 166 } |
| 149 } | 167 } |
| 150 | 168 |
| 151 if req.fetchRes { | 169 if req.fetchRes { |
| 152 workC <- func() (err error) { | 170 workC <- func() (err error) { |
| 153 » » » » fr.res, err = svc.getSwarmingResult(c, taskID) | 171 » » » » if fr.res, err = svc.getSwarmingResult(c, taskID
); err == nil { |
| 172 » » » » » resTags = swarmingTags(fr.res.Tags) |
| 173 » » » » » if req.taskTagCallback != nil && req.tas
kTagCallback(resTags) { |
| 174 » » » » » » logsCancelled = true |
| 175 » » » » » » cancelLogs() |
| 176 » » » » » } |
| 177 » » » » } |
| 154 return | 178 return |
| 155 } | 179 } |
| 156 } | 180 } |
| 157 | 181 |
| 158 if req.fetchLog { | 182 if req.fetchLog { |
| 159 workC <- func() error { | 183 workC <- func() error { |
| 160 » » » » fr.log, logErr = svc.getTaskOutput(c, taskID) | 184 » » » » // Note: we're using the log Context here so we
can cancel log fetch |
| 185 » » » » // explicitly. |
| 186 » » » » fr.log, logErr = svc.getTaskOutput(logCtx, taskI
D) |
| 161 return nil | 187 return nil |
| 162 } | 188 } |
| 163 } | 189 } |
| 164 }) | 190 }) |
| 165 if err != nil { | 191 if err != nil { |
| 166 return nil, err | 192 return nil, err |
| 167 } | 193 } |
| 168 | 194 |
| 169 // Current ACL implementation: error if this is not a Milo job. | 195 // Current ACL implementation: error if this is not a Milo job. |
| 170 switch { | 196 switch { |
| 171 case req.fetchReq: | 197 case req.fetchReq: |
| 172 if !isMiloJob(fr.req.Tags) { | 198 if !isMiloJob(fr.req.Tags) { |
| 173 return nil, errNotMiloJob | 199 return nil, errNotMiloJob |
| 174 } | 200 } |
| 201 |
| 175 case req.fetchRes: | 202 case req.fetchRes: |
| 176 if !isMiloJob(fr.res.Tags) { | 203 if !isMiloJob(fr.res.Tags) { |
| 177 return nil, errNotMiloJob | 204 return nil, errNotMiloJob |
| 178 } | 205 } |
| 206 |
| 179 default: | 207 default: |
| 180 // No metadata to decide if this is a Milo job, so assume that i
t is not. | 208 // No metadata to decide if this is a Milo job, so assume that i
t is not. |
| 181 return nil, errNotMiloJob | 209 return nil, errNotMiloJob |
| 182 } | 210 } |
| 183 | 211 |
| 184 if req.fetchRes && logErr != nil { | 212 if req.fetchRes && logErr != nil { |
| 185 switch fr.res.State { | 213 switch fr.res.State { |
| 186 case TaskCompleted, TaskRunning, TaskCanceled: | 214 case TaskCompleted, TaskRunning, TaskCanceled: |
| 187 default: | 215 default: |
| 188 // Ignore log errors if the task might be pending, time
d out, expired, etc. | 216 // Ignore log errors if the task might be pending, time
d out, expired, etc. |
| 189 if err != nil { | 217 if err != nil { |
| 190 fr.log = "" | 218 fr.log = "" |
| 191 logErr = nil | 219 logErr = nil |
| 192 } | 220 } |
| 193 } | 221 } |
| 194 } | 222 } |
| 223 |
| 224 // If we explicitly cancelled logs, everything is OK. |
| 225 if logErr == context.Canceled && logsCancelled { |
| 226 logErr = nil |
| 227 } |
| 195 return &fr, logErr | 228 return &fr, logErr |
| 196 } | 229 } |
| 197 | 230 |
| 198 func taskProperties(sr *swarming.SwarmingRpcsTaskResult) *resp.PropertyGroup { | 231 func taskProperties(sr *swarming.SwarmingRpcsTaskResult) *resp.PropertyGroup { |
| 199 props := &resp.PropertyGroup{GroupName: "Swarming"} | 232 props := &resp.PropertyGroup{GroupName: "Swarming"} |
| 200 if len(sr.CostsUsd) == 1 { | 233 if len(sr.CostsUsd) == 1 { |
| 201 props.Property = append(props.Property, &resp.Property{ | 234 props.Property = append(props.Property, &resp.Property{ |
| 202 Key: "Cost of job (USD)", | 235 Key: "Cost of job (USD)", |
| 203 Value: fmt.Sprintf("$%.2f", sr.CostsUsd[0]), | 236 Value: fmt.Sprintf("$%.2f", sr.CostsUsd[0]), |
| 204 }) | 237 }) |
| (...skipping 18 matching lines...) Expand all Loading... |
| 223 Key: parts[0], | 256 Key: parts[0], |
| 224 } | 257 } |
| 225 if len(parts) == 2 { | 258 if len(parts) == 2 { |
| 226 p.Value = parts[1] | 259 p.Value = parts[1] |
| 227 } | 260 } |
| 228 props.Property = append(props.Property, p) | 261 props.Property = append(props.Property, p) |
| 229 } | 262 } |
| 230 return props | 263 return props |
| 231 } | 264 } |
| 232 | 265 |
| 233 // tagValue returns a value of the first tag matching the tag key. If not found | |
| 234 // returns "". | |
| 235 func tagValue(tags []string, key string) string { | |
| 236 prefix := key + ":" | |
| 237 for _, t := range tags { | |
| 238 if strings.HasPrefix(t, prefix) { | |
| 239 return strings.TrimPrefix(t, prefix) | |
| 240 } | |
| 241 } | |
| 242 return "" | |
| 243 } | |
| 244 | |
| 245 // addBuilderLink adds a link to the buildbucket builder view. | 266 // addBuilderLink adds a link to the buildbucket builder view. |
| 246 func addBuilderLink(c context.Context, build *resp.MiloBuild, swarmingHostname s
tring, sr *swarming.SwarmingRpcsTaskResult) { | 267 func addBuilderLink(c context.Context, build *resp.MiloBuild, swarmingHostname s
tring, sr *swarming.SwarmingRpcsTaskResult) { |
| 247 » bbHost := tagValue(sr.Tags, "buildbucket_hostname") | 268 » tags := swarmingTags(sr.Tags) |
| 248 » bucket := tagValue(sr.Tags, "buildbucket_bucket") | 269 |
| 249 » builder := tagValue(sr.Tags, "builder") | 270 » bbHost := tags["buildbucket_hostname"] |
| 271 » bucket := tags["buildbucket_bucket"] |
| 272 » builder := tags["builder"] |
| 250 if bucket == "" { | 273 if bucket == "" { |
| 251 logging.Errorf( | 274 logging.Errorf( |
| 252 c, "Could not extract buidlbucket bucket from task %s", | 275 c, "Could not extract buidlbucket bucket from task %s", |
| 253 taskPageURL(swarmingHostname, sr.TaskId)) | 276 taskPageURL(swarmingHostname, sr.TaskId)) |
| 254 } | 277 } |
| 255 if builder == "" { | 278 if builder == "" { |
| 256 logging.Errorf( | 279 logging.Errorf( |
| 257 c, "Could not extract builder name from task %s", | 280 c, "Could not extract builder name from task %s", |
| 258 taskPageURL(swarmingHostname, sr.TaskId)) | 281 taskPageURL(swarmingHostname, sr.TaskId)) |
| 259 } | 282 } |
| (...skipping 178 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 438 } | 461 } |
| 439 // If this ever has more than one stream then memoryClient needs to beco
me | 462 // If this ever has more than one stream then memoryClient needs to beco
me |
| 440 // goroutine safe | 463 // goroutine safe |
| 441 if err := p.RunStreams([]*annotee.Stream{&is}); err != nil { | 464 if err := p.RunStreams([]*annotee.Stream{&is}); err != nil { |
| 442 return nil, err | 465 return nil, err |
| 443 } | 466 } |
| 444 p.Finish() | 467 p.Finish() |
| 445 return c.ToLogDogStreams() | 468 return c.ToLogDogStreams() |
| 446 } | 469 } |
| 447 | 470 |
| 448 func swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID
string) (*resp.MiloBuild, error) { | 471 // buildLoader represents the ability to load a Milo build from a Swarming task. |
| 472 // |
| 473 // It exists so that the internal build loading functionality can be stubbed out |
| 474 // for testing. |
| 475 type buildLoader struct { |
| 476 » // logdogClientFunc returns a coordinator Client instance for the suppli
ed |
| 477 » // parameters. |
| 478 » // |
| 479 » // If nil, a production client will be generated. |
| 480 » logDogClientFunc func(c context.Context, host string) (*coordinator.Clie
nt, error) |
| 481 } |
| 482 |
| 483 func (bl *buildLoader) newEmptyAnnotationStream(c context.Context, addr *types.S
treamAddr) ( |
| 484 » *logdog.AnnotationStream, error) { |
| 485 |
| 486 » fn := bl.logDogClientFunc |
| 487 » if fn == nil { |
| 488 » » fn = logdog.NewClient |
| 489 » } |
| 490 » client, err := fn(c, addr.Host) |
| 491 » if err != nil { |
| 492 » » return nil, errors.Annotate(err).Reason("failed to create LogDog
client").Err() |
| 493 » } |
| 494 |
| 495 » as := logdog.AnnotationStream{ |
| 496 » » Client: client, |
| 497 » » Project: addr.Project, |
| 498 » » Path: addr.Path, |
| 499 » } |
| 500 » if err := as.Normalize(); err != nil { |
| 501 » » return nil, errors.Annotate(err).Reason("failed to normalize ann
otation stream parameters").Err() |
| 502 » } |
| 503 |
| 504 » return &as, nil |
| 505 } |
| 506 |
| 507 func (bl *buildLoader) swarmingBuildImpl(c context.Context, svc swarmingService,
linkBase, taskID string) (*resp.MiloBuild, error) { |
| 449 // Fetch the data from Swarming | 508 // Fetch the data from Swarming |
| 509 var logDogStreamAddr *types.StreamAddr |
| 510 |
| 450 fetchParams := swarmingFetchParams{ | 511 fetchParams := swarmingFetchParams{ |
| 451 fetchRes: true, | 512 fetchRes: true, |
| 452 fetchLog: true, | 513 fetchLog: true, |
| 514 |
| 515 // Cancel if LogDog annotation stream parameters are present in
the tag set. |
| 516 taskTagCallback: func(tags map[string]string) (cancelLogs bool)
{ |
| 517 var err error |
| 518 if logDogStreamAddr, err = resolveLogDogStreamAddrFromTa
gs(tags); err != nil { |
| 519 logging.WithError(err).Debugf(c, "Not using LogD
og annotation stream.") |
| 520 return false |
| 521 } |
| 522 return true |
| 523 }, |
| 453 } | 524 } |
| 454 fr, err := swarmingFetch(c, svc, taskID, fetchParams) | 525 fr, err := swarmingFetch(c, svc, taskID, fetchParams) |
| 455 if err != nil { | 526 if err != nil { |
| 456 return nil, err | 527 return nil, err |
| 457 } | 528 } |
| 458 | 529 |
| 459 var build resp.MiloBuild | 530 var build resp.MiloBuild |
| 460 var s *miloProto.Step | 531 var s *miloProto.Step |
| 461 var lds *logdog.Streams | 532 var lds *logdog.Streams |
| 533 var ub logdog.URLBuilder |
| 462 | 534 |
| 463 » // Decode the data using annotee. The logdog stream returned here is ass
umed | 535 » // Load the build from the available data. |
| 464 » // to be consistent, which is why the following block of code are not | 536 » // |
| 465 » // expected to ever err out. | 537 » // If the Swarming task explicitly specifies its log location, we prefer
that. |
| 466 » if fr.log != "" { | 538 » // As a fallback, we will try and parse the Swarming task's output for |
| 539 » // annotations. |
| 540 » switch { |
| 541 » case logDogStreamAddr != nil: |
| 542 » » // If the LogDog stream is available, load the step from that. |
| 543 » » as, err := bl.newEmptyAnnotationStream(c, logDogStreamAddr) |
| 544 » » if err != nil { |
| 545 » » » return nil, errors.Annotate(err).Reason("failed to creat
e LogDog annotation stream").Err() |
| 546 » » } |
| 547 |
| 548 » » if s, err = as.Fetch(c); err != nil { |
| 549 » » » return nil, errors.Annotate(err).Reason("failed to load
LogDog annotation stream").Err() |
| 550 » » } |
| 551 |
| 552 » » prefix, _ := logDogStreamAddr.Path.Split() |
| 553 » » ub = &logdog.ViewerURLBuilder{ |
| 554 » » » Host: logDogStreamAddr.Host, |
| 555 » » » Prefix: prefix, |
| 556 » » » Project: logDogStreamAddr.Project, |
| 557 » » } |
| 558 |
| 559 » case fr.log != "": |
| 560 » » // Decode the data using annotee. The logdog stream returned her
e is assumed |
| 561 » » // to be consistent, which is why the following block of code ar
e not |
| 562 » » // expected to ever err out. |
| 467 var err error | 563 var err error |
| 468 lds, err = streamsFromAnnotatedLog(c, fr.log) | 564 lds, err = streamsFromAnnotatedLog(c, fr.log) |
| 469 if err != nil { | 565 if err != nil { |
| 470 build.Components = []*resp.BuildComponent{{ | 566 build.Components = []*resp.BuildComponent{{ |
| 471 Type: resp.Summary, | 567 Type: resp.Summary, |
| 472 Label: "Milo annotation parser", | 568 Label: "Milo annotation parser", |
| 473 Text: []string{err.Error()}, | 569 Text: []string{err.Error()}, |
| 474 Status: resp.InfraFailure, | 570 Status: resp.InfraFailure, |
| 475 SubLink: []*resp.Link{{ | 571 SubLink: []*resp.Link{{ |
| 476 Label: "swarming task", | 572 Label: "swarming task", |
| 477 URL: taskPageURL(svc.getHost(), taskID
), | 573 URL: taskPageURL(svc.getHost(), taskID
), |
| 478 }}, | 574 }}, |
| 479 }} | 575 }} |
| 480 } | 576 } |
| 481 } | |
| 482 | 577 |
| 483 » if lds != nil && lds.MainStream != nil && lds.MainStream.Data != nil { | 578 » » if lds != nil && lds.MainStream != nil && lds.MainStream.Data !=
nil { |
| 484 » » s = lds.MainStream.Data | 579 » » » s = lds.MainStream.Data |
| 485 » } else { | 580 » » } |
| 581 » » ub = swarmingURLBuilder(linkBase) |
| 582 |
| 583 » default: |
| 486 s = &miloProto.Step{} | 584 s = &miloProto.Step{} |
| 585 ub = swarmingURLBuilder(linkBase) |
| 487 } | 586 } |
| 488 | 587 |
| 489 if err := addTaskToMiloStep(c, svc.getHost(), fr.res, s); err != nil { | 588 if err := addTaskToMiloStep(c, svc.getHost(), fr.res, s); err != nil { |
| 490 return nil, err | 589 return nil, err |
| 491 } | 590 } |
| 492 » logdog.AddLogDogToBuild(c, swarmingURLBuilder(linkBase), s, &build) | 591 » logdog.AddLogDogToBuild(c, ub, s, &build) |
| 493 | 592 |
| 494 if err := addTaskToBuild(c, svc.getHost(), fr.res, &build); err != nil { | 593 if err := addTaskToBuild(c, svc.getHost(), fr.res, &build); err != nil { |
| 495 return nil, err | 594 return nil, err |
| 496 } | 595 } |
| 497 | 596 |
| 498 return &build, nil | 597 return &build, nil |
| 499 } | 598 } |
| 500 | 599 |
| 501 func isMiloJob(tags []string) bool { | 600 func isMiloJob(tags []string) bool { |
| 502 for _, t := range tags { | 601 for _, t := range tags { |
| (...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 553 case *miloProto.Link_Url: | 652 case *miloProto.Link_Url: |
| 554 return &resp.Link{ | 653 return &resp.Link{ |
| 555 Label: l.Label, | 654 Label: l.Label, |
| 556 URL: t.Url, | 655 URL: t.Url, |
| 557 } | 656 } |
| 558 | 657 |
| 559 default: | 658 default: |
| 560 return nil | 659 return nil |
| 561 } | 660 } |
| 562 } | 661 } |
| 662 |
| 663 func swarmingTags(v []string) map[string]string { |
| 664 res := make(map[string]string, len(v)) |
| 665 for _, tag := range v { |
| 666 var value string |
| 667 parts := strings.SplitN(tag, ":", 2) |
| 668 if len(parts) == 2 { |
| 669 value = parts[1] |
| 670 } |
| 671 res[parts[0]] = value |
| 672 } |
| 673 return res |
| 674 } |
| OLD | NEW |