OLD | NEW |
1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package swarming | 5 package swarming |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "encoding/base64" | 9 "encoding/base64" |
10 "encoding/json" | 10 "encoding/json" |
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
103 // can only happen with nil client | 103 // can only happen with nil client |
104 panic(err) | 104 panic(err) |
105 } | 105 } |
106 svc.BasePath = cfg.Swarming.Url + "/_ah/api/swarming/v1/" | 106 svc.BasePath = cfg.Swarming.Url + "/_ah/api/swarming/v1/" |
107 return svc | 107 return svc |
108 } | 108 } |
109 | 109 |
110 func parseParams(desc *dm.Quest_Desc) (ret *sv1.Parameters, err error) { | 110 func parseParams(desc *dm.Quest_Desc) (ret *sv1.Parameters, err error) { |
111 ret = &sv1.Parameters{} | 111 ret = &sv1.Parameters{} |
112 if err = jsonpb.UnmarshalString(desc.DistributorParameters, ret); err !=
nil { | 112 if err = jsonpb.UnmarshalString(desc.DistributorParameters, ret); err !=
nil { |
113 » » err = errors.Annotate(err). | 113 » » err = errors.Annotate(err, "unmarshalling DistributorParameters"
). |
114 » » » Reason("unmarshalling DistributorParameters"). | |
115 InternalReason("These paramaeters were already validated
?"). | 114 InternalReason("These paramaeters were already validated
?"). |
116 Err() | 115 Err() |
117 return | 116 return |
118 } | 117 } |
119 if err = ret.Normalize(); err != nil { | 118 if err = ret.Normalize(); err != nil { |
120 » » err = errors.Annotate(err). | 119 » » err = errors.Annotate(err, "normalizing DistributorParameters"). |
121 » » » Reason("normalizing DistributorParameters"). | |
122 InternalReason("These paramaeters were already normalize
d successfully once?"). | 120 InternalReason("These paramaeters were already normalize
d successfully once?"). |
123 Err() | 121 Err() |
124 return | 122 return |
125 } | 123 } |
126 return | 124 return |
127 } | 125 } |
128 | 126 |
129 func (d *swarmingDist) Run(desc *dm.Quest_Desc, auth *dm.Execution_Auth, prev *d
m.JsonResult) (tok distributor.Token, _ time.Duration, err error) { | 127 func (d *swarmingDist) Run(desc *dm.Quest_Desc, auth *dm.Execution_Auth, prev *d
m.JsonResult) (tok distributor.Token, _ time.Duration, err error) { |
130 id := auth.Id | 128 id := auth.Id |
131 | 129 |
132 params, err := parseParams(desc) | 130 params, err := parseParams(desc) |
133 if err != nil { | 131 if err != nil { |
134 return | 132 return |
135 } | 133 } |
136 | 134 |
137 prevParsed := (*sv1.Result)(nil) | 135 prevParsed := (*sv1.Result)(nil) |
138 if prev != nil { | 136 if prev != nil { |
139 prevParsed = &sv1.Result{} | 137 prevParsed = &sv1.Result{} |
140 if err = jsonpb.UnmarshalString(prev.Object, prevParsed); err !=
nil { | 138 if err = jsonpb.UnmarshalString(prev.Object, prevParsed); err !=
nil { |
141 » » » err = errors.Annotate(err).Reason("parsing previous resu
lt").Err() | 139 » » » err = errors.Annotate(err, "parsing previous result").Er
r() |
142 return | 140 return |
143 } | 141 } |
144 } | 142 } |
145 | 143 |
146 isoCtx, _ := context.WithTimeout(d, 30*time.Second) | 144 isoCtx, _ := context.WithTimeout(d, 30*time.Second) |
147 iso, err := prepIsolate(isoCtx, d.sCfg.Isolate.Url, desc, prev, params) | 145 iso, err := prepIsolate(isoCtx, d.sCfg.Isolate.Url, desc, prev, params) |
148 if err != nil { | 146 if err != nil { |
149 » » err = errors.Annotate(err).Reason("prepping Isolated").Err() | 147 » » err = errors.Annotate(err, "prepping Isolated").Err() |
150 return | 148 return |
151 } | 149 } |
152 | 150 |
153 secretBytesRaw := &bytes.Buffer{} | 151 secretBytesRaw := &bytes.Buffer{} |
154 marshaller := &jsonpb.Marshaler{OrigName: true} | 152 marshaller := &jsonpb.Marshaler{OrigName: true} |
155 if err = marshaller.Marshal(secretBytesRaw, auth); err != nil { | 153 if err = marshaller.Marshal(secretBytesRaw, auth); err != nil { |
156 return | 154 return |
157 } | 155 } |
158 secretBytes := base64.StdEncoding.EncodeToString(secretBytesRaw.Bytes()) | 156 secretBytes := base64.StdEncoding.EncodeToString(secretBytesRaw.Bytes()) |
159 | 157 |
160 cipdInput := (*swarm.SwarmingRpcsCipdInput)(nil) | 158 cipdInput := (*swarm.SwarmingRpcsCipdInput)(nil) |
161 if prevParsed != nil { | 159 if prevParsed != nil { |
162 cipdInput = prevParsed.CipdPins.ToCipdInput() | 160 cipdInput = prevParsed.CipdPins.ToCipdInput() |
163 } else { | 161 } else { |
164 cipdInput = params.Job.Inputs.Cipd.ToCipdInput() | 162 cipdInput = params.Job.Inputs.Cipd.ToCipdInput() |
165 } | 163 } |
166 | 164 |
167 topic, token, err := d.cfg.PrepareTopic(d, auth.Id) | 165 topic, token, err := d.cfg.PrepareTopic(d, auth.Id) |
168 if err != nil { | 166 if err != nil { |
169 » » err = errors.Annotate(err).Reason("preparing topic").Err() | 167 » » err = errors.Annotate(err, "preparing topic").Err() |
170 return | 168 return |
171 } | 169 } |
172 | 170 |
173 prefix := params.Meta.NamePrefix | 171 prefix := params.Meta.NamePrefix |
174 if len(prefix) > 0 { | 172 if len(prefix) > 0 { |
175 prefix += " / " | 173 prefix += " / " |
176 } | 174 } |
177 | 175 |
178 dims := make(map[string]string, len(params.Scheduling.Dimensions)) | 176 dims := make(map[string]string, len(params.Scheduling.Dimensions)) |
179 for k, v := range params.Scheduling.Dimensions { | 177 for k, v := range params.Scheduling.Dimensions { |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
215 }, | 213 }, |
216 | 214 |
217 PubsubTopic: topic.String(), | 215 PubsubTopic: topic.String(), |
218 PubsubAuthToken: token, | 216 PubsubAuthToken: token, |
219 | 217 |
220 Tags: tags, | 218 Tags: tags, |
221 }).Context(rpcCtx).Do() | 219 }).Context(rpcCtx).Do() |
222 return | 220 return |
223 }, retry.LogCallback(d, "swarm.Tasks.New")) | 221 }, retry.LogCallback(d, "swarm.Tasks.New")) |
224 if err != nil { | 222 if err != nil { |
225 » » err = errors.Annotate(err).Reason("calling swarm.Tasks.New").Err
() | 223 » » err = errors.Annotate(err, "calling swarm.Tasks.New").Err() |
226 return | 224 return |
227 } | 225 } |
228 | 226 |
229 tok = distributor.Token(rslt.TaskId) | 227 tok = distributor.Token(rslt.TaskId) |
230 return | 228 return |
231 } | 229 } |
232 | 230 |
233 func (d *swarmingDist) Cancel(q *dm.Quest_Desc, tok distributor.Token) error { | 231 func (d *swarmingDist) Cancel(q *dm.Quest_Desc, tok distributor.Token) error { |
234 return retry.Retry(d, retry.Default, func() (err error) { | 232 return retry.Retry(d, retry.Default, func() (err error) { |
235 ctx, _ := context.WithTimeout(d, 10*time.Second) | 233 ctx, _ := context.WithTimeout(d, 10*time.Second) |
(...skipping 119 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
355 return d.GetStatus(q, dat.TaskID) | 353 return d.GetStatus(q, dat.TaskID) |
356 } | 354 } |
357 | 355 |
358 func (*swarmingDist) HandleTaskQueueTask(r *http.Request) ([]*distributor.Notifi
cation, error) { | 356 func (*swarmingDist) HandleTaskQueueTask(r *http.Request) ([]*distributor.Notifi
cation, error) { |
359 return nil, nil | 357 return nil, nil |
360 } | 358 } |
361 | 359 |
362 func (*swarmingDist) Validate(payload string) error { | 360 func (*swarmingDist) Validate(payload string) error { |
363 msg := &sv1.Parameters{} | 361 msg := &sv1.Parameters{} |
364 if err := jsonpb.UnmarshalString(payload, msg); err != nil { | 362 if err := jsonpb.UnmarshalString(payload, msg); err != nil { |
365 » » return errors.Annotate(err).Reason("unmarshal").D("payload", pay
load).Err() | 363 » » return errors.Annotate(err, "unmarshal").InternalReason("payload
(%v)", payload).Err() |
366 } | 364 } |
367 » return errors.Annotate(msg.Normalize()).Reason("normalize").D("payload",
payload).Err() | 365 » return errors.Annotate(msg.Normalize(), "normalize").InternalReason("pay
load(%v)", payload).Err() |
368 } | 366 } |
369 | 367 |
370 func factory(c context.Context, cfg *distributor.Config) (distributor.D, error)
{ | 368 func factory(c context.Context, cfg *distributor.Config) (distributor.D, error)
{ |
371 return &swarmingDist{c, cfg, cfg.Content.(*sv1.Config)}, nil | 369 return &swarmingDist{c, cfg, cfg.Content.(*sv1.Config)}, nil |
372 } | 370 } |
373 | 371 |
374 // AddFactory adds this distributor implementation into the distributor | 372 // AddFactory adds this distributor implementation into the distributor |
375 // Registry. | 373 // Registry. |
376 func AddFactory(m distributor.FactoryMap) { | 374 func AddFactory(m distributor.FactoryMap) { |
377 m[(*sv1.Config)(nil)] = factory | 375 m[(*sv1.Config)(nil)] = factory |
378 } | 376 } |
OLD | NEW |