Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(16)

Side by Side Diff: milo/appengine/swarming/build.go

Issue 2695383002: milo: Enable Swarming LogDog log loading. (Closed)
Patch Set: Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package swarming 5 package swarming
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "fmt" 9 "fmt"
10 "net/http" 10 "net/http"
11 "net/url" 11 "net/url"
12 "strings" 12 "strings"
13 "time" 13 "time"
14 14
15 "golang.org/x/net/context" 15 "golang.org/x/net/context"
16 16
17 swarming "github.com/luci/luci-go/common/api/swarming/swarming/v1" 17 swarming "github.com/luci/luci-go/common/api/swarming/swarming/v1"
18 "github.com/luci/luci-go/common/errors" 18 "github.com/luci/luci-go/common/errors"
19 "github.com/luci/luci-go/common/logging" 19 "github.com/luci/luci-go/common/logging"
20 "github.com/luci/luci-go/common/proto/google" 20 "github.com/luci/luci-go/common/proto/google"
21 miloProto "github.com/luci/luci-go/common/proto/milo" 21 miloProto "github.com/luci/luci-go/common/proto/milo"
22 "github.com/luci/luci-go/common/sync/parallel" 22 "github.com/luci/luci-go/common/sync/parallel"
23 "github.com/luci/luci-go/logdog/client/annotee" 23 "github.com/luci/luci-go/logdog/client/annotee"
24 "github.com/luci/luci-go/logdog/client/coordinator"
24 "github.com/luci/luci-go/logdog/common/types" 25 "github.com/luci/luci-go/logdog/common/types"
25 "github.com/luci/luci-go/milo/api/resp" 26 "github.com/luci/luci-go/milo/api/resp"
26 "github.com/luci/luci-go/milo/appengine/logdog" 27 "github.com/luci/luci-go/milo/appengine/logdog"
27 "github.com/luci/luci-go/server/auth" 28 "github.com/luci/luci-go/server/auth"
28 ) 29 )
29 30
30 // errNotMiloJob is returned if a Swarming task is fetched that does not self- 31 // errNotMiloJob is returned if a Swarming task is fetched that does not self-
31 // identify as a Milo job. 32 // identify as a Milo job.
32 var errNotMiloJob = errors.New("Not a Milo Job") 33 var errNotMiloJob = errors.New("Not a Milo Job")
33 34
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after
108 } 109 }
109 110
110 func (svc *prodSwarmingService) getSwarmingRequest(c context.Context, taskID str ing) (*swarming.SwarmingRpcsTaskRequest, error) { 111 func (svc *prodSwarmingService) getSwarmingRequest(c context.Context, taskID str ing) (*swarming.SwarmingRpcsTaskRequest, error) {
111 return svc.client.Task.Request(taskID).Context(c).Do() 112 return svc.client.Task.Request(taskID).Context(c).Do()
112 } 113 }
113 114
114 type swarmingFetchParams struct { 115 type swarmingFetchParams struct {
115 fetchReq bool 116 fetchReq bool
116 fetchRes bool 117 fetchRes bool
117 fetchLog bool 118 fetchLog bool
119
120 // taskTagCallback, if not nil, is a callback that will be invoked durin g
121 // the swarmingFetch of the fetch result (if enabled). It will contain t he
hinoka 2017/02/16 02:08:21 invoked after fetching the swarm fetchRes call?
dnj 2017/02/16 02:20:51 Done.
122 // mapped result tags.
123 //
124 // If taskTagCallback returns true, any pending log fetch will be cancel led
125 // without error.
126 taskTagCallback func(map[string]string) bool
hinoka 2017/02/16 02:08:21 This is funky, but I don't have any better ideas :
dnj 2017/02/16 02:20:51 Acknowledged.
118 } 127 }
119 128
120 type swarmingFetchResult struct { 129 type swarmingFetchResult struct {
121 req *swarming.SwarmingRpcsTaskRequest 130 req *swarming.SwarmingRpcsTaskRequest
122 res *swarming.SwarmingRpcsTaskResult 131 res *swarming.SwarmingRpcsTaskResult
132
133 // log is the log data content. If no log data was fetched, this will em pty.
134 // If the log fetch was cancelled, this is undefined.
123 log string 135 log string
124 } 136 }
125 137
126 // swarmingFetch fetches (in parallel) the components that it is configured to 138 // swarmingFetch fetches (in parallel) the components that it is configured to
127 // fetch. 139 // fetch.
128 // 140 //
129 // After fetching, an ACL check is performed to confirm that the user is 141 // After fetching, an ACL check is performed to confirm that the user is
130 // permitted to view the resulting data. If this check fails, get returns 142 // permitted to view the resulting data. If this check fails, get returns
131 // errNotMiloJob. 143 // errNotMiloJob.
132 // 144 //
133 // TODO(hinoka): Make this ACL check something more specific than the 145 // TODO(hinoka): Make this ACL check something more specific than the
134 // resence of the "allow_milo" dimension. 146 // resence of the "allow_milo" dimension.
135 func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw armingFetchParams) ( 147 func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw armingFetchParams) (
136 *swarmingFetchResult, error) { 148 *swarmingFetchResult, error) {
137 149
138 // logErr is managed separately from other fetch errors, since in some 150 // logErr is managed separately from other fetch errors, since in some
139 // situations it's acceptable to not have a log stream. 151 // situations it's acceptable to not have a log stream.
140 var logErr error 152 var logErr error
141 var fr swarmingFetchResult 153 var fr swarmingFetchResult
154 var resTags map[string]string
155
156 // Special Context to enable the cancellation of log fetching.
157 logsCancelled := false
158 logCtx, cancelLogs := context.WithCancel(c)
159 defer cancelLogs()
142 160
143 err := parallel.FanOutIn(func(workC chan<- func() error) { 161 err := parallel.FanOutIn(func(workC chan<- func() error) {
144 if req.fetchReq { 162 if req.fetchReq {
145 workC <- func() (err error) { 163 workC <- func() (err error) {
146 fr.req, err = svc.getSwarmingRequest(c, taskID) 164 fr.req, err = svc.getSwarmingRequest(c, taskID)
147 return 165 return
148 } 166 }
149 } 167 }
150 168
151 if req.fetchRes { 169 if req.fetchRes {
152 workC <- func() (err error) { 170 workC <- func() (err error) {
153 » » » » fr.res, err = svc.getSwarmingResult(c, taskID) 171 » » » » if fr.res, err = svc.getSwarmingResult(c, taskID ); err == nil {
172 » » » » » resTags = swarmingTags(fr.res.Tags)
173 » » » » » if req.taskTagCallback != nil && req.tas kTagCallback(resTags) {
174 » » » » » » logsCancelled = true
175 » » » » » » cancelLogs()
176 » » » » » }
177 » » » » }
154 return 178 return
155 } 179 }
156 } 180 }
157 181
158 if req.fetchLog { 182 if req.fetchLog {
159 workC <- func() error { 183 workC <- func() error {
160 » » » » fr.log, logErr = svc.getTaskOutput(c, taskID) 184 » » » » // Note: we're using the log Context here so we can cancel log fetch
185 » » » » // explicitly.
186 » » » » fr.log, logErr = svc.getTaskOutput(logCtx, taskI D)
161 return nil 187 return nil
162 } 188 }
163 } 189 }
164 }) 190 })
165 if err != nil { 191 if err != nil {
166 return nil, err 192 return nil, err
167 } 193 }
168 194
169 // Current ACL implementation: error if this is not a Milo job. 195 // Current ACL implementation: error if this is not a Milo job.
170 switch { 196 switch {
171 case req.fetchReq: 197 case req.fetchReq:
172 if !isMiloJob(fr.req.Tags) { 198 if !isMiloJob(fr.req.Tags) {
173 return nil, errNotMiloJob 199 return nil, errNotMiloJob
174 } 200 }
201
175 case req.fetchRes: 202 case req.fetchRes:
176 if !isMiloJob(fr.res.Tags) { 203 if !isMiloJob(fr.res.Tags) {
177 return nil, errNotMiloJob 204 return nil, errNotMiloJob
178 } 205 }
206
179 default: 207 default:
180 // No metadata to decide if this is a Milo job, so assume that i t is not. 208 // No metadata to decide if this is a Milo job, so assume that i t is not.
181 return nil, errNotMiloJob 209 return nil, errNotMiloJob
182 } 210 }
183 211
184 if req.fetchRes && logErr != nil { 212 if req.fetchRes && logErr != nil {
185 switch fr.res.State { 213 switch fr.res.State {
186 case TaskCompleted, TaskRunning, TaskCanceled: 214 case TaskCompleted, TaskRunning, TaskCanceled:
187 default: 215 default:
188 // Ignore log errors if the task might be pending, time d out, expired, etc. 216 // Ignore log errors if the task might be pending, time d out, expired, etc.
189 if err != nil { 217 if err != nil {
190 fr.log = "" 218 fr.log = ""
191 logErr = nil 219 logErr = nil
192 } 220 }
193 } 221 }
194 } 222 }
223
224 // If we explicitly cancelled logs, everything is OK.
225 if logErr == context.Canceled && logsCancelled {
226 logErr = nil
227 }
195 return &fr, logErr 228 return &fr, logErr
196 } 229 }
197 230
198 func taskProperties(sr *swarming.SwarmingRpcsTaskResult) *resp.PropertyGroup { 231 func taskProperties(sr *swarming.SwarmingRpcsTaskResult) *resp.PropertyGroup {
199 props := &resp.PropertyGroup{GroupName: "Swarming"} 232 props := &resp.PropertyGroup{GroupName: "Swarming"}
200 if len(sr.CostsUsd) == 1 { 233 if len(sr.CostsUsd) == 1 {
201 props.Property = append(props.Property, &resp.Property{ 234 props.Property = append(props.Property, &resp.Property{
202 Key: "Cost of job (USD)", 235 Key: "Cost of job (USD)",
203 Value: fmt.Sprintf("$%.2f", sr.CostsUsd[0]), 236 Value: fmt.Sprintf("$%.2f", sr.CostsUsd[0]),
204 }) 237 })
(...skipping 18 matching lines...) Expand all
223 Key: parts[0], 256 Key: parts[0],
224 } 257 }
225 if len(parts) == 2 { 258 if len(parts) == 2 {
226 p.Value = parts[1] 259 p.Value = parts[1]
227 } 260 }
228 props.Property = append(props.Property, p) 261 props.Property = append(props.Property, p)
229 } 262 }
230 return props 263 return props
231 } 264 }
232 265
233 // tagValue returns a value of the first tag matching the tag key. If not found
234 // returns "".
235 func tagValue(tags []string, key string) string {
236 prefix := key + ":"
237 for _, t := range tags {
238 if strings.HasPrefix(t, prefix) {
239 return strings.TrimPrefix(t, prefix)
240 }
241 }
242 return ""
243 }
244
245 // addBuilderLink adds a link to the buildbucket builder view. 266 // addBuilderLink adds a link to the buildbucket builder view.
246 func addBuilderLink(c context.Context, build *resp.MiloBuild, swarmingHostname s tring, sr *swarming.SwarmingRpcsTaskResult) { 267 func addBuilderLink(c context.Context, build *resp.MiloBuild, swarmingHostname s tring, sr *swarming.SwarmingRpcsTaskResult) {
247 » bbHost := tagValue(sr.Tags, "buildbucket_hostname") 268 » tags := swarmingTags(sr.Tags)
248 » bucket := tagValue(sr.Tags, "buildbucket_bucket") 269
249 » builder := tagValue(sr.Tags, "builder") 270 » bbHost := tags["buildbucket_hostname"]
271 » bucket := tags["buildbucket_bucket"]
272 » builder := tags["builder"]
250 if bucket == "" { 273 if bucket == "" {
251 logging.Errorf( 274 logging.Errorf(
252 c, "Could not extract buidlbucket bucket from task %s", 275 c, "Could not extract buidlbucket bucket from task %s",
253 taskPageURL(swarmingHostname, sr.TaskId)) 276 taskPageURL(swarmingHostname, sr.TaskId))
254 } 277 }
255 if builder == "" { 278 if builder == "" {
256 logging.Errorf( 279 logging.Errorf(
257 c, "Could not extract builder name from task %s", 280 c, "Could not extract builder name from task %s",
258 taskPageURL(swarmingHostname, sr.TaskId)) 281 taskPageURL(swarmingHostname, sr.TaskId))
259 } 282 }
(...skipping 178 matching lines...) Expand 10 before | Expand all | Expand 10 after
438 } 461 }
439 // If this ever has more than one stream then memoryClient needs to beco me 462 // If this ever has more than one stream then memoryClient needs to beco me
440 // goroutine safe 463 // goroutine safe
441 if err := p.RunStreams([]*annotee.Stream{&is}); err != nil { 464 if err := p.RunStreams([]*annotee.Stream{&is}); err != nil {
442 return nil, err 465 return nil, err
443 } 466 }
444 p.Finish() 467 p.Finish()
445 return c.ToLogDogStreams() 468 return c.ToLogDogStreams()
446 } 469 }
447 470
448 func swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID string) (*resp.MiloBuild, error) { 471 type buildLoader struct {
hinoka 2017/02/16 02:08:21 Comment on why this exists (testing)
dnj 2017/02/16 02:20:51 Done.
472 » // logdogClientFunc returns a coordinator Client instance for the suppli ed
473 » // parameters.
474 » //
475 » // If nil, a production client will be generated.
476 » logDogClientFunc func(c context.Context, host string) (*coordinator.Clie nt, error)
477 }
478
479 func (bl *buildLoader) newAnnotationStream(c context.Context, stream *types.Stre am) (*logdog.AnnotationStream, error) {
hinoka 2017/02/16 02:08:21 newEmptyAnnotationStream might be more appropriate
dnj 2017/02/16 02:20:51 Done.
480 » fn := bl.logDogClientFunc
481 » if fn == nil {
482 » » fn = logdog.NewClient
483 » }
484 » client, err := fn(c, stream.Host)
485 » if err != nil {
486 » » return nil, errors.Annotate(err).Reason("failed to create LogDog client").Err()
487 » }
488
489 » as := logdog.AnnotationStream{
490 » » Client: client,
491 » » Project: stream.Project,
492 » » Path: stream.Path,
493 » }
494 » if err := as.Normalize(); err != nil {
495 » » return nil, errors.Annotate(err).Reason("failed to normalize ann otation stream parameters").Err()
496 » }
497
498 » return &as, nil
499 }
500
501 func (bl *buildLoader) swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID string) (*resp.MiloBuild, error) {
449 // Fetch the data from Swarming 502 // Fetch the data from Swarming
503 var logDogStream *types.Stream
504
450 fetchParams := swarmingFetchParams{ 505 fetchParams := swarmingFetchParams{
451 fetchRes: true, 506 fetchRes: true,
452 fetchLog: true, 507 fetchLog: true,
508
509 // Cancel if LogDog annotation strem parameters are present in t he tag set.
hinoka 2017/02/16 02:08:21 stream
dnj 2017/02/16 02:20:51 Done.
510 taskTagCallback: func(tags map[string]string) (cancelLogs bool) {
511 var err error
512 if logDogStream, err = resolveLogDogStreamFromTags(tags) ; err != nil {
513 logging.WithError(err).Debugf(c, "Not using LogD og annotation stream.")
514 return false
515 }
516 return true
517 },
453 } 518 }
454 fr, err := swarmingFetch(c, svc, taskID, fetchParams) 519 fr, err := swarmingFetch(c, svc, taskID, fetchParams)
455 if err != nil { 520 if err != nil {
456 return nil, err 521 return nil, err
457 } 522 }
458 523
459 var build resp.MiloBuild 524 var build resp.MiloBuild
460 var s *miloProto.Step 525 var s *miloProto.Step
461 var lds *logdog.Streams 526 var lds *logdog.Streams
462 527
463 » // Decode the data using annotee. The logdog stream returned here is ass umed 528 » // If the LogDog stream is available, load the step from that.
hinoka 2017/02/16 02:08:21 Add to comment: Explanation on why we're preferrin
dnj 2017/02/16 02:20:51 Done.
464 » // to be consistent, which is why the following block of code are not 529 » switch {
465 » // expected to ever err out. 530 » case logDogStream != nil:
466 » if fr.log != "" { 531 » » as, err := bl.newAnnotationStream(c, logDogStream)
532 » » if err != nil {
533 » » » return nil, errors.Annotate(err).Reason("failed to creat e LogDog annotation stream").Err()
534 » » }
535
536 » » if s, err = as.Load(c); err != nil {
hinoka 2017/02/16 02:08:21 Fetch seems like a more appropriate name
dnj 2017/02/16 02:20:51 Done.
537 » » » return nil, errors.Annotate(err).Reason("failed to load LogDog annotation stream").Err()
538 » » }
539
540 » case fr.log != "":
541 » » // Decode the data using annotee. The logdog stream returned her e is assumed
542 » » // to be consistent, which is why the following block of code ar e not
543 » » // expected to ever err out.
467 var err error 544 var err error
468 lds, err = streamsFromAnnotatedLog(c, fr.log) 545 lds, err = streamsFromAnnotatedLog(c, fr.log)
469 if err != nil { 546 if err != nil {
470 build.Components = []*resp.BuildComponent{{ 547 build.Components = []*resp.BuildComponent{{
471 Type: resp.Summary, 548 Type: resp.Summary,
472 Label: "Milo annotation parser", 549 Label: "Milo annotation parser",
473 Text: []string{err.Error()}, 550 Text: []string{err.Error()},
474 Status: resp.InfraFailure, 551 Status: resp.InfraFailure,
475 SubLink: []*resp.Link{{ 552 SubLink: []*resp.Link{{
476 Label: "swarming task", 553 Label: "swarming task",
477 URL: taskPageURL(svc.getHost(), taskID ), 554 URL: taskPageURL(svc.getHost(), taskID ),
478 }}, 555 }},
479 }} 556 }}
480 } 557 }
481 }
482 558
483 » if lds != nil && lds.MainStream != nil && lds.MainStream.Data != nil { 559 » » if lds != nil && lds.MainStream != nil && lds.MainStream.Data != nil {
484 » » s = lds.MainStream.Data 560 » » » s = lds.MainStream.Data
485 » } else { 561 » » }
562
563 » default:
486 s = &miloProto.Step{} 564 s = &miloProto.Step{}
487 } 565 }
488 566
489 if err := addTaskToMiloStep(c, svc.getHost(), fr.res, s); err != nil { 567 if err := addTaskToMiloStep(c, svc.getHost(), fr.res, s); err != nil {
490 return nil, err 568 return nil, err
491 } 569 }
492 logdog.AddLogDogToBuild(c, swarmingURLBuilder(linkBase), s, &build) 570 logdog.AddLogDogToBuild(c, swarmingURLBuilder(linkBase), s, &build)
493 571
494 if err := addTaskToBuild(c, svc.getHost(), fr.res, &build); err != nil { 572 if err := addTaskToBuild(c, svc.getHost(), fr.res, &build); err != nil {
495 return nil, err 573 return nil, err
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
553 case *miloProto.Link_Url: 631 case *miloProto.Link_Url:
554 return &resp.Link{ 632 return &resp.Link{
555 Label: l.Label, 633 Label: l.Label,
556 URL: t.Url, 634 URL: t.Url,
557 } 635 }
558 636
559 default: 637 default:
560 return nil 638 return nil
561 } 639 }
562 } 640 }
641
642 func swarmingTags(v []string) map[string]string {
643 res := make(map[string]string, len(v))
644 for _, tag := range v {
645 var value string
646 parts := strings.SplitN(tag, ":", 2)
647 if len(parts) == 2 {
648 value = parts[1]
649 }
650 res[parts[0]] = value
651 }
652 return res
653 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698