Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(227)

Side by Side Diff: milo/appengine/swarming/build.go

Issue 2675493003: milo: Use service interface for swarming. (Closed)
Patch Set: Update fetch signature Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package swarming 5 package swarming
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "encoding/json"
10 "fmt" 9 "fmt"
11 "io/ioutil"
12 "net/http" 10 "net/http"
13 "net/url" 11 "net/url"
14 "path/filepath"
15 "strings" 12 "strings"
16 "sync"
17 "time" 13 "time"
18 14
19 "golang.org/x/net/context" 15 "golang.org/x/net/context"
20 16
21 swarming "github.com/luci/luci-go/common/api/swarming/swarming/v1" 17 swarming "github.com/luci/luci-go/common/api/swarming/swarming/v1"
18 "github.com/luci/luci-go/common/errors"
22 "github.com/luci/luci-go/common/logging" 19 "github.com/luci/luci-go/common/logging"
23 "github.com/luci/luci-go/common/proto/google" 20 "github.com/luci/luci-go/common/proto/google"
24 miloProto "github.com/luci/luci-go/common/proto/milo" 21 miloProto "github.com/luci/luci-go/common/proto/milo"
22 "github.com/luci/luci-go/common/sync/parallel"
25 "github.com/luci/luci-go/logdog/client/annotee" 23 "github.com/luci/luci-go/logdog/client/annotee"
26 "github.com/luci/luci-go/logdog/common/types" 24 "github.com/luci/luci-go/logdog/common/types"
27 "github.com/luci/luci-go/milo/api/resp" 25 "github.com/luci/luci-go/milo/api/resp"
28 "github.com/luci/luci-go/milo/appengine/logdog" 26 "github.com/luci/luci-go/milo/appengine/logdog"
29 "github.com/luci/luci-go/server/auth" 27 "github.com/luci/luci-go/server/auth"
30 ) 28 )
31 29
30 // errNotMiloJob is returned if a Swarming task is fetched that does not self-
31 // identify as a Milo job.
32 var errNotMiloJob = errors.New("Not a Milo Job")
33
32 // SwarmingTimeLayout is time layout used by swarming. 34 // SwarmingTimeLayout is time layout used by swarming.
33 const SwarmingTimeLayout = "2006-01-02T15:04:05.999999999" 35 const SwarmingTimeLayout = "2006-01-02T15:04:05.999999999"
34 36
35 // Swarming task states.. 37 // Swarming task states..
36 const ( 38 const (
37 // TaskRunning means task is running. 39 // TaskRunning means task is running.
38 TaskRunning = "RUNNING" 40 TaskRunning = "RUNNING"
39 // TaskPending means task didn't start yet. 41 // TaskPending means task didn't start yet.
40 TaskPending = "PENDING" 42 TaskPending = "PENDING"
41 // TaskExpired means task expired and did not start. 43 // TaskExpired means task expired and did not start.
(...skipping 15 matching lines...) Expand all
57 return nil, err 59 return nil, err
58 } 60 }
59 sc, err := swarming.New(&http.Client{Transport: t}) 61 sc, err := swarming.New(&http.Client{Transport: t})
60 if err != nil { 62 if err != nil {
61 return nil, err 63 return nil, err
62 } 64 }
63 sc.BasePath = fmt.Sprintf("https://%s/_ah/api/swarming/v1/", server) 65 sc.BasePath = fmt.Sprintf("https://%s/_ah/api/swarming/v1/", server)
64 return sc, nil 66 return sc, nil
65 } 67 }
66 68
67 func getDebugTaskOutput(taskID string) (string, error) { 69 // swarmingService is an interface that fetches data from Swarming.
68 » // Read the debug file instead. 70 //
71 // In production, this is fetched from a Swarming server. For testing, this can
72 // be replaced with a mock.
73 type swarmingService interface {
74 » getHost() string
75 » getSwarmingResult(c context.Context, taskID string) (*swarming.SwarmingR pcsTaskResult, error)
76 » getTaskOutput(c context.Context, taskID string) (string, error)
77 }
69 78
70 » // ../swarming below assumes that 79 type prodSwarmingService struct {
71 » // - this code is not executed by tests outside of this dir 80 » host string
72 » // - this dir is a sibling of frontend dir 81 » client *swarming.Service
73 » logFilename := filepath.Join("..", "swarming", "testdata", taskID) 82 }
74 » b, err := ioutil.ReadFile(logFilename) 83
84 func newProdService(c context.Context, server string) (*prodSwarmingService, err or) {
85 » client, err := getSwarmingClient(c, server)
86 » if err != nil {
87 » » return nil, err
88 » }
89 » return &prodSwarmingService{
90 » » host: server,
91 » » client: client,
92 » }, nil
93 }
94
95 func (svc *prodSwarmingService) getHost() string { return svc.host }
96
97 func (svc *prodSwarmingService) getSwarmingResult(c context.Context, taskID stri ng) (*swarming.SwarmingRpcsTaskResult, error) {
98 » return svc.client.Task.Result(taskID).Context(c).Do()
99 }
100
101 func (svc *prodSwarmingService) getTaskOutput(c context.Context, taskID string) (string, error) {
102 » stdout, err := svc.client.Task.Stdout(taskID).Context(c).Do()
75 if err != nil { 103 if err != nil {
76 return "", err 104 return "", err
77 } 105 }
78 » return string(b), nil 106 » return stdout.Output, nil
79 } 107 }
80 108
81 func getTaskOutput(sc *swarming.Service, taskID string) (string, error) { 109 type swarmingFetchParams struct {
82 » res, err := sc.Task.Stdout(taskID).Do() 110 » fetchRes bool
83 » if err != nil { 111 » fetchLog bool
84 » » return "", err
85 » }
86 » return res.Output, nil
87 } 112 }
88 113
89 func getDebugSwarmingResult( 114 type swarmingFetchResult struct {
90 » taskID string) (*swarming.SwarmingRpcsTaskResult, error) { 115 » res *swarming.SwarmingRpcsTaskResult
116 » log string
117 }
91 118
92 » // ../swarming below assumes that 119 // swarmingFetch fetches (in parallel) the components that it is configured to
93 » // - this code is not executed by tests outside of this dir 120 // fetch.
94 » // - this dir is a sibling of frontend dir 121 //
95 » logFilename := filepath.Join("..", "swarming", "testdata", taskID) 122 // After fetching, an ACL check is performed to confirm that the user is
96 » swarmFilename := fmt.Sprintf("%s.swarm", logFilename) 123 // permitted to view the resulting data. If this check fails, get returns
97 » s, err := ioutil.ReadFile(swarmFilename) 124 // errNotMiloJob.
125 //
126 // TODO(hinoka): Make this ACL check something more specific than the
127 // resence of the "allow_milo" dimension.
128 func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw armingFetchParams) (
129 » *swarmingFetchResult, error) {
130
131 » // logErr is managed separately from other fetch errors, since in some
132 » // situations it's acceptable to not have a log stream.
133 » var logErr error
134 » var fr swarmingFetchResult
135
136 » err := parallel.FanOutIn(func(workC chan<- func() error) {
137 » » if req.fetchRes {
138 » » » workC <- func() (err error) {
139 » » » » fr.res, err = svc.getSwarmingResult(c, taskID)
140 » » » » return
141 » » » }
142 » » }
143
144 » » if req.fetchLog {
145 » » » workC <- func() error {
146 » » » » fr.log, logErr = svc.getTaskOutput(c, taskID)
147 » » » » return nil
148 » » » }
149 » » }
150 » })
98 if err != nil { 151 if err != nil {
99 return nil, err 152 return nil, err
100 } 153 }
101 sr := &swarming.SwarmingRpcsTaskResult{}
102 if err := json.Unmarshal(s, sr); err != nil {
103 return nil, err
104 }
105 return sr, nil
106 }
107 154
108 func getSwarming(c context.Context, server string, taskID string) ( 155 » // Current ACL implementation: error if this is not a Milo job.
109 » *swarming.SwarmingRpcsTaskResult, string, error) { 156 » switch {
110 157 » case req.fetchRes:
111 » var log string 158 » » if !isMiloJob(fr.res.Tags) {
112 » var sr *swarming.SwarmingRpcsTaskResult 159 » » » return nil, errNotMiloJob
113 » var errLog, errRes error
114
115 » // Detour: Return debugging results, useful for development.
116 » if server == "debug" {
117 » » sr, errRes = getDebugSwarmingResult(taskID)
118 » » log, errLog = getDebugTaskOutput(taskID)
119 » } else {
120 » » sc, err := getSwarmingClient(c, server)
121 » » if err != nil {
122 » » » return nil, "", err
123 } 160 }
124 161 » default:
125 » » var wg sync.WaitGroup 162 » » // No metadata to decide if this is a Milo job, so assume that i t is not.
126 » » wg.Add(2) // Getting log and result can happen concurrently. Wa it for both. 163 » » return nil, errNotMiloJob
127
128 » » go func() {
129 » » » defer wg.Done()
130 » » » log, errLog = getTaskOutput(sc, taskID)
131 » » }()
132 » » go func() {
133 » » » defer wg.Done()
134 » » » sr, errRes = sc.Task.Result(taskID).Do()
135 » » }()
136 » » wg.Wait()
137 } 164 }
138 165
139 » if errRes != nil { 166 » if req.fetchRes && logErr != nil {
140 » » // Swarming result errors take priority. 167 » » switch fr.res.State {
141 » » return sr, log, errRes 168 » » case TaskCompleted, TaskRunning, TaskCanceled:
142 » } 169 » » default:
143 170 » » » // Ignore log errors if the task might be pending, time d out, expired, etc.
144 » switch sr.State { 171 » » » if err != nil {
145 » case TaskCompleted, TaskRunning, TaskCanceled: 172 » » » » fr.log = ""
146 » default: 173 » » » » logErr = nil
147 » » // Ignore log errors if the task might be pending, timed out, e xpired, etc. 174 » » » }
148 » » if errLog != nil {
149 » » » errLog = nil
150 » » » log = ""
151 } 175 }
152 } 176 }
153 » return sr, log, errLog 177 » return &fr, logErr
154 } 178 }
155 179
156 func taskProperties(sr *swarming.SwarmingRpcsTaskResult) *resp.PropertyGroup { 180 func taskProperties(sr *swarming.SwarmingRpcsTaskResult) *resp.PropertyGroup {
157 props := &resp.PropertyGroup{GroupName: "Swarming"} 181 props := &resp.PropertyGroup{GroupName: "Swarming"}
158 if len(sr.CostsUsd) == 1 { 182 if len(sr.CostsUsd) == 1 {
159 props.Property = append(props.Property, &resp.Property{ 183 props.Property = append(props.Property, &resp.Property{
160 Key: "Cost of job (USD)", 184 Key: "Cost of job (USD)",
161 Value: fmt.Sprintf("$%.2f", sr.CostsUsd[0]), 185 Value: fmt.Sprintf("$%.2f", sr.CostsUsd[0]),
162 }) 186 })
163 } 187 }
(...skipping 232 matching lines...) Expand 10 before | Expand all | Expand 10 after
396 } 420 }
397 // If this ever has more than one stream then memoryClient needs to beco me 421 // If this ever has more than one stream then memoryClient needs to beco me
398 // goroutine safe 422 // goroutine safe
399 if err := p.RunStreams([]*annotee.Stream{&is}); err != nil { 423 if err := p.RunStreams([]*annotee.Stream{&is}); err != nil {
400 return nil, err 424 return nil, err
401 } 425 }
402 p.Finish() 426 p.Finish()
403 return c.ToLogDogStreams() 427 return c.ToLogDogStreams()
404 } 428 }
405 429
406 func swarmingBuildImpl(c context.Context, linkBase, server, taskID string) (*res p.MiloBuild, error) { 430 func swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID string) (*resp.MiloBuild, error) {
407 // Fetch the data from Swarming 431 // Fetch the data from Swarming
408 » sr, body, err := getSwarming(c, server, taskID) 432 » fetchParams := swarmingFetchParams{
433 » » fetchRes: true,
434 » » fetchLog: true,
435 » }
436 » fr, err := swarmingFetch(c, svc, taskID, fetchParams)
409 if err != nil { 437 if err != nil {
410 return nil, err 438 return nil, err
411 } 439 }
412 440
413 allowMilo := false
414 for _, t := range sr.Tags {
415 if t == "allow_milo:1" {
416 allowMilo = true
417 break
418 }
419 }
420 if !allowMilo {
421 return nil, fmt.Errorf("Not A Milo Job")
422 }
423
424 var build resp.MiloBuild 441 var build resp.MiloBuild
425 var s *miloProto.Step 442 var s *miloProto.Step
426 var lds *logdog.Streams 443 var lds *logdog.Streams
427 444
428 // Decode the data using annotee. The logdog stream returned here is ass umed 445 // Decode the data using annotee. The logdog stream returned here is ass umed
429 // to be consistent, which is why the following block of code are not 446 // to be consistent, which is why the following block of code are not
430 // expected to ever err out. 447 // expected to ever err out.
431 » if body != "" { 448 » if fr.log != "" {
432 » » lds, err = streamsFromAnnotatedLog(c, body) 449 » » var err error
450 » » lds, err = streamsFromAnnotatedLog(c, fr.log)
433 if err != nil { 451 if err != nil {
434 build.Components = []*resp.BuildComponent{{ 452 build.Components = []*resp.BuildComponent{{
435 Type: resp.Summary, 453 Type: resp.Summary,
436 Label: "Milo annotation parser", 454 Label: "Milo annotation parser",
437 Text: []string{err.Error()}, 455 Text: []string{err.Error()},
438 Status: resp.InfraFailure, 456 Status: resp.InfraFailure,
439 SubLink: []*resp.Link{{ 457 SubLink: []*resp.Link{{
440 Label: "swarming task", 458 Label: "swarming task",
441 » » » » » URL: taskPageURL(server, taskID), 459 » » » » » URL: taskPageURL(svc.getHost(), taskID ),
442 }}, 460 }},
443 }} 461 }}
444 } 462 }
445 } 463 }
446 464
447 if lds != nil && lds.MainStream != nil && lds.MainStream.Data != nil { 465 if lds != nil && lds.MainStream != nil && lds.MainStream.Data != nil {
448 s = lds.MainStream.Data 466 s = lds.MainStream.Data
449 } else { 467 } else {
450 s = &miloProto.Step{} 468 s = &miloProto.Step{}
451 } 469 }
452 470
453 » if err := addTaskToMiloStep(c, server, sr, s); err != nil { 471 » if err := addTaskToMiloStep(c, svc.getHost(), fr.res, s); err != nil {
454 return nil, err 472 return nil, err
455 } 473 }
456 logdog.AddLogDogToBuild(c, swarmingURLBuilder(linkBase), s, &build) 474 logdog.AddLogDogToBuild(c, swarmingURLBuilder(linkBase), s, &build)
457 475
458 » if err := addTaskToBuild(c, server, sr, &build); err != nil { 476 » if err := addTaskToBuild(c, svc.getHost(), fr.res, &build); err != nil {
459 return nil, err 477 return nil, err
460 } 478 }
461 479
462 return &build, nil 480 return &build, nil
463 } 481 }
464 482
483 func isMiloJob(tags []string) bool {
484 for _, t := range tags {
485 if t == "allow_milo:1" {
486 return true
487 }
488 }
489 return false
490 }
491
465 // taskPageURL returns a URL to a human-consumable page of a swarming task. 492 // taskPageURL returns a URL to a human-consumable page of a swarming task.
466 // Supports server aliases. 493 // Supports server aliases.
467 func taskPageURL(swarmingHostname, taskID string) string { 494 func taskPageURL(swarmingHostname, taskID string) string {
468 return fmt.Sprintf("https://%s/user/task/%s", swarmingHostname, taskID) 495 return fmt.Sprintf("https://%s/user/task/%s", swarmingHostname, taskID)
469 } 496 }
470 497
471 // botPageURL returns a URL to a human-consumable page of a swarming bot. 498 // botPageURL returns a URL to a human-consumable page of a swarming bot.
472 // Supports server aliases. 499 // Supports server aliases.
473 func botPageURL(swarmingHostname, botID string) string { 500 func botPageURL(swarmingHostname, botID string) string {
474 return fmt.Sprintf("https://%s/restricted/bot/%s", swarmingHostname, bot ID) 501 return fmt.Sprintf("https://%s/restricted/bot/%s", swarmingHostname, bot ID)
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
508 case *miloProto.Link_Url: 535 case *miloProto.Link_Url:
509 return &resp.Link{ 536 return &resp.Link{
510 Label: l.Label, 537 Label: l.Label,
511 URL: t.Url, 538 URL: t.Url,
512 } 539 }
513 540
514 default: 541 default:
515 return nil 542 return nil
516 } 543 }
517 } 544 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698