Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package swarming | 5 package swarming |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "encoding/json" | |
| 10 "fmt" | 9 "fmt" |
| 11 "io/ioutil" | |
| 12 "net/http" | 10 "net/http" |
| 13 "net/url" | 11 "net/url" |
| 14 "path/filepath" | |
| 15 "strings" | 12 "strings" |
| 16 "sync" | |
| 17 "time" | 13 "time" |
| 18 | 14 |
| 19 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
| 20 | 16 |
| 21 swarming "github.com/luci/luci-go/common/api/swarming/swarming/v1" | 17 swarming "github.com/luci/luci-go/common/api/swarming/swarming/v1" |
| 18 "github.com/luci/luci-go/common/errors" | |
| 22 "github.com/luci/luci-go/common/logging" | 19 "github.com/luci/luci-go/common/logging" |
| 23 "github.com/luci/luci-go/common/proto/google" | 20 "github.com/luci/luci-go/common/proto/google" |
| 24 miloProto "github.com/luci/luci-go/common/proto/milo" | 21 miloProto "github.com/luci/luci-go/common/proto/milo" |
| 22 "github.com/luci/luci-go/common/sync/parallel" | |
| 25 "github.com/luci/luci-go/logdog/client/annotee" | 23 "github.com/luci/luci-go/logdog/client/annotee" |
| 26 "github.com/luci/luci-go/logdog/common/types" | 24 "github.com/luci/luci-go/logdog/common/types" |
| 27 "github.com/luci/luci-go/milo/api/resp" | 25 "github.com/luci/luci-go/milo/api/resp" |
| 28 "github.com/luci/luci-go/milo/appengine/logdog" | 26 "github.com/luci/luci-go/milo/appengine/logdog" |
| 29 "github.com/luci/luci-go/server/auth" | 27 "github.com/luci/luci-go/server/auth" |
| 30 ) | 28 ) |
| 31 | 29 |
| 30 // errNotMiloJob is returned if a Swarming task is fetched that does not self- | |
| 31 // identify as a Milo job. | |
| 32 var errNotMiloJob = errors.New("No a Milo Job") | |
|
hinoka
2017/02/02 22:36:51
No -> Not
dnj
2017/02/03 02:22:03
Done.
| |
| 33 | |
| 32 // SwarmingTimeLayout is time layout used by swarming. | 34 // SwarmingTimeLayout is time layout used by swarming. |
| 33 const SwarmingTimeLayout = "2006-01-02T15:04:05.999999999" | 35 const SwarmingTimeLayout = "2006-01-02T15:04:05.999999999" |
| 34 | 36 |
| 35 // Swarming task states.. | 37 // Swarming task states.. |
| 36 const ( | 38 const ( |
| 37 // TaskRunning means task is running. | 39 // TaskRunning means task is running. |
| 38 TaskRunning = "RUNNING" | 40 TaskRunning = "RUNNING" |
| 39 // TaskPending means task didn't start yet. | 41 // TaskPending means task didn't start yet. |
| 40 TaskPending = "PENDING" | 42 TaskPending = "PENDING" |
| 41 // TaskExpired means task expired and did not start. | 43 // TaskExpired means task expired and did not start. |
| (...skipping 15 matching lines...) Expand all Loading... | |
| 57 return nil, err | 59 return nil, err |
| 58 } | 60 } |
| 59 sc, err := swarming.New(&http.Client{Transport: t}) | 61 sc, err := swarming.New(&http.Client{Transport: t}) |
| 60 if err != nil { | 62 if err != nil { |
| 61 return nil, err | 63 return nil, err |
| 62 } | 64 } |
| 63 sc.BasePath = fmt.Sprintf("https://%s/_ah/api/swarming/v1/", server) | 65 sc.BasePath = fmt.Sprintf("https://%s/_ah/api/swarming/v1/", server) |
| 64 return sc, nil | 66 return sc, nil |
| 65 } | 67 } |
| 66 | 68 |
| 67 func getDebugTaskOutput(taskID string) (string, error) { | 69 // swarmingService is an interface that fetches data from Swarming. |
| 68 » // Read the debug file instead. | 70 // |
| 71 // In production, this is fetched from a Swarming server. For testing, this can | |
| 72 // be replaced with a mock. | |
| 73 type swarmingService interface { | |
| 74 » getHost() string | |
| 75 » getSwarmingResult(c context.Context, taskID string) (*swarming.SwarmingR pcsTaskResult, error) | |
| 76 » getTaskOutput(c context.Context, taskID string) (string, error) | |
| 77 } | |
| 69 | 78 |
| 70 » // ../swarming below assumes that | 79 type prodSwarmingService struct { |
| 71 » // - this code is not executed by tests outside of this dir | 80 » host string |
| 72 » // - this dir is a sibling of frontend dir | 81 » client *swarming.Service |
| 73 » logFilename := filepath.Join("..", "swarming", "testdata", taskID) | 82 } |
| 74 » b, err := ioutil.ReadFile(logFilename) | 83 |
| 84 func newProdService(c context.Context, server string) (*prodSwarmingService, err or) { | |
| 85 » client, err := getSwarmingClient(c, server) | |
| 86 » if err != nil { | |
| 87 » » return nil, err | |
| 88 » } | |
| 89 » return &prodSwarmingService{ | |
| 90 » » host: server, | |
| 91 » » client: client, | |
| 92 » }, nil | |
| 93 } | |
| 94 | |
| 95 func (svc *prodSwarmingService) getHost() string { return svc.host } | |
| 96 | |
| 97 func (svc *prodSwarmingService) getSwarmingResult(c context.Context, taskID stri ng) (*swarming.SwarmingRpcsTaskResult, error) { | |
| 98 » return svc.client.Task.Result(taskID).Context(c).Do() | |
| 99 } | |
| 100 | |
| 101 func (svc *prodSwarmingService) getTaskOutput(c context.Context, taskID string) (string, error) { | |
| 102 » stdout, err := svc.client.Task.Stdout(taskID).Context(c).Do() | |
| 75 if err != nil { | 103 if err != nil { |
| 76 return "", err | 104 return "", err |
| 77 } | 105 } |
| 78 » return string(b), nil | 106 » return stdout.Output, nil |
| 79 } | 107 } |
| 80 | 108 |
| 81 func getTaskOutput(sc *swarming.Service, taskID string) (string, error) { | 109 type swarmingFetch struct { |
|
hinoka
2017/02/03 01:02:23
Can this just be a function? Not a struct? There
dnj
2017/02/03 01:04:56
There will be (see the final CL in the chain).
| |
| 82 » res, err := sc.Task.Stdout(taskID).Do() | 110 » fetchRes bool |
| 83 » if err != nil { | 111 » res *swarming.SwarmingRpcsTaskResult |
| 84 » » return "", err | 112 |
| 85 » } | 113 » fetchLog bool |
| 86 » return res.Output, nil | 114 » log string |
| 115 » logErr error | |
| 87 } | 116 } |
| 88 | 117 |
| 89 func getDebugSwarmingResult( | 118 // get fetches (in parallel) the components that it is configured to fetch. |
| 90 » taskID string) (*swarming.SwarmingRpcsTaskResult, error) { | 119 // |
| 91 | 120 // After fetching, get performs an ACL check to confirm that the user is |
| 92 » // ../swarming below assumes that | 121 // permitted to view the resulting data. If this check fails, get returns |
| 93 » // - this code is not executed by tests outside of this dir | 122 // errNotMiloJob. |
| 94 » // - this dir is a sibling of frontend dir | 123 func (f *swarmingFetch) get(c context.Context, svc swarmingService, taskID strin g) error { |
| 95 » logFilename := filepath.Join("..", "swarming", "testdata", taskID) | 124 » err := parallel.FanOutIn(func(workC chan<- func() error) { |
| 96 » swarmFilename := fmt.Sprintf("%s.swarm", logFilename) | 125 » » if f.fetchRes { |
| 97 » s, err := ioutil.ReadFile(swarmFilename) | 126 » » » workC <- func() (err error) { |
| 98 » if err != nil { | 127 » » » » f.res, err = svc.getSwarmingResult(c, taskID) |
| 99 » » return nil, err | 128 » » » » return |
| 100 » } | 129 » » » } |
| 101 » sr := &swarming.SwarmingRpcsTaskResult{} | |
| 102 » if err := json.Unmarshal(s, sr); err != nil { | |
| 103 » » return nil, err | |
| 104 » } | |
| 105 » return sr, nil | |
| 106 } | |
| 107 | |
| 108 func getSwarming(c context.Context, server string, taskID string) ( | |
| 109 » *swarming.SwarmingRpcsTaskResult, string, error) { | |
| 110 | |
| 111 » var log string | |
| 112 » var sr *swarming.SwarmingRpcsTaskResult | |
| 113 » var errLog, errRes error | |
| 114 | |
| 115 » // Detour: Return debugging results, useful for development. | |
| 116 » if server == "debug" { | |
| 117 » » sr, errRes = getDebugSwarmingResult(taskID) | |
| 118 » » log, errLog = getDebugTaskOutput(taskID) | |
| 119 » } else { | |
| 120 » » sc, err := getSwarmingClient(c, server) | |
| 121 » » if err != nil { | |
| 122 » » » return nil, "", err | |
| 123 } | 130 } |
| 124 | 131 |
| 125 » » var wg sync.WaitGroup | 132 » » if f.fetchLog { |
| 126 » » wg.Add(2) // Getting log and result can happen concurrently. Wa it for both. | 133 » » » workC <- func() error { |
| 127 | 134 » » » » f.log, f.logErr = svc.getTaskOutput(c, taskID) |
| 128 » » go func() { | 135 » » » » return nil |
| 129 » » » defer wg.Done() | 136 » » » } |
| 130 » » » log, errLog = getTaskOutput(sc, taskID) | 137 » » } |
| 131 » » }() | 138 » }) |
| 132 » » go func() { | 139 » if err != nil { |
|
hinoka
2017/02/03 01:02:23
If both fail the error is non-deterministic, I'd l
dnj
2017/02/03 01:04:56
This is currently the case. Note that the log one
| |
| 133 » » » defer wg.Done() | 140 » » return err |
| 134 » » » sr, errRes = sc.Task.Result(taskID).Do() | |
| 135 » » }() | |
| 136 » » wg.Wait() | |
| 137 } | 141 } |
| 138 | 142 |
| 139 » if errRes != nil { | 143 » // Current ACL implementation: error if this is not a Milo job. |
| 140 » » // Swarming result errors take priority. | 144 » switch { |
| 141 » » return sr, log, errRes | 145 » case f.fetchRes: |
| 146 » » if !isMiloJob(f.res.Tags) { | |
| 147 » » » return errNotMiloJob | |
| 148 » » } | |
| 149 » default: | |
| 150 » » // No metadata to decide if this is a Milo job, so assume that i t is not. | |
| 151 » » return errNotMiloJob | |
| 142 } | 152 } |
| 143 | 153 |
| 144 » switch sr.State { | 154 » if f.fetchRes && f.logErr != nil { |
| 145 » case TaskCompleted, TaskRunning, TaskCanceled: | 155 » » switch f.res.State { |
| 146 » default: | 156 » » case TaskCompleted, TaskRunning, TaskCanceled: |
| 147 » » // Ignore log errors if the task might be pending, timed out, e xpired, etc. | 157 » » default: |
| 148 » » if errLog != nil { | 158 » » » // Ignore log errors if the task might be pending, time d out, expired, etc. |
| 149 » » » errLog = nil | 159 » » » if err != nil { |
| 150 » » » log = "" | 160 » » » » f.log = "" |
| 161 » » » » f.logErr = nil | |
| 162 » » » } | |
| 151 } | 163 } |
| 152 } | 164 } |
| 153 » return sr, log, errLog | 165 » return f.logErr |
| 154 } | 166 } |
| 155 | 167 |
| 156 func taskProperties(sr *swarming.SwarmingRpcsTaskResult) *resp.PropertyGroup { | 168 func taskProperties(sr *swarming.SwarmingRpcsTaskResult) *resp.PropertyGroup { |
| 157 props := &resp.PropertyGroup{GroupName: "Swarming"} | 169 props := &resp.PropertyGroup{GroupName: "Swarming"} |
| 158 if len(sr.CostsUsd) == 1 { | 170 if len(sr.CostsUsd) == 1 { |
| 159 props.Property = append(props.Property, &resp.Property{ | 171 props.Property = append(props.Property, &resp.Property{ |
| 160 Key: "Cost of job (USD)", | 172 Key: "Cost of job (USD)", |
| 161 Value: fmt.Sprintf("$%.2f", sr.CostsUsd[0]), | 173 Value: fmt.Sprintf("$%.2f", sr.CostsUsd[0]), |
| 162 }) | 174 }) |
| 163 } | 175 } |
| (...skipping 232 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 396 } | 408 } |
| 397 // If this ever has more than one stream then memoryClient needs to beco me | 409 // If this ever has more than one stream then memoryClient needs to beco me |
| 398 // goroutine safe | 410 // goroutine safe |
| 399 if err := p.RunStreams([]*annotee.Stream{&is}); err != nil { | 411 if err := p.RunStreams([]*annotee.Stream{&is}); err != nil { |
| 400 return nil, err | 412 return nil, err |
| 401 } | 413 } |
| 402 p.Finish() | 414 p.Finish() |
| 403 return c.ToLogDogStreams() | 415 return c.ToLogDogStreams() |
| 404 } | 416 } |
| 405 | 417 |
| 406 func swarmingBuildImpl(c context.Context, linkBase, server, taskID string) (*res p.MiloBuild, error) { | 418 func swarmingBuildImpl(c context.Context, svc swarmingService, linkBase, taskID string) (*resp.MiloBuild, error) { |
| 407 // Fetch the data from Swarming | 419 // Fetch the data from Swarming |
| 408 » sr, body, err := getSwarming(c, server, taskID) | 420 » fetch := swarmingFetch{ |
| 409 » if err != nil { | 421 » » fetchRes: true, |
| 422 » » fetchLog: true, | |
| 423 » } | |
| 424 » if err := fetch.get(c, svc, taskID); err != nil { | |
| 410 return nil, err | 425 return nil, err |
| 411 } | 426 } |
| 412 | 427 |
| 413 allowMilo := false | |
| 414 for _, t := range sr.Tags { | |
| 415 if t == "allow_milo:1" { | |
| 416 allowMilo = true | |
| 417 break | |
| 418 } | |
| 419 } | |
| 420 if !allowMilo { | |
| 421 return nil, fmt.Errorf("Not A Milo Job") | |
| 422 } | |
| 423 | |
| 424 var build resp.MiloBuild | 428 var build resp.MiloBuild |
| 425 var s *miloProto.Step | 429 var s *miloProto.Step |
| 426 var lds *logdog.Streams | 430 var lds *logdog.Streams |
| 427 | 431 |
| 428 // Decode the data using annotee. The logdog stream returned here is ass umed | 432 // Decode the data using annotee. The logdog stream returned here is ass umed |
| 429 // to be consistent, which is why the following block of code are not | 433 // to be consistent, which is why the following block of code are not |
| 430 // expected to ever err out. | 434 // expected to ever err out. |
| 431 » if body != "" { | 435 » if fetch.log != "" { |
| 432 » » lds, err = streamsFromAnnotatedLog(c, body) | 436 » » var err error |
| 437 » » lds, err = streamsFromAnnotatedLog(c, fetch.log) | |
| 433 if err != nil { | 438 if err != nil { |
| 434 build.Components = []*resp.BuildComponent{{ | 439 build.Components = []*resp.BuildComponent{{ |
| 435 Type: resp.Summary, | 440 Type: resp.Summary, |
| 436 Label: "Milo annotation parser", | 441 Label: "Milo annotation parser", |
| 437 Text: []string{err.Error()}, | 442 Text: []string{err.Error()}, |
| 438 Status: resp.InfraFailure, | 443 Status: resp.InfraFailure, |
| 439 SubLink: []*resp.Link{{ | 444 SubLink: []*resp.Link{{ |
| 440 Label: "swarming task", | 445 Label: "swarming task", |
| 441 » » » » » URL: taskPageURL(server, taskID), | 446 » » » » » URL: taskPageURL(svc.getHost(), taskID ), |
| 442 }}, | 447 }}, |
| 443 }} | 448 }} |
| 444 } | 449 } |
| 445 } | 450 } |
| 446 | 451 |
| 447 if lds != nil && lds.MainStream != nil && lds.MainStream.Data != nil { | 452 if lds != nil && lds.MainStream != nil && lds.MainStream.Data != nil { |
| 448 s = lds.MainStream.Data | 453 s = lds.MainStream.Data |
| 449 } else { | 454 } else { |
| 450 s = &miloProto.Step{} | 455 s = &miloProto.Step{} |
| 451 } | 456 } |
| 452 | 457 |
| 453 » if err := addTaskToMiloStep(c, server, sr, s); err != nil { | 458 » if err := addTaskToMiloStep(c, svc.getHost(), fetch.res, s); err != nil { |
| 454 return nil, err | 459 return nil, err |
| 455 } | 460 } |
| 456 logdog.AddLogDogToBuild(c, swarmingURLBuilder(linkBase), s, &build) | 461 logdog.AddLogDogToBuild(c, swarmingURLBuilder(linkBase), s, &build) |
| 457 | 462 |
| 458 » if err := addTaskToBuild(c, server, sr, &build); err != nil { | 463 » if err := addTaskToBuild(c, svc.getHost(), fetch.res, &build); err != ni l { |
| 459 return nil, err | 464 return nil, err |
| 460 } | 465 } |
| 461 | 466 |
| 462 return &build, nil | 467 return &build, nil |
| 463 } | 468 } |
| 464 | 469 |
| 470 func isMiloJob(tags []string) bool { | |
| 471 for _, t := range tags { | |
| 472 if t == "allow_milo:1" { | |
| 473 return true | |
| 474 } | |
| 475 } | |
| 476 return false | |
| 477 } | |
| 478 | |
| 465 // taskPageURL returns a URL to a human-consumable page of a swarming task. | 479 // taskPageURL returns a URL to a human-consumable page of a swarming task. |
| 466 // Supports server aliases. | 480 // Supports server aliases. |
| 467 func taskPageURL(swarmingHostname, taskID string) string { | 481 func taskPageURL(swarmingHostname, taskID string) string { |
| 468 return fmt.Sprintf("https://%s/user/task/%s", swarmingHostname, taskID) | 482 return fmt.Sprintf("https://%s/user/task/%s", swarmingHostname, taskID) |
| 469 } | 483 } |
| 470 | 484 |
| 471 // botPageURL returns a URL to a human-consumable page of a swarming bot. | 485 // botPageURL returns a URL to a human-consumable page of a swarming bot. |
| 472 // Supports server aliases. | 486 // Supports server aliases. |
| 473 func botPageURL(swarmingHostname, botID string) string { | 487 func botPageURL(swarmingHostname, botID string) string { |
| 474 return fmt.Sprintf("https://%s/restricted/bot/%s", swarmingHostname, bot ID) | 488 return fmt.Sprintf("https://%s/restricted/bot/%s", swarmingHostname, bot ID) |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 508 case *miloProto.Link_Url: | 522 case *miloProto.Link_Url: |
| 509 return &resp.Link{ | 523 return &resp.Link{ |
| 510 Label: l.Label, | 524 Label: l.Label, |
| 511 URL: t.Url, | 525 URL: t.Url, |
| 512 } | 526 } |
| 513 | 527 |
| 514 default: | 528 default: |
| 515 return nil | 529 return nil |
| 516 } | 530 } |
| 517 } | 531 } |
| OLD | NEW |