OLD | NEW |
| (Empty) |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | |
2 // Use of this source code is governed under the Apache License, Version 2.0 | |
3 // that can be found in the LICENSE file. | |
4 | |
5 package swarming | |
6 | |
7 import ( | |
8 "bytes" | |
9 "fmt" | |
10 "net/http" | |
11 "net/url" | |
12 "strings" | |
13 "time" | |
14 | |
15 "golang.org/x/net/context" | |
16 | |
17 swarming "github.com/luci/luci-go/common/api/swarming/swarming/v1" | |
18 "github.com/luci/luci-go/common/errors" | |
19 "github.com/luci/luci-go/common/logging" | |
20 "github.com/luci/luci-go/common/proto/google" | |
21 miloProto "github.com/luci/luci-go/common/proto/milo" | |
22 "github.com/luci/luci-go/common/sync/parallel" | |
23 "github.com/luci/luci-go/logdog/client/annotee" | |
24 "github.com/luci/luci-go/logdog/client/coordinator" | |
25 "github.com/luci/luci-go/logdog/common/types" | |
26 "github.com/luci/luci-go/milo/api/resp" | |
27 "github.com/luci/luci-go/milo/appengine/common" | |
28 "github.com/luci/luci-go/milo/appengine/common/model" | |
29 "github.com/luci/luci-go/milo/appengine/logdog" | |
30 "github.com/luci/luci-go/server/auth" | |
31 ) | |
32 | |
33 // errNotMiloJob is returned if a Swarming task is fetched that does not self- | |
34 // identify as a Milo job. | |
35 var errNotMiloJob = errors.New("Not a Milo Job or access denied") | |
36 | |
37 // SwarmingTimeLayout is time layout used by swarming. | |
38 const SwarmingTimeLayout = "2006-01-02T15:04:05.999999999" | |
39 | |
40 // logDogFetchTimeout is the amount of time to wait while fetching a LogDog | |
41 // stream before we time out the fetch. | |
42 const logDogFetchTimeout = 30 * time.Second | |
43 | |
44 // Swarming task states.. | |
45 const ( | |
46 // TaskRunning means task is running. | |
47 TaskRunning = "RUNNING" | |
48 // TaskPending means task didn't start yet. | |
49 TaskPending = "PENDING" | |
50 // TaskExpired means task expired and did not start. | |
51 TaskExpired = "EXPIRED" | |
52 // TaskTimedOut means task started, but took too long. | |
53 TaskTimedOut = "TIMED_OUT" | |
54 // TaskBotDied means task started but bot died. | |
55 TaskBotDied = "BOT_DIED" | |
56 // TaskCanceled means the task was canceled. See CompletedTs to determin
e whether it was started. | |
57 TaskCanceled = "CANCELED" | |
58 // TaskCompleted means task is complete. | |
59 TaskCompleted = "COMPLETED" | |
60 ) | |
61 | |
62 func getSwarmingClient(c context.Context, host string) (*swarming.Service, error
) { | |
63 c, _ = context.WithTimeout(c, 60*time.Second) | |
64 t, err := auth.GetRPCTransport(c, auth.AsSelf) | |
65 if err != nil { | |
66 return nil, err | |
67 } | |
68 sc, err := swarming.New(&http.Client{Transport: t}) | |
69 if err != nil { | |
70 return nil, err | |
71 } | |
72 sc.BasePath = fmt.Sprintf("https://%s/_ah/api/swarming/v1/", host) | |
73 return sc, nil | |
74 } | |
75 | |
76 // swarmingService is an interface that fetches data from Swarming. | |
77 // | |
78 // In production, this is fetched from a Swarming server. For testing, this can | |
79 // be replaced with a mock. | |
80 type swarmingService interface { | |
81 getHost() string | |
82 getSwarmingResult(c context.Context, taskID string) (*swarming.SwarmingR
pcsTaskResult, error) | |
83 getSwarmingRequest(c context.Context, taskID string) (*swarming.Swarming
RpcsTaskRequest, error) | |
84 getTaskOutput(c context.Context, taskID string) (string, error) | |
85 } | |
86 | |
87 type prodSwarmingService struct { | |
88 host string | |
89 client *swarming.Service | |
90 } | |
91 | |
92 func newProdService(c context.Context, host string) (*prodSwarmingService, error
) { | |
93 client, err := getSwarmingClient(c, host) | |
94 if err != nil { | |
95 return nil, err | |
96 } | |
97 return &prodSwarmingService{ | |
98 host: host, | |
99 client: client, | |
100 }, nil | |
101 } | |
102 | |
103 func (svc *prodSwarmingService) getHost() string { return svc.host } | |
104 | |
105 func (svc *prodSwarmingService) getSwarmingResult(c context.Context, taskID stri
ng) (*swarming.SwarmingRpcsTaskResult, error) { | |
106 return svc.client.Task.Result(taskID).Context(c).Do() | |
107 } | |
108 | |
109 func (svc *prodSwarmingService) getTaskOutput(c context.Context, taskID string)
(string, error) { | |
110 stdout, err := svc.client.Task.Stdout(taskID).Context(c).Do() | |
111 if err != nil { | |
112 return "", err | |
113 } | |
114 return stdout.Output, nil | |
115 } | |
116 | |
117 func (svc *prodSwarmingService) getSwarmingRequest(c context.Context, taskID str
ing) (*swarming.SwarmingRpcsTaskRequest, error) { | |
118 return svc.client.Task.Request(taskID).Context(c).Do() | |
119 } | |
120 | |
121 type swarmingFetchParams struct { | |
122 fetchReq bool | |
123 fetchRes bool | |
124 fetchLog bool | |
125 | |
126 // taskResCallback, if not nil, is a callback that will be invoked after | |
127 // fetching the result, if fetchRes is true. It will be passed a key/val
ue map | |
128 // of the Swarming result's tags. | |
129 // | |
130 // If taskResCallback returns true, any pending log fetch will be cancel
led | |
131 // without error. | |
132 taskResCallback func(*swarming.SwarmingRpcsTaskResult) bool | |
133 } | |
134 | |
135 type swarmingFetchResult struct { | |
136 req *swarming.SwarmingRpcsTaskRequest | |
137 res *swarming.SwarmingRpcsTaskResult | |
138 | |
139 // log is the log data content. If no log data was fetched, this will em
pty. | |
140 // If the log fetch was cancelled, this is undefined. | |
141 log string | |
142 } | |
143 | |
144 // swarmingFetch fetches (in parallel) the components that it is configured to | |
145 // fetch. | |
146 // | |
147 // After fetching, an ACL check is performed to confirm that the user is | |
148 // permitted to view the resulting data. If this check fails, get returns | |
149 // errNotMiloJob. | |
150 func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw
armingFetchParams) ( | |
151 *swarmingFetchResult, error) { | |
152 | |
153 // logErr is managed separately from other fetch errors, since in some | |
154 // situations it's acceptable to not have a log stream. | |
155 var logErr error | |
156 var fr swarmingFetchResult | |
157 | |
158 // Special Context to enable the cancellation of log fetching. | |
159 logsCancelled := false | |
160 logCtx, cancelLogs := context.WithCancel(c) | |
161 defer cancelLogs() | |
162 | |
163 err := parallel.FanOutIn(func(workC chan<- func() error) { | |
164 if req.fetchReq { | |
165 workC <- func() (err error) { | |
166 fr.req, err = svc.getSwarmingRequest(c, taskID) | |
167 return | |
168 } | |
169 } | |
170 | |
171 if req.fetchRes { | |
172 workC <- func() (err error) { | |
173 if fr.res, err = svc.getSwarmingResult(c, taskID
); err == nil { | |
174 if req.taskResCallback != nil && req.tas
kResCallback(fr.res) { | |
175 logsCancelled = true | |
176 cancelLogs() | |
177 } | |
178 } | |
179 return | |
180 } | |
181 } | |
182 | |
183 if req.fetchLog { | |
184 workC <- func() error { | |
185 // Note: we're using the log Context here so we
can cancel log fetch | |
186 // explicitly. | |
187 fr.log, logErr = svc.getTaskOutput(logCtx, taskI
D) | |
188 return nil | |
189 } | |
190 } | |
191 }) | |
192 if err != nil { | |
193 return nil, err | |
194 } | |
195 | |
196 // Current ACL implementation: | |
197 // If allow_milo:1 is present, it is a public job. Don't bother with AC
L check. | |
198 // If it is not present, check the luci_project tag, and see if user is
allowed | |
199 // to access said project. | |
200 switch { | |
201 case req.fetchReq: | |
202 if !isAllowed(c, fr.req.Tags) { | |
203 return nil, errNotMiloJob | |
204 } | |
205 | |
206 case req.fetchRes: | |
207 if !isAllowed(c, fr.res.Tags) { | |
208 return nil, errNotMiloJob | |
209 } | |
210 | |
211 default: | |
212 // No metadata to decide if this is a Milo job, so assume that i
t is not. | |
213 return nil, errNotMiloJob | |
214 } | |
215 | |
216 if req.fetchRes && logErr != nil { | |
217 switch fr.res.State { | |
218 case TaskCompleted, TaskRunning, TaskCanceled: | |
219 default: | |
220 // Ignore log errors if the task might be pending, time
d out, expired, etc. | |
221 if err != nil { | |
222 fr.log = "" | |
223 logErr = nil | |
224 } | |
225 } | |
226 } | |
227 | |
228 // If we explicitly cancelled logs, everything is OK. | |
229 if logErr == context.Canceled && logsCancelled { | |
230 logErr = nil | |
231 } | |
232 return &fr, logErr | |
233 } | |
234 | |
235 func taskProperties(sr *swarming.SwarmingRpcsTaskResult) *resp.PropertyGroup { | |
236 props := &resp.PropertyGroup{GroupName: "Swarming"} | |
237 if len(sr.CostsUsd) == 1 { | |
238 props.Property = append(props.Property, &resp.Property{ | |
239 Key: "Cost of job (USD)", | |
240 Value: fmt.Sprintf("$%.2f", sr.CostsUsd[0]), | |
241 }) | |
242 } | |
243 if sr.State == TaskCompleted || sr.State == TaskTimedOut { | |
244 props.Property = append(props.Property, &resp.Property{ | |
245 Key: "Exit Code", | |
246 Value: fmt.Sprintf("%d", sr.ExitCode), | |
247 }) | |
248 } | |
249 return props | |
250 } | |
251 | |
252 func tagsToMap(tags []string) map[string]string { | |
253 result := make(map[string]string, len(tags)) | |
254 for _, t := range tags { | |
255 parts := strings.SplitN(t, ":", 2) | |
256 if len(parts) == 2 { | |
257 result[parts[0]] = parts[1] | |
258 } | |
259 } | |
260 return result | |
261 } | |
262 | |
263 // addBuilderLink adds a link to the buildbucket builder view. | |
264 func addBuilderLink(c context.Context, build *resp.MiloBuild, tags map[string]st
ring) { | |
265 bucket := tags["buildbucket_bucket"] | |
266 builder := tags["builder"] | |
267 if bucket != "" && builder != "" { | |
268 build.Summary.ParentLabel = resp.NewLink( | |
269 builder, fmt.Sprintf("/buildbucket/%s/%s", bucket, build
er)) | |
270 } | |
271 } | |
272 | |
273 // addBanner adds an OS banner derived from "os" swarming tag, if present. | |
274 func addBanner(build *resp.MiloBuild, tags map[string]string) { | |
275 os := tags["os"] | |
276 var ver string | |
277 parts := strings.SplitN(os, "-", 2) | |
278 if len(parts) == 2 { | |
279 os = parts[0] | |
280 ver = parts[1] | |
281 } | |
282 | |
283 var base resp.LogoBase | |
284 switch os { | |
285 case "Ubuntu": | |
286 base = resp.Ubuntu | |
287 case "Windows": | |
288 base = resp.Windows | |
289 case "Mac": | |
290 base = resp.OSX | |
291 case "Android": | |
292 base = resp.Android | |
293 default: | |
294 return | |
295 } | |
296 | |
297 build.Summary.Banner = &resp.LogoBanner{ | |
298 OS: []resp.Logo{{ | |
299 LogoBase: base, | |
300 Subtitle: ver, | |
301 Count: 1, | |
302 }}, | |
303 } | |
304 } | |
305 | |
306 // addTaskToMiloStep augments a Milo Annotation Protobuf with state from the | |
307 // Swarming task. | |
308 func addTaskToMiloStep(c context.Context, server string, sr *swarming.SwarmingRp
csTaskResult, step *miloProto.Step) error { | |
309 step.Link = &miloProto.Link{ | |
310 Label: "Task " + sr.TaskId, | |
311 Value: &miloProto.Link_Url{ | |
312 Url: taskPageURL(server, sr.TaskId), | |
313 }, | |
314 } | |
315 | |
316 switch sr.State { | |
317 case TaskRunning: | |
318 step.Status = miloProto.Status_RUNNING | |
319 | |
320 case TaskPending: | |
321 step.Status = miloProto.Status_PENDING | |
322 | |
323 case TaskExpired, TaskTimedOut, TaskBotDied: | |
324 step.Status = miloProto.Status_FAILURE | |
325 | |
326 switch sr.State { | |
327 case TaskExpired: | |
328 step.FailureDetails = &miloProto.FailureDetails{ | |
329 Type: miloProto.FailureDetails_EXPIRED, | |
330 Text: "Task expired", | |
331 } | |
332 case TaskTimedOut: | |
333 step.FailureDetails = &miloProto.FailureDetails{ | |
334 Type: miloProto.FailureDetails_INFRA, | |
335 Text: "Task timed out", | |
336 } | |
337 case TaskBotDied: | |
338 step.FailureDetails = &miloProto.FailureDetails{ | |
339 Type: miloProto.FailureDetails_INFRA, | |
340 Text: "Bot died", | |
341 } | |
342 } | |
343 | |
344 case TaskCanceled: | |
345 // Cancelled build is user action, so it is not an infra failure
. | |
346 step.Status = miloProto.Status_FAILURE | |
347 step.FailureDetails = &miloProto.FailureDetails{ | |
348 Type: miloProto.FailureDetails_CANCELLED, | |
349 Text: "Task cancelled by user", | |
350 } | |
351 | |
352 case TaskCompleted: | |
353 | |
354 switch { | |
355 case sr.InternalFailure: | |
356 step.Status = miloProto.Status_FAILURE | |
357 step.FailureDetails = &miloProto.FailureDetails{ | |
358 Type: miloProto.FailureDetails_INFRA, | |
359 } | |
360 | |
361 case sr.Failure: | |
362 step.Status = miloProto.Status_FAILURE | |
363 | |
364 default: | |
365 step.Status = miloProto.Status_SUCCESS | |
366 } | |
367 | |
368 default: | |
369 return fmt.Errorf("unknown swarming task state %q", sr.State) | |
370 } | |
371 | |
372 // Compute start and finished times. | |
373 if sr.StartedTs != "" { | |
374 ts, err := time.Parse(SwarmingTimeLayout, sr.StartedTs) | |
375 if err != nil { | |
376 return fmt.Errorf("invalid task StartedTs: %s", err) | |
377 } | |
378 step.Started = google.NewTimestamp(ts) | |
379 } | |
380 if sr.CompletedTs != "" { | |
381 ts, err := time.Parse(SwarmingTimeLayout, sr.CompletedTs) | |
382 if err != nil { | |
383 return fmt.Errorf("invalid task CompletedTs: %s", err) | |
384 } | |
385 step.Ended = google.NewTimestamp(ts) | |
386 } | |
387 | |
388 return nil | |
389 } | |
390 | |
391 func addBuildsetInfo(build *resp.MiloBuild, tags map[string]string) { | |
392 buildset := tags["buildset"] | |
393 if !strings.HasPrefix(buildset, "patch/") { | |
394 // Buildset isn't a patch, ignore. | |
395 return | |
396 } | |
397 | |
398 patchset := strings.TrimLeft(buildset, "patch/") | |
399 // TODO(hinoka): Also support Rietveld patches. | |
400 if strings.HasPrefix(patchset, "gerrit/") { | |
401 gerritPatchset := strings.TrimLeft(patchset, "gerrit/") | |
402 parts := strings.Split(gerritPatchset, "/") | |
403 if len(parts) != 3 { | |
404 // Not a well-formed gerrit patchset. | |
405 return | |
406 } | |
407 if build.SourceStamp == nil { | |
408 build.SourceStamp = &resp.SourceStamp{} | |
409 } | |
410 build.SourceStamp.Changelist = resp.NewLink( | |
411 "Gerrit CL", fmt.Sprintf("https://%s/c/%s/%s", parts[0],
parts[1], parts[2])) | |
412 | |
413 } | |
414 } | |
415 | |
416 func addRecipeLink(build *resp.MiloBuild, tags map[string]string) { | |
417 name := tags["recipe_name"] | |
418 repoURL := tags["recipe_repository"] | |
419 revision := tags["recipe_revision"] | |
420 if name != "" && repoURL != "" { | |
421 if revision == "" { | |
422 revision = "master" | |
423 } | |
424 // Link directly to the revision if it is a gerrit URL, otherwis
e just | |
425 // display it in the name. | |
426 if repoParse, err := url.Parse(repoURL); err == nil && strings.H
asSuffix( | |
427 repoParse.Host, ".googlesource.com") { | |
428 repoURL += "/+/" + revision + "/" | |
429 } else { | |
430 if len(revision) > 8 { | |
431 revision = revision[:8] | |
432 } | |
433 name += " @ " + revision | |
434 } | |
435 build.Summary.Recipe = resp.NewLink(name, repoURL) | |
436 } | |
437 } | |
438 | |
439 func addTaskToBuild(c context.Context, server string, sr *swarming.SwarmingRpcsT
askResult, build *resp.MiloBuild) error { | |
440 build.Summary.Label = sr.TaskId | |
441 build.Summary.Type = resp.Recipe | |
442 build.Summary.Source = resp.NewLink("Task "+sr.TaskId, taskPageURL(serve
r, sr.TaskId)) | |
443 | |
444 // Extract more swarming specific information into the properties. | |
445 if props := taskProperties(sr); len(props.Property) > 0 { | |
446 build.PropertyGroup = append(build.PropertyGroup, props) | |
447 } | |
448 tags := tagsToMap(sr.Tags) | |
449 | |
450 addBuildsetInfo(build, tags) | |
451 addBanner(build, tags) | |
452 addBuilderLink(c, build, tags) | |
453 addRecipeLink(build, tags) | |
454 | |
455 // Add a link to the bot. | |
456 if sr.BotId != "" { | |
457 build.Summary.Bot = resp.NewLink(sr.BotId, botPageURL(server, sr
.BotId)) | |
458 } | |
459 | |
460 return nil | |
461 } | |
462 | |
463 // streamsFromAnnotatedLog takes in an annotated log and returns a fully | |
464 // populated set of logdog streams | |
465 func streamsFromAnnotatedLog(ctx context.Context, log string) (*logdog.Streams,
error) { | |
466 c := &memoryClient{} | |
467 p := annotee.New(ctx, annotee.Options{ | |
468 Client: c, | |
469 MetadataUpdateInterval: -1, // Neverrrrrr send incr updates. | |
470 Offline: true, | |
471 }) | |
472 | |
473 is := annotee.Stream{ | |
474 Reader: bytes.NewBufferString(log), | |
475 Name: types.StreamName("stdout"), | |
476 Annotate: true, | |
477 StripAnnotations: true, | |
478 } | |
479 // If this ever has more than one stream then memoryClient needs to beco
me | |
480 // goroutine safe | |
481 if err := p.RunStreams([]*annotee.Stream{&is}); err != nil { | |
482 return nil, err | |
483 } | |
484 p.Finish() | |
485 return c.ToLogDogStreams() | |
486 } | |
487 | |
488 // buildLoader represents the ability to load a Milo build from a Swarming task. | |
489 // | |
490 // It exists so that the internal build loading functionality can be stubbed out | |
491 // for testing. | |
492 type buildLoader struct { | |
493 // logdogClientFunc returns a coordinator Client instance for the suppli
ed | |
494 // parameters. | |
495 // | |
496 // If nil, a production client will be generated. | |
497 logDogClientFunc func(c context.Context, host string) (*coordinator.Clie
nt, error) | |
498 } | |
499 | |
500 func (bl *buildLoader) newEmptyAnnotationStream(c context.Context, addr *types.S
treamAddr) ( | |
501 *logdog.AnnotationStream, error) { | |
502 | |
503 fn := bl.logDogClientFunc | |
504 if fn == nil { | |
505 fn = logdog.NewClient | |
506 } | |
507 client, err := fn(c, addr.Host) | |
508 if err != nil { | |
509 return nil, errors.Annotate(err).Reason("failed to create LogDog
client").Err() | |
510 } | |
511 | |
512 as := logdog.AnnotationStream{ | |
513 Client: client, | |
514 Project: addr.Project, | |
515 Path: addr.Path, | |
516 } | |
517 if err := as.Normalize(); err != nil { | |
518 return nil, errors.Annotate(err).Reason("failed to normalize ann
otation stream parameters").Err() | |
519 } | |
520 | |
521 return &as, nil | |
522 } | |
523 | |
524 // failedToStart is called in the case where logdog-only mode is on but the | |
525 // stream doesn't exist and the swarming job is complete. It modifies the build | |
526 // to add information that would've otherwise been in the annotation stream. | |
527 func failedToStart(c context.Context, build *resp.MiloBuild, res *swarming.Swarm
ingRpcsTaskResult, host string) error { | |
528 var err error | |
529 build.Summary.Status = model.InfraFailure | |
530 build.Summary.Started, err = time.Parse(SwarmingTimeLayout, res.StartedT
s) | |
531 if err != nil { | |
532 return err | |
533 } | |
534 build.Summary.Finished, err = time.Parse(SwarmingTimeLayout, res.Complet
edTs) | |
535 if err != nil { | |
536 return err | |
537 } | |
538 build.Summary.Duration = build.Summary.Finished.Sub(build.Summary.Starte
d) | |
539 infoComp := infoComponent(model.InfraFailure, | |
540 "LogDog stream not found", "Job likely failed to start.") | |
541 infoComp.Started = build.Summary.Started | |
542 infoComp.Finished = build.Summary.Finished | |
543 infoComp.Duration = build.Summary.Duration | |
544 infoComp.Verbosity = resp.Interesting | |
545 build.Components = append(build.Components, infoComp) | |
546 return addTaskToBuild(c, host, res, build) | |
547 } | |
548 | |
549 func (bl *buildLoader) swarmingBuildImpl(c context.Context, svc swarmingService,
linkBase, taskID string) (*resp.MiloBuild, error) { | |
550 // Fetch the data from Swarming | |
551 var logDogStreamAddr *types.StreamAddr | |
552 | |
553 fetchParams := swarmingFetchParams{ | |
554 fetchRes: true, | |
555 fetchLog: true, | |
556 | |
557 // Cancel if LogDog annotation stream parameters are present in
the tag set. | |
558 taskResCallback: func(res *swarming.SwarmingRpcsTaskResult) (can
celLogs bool) { | |
559 // If the build hasn't started yet, then there is no Log
Dog log stream to | |
560 // render. | |
561 switch res.State { | |
562 case TaskPending, TaskExpired: | |
563 return false | |
564 | |
565 case TaskCanceled: | |
566 // If the task wasn't created, then it wasn't st
arted. | |
567 if res.CreatedTs == "" { | |
568 return false | |
569 } | |
570 } | |
571 | |
572 // The task started ... is it using LogDog for logging? | |
573 tags := swarmingTags(res.Tags) | |
574 | |
575 var err error | |
576 if logDogStreamAddr, err = resolveLogDogStreamAddrFromTa
gs(tags, res.TaskId, res.TryNumber); err != nil { | |
577 logging.WithError(err).Debugf(c, "Not using LogD
og annotation stream.") | |
578 return false | |
579 } | |
580 return true | |
581 }, | |
582 } | |
583 fr, err := swarmingFetch(c, svc, taskID, fetchParams) | |
584 if err != nil { | |
585 return nil, err | |
586 } | |
587 | |
588 var build resp.MiloBuild | |
589 var s *miloProto.Step | |
590 var lds *logdog.Streams | |
591 var ub logdog.URLBuilder | |
592 | |
593 // Load the build from the available data. | |
594 // | |
595 // If the Swarming task explicitly specifies its log location, we prefer
that. | |
596 // As a fallback, we will try and parse the Swarming task's output for | |
597 // annotations. | |
598 switch { | |
599 case logDogStreamAddr != nil: | |
600 logging.Infof(c, "Loading build from LogDog stream at: %s", logD
ogStreamAddr) | |
601 | |
602 // If the LogDog stream is available, load the step from that. | |
603 as, err := bl.newEmptyAnnotationStream(c, logDogStreamAddr) | |
604 if err != nil { | |
605 return nil, errors.Annotate(err).Reason("failed to creat
e LogDog annotation stream").Err() | |
606 } | |
607 | |
608 prefix, _ := logDogStreamAddr.Path.Split() | |
609 ub = &logdog.ViewerURLBuilder{ | |
610 Host: logDogStreamAddr.Host, | |
611 Prefix: prefix, | |
612 Project: logDogStreamAddr.Project, | |
613 } | |
614 | |
615 if s, err = as.Fetch(c); err != nil { | |
616 switch errors.Unwrap(err) { | |
617 case coordinator.ErrNoSuchStream: | |
618 // The stream was not found. This could be due
to one of two things: | |
619 // 1. The step just started and we're just waiti
ng for the logs | |
620 // to propogage to logdog. | |
621 // 2. The bootsrap on the client failed, and nev
er sent data to logdog. | |
622 // This would be evident because the swarming re
sult would be a failure. | |
623 if fr.res.State == TaskCompleted { | |
624 err = failedToStart(c, &build, fr.res, s
vc.getHost()) | |
625 return &build, err | |
626 } | |
627 logging.WithError(err).Errorf(c, "User cannot ac
cess stream.") | |
628 build.Components = append(build.Components, info
Component(model.Running, | |
629 "Waiting...", "waiting for annotation st
ream")) | |
630 | |
631 case coordinator.ErrNoAccess: | |
632 logging.WithError(err).Errorf(c, "User cannot ac
cess stream.") | |
633 build.Components = append(build.Components, info
Component(model.Failure, | |
634 "No Access", "no access to annotation st
ream")) | |
635 | |
636 default: | |
637 logging.WithError(err).Errorf(c, "Failed to load
LogDog annotation stream.") | |
638 build.Components = append(build.Components, info
Component(model.InfraFailure, | |
639 "Error", "failed to load annotation stre
am")) | |
640 } | |
641 } | |
642 | |
643 case fr.log != "": | |
644 // Decode the data using annotee. The logdog stream returned her
e is assumed | |
645 // to be consistent, which is why the following block of code ar
e not | |
646 // expected to ever err out. | |
647 var err error | |
648 lds, err = streamsFromAnnotatedLog(c, fr.log) | |
649 if err != nil { | |
650 comp := infoComponent(model.InfraFailure, "Milo annotati
on parser", err.Error()) | |
651 comp.SubLink = append(comp.SubLink, resp.LinkSet{ | |
652 resp.NewLink("swarming task", taskPageURL(svc.ge
tHost(), taskID)), | |
653 }) | |
654 build.Components = append(build.Components, comp) | |
655 } | |
656 | |
657 if lds != nil && lds.MainStream != nil && lds.MainStream.Data !=
nil { | |
658 s = lds.MainStream.Data | |
659 } | |
660 ub = swarmingURLBuilder(linkBase) | |
661 | |
662 default: | |
663 s = &miloProto.Step{} | |
664 ub = swarmingURLBuilder(linkBase) | |
665 } | |
666 | |
667 if s != nil { | |
668 if err := addTaskToMiloStep(c, svc.getHost(), fr.res, s); err !=
nil { | |
669 return nil, err | |
670 } | |
671 logdog.AddLogDogToBuild(c, ub, s, &build) | |
672 } | |
673 | |
674 if err := addTaskToBuild(c, svc.getHost(), fr.res, &build); err != nil { | |
675 return nil, err | |
676 } | |
677 | |
678 return &build, nil | |
679 } | |
680 | |
681 func infoComponent(st model.Status, label, text string) *resp.BuildComponent { | |
682 return &resp.BuildComponent{ | |
683 Type: resp.Summary, | |
684 Label: label, | |
685 Text: []string{text}, | |
686 Status: st, | |
687 } | |
688 } | |
689 | |
690 // isAllowed checks if: | |
691 // 1. allow_milo:1 is present. If so, it's a public job. | |
692 // 2. luci_project is present, and if the logged in user has access to that proj
ect. | |
693 func isAllowed(c context.Context, tags []string) bool { | |
694 for _, t := range tags { | |
695 if t == "allow_milo:1" { | |
696 return true | |
697 } | |
698 } | |
699 for _, t := range tags { | |
700 if strings.HasPrefix(t, "luci_project:") { | |
701 sp := strings.SplitN(t, ":", 2) | |
702 if len(sp) != 2 { | |
703 return false | |
704 } | |
705 logging.Debugf(c, "Checking if user has access to %s", s
p[1]) | |
706 // sp[1] is the project ID. | |
707 allowed, err := common.IsAllowed(c, sp[1]) | |
708 if err != nil { | |
709 logging.WithError(err).Errorf(c, "could not perf
orm acl check") | |
710 return false | |
711 } | |
712 return allowed | |
713 } | |
714 } | |
715 return false | |
716 } | |
717 | |
718 // taskPageURL returns a URL to a human-consumable page of a swarming task. | |
719 // Supports server aliases. | |
720 func taskPageURL(swarmingHostname, taskID string) string { | |
721 return fmt.Sprintf("https://%s/task?id=%s&show_raw=1&wide_logs=true", sw
armingHostname, taskID) | |
722 } | |
723 | |
724 // botPageURL returns a URL to a human-consumable page of a swarming bot. | |
725 // Supports server aliases. | |
726 func botPageURL(swarmingHostname, botID string) string { | |
727 return fmt.Sprintf("https://%s/restricted/bot/%s", swarmingHostname, bot
ID) | |
728 } | |
729 | |
730 // swarmingURLBuilder is a logdog.URLBuilder that builds Milo swarming log | |
731 // links. | |
732 // | |
733 // The string value for this should be the "linkBase" parameter value supplied | |
734 // to swarmingBuildImpl. | |
735 type swarmingURLBuilder string | |
736 | |
737 func (b swarmingURLBuilder) BuildLink(l *miloProto.Link) *resp.Link { | |
738 u, err := url.Parse(string(b)) | |
739 if err != nil { | |
740 return nil | |
741 } | |
742 | |
743 switch t := l.Value.(type) { | |
744 case *miloProto.Link_LogdogStream: | |
745 ls := t.LogdogStream | |
746 | |
747 if u.Path == "" { | |
748 u.Path = ls.Name | |
749 } else { | |
750 u.Path = strings.TrimSuffix(u.Path, "/") + "/" + ls.Name | |
751 } | |
752 link := resp.NewLink(l.Label, u.String()) | |
753 if link.Label == "" { | |
754 link.Label = ls.Name | |
755 } | |
756 return link | |
757 | |
758 case *miloProto.Link_Url: | |
759 return resp.NewLink(l.Label, t.Url) | |
760 | |
761 default: | |
762 return nil | |
763 } | |
764 } | |
765 | |
766 func swarmingTags(v []string) map[string]string { | |
767 res := make(map[string]string, len(v)) | |
768 for _, tag := range v { | |
769 var value string | |
770 parts := strings.SplitN(tag, ":", 2) | |
771 if len(parts) == 2 { | |
772 value = parts[1] | |
773 } | |
774 res[parts[0]] = value | |
775 } | |
776 return res | |
777 } | |
OLD | NEW |