| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package swarming | 5 package swarming |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "encoding/json" | 8 "encoding/json" |
| 9 "fmt" | 9 "fmt" |
| 10 "net/http" | 10 "net/http" |
| (...skipping 27 matching lines...) Expand all Loading... |
| 38 | 38 |
| 39 var _ distributor.D = (*swarmingDist)(nil) | 39 var _ distributor.D = (*swarmingDist)(nil) |
| 40 | 40 |
| 41 var swarmBotLookup = map[string]dm.AbnormalFinish_Status{ | 41 var swarmBotLookup = map[string]dm.AbnormalFinish_Status{ |
| 42 "BOT_DIED": dm.AbnormalFinish_CRASHED, | 42 "BOT_DIED": dm.AbnormalFinish_CRASHED, |
| 43 "CANCELED": dm.AbnormalFinish_CANCELLED, | 43 "CANCELED": dm.AbnormalFinish_CANCELLED, |
| 44 "EXPIRED": dm.AbnormalFinish_EXPIRED, | 44 "EXPIRED": dm.AbnormalFinish_EXPIRED, |
| 45 "TIMED_OUT": dm.AbnormalFinish_TIMED_OUT, | 45 "TIMED_OUT": dm.AbnormalFinish_TIMED_OUT, |
| 46 } | 46 } |
| 47 | 47 |
| 48 func cipdPackageFromSwarm(pkg *swarm.SwarmingRpcsCipdPackage) *sv1.CipdPackage { |
| 49 return &sv1.CipdPackage{Name: pkg.PackageName, Version: pkg.Version} |
| 50 } |
| 51 |
| 52 func cipdSpecFromSwarm(pkgs *swarm.SwarmingRpcsCipdPins) *sv1.CipdSpec { |
| 53 ret := &sv1.CipdSpec{ |
| 54 Client: cipdPackageFromSwarm(pkgs.ClientPackage), |
| 55 ByPath: map[string]*sv1.CipdSpec_CipdPackages{}, |
| 56 } |
| 57 for _, pkg := range pkgs.Packages { |
| 58 pkgs, ok := ret.ByPath[pkg.Path] |
| 59 if !ok { |
| 60 pkgs = &sv1.CipdSpec_CipdPackages{} |
| 61 ret.ByPath[pkg.Path] = pkgs |
| 62 } |
| 63 pkgs.Pkg = append(pkgs.Pkg, cipdPackageFromSwarm(pkg)) |
| 64 } |
| 65 return ret |
| 66 } |
| 67 |
| 48 func toSwarmMap(m map[string]string) []*swarm.SwarmingRpcsStringPair { | 68 func toSwarmMap(m map[string]string) []*swarm.SwarmingRpcsStringPair { |
| 49 ret := make([]*swarm.SwarmingRpcsStringPair, 0, len(m)) | 69 ret := make([]*swarm.SwarmingRpcsStringPair, 0, len(m)) |
| 50 for key, value := range m { | 70 for key, value := range m { |
| 51 ret = append(ret, &swarm.SwarmingRpcsStringPair{ | 71 ret = append(ret, &swarm.SwarmingRpcsStringPair{ |
| 52 Key: key, Value: value}) | 72 Key: key, Value: value}) |
| 53 } | 73 } |
| 54 return ret | 74 return ret |
| 55 } | 75 } |
| 56 | 76 |
| 57 func httpClients(c context.Context) (anonC, authC *http.Client) { | 77 func httpClients(c context.Context) (anonC, authC *http.Client) { |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 100 } | 120 } |
| 101 | 121 |
| 102 func (d *swarmingDist) Run(desc *dm.Quest_Desc, auth *dm.Execution_Auth, prev *d
m.JsonResult) (tok distributor.Token, _ time.Duration, err error) { | 122 func (d *swarmingDist) Run(desc *dm.Quest_Desc, auth *dm.Execution_Auth, prev *d
m.JsonResult) (tok distributor.Token, _ time.Duration, err error) { |
| 103 id := auth.Id | 123 id := auth.Id |
| 104 | 124 |
| 105 params, err := parseParams(desc) | 125 params, err := parseParams(desc) |
| 106 if err != nil { | 126 if err != nil { |
| 107 return | 127 return |
| 108 } | 128 } |
| 109 | 129 |
| 130 prevParsed := (*sv1.Result)(nil) |
| 131 if prev != nil { |
| 132 prevParsed = &sv1.Result{} |
| 133 if err = jsonpb.UnmarshalString(prev.Object, prevParsed); err !=
nil { |
| 134 err = errors.Annotate(err).Reason("parsing previous resu
lt").Err() |
| 135 return |
| 136 } |
| 137 } |
| 138 |
| 110 isoCtx, _ := context.WithTimeout(d, 30*time.Second) | 139 isoCtx, _ := context.WithTimeout(d, 30*time.Second) |
| 111 iso, err := prepIsolate(isoCtx, d.sCfg.Isolate.Url, desc, auth, prev, pa
rams) | 140 iso, err := prepIsolate(isoCtx, d.sCfg.Isolate.Url, desc, auth, prev, pa
rams) |
| 112 if err != nil { | 141 if err != nil { |
| 113 err = errors.Annotate(err).Reason("prepping Isolated").Err() | 142 err = errors.Annotate(err).Reason("prepping Isolated").Err() |
| 114 return | 143 return |
| 115 } | 144 } |
| 116 | 145 |
| 146 cipdInput := (*swarm.SwarmingRpcsCipdInput)(nil) |
| 147 if prevParsed != nil { |
| 148 cipdInput = prevParsed.CipdPins.ToCipdInput() |
| 149 } else { |
| 150 cipdInput = params.Job.Inputs.Cipd.ToCipdInput() |
| 151 } |
| 152 |
| 117 topic, token, err := d.cfg.PrepareTopic(d, auth.Id) | 153 topic, token, err := d.cfg.PrepareTopic(d, auth.Id) |
| 118 if err != nil { | 154 if err != nil { |
| 119 err = errors.Annotate(err).Reason("preparing topic").Err() | 155 err = errors.Annotate(err).Reason("preparing topic").Err() |
| 120 return | 156 return |
| 121 } | 157 } |
| 122 | 158 |
| 123 cipdInput := (*swarm.SwarmingRpcsCipdInput)(nil) | |
| 124 if len(params.Job.Inputs.Packages) > 0 { | |
| 125 cipdInput := &swarm.SwarmingRpcsCipdInput{ | |
| 126 Server: params.Job.Inputs.CipdServer, | |
| 127 } | |
| 128 for _, pkg := range params.Job.Inputs.Packages { | |
| 129 cipdInput.Packages = append(cipdInput.Packages, &swarm.S
warmingRpcsCipdPackage{ | |
| 130 PackageName: pkg.Name, | |
| 131 Path: pkg.Path, | |
| 132 Version: pkg.Version, | |
| 133 }) | |
| 134 } | |
| 135 } | |
| 136 | |
| 137 dims := []*swarm.SwarmingRpcsStringPair(nil) | |
| 138 for key, value := range params.Scheduling.Dimensions { | |
| 139 dims = append(dims, &swarm.SwarmingRpcsStringPair{Key: key, Valu
e: value}) | |
| 140 } | |
| 141 | |
| 142 prefix := params.Meta.NamePrefix | 159 prefix := params.Meta.NamePrefix |
| 143 if len(prefix) > 0 { | 160 if len(prefix) > 0 { |
| 144 prefix += " / " | 161 prefix += " / " |
| 145 } | 162 } |
| 146 | 163 |
| 164 dims := make(map[string]string, len(params.Scheduling.Dimensions)) |
| 165 for k, v := range params.Scheduling.Dimensions { |
| 166 dims[k] = v |
| 167 } |
| 168 if prevParsed != nil { |
| 169 for k, v := range prevParsed.SnapshotDimensions { |
| 170 dims[k] = v |
| 171 } |
| 172 } |
| 173 |
| 147 tags := []string{ | 174 tags := []string{ |
| 148 "requestor:DM", | 175 "requestor:DM", |
| 149 "requestor:" + info.TrimmedAppID(d), | 176 "requestor:" + info.TrimmedAppID(d), |
| 150 "requestor:swarming_v1", | 177 "requestor:swarming_v1", |
| 151 fmt.Sprintf("quest:%s", id.Quest), | 178 fmt.Sprintf("quest:%s", id.Quest), |
| 152 fmt.Sprintf("attempt:%s|%d", id.Quest, id.Attempt), | 179 fmt.Sprintf("attempt:%s|%d", id.Quest, id.Attempt), |
| 153 fmt.Sprintf("execution:%s|%d|%d", id.Quest, id.Attempt, id.Id), | 180 fmt.Sprintf("execution:%s|%d|%d", id.Quest, id.Attempt, id.Id), |
| 154 } | 181 } |
| 155 | 182 |
| 156 rslt := (*swarm.SwarmingRpcsTaskRequestMetadata)(nil) | 183 rslt := (*swarm.SwarmingRpcsTaskRequestMetadata)(nil) |
| 157 err = retry.Retry(d, retry.Default, func() (err error) { | 184 err = retry.Retry(d, retry.Default, func() (err error) { |
| 158 rpcCtx, _ := context.WithTimeout(d, 10*time.Second) | 185 rpcCtx, _ := context.WithTimeout(d, 10*time.Second) |
| 159 rslt, err = newSwarmClient(rpcCtx, d.sCfg).Tasks.New(&swarm.Swar
mingRpcsNewTaskRequest{ | 186 rslt, err = newSwarmClient(rpcCtx, d.sCfg).Tasks.New(&swarm.Swar
mingRpcsNewTaskRequest{ |
| 160 ExpirationSecs: int64(desc.Meta.Timeouts.Start.Duration(
).Seconds()), | 187 ExpirationSecs: int64(desc.Meta.Timeouts.Start.Duration(
).Seconds()), |
| 161 Name: fmt.Sprintf("%s%s|%d|%d", prefix, id.Que
st, id.Attempt, id.Id), | 188 Name: fmt.Sprintf("%s%s|%d|%d", prefix, id.Que
st, id.Attempt, id.Id), |
| 162 | 189 |
| 163 // Priority is already pre-Normalize()'d | |
| 164 Priority: int64(params.Scheduling.Priority), | 190 Priority: int64(params.Scheduling.Priority), |
| 165 | 191 |
| 166 Properties: &swarm.SwarmingRpcsTaskProperties{ | 192 Properties: &swarm.SwarmingRpcsTaskProperties{ |
| 167 CipdInput: cipdInput, | 193 CipdInput: cipdInput, |
| 168 » » » » Dimensions: toSwarmMap(params.Scheduli
ng.Dimensions), | 194 » » » » Dimensions: toSwarmMap(dims), |
| 169 Env: toSwarmMap(params.Job.Env)
, | 195 Env: toSwarmMap(params.Job.Env)
, |
| 170 ExecutionTimeoutSecs: int64(desc.Meta.Timeouts.R
un.Duration().Seconds()), | 196 ExecutionTimeoutSecs: int64(desc.Meta.Timeouts.R
un.Duration().Seconds()), |
| 171 GracePeriodSecs: int64(desc.Meta.Timeouts.S
top.Duration().Seconds()), | 197 GracePeriodSecs: int64(desc.Meta.Timeouts.S
top.Duration().Seconds()), |
| 172 IoTimeoutSecs: int64(params.Scheduling.Io
Timeout.Duration().Seconds()), | 198 IoTimeoutSecs: int64(params.Scheduling.Io
Timeout.Duration().Seconds()), |
| 173 InputsRef: iso, | 199 InputsRef: iso, |
| 174 }, | 200 }, |
| 175 | 201 |
| 176 PubsubTopic: topic.String(), | 202 PubsubTopic: topic.String(), |
| 177 PubsubAuthToken: token, | 203 PubsubAuthToken: token, |
| 178 | 204 |
| (...skipping 11 matching lines...) Expand all Loading... |
| 190 } | 216 } |
| 191 | 217 |
| 192 func (d *swarmingDist) Cancel(q *dm.Quest_Desc, tok distributor.Token) error { | 218 func (d *swarmingDist) Cancel(q *dm.Quest_Desc, tok distributor.Token) error { |
| 193 return retry.Retry(d, retry.Default, func() (err error) { | 219 return retry.Retry(d, retry.Default, func() (err error) { |
| 194 ctx, _ := context.WithTimeout(d, 10*time.Second) | 220 ctx, _ := context.WithTimeout(d, 10*time.Second) |
| 195 _, err = newSwarmClient(ctx, d.sCfg).Task.Cancel(string(tok)).Co
ntext(ctx).Do() | 221 _, err = newSwarmClient(ctx, d.sCfg).Task.Cancel(string(tok)).Co
ntext(ctx).Do() |
| 196 return | 222 return |
| 197 }, retry.LogCallback(d, "swarm.Task.Cancel")) | 223 }, retry.LogCallback(d, "swarm.Task.Cancel")) |
| 198 } | 224 } |
| 199 | 225 |
| 200 func (d *swarmingDist) GetStatus(_ *dm.Quest_Desc, tok distributor.Token) (*dm.R
esult, error) { | 226 func snapshotDimensions(p *sv1.Parameters, dims []*swarm.SwarmingRpcsStringListP
air) map[string]string { |
| 227 » if len(p.Scheduling.SnapshotDimensions) == 0 { |
| 228 » » return nil |
| 229 » } |
| 230 » allDims := map[string]string{} |
| 231 » for _, dim := range dims { |
| 232 » » allDims[dim.Key] = dim.Value[len(dim.Value)-1] |
| 233 » } |
| 234 |
| 235 » ret := make(map[string]string, len(p.Scheduling.SnapshotDimensions)) |
| 236 » for _, k := range p.Scheduling.SnapshotDimensions { |
| 237 » » if v, ok := allDims[k]; ok { |
| 238 » » » ret[k] = v |
| 239 » » } |
| 240 » } |
| 241 » return ret |
| 242 } |
| 243 |
| 244 func (d *swarmingDist) GetStatus(q *dm.Quest_Desc, tok distributor.Token) (*dm.R
esult, error) { |
| 201 rslt := (*swarm.SwarmingRpcsTaskResult)(nil) | 245 rslt := (*swarm.SwarmingRpcsTaskResult)(nil) |
| 202 | 246 |
| 203 err := retry.Retry(d, retry.Default, func() (err error) { | 247 err := retry.Retry(d, retry.Default, func() (err error) { |
| 204 ctx, _ := context.WithTimeout(d, 10*time.Second) | 248 ctx, _ := context.WithTimeout(d, 10*time.Second) |
| 205 rslt, err = newSwarmClient(ctx, d.sCfg).Task.Result(string(tok))
.Context(ctx).Do() | 249 rslt, err = newSwarmClient(ctx, d.sCfg).Task.Result(string(tok))
.Context(ctx).Do() |
| 206 return | 250 return |
| 207 }, retry.LogCallback(d, fmt.Sprintf("swarm.Task.Result(%s)", tok))) | 251 }, retry.LogCallback(d, fmt.Sprintf("swarm.Task.Result(%s)", tok))) |
| 208 if err != nil { | 252 if err != nil { |
| 209 if gerr := err.(*googleapi.Error); gerr != nil { | 253 if gerr := err.(*googleapi.Error); gerr != nil { |
| 210 if gerr.Code == http.StatusNotFound { | 254 if gerr.Code == http.StatusNotFound { |
| (...skipping 19 matching lines...) Expand all Loading... |
| 230 ret.AbnormalFinish = &dm.AbnormalFinish{ | 274 ret.AbnormalFinish = &dm.AbnormalFinish{ |
| 231 Status: dm.AbnormalFinish_CRASHED, | 275 Status: dm.AbnormalFinish_CRASHED, |
| 232 Reason: fmt.Sprintf("swarming: COMPLETED/Interna
lFailure(%d)", rslt.ExitCode), | 276 Reason: fmt.Sprintf("swarming: COMPLETED/Interna
lFailure(%d)", rslt.ExitCode), |
| 233 } | 277 } |
| 234 break | 278 break |
| 235 } | 279 } |
| 236 | 280 |
| 237 retData := &sv1.Result{ | 281 retData := &sv1.Result{ |
| 238 ExitCode: rslt.ExitCode, | 282 ExitCode: rslt.ExitCode, |
| 239 } | 283 } |
| 284 if rslt.CipdPins != nil { |
| 285 retData.CipdPins = cipdSpecFromSwarm(rslt.CipdPins) |
| 286 } |
| 287 params, err := parseParams(q) |
| 288 if err != nil { |
| 289 return nil, err |
| 290 } |
| 291 retData.SnapshotDimensions = snapshotDimensions(params, rslt.Bot
Dimensions) |
| 292 |
| 240 if ref := rslt.OutputsRef; ref != nil { | 293 if ref := rslt.OutputsRef; ref != nil { |
| 241 retData.IsolatedOutdir = &sv1.IsolatedRef{ | 294 retData.IsolatedOutdir = &sv1.IsolatedRef{ |
| 242 Id: ref.Isolated, Server: ref.Isolatedserver} | 295 Id: ref.Isolated, Server: ref.Isolatedserver} |
| 243 } | 296 } |
| 244 data, err := (&jsonpb.Marshaler{OrigName: true}).MarshalToString
(retData) | 297 data, err := (&jsonpb.Marshaler{OrigName: true}).MarshalToString
(retData) |
| 245 if err != nil { | 298 if err != nil { |
| 246 panic(err) | 299 panic(err) |
| 247 } | 300 } |
| 248 | 301 |
| 249 ret.Data = &dm.JsonResult{ | 302 ret.Data = &dm.JsonResult{ |
| (...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 301 | 354 |
| 302 func factory(c context.Context, cfg *distributor.Config) (distributor.D, error)
{ | 355 func factory(c context.Context, cfg *distributor.Config) (distributor.D, error)
{ |
| 303 return &swarmingDist{c, cfg, cfg.Content.(*sv1.Config)}, nil | 356 return &swarmingDist{c, cfg, cfg.Content.(*sv1.Config)}, nil |
| 304 } | 357 } |
| 305 | 358 |
| 306 // AddFactory adds this distributor implementation into the distributor | 359 // AddFactory adds this distributor implementation into the distributor |
| 307 // Registry. | 360 // Registry. |
| 308 func AddFactory(m distributor.FactoryMap) { | 361 func AddFactory(m distributor.FactoryMap) { |
| 309 m[(*sv1.Config)(nil)] = factory | 362 m[(*sv1.Config)(nil)] = factory |
| 310 } | 363 } |
| OLD | NEW |