Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(236)

Side by Side Diff: milo/appengine/swarming/build.go

Issue 2695383002: milo: Enable Swarming LogDog log loading. (Closed)
Patch Set: Comments, fix links. Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « milo/appengine/logdog/http.go ('k') | milo/appengine/swarming/build_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package swarming 5 package swarming
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "fmt" 9 "fmt"
10 "net/http" 10 "net/http"
11 "net/url" 11 "net/url"
12 "strings" 12 "strings"
13 "time" 13 "time"
14 14
15 "golang.org/x/net/context" 15 "golang.org/x/net/context"
16 16
17 swarming "github.com/luci/luci-go/common/api/swarming/swarming/v1" 17 swarming "github.com/luci/luci-go/common/api/swarming/swarming/v1"
18 "github.com/luci/luci-go/common/errors" 18 "github.com/luci/luci-go/common/errors"
19 "github.com/luci/luci-go/common/logging" 19 "github.com/luci/luci-go/common/logging"
20 "github.com/luci/luci-go/common/proto/google" 20 "github.com/luci/luci-go/common/proto/google"
21 miloProto "github.com/luci/luci-go/common/proto/milo" 21 miloProto "github.com/luci/luci-go/common/proto/milo"
22 "github.com/luci/luci-go/common/sync/parallel" 22 "github.com/luci/luci-go/common/sync/parallel"
23 "github.com/luci/luci-go/logdog/client/annotee" 23 "github.com/luci/luci-go/logdog/client/annotee"
24 "github.com/luci/luci-go/logdog/client/coordinator"
24 "github.com/luci/luci-go/logdog/common/types" 25 "github.com/luci/luci-go/logdog/common/types"
25 "github.com/luci/luci-go/milo/api/resp" 26 "github.com/luci/luci-go/milo/api/resp"
26 "github.com/luci/luci-go/milo/appengine/logdog" 27 "github.com/luci/luci-go/milo/appengine/logdog"
27 "github.com/luci/luci-go/server/auth" 28 "github.com/luci/luci-go/server/auth"
28 ) 29 )
29 30
30 // errNotMiloJob is returned if a Swarming task is fetched that does not self- 31 // errNotMiloJob is returned if a Swarming task is fetched that does not self-
31 // identify as a Milo job. 32 // identify as a Milo job.
32 var errNotMiloJob = errors.New("Not a Milo Job") 33 var errNotMiloJob = errors.New("Not a Milo Job")
33 34
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after
108 } 109 }
109 110
110 func (svc *prodSwarmingService) getSwarmingRequest(c context.Context, taskID str ing) (*swarming.SwarmingRpcsTaskRequest, error) { 111 func (svc *prodSwarmingService) getSwarmingRequest(c context.Context, taskID str ing) (*swarming.SwarmingRpcsTaskRequest, error) {
111 return svc.client.Task.Request(taskID).Context(c).Do() 112 return svc.client.Task.Request(taskID).Context(c).Do()
112 } 113 }
113 114
114 type swarmingFetchParams struct { 115 type swarmingFetchParams struct {
115 fetchReq bool 116 fetchReq bool
116 fetchRes bool 117 fetchRes bool
117 fetchLog bool 118 fetchLog bool
119
120 // taskTagCallback, if not nil, is a callback that will be invoked after
121 // fetching the result, if fetchRes is true. It will be passed a key/val ue map
122 // of the Swarming result's tags.
123 //
124 // If taskTagCallback returns true, any pending log fetch will be cancel led
125 // without error.
126 taskTagCallback func(map[string]string) bool
118 } 127 }
119 128
120 type swarmingFetchResult struct { 129 type swarmingFetchResult struct {
121 req *swarming.SwarmingRpcsTaskRequest 130 req *swarming.SwarmingRpcsTaskRequest
122 res *swarming.SwarmingRpcsTaskResult 131 res *swarming.SwarmingRpcsTaskResult
132
133 // log is the log data content. If no log data was fetched, this will em pty.
134 // If the log fetch was cancelled, this is undefined.
123 log string 135 log string
124 } 136 }
125 137
126 // swarmingFetch fetches (in parallel) the components that it is configured to 138 // swarmingFetch fetches (in parallel) the components that it is configured to
127 // fetch. 139 // fetch.
128 // 140 //
129 // After fetching, an ACL check is performed to confirm that the user is 141 // After fetching, an ACL check is performed to confirm that the user is
130 // permitted to view the resulting data. If this check fails, get returns 142 // permitted to view the resulting data. If this check fails, get returns
131 // errNotMiloJob. 143 // errNotMiloJob.
132 // 144 //
133 // TODO(hinoka): Make this ACL check something more specific than the 145 // TODO(hinoka): Make this ACL check something more specific than the
134 // resence of the "allow_milo" dimension. 146 // resence of the "allow_milo" dimension.
135 func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw armingFetchParams) ( 147 func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw armingFetchParams) (
136 *swarmingFetchResult, error) { 148 *swarmingFetchResult, error) {
137 149
138 // logErr is managed separately from other fetch errors, since in some 150 // logErr is managed separately from other fetch errors, since in some
139 // situations it's acceptable to not have a log stream. 151 // situations it's acceptable to not have a log stream.
140 var logErr error 152 var logErr error
141 var fr swarmingFetchResult 153 var fr swarmingFetchResult
154 var resTags map[string]string
155
156 // Special Context to enable the cancellation of log fetching.
157 logsCancelled := false
158 logCtx, cancelLogs := context.WithCancel(c)
159 defer cancelLogs()
142 160
143 err := parallel.FanOutIn(func(workC chan<- func() error) { 161 err := parallel.FanOutIn(func(workC chan<- func() error) {
144 if req.fetchReq { 162 if req.fetchReq {
145 workC <- func() (err error) { 163 workC <- func() (err error) {
146 fr.req, err = svc.getSwarmingRequest(c, taskID) 164 fr.req, err = svc.getSwarmingRequest(c, taskID)
147 return 165 return
148 } 166 }
149 } 167 }
150 168
151 if req.fetchRes { 169 if req.fetchRes {
152 workC <- func() (err error) { 170 workC <- func() (err error) {
153 » » » » fr.res, err = svc.getSwarmingResult(c, taskID) 171 » » » » if fr.res, err = svc.getSwarmingResult(c, taskID ); err == nil {
172 » » » » » resTags = swarmingTags(fr.res.Tags)
173 » » » » » if req.taskTagCallback != nil && req.tas kTagCallback(resTags) {
174 » » » » » » logsCancelled = true
175 » » » » » » cancelLogs()
176 » » » » » }
177 » » » » }
154 return 178 return
155 } 179 }
156 } 180 }
157 181
158 if req.fetchLog { 182 if req.fetchLog {
159 workC <- func() error { 183 workC <- func() error {
160 » » » » fr.log, logErr = svc.getTaskOutput(c, taskID) 184 » » » » // Note: we're using the log Context here so we can cancel log fetch
185 » » » » // explicitly.
186 » » » » fr.log, logErr = svc.getTaskOutput(logCtx, taskI D)
161 return nil 187 return nil
162 } 188 }
163 } 189 }
164 }) 190 })
165 if err != nil { 191 if err != nil {
166 return nil, err 192 return nil, err
167 } 193 }
168 194
169 // Current ACL implementation: error if this is not a Milo job. 195 // Current ACL implementation: error if this is not a Milo job.
170 switch { 196 switch {
171 case req.fetchReq: 197 case req.fetchReq:
172 if !isMiloJob(fr.req.Tags) { 198 if !isMiloJob(fr.req.Tags) {
173 return nil, errNotMiloJob 199 return nil, errNotMiloJob
174 } 200 }
201
175 case req.fetchRes: 202 case req.fetchRes:
176 if !isMiloJob(fr.res.Tags) { 203 if !isMiloJob(fr.res.Tags) {
177 return nil, errNotMiloJob 204 return nil, errNotMiloJob
178 } 205 }
206
179 default: 207 default:
180 // No metadata to decide if this is a Milo job, so assume that i t is not. 208 // No metadata to decide if this is a Milo job, so assume that i t is not.
181 return nil, errNotMiloJob 209 return nil, errNotMiloJob
182 } 210 }
183 211
184 if req.fetchRes && logErr != nil { 212 if req.fetchRes && logErr != nil {
185 switch fr.res.State { 213 switch fr.res.State {
186 case TaskCompleted, TaskRunning, TaskCanceled: 214 case TaskCompleted, TaskRunning, TaskCanceled:
187 default: 215 default:
188 // Ignore log errors if the task might be pending, time d out, expired, etc. 216 // Ignore log errors if the task might be pending, time d out, expired, etc.
189 if err != nil { 217 if err != nil {
190 fr.log = "" 218 fr.log = ""
191 logErr = nil 219 logErr = nil
192 } 220 }
193 } 221 }
194 } 222 }
223
224 // If we explicitly cancelled logs, everything is OK.
225 if logErr == context.Canceled && logsCancelled {
226 logErr = nil
227 }
195 return &fr, logErr 228 return &fr, logErr
196 } 229 }
197 230
198 func taskProperties(sr *swarming.SwarmingRpcsTaskResult) *resp.PropertyGroup { 231 func taskProperties(sr *swarming.SwarmingRpcsTaskResult) *resp.PropertyGroup {
199 props := &resp.PropertyGroup{GroupName: "Swarming"} 232 props := &resp.PropertyGroup{GroupName: "Swarming"}
200 if len(sr.CostsUsd) == 1 { 233 if len(sr.CostsUsd) == 1 {
201 props.Property = append(props.Property, &resp.Property{ 234 props.Property = append(props.Property, &resp.Property{
202 Key: "Cost of job (USD)", 235 Key: "Cost of job (USD)",
203 Value: fmt.Sprintf("$%.2f", sr.CostsUsd[0]), 236 Value: fmt.Sprintf("$%.2f", sr.CostsUsd[0]),
204 }) 237 })
(...skipping 18 matching lines...) Expand all
223 Key: parts[0], 256 Key: parts[0],
224 } 257 }
225 if len(parts) == 2 { 258 if len(parts) == 2 {
226 p.Value = parts[1] 259 p.Value = parts[1]
227 } 260 }
228 props.Property = append(props.Property, p) 261 props.Property = append(props.Property, p)
229 } 262 }
230 return props 263 return props
231 } 264 }
232 265
233 // tagValue returns a value of the first tag matching the tag key. If not found
234 // returns "".
235 func tagValue(tags []string, key string) string {
236 prefix := key + ":"
237 for _, t := range tags {
238 if strings.HasPrefix(t, prefix) {
239 return strings.TrimPrefix(t, prefix)
240 }
241 }
242 return ""
243 }
244
245 // addBuilderLink adds a link to the buildbucket builder view. 266 // addBuilderLink adds a link to the buildbucket builder view.
246 func addBuilderLink(c context.Context, build *resp.MiloBuild, swarmingHostname s tring, sr *swarming.SwarmingRpcsTaskResult) { 267 func addBuilderLink(c context.Context, build *resp.MiloBuild, swarmingHostname s tring, sr *swarming.SwarmingRpcsTaskResult) {
247 » bbHost := tagValue(sr.Tags, "buildbucket_hostname") 268 » tags := swarmingTags(sr.Tags)
248 » bucket := tagValue(sr.Tags, "buildbucket_bucket") 269
249 » builder := tagValue(sr.Tags, "builder") 270 » bbHost := tags["buildbucket_hostname"]
271 » bucket := tags["buildbucket_bucket"]
272 » builder := tags["builder"]
250 if bucket == "" { 273 if bucket == "" {
251 logging.Errorf( 274 logging.Errorf(
252 c, "Could not extract buidlbucket bucket from task %s", 275 c, "Could not extract buidlbucket bucket from task %s",
253 taskPageURL(swarmingHostname, sr.TaskId)) 276 taskPageURL(swarmingHostname, sr.TaskId))
254 } 277 }
255 if builder == "" { 278 if builder == "" {
256 logging.Errorf( 279 logging.Errorf(
257 c, "Could not extract builder name from task %s", 280 c, "Could not extract builder name from task %s",
258 taskPageURL(swarmingHostname, sr.TaskId)) 281 taskPageURL(swarmingHostname, sr.TaskId))
259 } 282 }
(...skipping 178 matching lines...) Expand 10 before | Expand all | Expand 10 after
438 } 461 }
439 // If this ever has more than one stream then memoryClient needs to beco me 462 // If this ever has more than one stream then memoryClient needs to beco me
440 // goroutine safe 463 // goroutine safe
441 if err := p.RunStreams([]*annotee.Stream{&is}); err != nil { 464 if err := p.RunStreams([]*annotee.Stream{&is}); err != nil {
442 return nil, err 465 return nil, err
443 } 466 }
444 p.Finish() 467 p.Finish()
445 return c.ToLogDogStreams() 468 return c.ToLogDogStreams()
446 } 469 }
447 470
448 func swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID string) (*resp.MiloBuild, error) { 471 // buildLoader represents the ability to load a Milo build from a Swarming task.
472 //
473 // It exists so that the internal build loading functionality can be stubbed out
474 // for testing.
475 type buildLoader struct {
476 » // logdogClientFunc returns a coordinator Client instance for the suppli ed
477 » // parameters.
478 » //
479 » // If nil, a production client will be generated.
480 » logDogClientFunc func(c context.Context, host string) (*coordinator.Clie nt, error)
481 }
482
483 func (bl *buildLoader) newEmptyAnnotationStream(c context.Context, addr *types.S treamAddr) (
484 » *logdog.AnnotationStream, error) {
485
486 » fn := bl.logDogClientFunc
487 » if fn == nil {
488 » » fn = logdog.NewClient
489 » }
490 » client, err := fn(c, addr.Host)
491 » if err != nil {
492 » » return nil, errors.Annotate(err).Reason("failed to create LogDog client").Err()
493 » }
494
495 » as := logdog.AnnotationStream{
496 » » Client: client,
497 » » Project: addr.Project,
498 » » Path: addr.Path,
499 » }
500 » if err := as.Normalize(); err != nil {
501 » » return nil, errors.Annotate(err).Reason("failed to normalize ann otation stream parameters").Err()
502 » }
503
504 » return &as, nil
505 }
506
507 func (bl *buildLoader) swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID string) (*resp.MiloBuild, error) {
449 // Fetch the data from Swarming 508 // Fetch the data from Swarming
509 var logDogStreamAddr *types.StreamAddr
510
450 fetchParams := swarmingFetchParams{ 511 fetchParams := swarmingFetchParams{
451 fetchRes: true, 512 fetchRes: true,
452 fetchLog: true, 513 fetchLog: true,
514
515 // Cancel if LogDog annotation stream parameters are present in the tag set.
516 taskTagCallback: func(tags map[string]string) (cancelLogs bool) {
517 var err error
518 if logDogStreamAddr, err = resolveLogDogStreamAddrFromTa gs(tags); err != nil {
519 logging.WithError(err).Debugf(c, "Not using LogD og annotation stream.")
520 return false
521 }
522 return true
523 },
453 } 524 }
454 fr, err := swarmingFetch(c, svc, taskID, fetchParams) 525 fr, err := swarmingFetch(c, svc, taskID, fetchParams)
455 if err != nil { 526 if err != nil {
456 return nil, err 527 return nil, err
457 } 528 }
458 529
459 var build resp.MiloBuild 530 var build resp.MiloBuild
460 var s *miloProto.Step 531 var s *miloProto.Step
461 var lds *logdog.Streams 532 var lds *logdog.Streams
533 var ub logdog.URLBuilder
462 534
463 » // Decode the data using annotee. The logdog stream returned here is ass umed 535 » // Load the build from the available data.
464 » // to be consistent, which is why the following block of code are not 536 » //
465 » // expected to ever err out. 537 » // If the Swarming task explicitly specifies its log location, we prefer that.
466 » if fr.log != "" { 538 » // As a fallback, we will try and parse the Swarming task's output for
539 » // annotations.
540 » switch {
541 » case logDogStreamAddr != nil:
542 » » // If the LogDog stream is available, load the step from that.
543 » » as, err := bl.newEmptyAnnotationStream(c, logDogStreamAddr)
544 » » if err != nil {
545 » » » return nil, errors.Annotate(err).Reason("failed to creat e LogDog annotation stream").Err()
546 » » }
547
548 » » if s, err = as.Fetch(c); err != nil {
549 » » » return nil, errors.Annotate(err).Reason("failed to load LogDog annotation stream").Err()
550 » » }
551
552 » » prefix, _ := logDogStreamAddr.Path.Split()
553 » » ub = &logdog.ViewerURLBuilder{
554 » » » Host: logDogStreamAddr.Host,
555 » » » Prefix: prefix,
556 » » » Project: logDogStreamAddr.Project,
557 » » }
558
559 » case fr.log != "":
560 » » // Decode the data using annotee. The logdog stream returned her e is assumed
561 » » // to be consistent, which is why the following block of code ar e not
562 » » // expected to ever err out.
467 var err error 563 var err error
468 lds, err = streamsFromAnnotatedLog(c, fr.log) 564 lds, err = streamsFromAnnotatedLog(c, fr.log)
469 if err != nil { 565 if err != nil {
470 build.Components = []*resp.BuildComponent{{ 566 build.Components = []*resp.BuildComponent{{
471 Type: resp.Summary, 567 Type: resp.Summary,
472 Label: "Milo annotation parser", 568 Label: "Milo annotation parser",
473 Text: []string{err.Error()}, 569 Text: []string{err.Error()},
474 Status: resp.InfraFailure, 570 Status: resp.InfraFailure,
475 SubLink: []*resp.Link{{ 571 SubLink: []*resp.Link{{
476 Label: "swarming task", 572 Label: "swarming task",
477 URL: taskPageURL(svc.getHost(), taskID ), 573 URL: taskPageURL(svc.getHost(), taskID ),
478 }}, 574 }},
479 }} 575 }}
480 } 576 }
481 }
482 577
483 » if lds != nil && lds.MainStream != nil && lds.MainStream.Data != nil { 578 » » if lds != nil && lds.MainStream != nil && lds.MainStream.Data != nil {
484 » » s = lds.MainStream.Data 579 » » » s = lds.MainStream.Data
485 » } else { 580 » » }
581 » » ub = swarmingURLBuilder(linkBase)
582
583 » default:
486 s = &miloProto.Step{} 584 s = &miloProto.Step{}
585 ub = swarmingURLBuilder(linkBase)
487 } 586 }
488 587
489 if err := addTaskToMiloStep(c, svc.getHost(), fr.res, s); err != nil { 588 if err := addTaskToMiloStep(c, svc.getHost(), fr.res, s); err != nil {
490 return nil, err 589 return nil, err
491 } 590 }
492 » logdog.AddLogDogToBuild(c, swarmingURLBuilder(linkBase), s, &build) 591 » logdog.AddLogDogToBuild(c, ub, s, &build)
493 592
494 if err := addTaskToBuild(c, svc.getHost(), fr.res, &build); err != nil { 593 if err := addTaskToBuild(c, svc.getHost(), fr.res, &build); err != nil {
495 return nil, err 594 return nil, err
496 } 595 }
497 596
498 return &build, nil 597 return &build, nil
499 } 598 }
500 599
501 func isMiloJob(tags []string) bool { 600 func isMiloJob(tags []string) bool {
502 for _, t := range tags { 601 for _, t := range tags {
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
553 case *miloProto.Link_Url: 652 case *miloProto.Link_Url:
554 return &resp.Link{ 653 return &resp.Link{
555 Label: l.Label, 654 Label: l.Label,
556 URL: t.Url, 655 URL: t.Url,
557 } 656 }
558 657
559 default: 658 default:
560 return nil 659 return nil
561 } 660 }
562 } 661 }
662
663 func swarmingTags(v []string) map[string]string {
664 res := make(map[string]string, len(v))
665 for _, tag := range v {
666 var value string
667 parts := strings.SplitN(tag, ":", 2)
668 if len(parts) == 2 {
669 value = parts[1]
670 }
671 res[parts[0]] = value
672 }
673 return res
674 }
OLDNEW
« no previous file with comments | « milo/appengine/logdog/http.go ('k') | milo/appengine/swarming/build_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698