Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(600)

Side by Side Diff: milo/appengine/swarming/build.go

Issue 2675493003: milo: Use service interface for swarming. (Closed)
Patch Set: Fix error text. Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package swarming 5 package swarming
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "encoding/json"
10 "fmt" 9 "fmt"
11 "io/ioutil"
12 "net/http" 10 "net/http"
13 "net/url" 11 "net/url"
14 "path/filepath"
15 "strings" 12 "strings"
16 "sync"
17 "time" 13 "time"
18 14
19 "golang.org/x/net/context" 15 "golang.org/x/net/context"
20 16
21 swarming "github.com/luci/luci-go/common/api/swarming/swarming/v1" 17 swarming "github.com/luci/luci-go/common/api/swarming/swarming/v1"
18 "github.com/luci/luci-go/common/errors"
22 "github.com/luci/luci-go/common/logging" 19 "github.com/luci/luci-go/common/logging"
23 "github.com/luci/luci-go/common/proto/google" 20 "github.com/luci/luci-go/common/proto/google"
24 miloProto "github.com/luci/luci-go/common/proto/milo" 21 miloProto "github.com/luci/luci-go/common/proto/milo"
22 "github.com/luci/luci-go/common/sync/parallel"
25 "github.com/luci/luci-go/logdog/client/annotee" 23 "github.com/luci/luci-go/logdog/client/annotee"
26 "github.com/luci/luci-go/logdog/common/types" 24 "github.com/luci/luci-go/logdog/common/types"
27 "github.com/luci/luci-go/milo/api/resp" 25 "github.com/luci/luci-go/milo/api/resp"
28 "github.com/luci/luci-go/milo/appengine/logdog" 26 "github.com/luci/luci-go/milo/appengine/logdog"
29 "github.com/luci/luci-go/server/auth" 27 "github.com/luci/luci-go/server/auth"
30 ) 28 )
31 29
30 // errNotMiloJob is returned if a Swarming task is fetched that does not self-
31 // identify as a Milo job.
32 var errNotMiloJob = errors.New("Not a Milo Job")
33
32 // SwarmingTimeLayout is time layout used by swarming. 34 // SwarmingTimeLayout is time layout used by swarming.
33 const SwarmingTimeLayout = "2006-01-02T15:04:05.999999999" 35 const SwarmingTimeLayout = "2006-01-02T15:04:05.999999999"
34 36
35 // Swarming task states.. 37 // Swarming task states..
36 const ( 38 const (
37 // TaskRunning means task is running. 39 // TaskRunning means task is running.
38 TaskRunning = "RUNNING" 40 TaskRunning = "RUNNING"
39 // TaskPending means task didn't start yet. 41 // TaskPending means task didn't start yet.
40 TaskPending = "PENDING" 42 TaskPending = "PENDING"
41 // TaskExpired means task expired and did not start. 43 // TaskExpired means task expired and did not start.
(...skipping 15 matching lines...) Expand all
57 return nil, err 59 return nil, err
58 } 60 }
59 sc, err := swarming.New(&http.Client{Transport: t}) 61 sc, err := swarming.New(&http.Client{Transport: t})
60 if err != nil { 62 if err != nil {
61 return nil, err 63 return nil, err
62 } 64 }
63 sc.BasePath = fmt.Sprintf("https://%s/_ah/api/swarming/v1/", server) 65 sc.BasePath = fmt.Sprintf("https://%s/_ah/api/swarming/v1/", server)
64 return sc, nil 66 return sc, nil
65 } 67 }
66 68
67 func getDebugTaskOutput(taskID string) (string, error) { 69 // swarmingService is an interface that fetches data from Swarming.
68 » // Read the debug file instead. 70 //
71 // In production, this is fetched from a Swarming server. For testing, this can
72 // be replaced with a mock.
73 type swarmingService interface {
74 » getHost() string
75 » getSwarmingResult(c context.Context, taskID string) (*swarming.SwarmingR pcsTaskResult, error)
76 » getTaskOutput(c context.Context, taskID string) (string, error)
77 }
69 78
70 » // ../swarming below assumes that 79 type prodSwarmingService struct {
71 » // - this code is not executed by tests outside of this dir 80 » host string
72 » // - this dir is a sibling of frontend dir 81 » client *swarming.Service
73 » logFilename := filepath.Join("..", "swarming", "testdata", taskID) 82 }
74 » b, err := ioutil.ReadFile(logFilename) 83
84 func newProdService(c context.Context, server string) (*prodSwarmingService, err or) {
85 » client, err := getSwarmingClient(c, server)
86 » if err != nil {
87 » » return nil, err
88 » }
89 » return &prodSwarmingService{
90 » » host: server,
91 » » client: client,
92 » }, nil
93 }
94
95 func (svc *prodSwarmingService) getHost() string { return svc.host }
96
97 func (svc *prodSwarmingService) getSwarmingResult(c context.Context, taskID stri ng) (*swarming.SwarmingRpcsTaskResult, error) {
98 » return svc.client.Task.Result(taskID).Context(c).Do()
99 }
100
101 func (svc *prodSwarmingService) getTaskOutput(c context.Context, taskID string) (string, error) {
102 » stdout, err := svc.client.Task.Stdout(taskID).Context(c).Do()
75 if err != nil { 103 if err != nil {
76 return "", err 104 return "", err
77 } 105 }
78 » return string(b), nil 106 » return stdout.Output, nil
79 } 107 }
80 108
81 func getTaskOutput(sc *swarming.Service, taskID string) (string, error) { 109 type swarmingFetch struct {
hinoka 2017/02/03 20:50:46 How about splitting this into swarmingFetchRequest
82 » res, err := sc.Task.Stdout(taskID).Do() 110 » fetchRes bool
83 » if err != nil { 111 » res *swarming.SwarmingRpcsTaskResult
84 » » return "", err 112
85 » } 113 » fetchLog bool
86 » return res.Output, nil 114 » log string
115 » logErr error
87 } 116 }
88 117
89 func getDebugSwarmingResult( 118 // get fetches (in parallel) the components that it is configured to fetch.
90 » taskID string) (*swarming.SwarmingRpcsTaskResult, error) { 119 //
91 120 // After fetching, get performs an ACL check to confirm that the user is
92 » // ../swarming below assumes that 121 // permitted to view the resulting data. If this check fails, get returns
93 » // - this code is not executed by tests outside of this dir 122 // errNotMiloJob.
94 » // - this dir is a sibling of frontend dir 123 func (f *swarmingFetch) get(c context.Context, svc swarmingService, taskID strin g) error {
95 » logFilename := filepath.Join("..", "swarming", "testdata", taskID) 124 » err := parallel.FanOutIn(func(workC chan<- func() error) {
96 » swarmFilename := fmt.Sprintf("%s.swarm", logFilename) 125 » » if f.fetchRes {
97 » s, err := ioutil.ReadFile(swarmFilename) 126 » » » workC <- func() (err error) {
98 » if err != nil { 127 » » » » f.res, err = svc.getSwarmingResult(c, taskID)
99 » » return nil, err 128 » » » » return
100 » } 129 » » » }
101 » sr := &swarming.SwarmingRpcsTaskResult{}
102 » if err := json.Unmarshal(s, sr); err != nil {
103 » » return nil, err
104 » }
105 » return sr, nil
106 }
107
108 func getSwarming(c context.Context, server string, taskID string) (
109 » *swarming.SwarmingRpcsTaskResult, string, error) {
110
111 » var log string
112 » var sr *swarming.SwarmingRpcsTaskResult
113 » var errLog, errRes error
114
115 » // Detour: Return debugging results, useful for development.
116 » if server == "debug" {
117 » » sr, errRes = getDebugSwarmingResult(taskID)
118 » » log, errLog = getDebugTaskOutput(taskID)
119 » } else {
120 » » sc, err := getSwarmingClient(c, server)
121 » » if err != nil {
122 » » » return nil, "", err
123 } 130 }
124 131
125 » » var wg sync.WaitGroup 132 » » if f.fetchLog {
126 » » wg.Add(2) // Getting log and result can happen concurrently. Wa it for both. 133 » » » workC <- func() error {
127 134 » » » » f.log, f.logErr = svc.getTaskOutput(c, taskID)
128 » » go func() { 135 » » » » return nil
129 » » » defer wg.Done() 136 » » » }
130 » » » log, errLog = getTaskOutput(sc, taskID) 137 » » }
131 » » }() 138 » })
132 » » go func() { 139 » if err != nil {
133 » » » defer wg.Done() 140 » » return err
134 » » » sr, errRes = sc.Task.Result(taskID).Do()
135 » » }()
136 » » wg.Wait()
137 } 141 }
138 142
139 » if errRes != nil { 143 » // Current ACL implementation: error if this is not a Milo job.
140 » » // Swarming result errors take priority. 144 » switch {
141 » » return sr, log, errRes 145 » case f.fetchRes:
146 » » if !isMiloJob(f.res.Tags) {
147 » » » return errNotMiloJob
148 » » }
149 » default:
150 » » // No metadata to decide if this is a Milo job, so assume that i t is not.
151 » » return errNotMiloJob
142 } 152 }
143 153
144 » switch sr.State { 154 » if f.fetchRes && f.logErr != nil {
145 » case TaskCompleted, TaskRunning, TaskCanceled: 155 » » switch f.res.State {
146 » default: 156 » » case TaskCompleted, TaskRunning, TaskCanceled:
147 » » // Ignore log errors if the task might be pending, timed out, e xpired, etc. 157 » » default:
148 » » if errLog != nil { 158 » » » // Ignore log errors if the task might be pending, time d out, expired, etc.
149 » » » errLog = nil 159 » » » if err != nil {
150 » » » log = "" 160 » » » » f.log = ""
161 » » » » f.logErr = nil
162 » » » }
151 } 163 }
152 } 164 }
153 » return sr, log, errLog 165 » return f.logErr
154 } 166 }
155 167
156 func taskProperties(sr *swarming.SwarmingRpcsTaskResult) *resp.PropertyGroup { 168 func taskProperties(sr *swarming.SwarmingRpcsTaskResult) *resp.PropertyGroup {
157 props := &resp.PropertyGroup{GroupName: "Swarming"} 169 props := &resp.PropertyGroup{GroupName: "Swarming"}
158 if len(sr.CostsUsd) == 1 { 170 if len(sr.CostsUsd) == 1 {
159 props.Property = append(props.Property, &resp.Property{ 171 props.Property = append(props.Property, &resp.Property{
160 Key: "Cost of job (USD)", 172 Key: "Cost of job (USD)",
161 Value: fmt.Sprintf("$%.2f", sr.CostsUsd[0]), 173 Value: fmt.Sprintf("$%.2f", sr.CostsUsd[0]),
162 }) 174 })
163 } 175 }
(...skipping 232 matching lines...) Expand 10 before | Expand all | Expand 10 after
396 } 408 }
397 // If this ever has more than one stream then memoryClient needs to beco me 409 // If this ever has more than one stream then memoryClient needs to beco me
398 // goroutine safe 410 // goroutine safe
399 if err := p.RunStreams([]*annotee.Stream{&is}); err != nil { 411 if err := p.RunStreams([]*annotee.Stream{&is}); err != nil {
400 return nil, err 412 return nil, err
401 } 413 }
402 p.Finish() 414 p.Finish()
403 return c.ToLogDogStreams() 415 return c.ToLogDogStreams()
404 } 416 }
405 417
406 func swarmingBuildImpl(c context.Context, linkBase, server, taskID string) (*res p.MiloBuild, error) { 418 func swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID string) (*resp.MiloBuild, error) {
407 // Fetch the data from Swarming 419 // Fetch the data from Swarming
408 » sr, body, err := getSwarming(c, server, taskID) 420 » fetch := swarmingFetch{
409 » if err != nil { 421 » » fetchRes: true,
422 » » fetchLog: true,
423 » }
424 » if err := fetch.get(c, svc, taskID); err != nil {
410 return nil, err 425 return nil, err
411 } 426 }
412 427
413 allowMilo := false
414 for _, t := range sr.Tags {
415 if t == "allow_milo:1" {
416 allowMilo = true
417 break
418 }
419 }
420 if !allowMilo {
421 return nil, fmt.Errorf("Not A Milo Job")
422 }
423
424 var build resp.MiloBuild 428 var build resp.MiloBuild
425 var s *miloProto.Step 429 var s *miloProto.Step
426 var lds *logdog.Streams 430 var lds *logdog.Streams
427 431
428 // Decode the data using annotee. The logdog stream returned here is ass umed 432 // Decode the data using annotee. The logdog stream returned here is ass umed
429 // to be consistent, which is why the following block of code are not 433 // to be consistent, which is why the following block of code are not
430 // expected to ever err out. 434 // expected to ever err out.
431 » if body != "" { 435 » if fetch.log != "" {
432 » » lds, err = streamsFromAnnotatedLog(c, body) 436 » » var err error
437 » » lds, err = streamsFromAnnotatedLog(c, fetch.log)
433 if err != nil { 438 if err != nil {
434 build.Components = []*resp.BuildComponent{{ 439 build.Components = []*resp.BuildComponent{{
435 Type: resp.Summary, 440 Type: resp.Summary,
436 Label: "Milo annotation parser", 441 Label: "Milo annotation parser",
437 Text: []string{err.Error()}, 442 Text: []string{err.Error()},
438 Status: resp.InfraFailure, 443 Status: resp.InfraFailure,
439 SubLink: []*resp.Link{{ 444 SubLink: []*resp.Link{{
440 Label: "swarming task", 445 Label: "swarming task",
441 » » » » » URL: taskPageURL(server, taskID), 446 » » » » » URL: taskPageURL(svc.getHost(), taskID ),
442 }}, 447 }},
443 }} 448 }}
444 } 449 }
445 } 450 }
446 451
447 if lds != nil && lds.MainStream != nil && lds.MainStream.Data != nil { 452 if lds != nil && lds.MainStream != nil && lds.MainStream.Data != nil {
448 s = lds.MainStream.Data 453 s = lds.MainStream.Data
449 } else { 454 } else {
450 s = &miloProto.Step{} 455 s = &miloProto.Step{}
451 } 456 }
452 457
453 » if err := addTaskToMiloStep(c, server, sr, s); err != nil { 458 » if err := addTaskToMiloStep(c, svc.getHost(), fetch.res, s); err != nil {
454 return nil, err 459 return nil, err
455 } 460 }
456 logdog.AddLogDogToBuild(c, swarmingURLBuilder(linkBase), s, &build) 461 logdog.AddLogDogToBuild(c, swarmingURLBuilder(linkBase), s, &build)
457 462
458 » if err := addTaskToBuild(c, server, sr, &build); err != nil { 463 » if err := addTaskToBuild(c, svc.getHost(), fetch.res, &build); err != ni l {
459 return nil, err 464 return nil, err
460 } 465 }
461 466
462 return &build, nil 467 return &build, nil
463 } 468 }
464 469
470 func isMiloJob(tags []string) bool {
471 for _, t := range tags {
472 if t == "allow_milo:1" {
473 return true
474 }
475 }
476 return false
477 }
478
465 // taskPageURL returns a URL to a human-consumable page of a swarming task. 479 // taskPageURL returns a URL to a human-consumable page of a swarming task.
466 // Supports server aliases. 480 // Supports server aliases.
467 func taskPageURL(swarmingHostname, taskID string) string { 481 func taskPageURL(swarmingHostname, taskID string) string {
468 return fmt.Sprintf("https://%s/user/task/%s", swarmingHostname, taskID) 482 return fmt.Sprintf("https://%s/user/task/%s", swarmingHostname, taskID)
469 } 483 }
470 484
471 // botPageURL returns a URL to a human-consumable page of a swarming bot. 485 // botPageURL returns a URL to a human-consumable page of a swarming bot.
472 // Supports server aliases. 486 // Supports server aliases.
473 func botPageURL(swarmingHostname, botID string) string { 487 func botPageURL(swarmingHostname, botID string) string {
474 return fmt.Sprintf("https://%s/restricted/bot/%s", swarmingHostname, bot ID) 488 return fmt.Sprintf("https://%s/restricted/bot/%s", swarmingHostname, bot ID)
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
508 case *miloProto.Link_Url: 522 case *miloProto.Link_Url:
509 return &resp.Link{ 523 return &resp.Link{
510 Label: l.Label, 524 Label: l.Label,
511 URL: t.Url, 525 URL: t.Url,
512 } 526 }
513 527
514 default: 528 default:
515 return nil 529 return nil
516 } 530 }
517 } 531 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698