Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(97)

Side by Side Diff: dm/appengine/distributor/swarming/v1/distributor.go

Issue 2338153003: Add snapshotting for CIPD packages and dimensions to DM. (Closed)
Patch Set: Remove cleanup noise Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The LUCI Authors. All rights reserved. 1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package swarming 5 package swarming
6 6
7 import ( 7 import (
8 "encoding/json" 8 "encoding/json"
9 "fmt" 9 "fmt"
10 "net/http" 10 "net/http"
(...skipping 27 matching lines...) Expand all
38 38
39 var _ distributor.D = (*swarmingDist)(nil) 39 var _ distributor.D = (*swarmingDist)(nil)
40 40
41 var swarmBotLookup = map[string]dm.AbnormalFinish_Status{ 41 var swarmBotLookup = map[string]dm.AbnormalFinish_Status{
42 "BOT_DIED": dm.AbnormalFinish_CRASHED, 42 "BOT_DIED": dm.AbnormalFinish_CRASHED,
43 "CANCELED": dm.AbnormalFinish_CANCELLED, 43 "CANCELED": dm.AbnormalFinish_CANCELLED,
44 "EXPIRED": dm.AbnormalFinish_EXPIRED, 44 "EXPIRED": dm.AbnormalFinish_EXPIRED,
45 "TIMED_OUT": dm.AbnormalFinish_TIMED_OUT, 45 "TIMED_OUT": dm.AbnormalFinish_TIMED_OUT,
46 } 46 }
47 47
48 func cipdPackageFromSwarm(pkg *swarm.SwarmingRpcsCipdPackage) *sv1.CipdPackage {
49 return &sv1.CipdPackage{Name: pkg.PackageName, Version: pkg.Version}
50 }
51
52 func cipdSpecFromSwarm(pkgs *swarm.SwarmingRpcsCipdPins) *sv1.CipdSpec {
53 ret := &sv1.CipdSpec{
54 Client: cipdPackageFromSwarm(pkgs.ClientPackage),
55 ByPath: map[string]*sv1.CipdSpec_CipdPackages{},
56 }
57 for _, pkg := range pkgs.Packages {
58 pkgs, ok := ret.ByPath[pkg.Path]
59 if !ok {
60 pkgs = &sv1.CipdSpec_CipdPackages{}
61 ret.ByPath[pkg.Path] = pkgs
62 }
63 pkgs.Pkg = append(pkgs.Pkg, cipdPackageFromSwarm(pkg))
64 }
65 return ret
66 }
67
48 func toSwarmMap(m map[string]string) []*swarm.SwarmingRpcsStringPair { 68 func toSwarmMap(m map[string]string) []*swarm.SwarmingRpcsStringPair {
49 ret := make([]*swarm.SwarmingRpcsStringPair, 0, len(m)) 69 ret := make([]*swarm.SwarmingRpcsStringPair, 0, len(m))
50 for key, value := range m { 70 for key, value := range m {
51 ret = append(ret, &swarm.SwarmingRpcsStringPair{ 71 ret = append(ret, &swarm.SwarmingRpcsStringPair{
52 Key: key, Value: value}) 72 Key: key, Value: value})
53 } 73 }
54 return ret 74 return ret
55 } 75 }
56 76
57 func httpClients(c context.Context) (anonC, authC *http.Client) { 77 func httpClients(c context.Context) (anonC, authC *http.Client) {
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
100 } 120 }
101 121
102 func (d *swarmingDist) Run(desc *dm.Quest_Desc, auth *dm.Execution_Auth, prev *d m.JsonResult) (tok distributor.Token, _ time.Duration, err error) { 122 func (d *swarmingDist) Run(desc *dm.Quest_Desc, auth *dm.Execution_Auth, prev *d m.JsonResult) (tok distributor.Token, _ time.Duration, err error) {
103 id := auth.Id 123 id := auth.Id
104 124
105 params, err := parseParams(desc) 125 params, err := parseParams(desc)
106 if err != nil { 126 if err != nil {
107 return 127 return
108 } 128 }
109 129
130 prevParsed := (*sv1.Result)(nil)
131 if prev != nil {
132 prevParsed = &sv1.Result{}
133 if err = jsonpb.UnmarshalString(prev.Object, prevParsed); err != nil {
134 err = errors.Annotate(err).Reason("parsing previous resu lt").Err()
135 return
136 }
137 }
138
110 isoCtx, _ := context.WithTimeout(d, 30*time.Second) 139 isoCtx, _ := context.WithTimeout(d, 30*time.Second)
111 iso, err := prepIsolate(isoCtx, d.sCfg.Isolate.Url, desc, auth, prev, pa rams) 140 iso, err := prepIsolate(isoCtx, d.sCfg.Isolate.Url, desc, auth, prev, pa rams)
112 if err != nil { 141 if err != nil {
113 err = errors.Annotate(err).Reason("prepping Isolated").Err() 142 err = errors.Annotate(err).Reason("prepping Isolated").Err()
114 return 143 return
115 } 144 }
116 145
146 cipdInput := (*swarm.SwarmingRpcsCipdInput)(nil)
147 if prev != nil {
Vadim Sh. 2016/09/26 23:16:06 nit: prevParsed != nil (and below)
iannucci 2016/09/27 01:23:56 ah yeah. done
148 cipdInput = prevParsed.CipdPins.ToCipdInput()
149 } else {
150 cipdInput = params.Job.Inputs.Cipd.ToCipdInput()
151 }
iannucci 2016/09/26 23:15:17 this is where the pinned cipd inputs are applied
152
117 topic, token, err := d.cfg.PrepareTopic(d, auth.Id) 153 topic, token, err := d.cfg.PrepareTopic(d, auth.Id)
118 if err != nil { 154 if err != nil {
119 err = errors.Annotate(err).Reason("preparing topic").Err() 155 err = errors.Annotate(err).Reason("preparing topic").Err()
120 return 156 return
121 } 157 }
122 158
123 cipdInput := (*swarm.SwarmingRpcsCipdInput)(nil)
124 if len(params.Job.Inputs.Packages) > 0 {
125 cipdInput := &swarm.SwarmingRpcsCipdInput{
126 Server: params.Job.Inputs.CipdServer,
127 }
128 for _, pkg := range params.Job.Inputs.Packages {
129 cipdInput.Packages = append(cipdInput.Packages, &swarm.S warmingRpcsCipdPackage{
130 PackageName: pkg.Name,
131 Path: pkg.Path,
132 Version: pkg.Version,
133 })
134 }
135 }
136
137 dims := []*swarm.SwarmingRpcsStringPair(nil)
138 for key, value := range params.Scheduling.Dimensions {
139 dims = append(dims, &swarm.SwarmingRpcsStringPair{Key: key, Valu e: value})
140 }
141
142 prefix := params.Meta.NamePrefix 159 prefix := params.Meta.NamePrefix
143 if len(prefix) > 0 { 160 if len(prefix) > 0 {
144 prefix += " / " 161 prefix += " / "
145 } 162 }
146 163
164 dims := make(map[string]string, len(params.Scheduling.Dimensions))
165 for k, v := range params.Scheduling.Dimensions {
166 dims[k] = v
167 }
168 if prev != nil {
169 for k, v := range prevParsed.SnapshotDimensions {
Vadim Sh. 2016/09/26 23:16:06 I'm not 100% sure it is what we want all the time.
iannucci 2016/09/27 01:23:56 yeah me either, but it's a start. I think even if
Vadim Sh. 2016/09/27 03:07:11 No, it's fine as is.
170 dims[k] = v
171 }
172 }
iannucci 2016/09/26 23:15:17 this is where the pinned dimensions are applied
173
147 tags := []string{ 174 tags := []string{
148 "requestor:DM", 175 "requestor:DM",
149 "requestor:" + info.TrimmedAppID(d), 176 "requestor:" + info.TrimmedAppID(d),
150 "requestor:swarming_v1", 177 "requestor:swarming_v1",
151 fmt.Sprintf("quest:%s", id.Quest), 178 fmt.Sprintf("quest:%s", id.Quest),
152 fmt.Sprintf("attempt:%s|%d", id.Quest, id.Attempt), 179 fmt.Sprintf("attempt:%s|%d", id.Quest, id.Attempt),
153 fmt.Sprintf("execution:%s|%d|%d", id.Quest, id.Attempt, id.Id), 180 fmt.Sprintf("execution:%s|%d|%d", id.Quest, id.Attempt, id.Id),
154 } 181 }
155 182
156 rslt := (*swarm.SwarmingRpcsTaskRequestMetadata)(nil) 183 rslt := (*swarm.SwarmingRpcsTaskRequestMetadata)(nil)
157 err = retry.Retry(d, retry.Default, func() (err error) { 184 err = retry.Retry(d, retry.Default, func() (err error) {
158 rpcCtx, _ := context.WithTimeout(d, 10*time.Second) 185 rpcCtx, _ := context.WithTimeout(d, 10*time.Second)
159 rslt, err = newSwarmClient(rpcCtx, d.sCfg).Tasks.New(&swarm.Swar mingRpcsNewTaskRequest{ 186 rslt, err = newSwarmClient(rpcCtx, d.sCfg).Tasks.New(&swarm.Swar mingRpcsNewTaskRequest{
160 ExpirationSecs: int64(desc.Meta.Timeouts.Start.Duration( ).Seconds()), 187 ExpirationSecs: int64(desc.Meta.Timeouts.Start.Duration( ).Seconds()),
161 Name: fmt.Sprintf("%s%s|%d|%d", prefix, id.Que st, id.Attempt, id.Id), 188 Name: fmt.Sprintf("%s%s|%d|%d", prefix, id.Que st, id.Attempt, id.Id),
162 189
163 // Priority is already pre-Normalize()'d
164 Priority: int64(params.Scheduling.Priority), 190 Priority: int64(params.Scheduling.Priority),
165 191
166 Properties: &swarm.SwarmingRpcsTaskProperties{ 192 Properties: &swarm.SwarmingRpcsTaskProperties{
167 CipdInput: cipdInput, 193 CipdInput: cipdInput,
168 » » » » Dimensions: toSwarmMap(params.Scheduli ng.Dimensions), 194 » » » » Dimensions: toSwarmMap(dims),
169 Env: toSwarmMap(params.Job.Env) , 195 Env: toSwarmMap(params.Job.Env) ,
170 ExecutionTimeoutSecs: int64(desc.Meta.Timeouts.R un.Duration().Seconds()), 196 ExecutionTimeoutSecs: int64(desc.Meta.Timeouts.R un.Duration().Seconds()),
171 GracePeriodSecs: int64(desc.Meta.Timeouts.S top.Duration().Seconds()), 197 GracePeriodSecs: int64(desc.Meta.Timeouts.S top.Duration().Seconds()),
172 IoTimeoutSecs: int64(params.Scheduling.Io Timeout.Duration().Seconds()), 198 IoTimeoutSecs: int64(params.Scheduling.Io Timeout.Duration().Seconds()),
173 InputsRef: iso, 199 InputsRef: iso,
174 }, 200 },
175 201
176 PubsubTopic: topic.String(), 202 PubsubTopic: topic.String(),
177 PubsubAuthToken: token, 203 PubsubAuthToken: token,
178 204
(...skipping 11 matching lines...) Expand all
190 } 216 }
191 217
192 func (d *swarmingDist) Cancel(q *dm.Quest_Desc, tok distributor.Token) error { 218 func (d *swarmingDist) Cancel(q *dm.Quest_Desc, tok distributor.Token) error {
193 return retry.Retry(d, retry.Default, func() (err error) { 219 return retry.Retry(d, retry.Default, func() (err error) {
194 ctx, _ := context.WithTimeout(d, 10*time.Second) 220 ctx, _ := context.WithTimeout(d, 10*time.Second)
195 _, err = newSwarmClient(ctx, d.sCfg).Task.Cancel(string(tok)).Co ntext(ctx).Do() 221 _, err = newSwarmClient(ctx, d.sCfg).Task.Cancel(string(tok)).Co ntext(ctx).Do()
196 return 222 return
197 }, retry.LogCallback(d, "swarm.Task.Cancel")) 223 }, retry.LogCallback(d, "swarm.Task.Cancel"))
198 } 224 }
199 225
200 func (d *swarmingDist) GetStatus(_ *dm.Quest_Desc, tok distributor.Token) (*dm.R esult, error) { 226 func snapshotDimensions(p *sv1.Parameters, dims []*swarm.SwarmingRpcsStringListP air) map[string]string {
227 » if len(p.Scheduling.SnapshotDimensions) == 0 {
228 » » return nil
229 » }
230 » allDims := map[string]string{}
231 » for _, dim := range dims {
232 » » allDims[dim.Key] = dim.Value[len(dim.Value)-1]
Vadim Sh. 2016/09/26 23:16:06 I don't think we guarantee the order of dimensions
233 » }
234
235 » ret := make(map[string]string, len(p.Scheduling.SnapshotDimensions))
236 » for _, k := range p.Scheduling.SnapshotDimensions {
237 » » if v, ok := allDims[k]; ok {
238 » » » ret[k] = v
239 » » }
240 » }
241 » return ret
242 }
243
244 func (d *swarmingDist) GetStatus(q *dm.Quest_Desc, tok distributor.Token) (*dm.R esult, error) {
201 rslt := (*swarm.SwarmingRpcsTaskResult)(nil) 245 rslt := (*swarm.SwarmingRpcsTaskResult)(nil)
202 246
203 err := retry.Retry(d, retry.Default, func() (err error) { 247 err := retry.Retry(d, retry.Default, func() (err error) {
204 ctx, _ := context.WithTimeout(d, 10*time.Second) 248 ctx, _ := context.WithTimeout(d, 10*time.Second)
205 rslt, err = newSwarmClient(ctx, d.sCfg).Task.Result(string(tok)) .Context(ctx).Do() 249 rslt, err = newSwarmClient(ctx, d.sCfg).Task.Result(string(tok)) .Context(ctx).Do()
206 return 250 return
207 }, retry.LogCallback(d, fmt.Sprintf("swarm.Task.Result(%s)", tok))) 251 }, retry.LogCallback(d, fmt.Sprintf("swarm.Task.Result(%s)", tok)))
208 if err != nil { 252 if err != nil {
209 if gerr := err.(*googleapi.Error); gerr != nil { 253 if gerr := err.(*googleapi.Error); gerr != nil {
210 if gerr.Code == http.StatusNotFound { 254 if gerr.Code == http.StatusNotFound {
(...skipping 19 matching lines...) Expand all
230 ret.AbnormalFinish = &dm.AbnormalFinish{ 274 ret.AbnormalFinish = &dm.AbnormalFinish{
231 Status: dm.AbnormalFinish_CRASHED, 275 Status: dm.AbnormalFinish_CRASHED,
232 Reason: fmt.Sprintf("swarming: COMPLETED/Interna lFailure(%d)", rslt.ExitCode), 276 Reason: fmt.Sprintf("swarming: COMPLETED/Interna lFailure(%d)", rslt.ExitCode),
233 } 277 }
234 break 278 break
235 } 279 }
236 280
237 retData := &sv1.Result{ 281 retData := &sv1.Result{
238 ExitCode: rslt.ExitCode, 282 ExitCode: rslt.ExitCode,
239 } 283 }
284 if rslt.CipdPins != nil {
285 retData.CipdPins = cipdSpecFromSwarm(rslt.CipdPins)
286 }
287 params, err := parseParams(q)
288 if err != nil {
289 return nil, err
290 }
291 retData.SnapshotDimensions = snapshotDimensions(params, rslt.Bot Dimensions)
iannucci 2016/09/26 23:15:17 this is where the pinning happens
292
240 if ref := rslt.OutputsRef; ref != nil { 293 if ref := rslt.OutputsRef; ref != nil {
241 retData.IsolatedOutdir = &sv1.IsolatedRef{ 294 retData.IsolatedOutdir = &sv1.IsolatedRef{
242 Id: ref.Isolated, Server: ref.Isolatedserver} 295 Id: ref.Isolated, Server: ref.Isolatedserver}
243 } 296 }
244 data, err := (&jsonpb.Marshaler{OrigName: true}).MarshalToString (retData) 297 data, err := (&jsonpb.Marshaler{OrigName: true}).MarshalToString (retData)
245 if err != nil { 298 if err != nil {
246 panic(err) 299 panic(err)
247 } 300 }
248 301
249 ret.Data = &dm.JsonResult{ 302 ret.Data = &dm.JsonResult{
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
301 354
302 func factory(c context.Context, cfg *distributor.Config) (distributor.D, error) { 355 func factory(c context.Context, cfg *distributor.Config) (distributor.D, error) {
303 return &swarmingDist{c, cfg, cfg.Content.(*sv1.Config)}, nil 356 return &swarmingDist{c, cfg, cfg.Content.(*sv1.Config)}, nil
304 } 357 }
305 358
306 // AddFactory adds this distributor implementation into the distributor 359 // AddFactory adds this distributor implementation into the distributor
307 // Registry. 360 // Registry.
308 func AddFactory(m distributor.FactoryMap) { 361 func AddFactory(m distributor.FactoryMap) {
309 m[(*sv1.Config)(nil)] = factory 362 m[(*sv1.Config)(nil)] = factory
310 } 363 }
OLDNEW
« dm/api/distributor/swarming/v1/result.proto ('K') | « dm/api/distributor/swarming/v1/result.pb.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698