| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package swarming | 5 package swarming |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "fmt" | 9 "fmt" |
| 10 "net/http" | 10 "net/http" |
| (...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 66 return sc, nil | 66 return sc, nil |
| 67 } | 67 } |
| 68 | 68 |
| 69 // swarmingService is an interface that fetches data from Swarming. | 69 // swarmingService is an interface that fetches data from Swarming. |
| 70 // | 70 // |
| 71 // In production, this is fetched from a Swarming server. For testing, this can | 71 // In production, this is fetched from a Swarming server. For testing, this can |
| 72 // be replaced with a mock. | 72 // be replaced with a mock. |
| 73 type swarmingService interface { | 73 type swarmingService interface { |
| 74 getHost() string | 74 getHost() string |
| 75 getSwarmingResult(c context.Context, taskID string) (*swarming.SwarmingR
pcsTaskResult, error) | 75 getSwarmingResult(c context.Context, taskID string) (*swarming.SwarmingR
pcsTaskResult, error) |
| 76 getSwarmingRequest(c context.Context, taskID string) (*swarming.Swarming
RpcsTaskRequest, error) |
| 76 getTaskOutput(c context.Context, taskID string) (string, error) | 77 getTaskOutput(c context.Context, taskID string) (string, error) |
| 77 } | 78 } |
| 78 | 79 |
| 79 type prodSwarmingService struct { | 80 type prodSwarmingService struct { |
| 80 host string | 81 host string |
| 81 client *swarming.Service | 82 client *swarming.Service |
| 82 } | 83 } |
| 83 | 84 |
| 84 func newProdService(c context.Context, server string) (*prodSwarmingService, err
or) { | 85 func newProdService(c context.Context, server string) (*prodSwarmingService, err
or) { |
| 85 client, err := getSwarmingClient(c, server) | 86 client, err := getSwarmingClient(c, server) |
| (...skipping 13 matching lines...) Expand all Loading... |
| 99 } | 100 } |
| 100 | 101 |
| 101 func (svc *prodSwarmingService) getTaskOutput(c context.Context, taskID string)
(string, error) { | 102 func (svc *prodSwarmingService) getTaskOutput(c context.Context, taskID string)
(string, error) { |
| 102 stdout, err := svc.client.Task.Stdout(taskID).Context(c).Do() | 103 stdout, err := svc.client.Task.Stdout(taskID).Context(c).Do() |
| 103 if err != nil { | 104 if err != nil { |
| 104 return "", err | 105 return "", err |
| 105 } | 106 } |
| 106 return stdout.Output, nil | 107 return stdout.Output, nil |
| 107 } | 108 } |
| 108 | 109 |
| 110 func (svc *prodSwarmingService) getSwarmingRequest(c context.Context, taskID str
ing) (*swarming.SwarmingRpcsTaskRequest, error) { |
| 111 return svc.client.Task.Request(taskID).Context(c).Do() |
| 112 } |
| 113 |
| 109 type swarmingFetchParams struct { | 114 type swarmingFetchParams struct { |
| 115 fetchReq bool |
| 110 fetchRes bool | 116 fetchRes bool |
| 111 fetchLog bool | 117 fetchLog bool |
| 112 } | 118 } |
| 113 | 119 |
| 114 type swarmingFetchResult struct { | 120 type swarmingFetchResult struct { |
| 121 req *swarming.SwarmingRpcsTaskRequest |
| 115 res *swarming.SwarmingRpcsTaskResult | 122 res *swarming.SwarmingRpcsTaskResult |
| 116 log string | 123 log string |
| 117 } | 124 } |
| 118 | 125 |
| 119 // swarmingFetch fetches (in parallel) the components that it is configured to | 126 // swarmingFetch fetches (in parallel) the components that it is configured to |
| 120 // fetch. | 127 // fetch. |
| 121 // | 128 // |
| 122 // After fetching, an ACL check is performed to confirm that the user is | 129 // After fetching, an ACL check is performed to confirm that the user is |
| 123 // permitted to view the resulting data. If this check fails, get returns | 130 // permitted to view the resulting data. If this check fails, get returns |
| 124 // errNotMiloJob. | 131 // errNotMiloJob. |
| 125 // | 132 // |
| 126 // TODO(hinoka): Make this ACL check something more specific than the | 133 // TODO(hinoka): Make this ACL check something more specific than the |
| 127 // resence of the "allow_milo" dimension. | 134 // resence of the "allow_milo" dimension. |
| 128 func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw
armingFetchParams) ( | 135 func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw
armingFetchParams) ( |
| 129 *swarmingFetchResult, error) { | 136 *swarmingFetchResult, error) { |
| 130 | 137 |
| 131 // logErr is managed separately from other fetch errors, since in some | 138 // logErr is managed separately from other fetch errors, since in some |
| 132 // situations it's acceptable to not have a log stream. | 139 // situations it's acceptable to not have a log stream. |
| 133 var logErr error | 140 var logErr error |
| 134 var fr swarmingFetchResult | 141 var fr swarmingFetchResult |
| 135 | 142 |
| 136 err := parallel.FanOutIn(func(workC chan<- func() error) { | 143 err := parallel.FanOutIn(func(workC chan<- func() error) { |
| 144 if req.fetchReq { |
| 145 workC <- func() (err error) { |
| 146 fr.req, err = svc.getSwarmingRequest(c, taskID) |
| 147 return |
| 148 } |
| 149 } |
| 150 |
| 137 if req.fetchRes { | 151 if req.fetchRes { |
| 138 workC <- func() (err error) { | 152 workC <- func() (err error) { |
| 139 fr.res, err = svc.getSwarmingResult(c, taskID) | 153 fr.res, err = svc.getSwarmingResult(c, taskID) |
| 140 return | 154 return |
| 141 } | 155 } |
| 142 } | 156 } |
| 143 | 157 |
| 144 if req.fetchLog { | 158 if req.fetchLog { |
| 145 workC <- func() error { | 159 workC <- func() error { |
| 146 fr.log, logErr = svc.getTaskOutput(c, taskID) | 160 fr.log, logErr = svc.getTaskOutput(c, taskID) |
| 147 return nil | 161 return nil |
| 148 } | 162 } |
| 149 } | 163 } |
| 150 }) | 164 }) |
| 151 if err != nil { | 165 if err != nil { |
| 152 return nil, err | 166 return nil, err |
| 153 } | 167 } |
| 154 | 168 |
| 155 // Current ACL implementation: error if this is not a Milo job. | 169 // Current ACL implementation: error if this is not a Milo job. |
| 156 switch { | 170 switch { |
| 171 case req.fetchReq: |
| 172 if !isMiloJob(fr.req.Tags) { |
| 173 return nil, errNotMiloJob |
| 174 } |
| 157 case req.fetchRes: | 175 case req.fetchRes: |
| 158 if !isMiloJob(fr.res.Tags) { | 176 if !isMiloJob(fr.res.Tags) { |
| 159 return nil, errNotMiloJob | 177 return nil, errNotMiloJob |
| 160 } | 178 } |
| 161 default: | 179 default: |
| 162 // No metadata to decide if this is a Milo job, so assume that i
t is not. | 180 // No metadata to decide if this is a Milo job, so assume that i
t is not. |
| 163 return nil, errNotMiloJob | 181 return nil, errNotMiloJob |
| 164 } | 182 } |
| 165 | 183 |
| 166 if req.fetchRes && logErr != nil { | 184 if req.fetchRes && logErr != nil { |
| (...skipping 368 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 535 case *miloProto.Link_Url: | 553 case *miloProto.Link_Url: |
| 536 return &resp.Link{ | 554 return &resp.Link{ |
| 537 Label: l.Label, | 555 Label: l.Label, |
| 538 URL: t.Url, | 556 URL: t.Url, |
| 539 } | 557 } |
| 540 | 558 |
| 541 default: | 559 default: |
| 542 return nil | 560 return nil |
| 543 } | 561 } |
| 544 } | 562 } |
| OLD | NEW |