OLD | NEW |
---|---|
1 package frontend | 1 package frontend |
2 | 2 |
3 import ( | 3 import ( |
4 "bytes" | 4 "bytes" |
5 "encoding/json" | 5 "encoding/json" |
6 "errors" | 6 "errors" |
7 "io" | 7 "io" |
8 "mime/multipart" | 8 "mime/multipart" |
9 "net/http" | 9 "net/http" |
10 "net/url" | |
10 "strconv" | 11 "strconv" |
11 "sync" | 12 "sync" |
13 "time" | |
12 | 14 |
13 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
14 | 16 |
15 "github.com/luci/gae/service/datastore" | 17 "github.com/luci/gae/service/datastore" |
18 "github.com/luci/gae/service/taskqueue" | |
19 "github.com/luci/luci-go/common/logging" | |
16 "github.com/luci/luci-go/server/router" | 20 "github.com/luci/luci-go/server/router" |
17 | 21 |
22 "infra/appengine/test-results/builderstate" | |
18 "infra/appengine/test-results/model" | 23 "infra/appengine/test-results/model" |
19 ) | 24 ) |
20 | 25 |
21 type statusError struct { | 26 type statusError struct { |
22 error | 27 error |
23 code int | 28 code int |
24 } | 29 } |
25 | 30 |
26 // MarshalJSON marshals status error to JSON. | 31 // MarshalJSON marshals status error to JSON. |
27 func (se *statusError) MarshalJSON() ([]byte, error) { | 32 func (se *statusError) MarshalJSON() ([]byte, error) { |
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
139 } | 144 } |
140 defer file.Close() | 145 defer file.Close() |
141 | 146 |
142 switch fh.Filename { | 147 switch fh.Filename { |
143 case "incremental_results.json": | 148 case "incremental_results.json": |
144 var incr model.AggregateResult | 149 var incr model.AggregateResult |
145 if err := json.NewDecoder(file).Decode(&incr); err != nil { | 150 if err := json.NewDecoder(file).Decode(&incr); err != nil { |
146 return statusError{err, http.StatusBadRequest} | 151 return statusError{err, http.StatusBadRequest} |
147 } | 152 } |
148 return updateIncremental(c, &incr) | 153 return updateIncremental(c, &incr) |
154 | |
149 case "full_results.json": | 155 case "full_results.json": |
150 » » return updateFullResults(c, file) | 156 » » bn, data, err := extractBuildNumber(file) |
157 » » if err != nil { | |
158 » » » if err == ErrInvalidBuildNumber { | |
159 » » » » return statusError{err, http.StatusBadRequest} | |
160 » » » } | |
161 » » » return statusError{err, http.StatusInternalServerError} | |
162 » » } | |
163 » » if err := updateFullResults(c, data); err != nil { | |
164 » » » return err | |
165 » » } | |
166 | |
167 » » p := GetUploadParams(c) | |
168 » » wg := sync.WaitGroup{} | |
169 | |
170 » » wg.Add(1) | |
171 » » go func() { | |
172 » » » defer wg.Done() | |
173 » » » if err := builderstate.Update(c, p.Master, p.Builder, p. TestType, time.Now().UTC()); err != nil { | |
174 » » » » logging.WithError(err).Errorf(c, "builderstate u pdate") | |
175 » » » } | |
176 » » }() | |
177 | |
178 » » wg.Add(1) | |
179 » » go func() { | |
180 » » » defer wg.Done() | |
181 » » » if err := taskqueue.Get(c).Add( | |
182 » » » » taskqueue.NewPOSTTask("/internal/monitoring/uplo ad", url.Values{ | |
183 » » » » » "master": {p.Master}, | |
184 » » » » » "builder": {p.Builder}, | |
185 » » » » » "build_number": {strconv.Itoa(bn)}, | |
186 » » » » » "test_type": {p.TestType}, | |
187 » » » » }), | |
188 » » » » defaultQueueName, | |
189 » » » ); err != nil { | |
190 » » » » logging.WithError(err).Errorf(c, "taskqueue add: /internal/monitoring/upload") | |
191 » » » } | |
192 » » }() | |
193 | |
194 » » wg.Wait() | |
195 » » return nil | |
Vadim Sh.
2016/08/16 21:27:12
no reporting errors back to the user?
nishanths
2016/08/16 21:29:54
These errors are not critical to the POST request.
Vadim Sh.
2016/08/16 21:34:46
I have no idea how it all works on high level, I j
| |
196 | |
151 default: | 197 default: |
152 return uploadTestFile(c, file, fh.Filename) | 198 return uploadTestFile(c, file, fh.Filename) |
153 } | 199 } |
154 } | 200 } |
155 | 201 |
156 // uploadTestFile creates a new TestFile from the values in context | 202 // ErrInvalidBuildNumber is returned when the extractBuildNumber fails |
157 // and supplied data, and puts it to the datastore. | 203 // to convert the build number value to an int. |
158 func uploadTestFile(c context.Context, data io.Reader, filename string) error { | 204 var ErrInvalidBuildNumber = errors.New("invalid build_number: cannot convert to int") |
205 | |
206 // extractBuildNumber extracts the value of "build_number" key from | |
207 // the supplied JSON encoded data. The returned io.Reader will have | |
208 // the same contents as the supplied io.Reader. | |
209 // | |
210 // The error is ErrInvalidBuildNumber if the build number value | |
211 // could not be converted to an int. | |
212 func extractBuildNumber(data io.Reader) (int, io.Reader, error) { | |
159 var buf bytes.Buffer | 213 var buf bytes.Buffer |
160 tee := io.TeeReader(data, &buf) | 214 tee := io.TeeReader(data, &buf) |
161 | 215 |
162 aux := struct { | 216 aux := struct { |
163 N string `json:"build_number,omitempty"` | 217 N string `json:"build_number,omitempty"` |
164 }{} | 218 }{} |
165 | |
166 dec := json.NewDecoder(tee) | 219 dec := json.NewDecoder(tee) |
167 if err := dec.Decode(&aux); err != nil { | 220 if err := dec.Decode(&aux); err != nil { |
168 » » return statusError{err, http.StatusInternalServerError} | 221 » » return 0, io.MultiReader(&buf, dec.Buffered()), err |
169 } | 222 } |
170 | 223 |
171 » bn := 0 | 224 » var bn int |
172 | |
173 if aux.N != "" { | 225 if aux.N != "" { |
174 n, err := strconv.Atoi(aux.N) | 226 n, err := strconv.Atoi(aux.N) |
175 if err != nil { | 227 if err != nil { |
176 » » » return statusError{errors.New("invalid build_number"), h ttp.StatusBadRequest} | 228 » » » return 0, io.MultiReader(&buf, dec.Buffered()), ErrInval idBuildNumber |
177 } | 229 } |
178 bn = n | 230 bn = n |
179 } | 231 } |
180 | 232 |
233 return bn, io.MultiReader(&buf, dec.Buffered()), nil | |
234 } | |
235 | |
236 // uploadTestFile creates a new TestFile from the values in context | |
237 // and supplied data, and puts it to the datastore. | |
238 func uploadTestFile(c context.Context, data io.Reader, filename string) error { | |
239 bn, data, err := extractBuildNumber(data) | |
240 if err != nil { | |
241 if err == ErrInvalidBuildNumber { | |
242 return statusError{err, http.StatusBadRequest} | |
243 } | |
244 return statusError{err, http.StatusInternalServerError} | |
245 } | |
246 | |
181 p := GetUploadParams(c) | 247 p := GetUploadParams(c) |
182 tf := model.TestFile{ | 248 tf := model.TestFile{ |
183 Master: p.Master, | 249 Master: p.Master, |
184 Builder: p.Builder, | 250 Builder: p.Builder, |
185 TestType: p.TestType, | 251 TestType: p.TestType, |
186 BuildNumber: model.BuildNum(bn), | 252 BuildNumber: model.BuildNum(bn), |
187 Name: filename, | 253 Name: filename, |
188 » » Data: io.MultiReader(&buf, dec.Buffered()), | 254 » » Data: data, |
189 } | 255 } |
190 if err := tf.PutData(c); err != nil { | 256 if err := tf.PutData(c); err != nil { |
191 return statusError{err, http.StatusInternalServerError} | 257 return statusError{err, http.StatusInternalServerError} |
192 } | 258 } |
193 » return nil | 259 |
260 » return datastore.Get(c).Put(&tf) | |
194 } | 261 } |
195 | 262 |
196 // updateFullResults puts the supplied data as "full_results.json" | 263 // updateFullResults puts the supplied data as "full_results.json" |
197 // to the datastore, and updates corresponding "results.json" and | 264 // to the datastore, and updates corresponding "results.json" and |
198 // "results-small.json" files in the datastore. | 265 // "results-small.json" files in the datastore. |
199 // | 266 // |
200 // The supplied data should unmarshal into model.FullResults. | 267 // The supplied data should unmarshal into model.FullResults. |
201 // Otherwise, an error is returned. | 268 // Otherwise, an error is returned. |
202 func updateFullResults(c context.Context, data io.Reader) error { | 269 func updateFullResults(c context.Context, data io.Reader) error { |
203 buf := &bytes.Buffer{} | 270 buf := &bytes.Buffer{} |
204 tee := io.TeeReader(data, buf) | 271 tee := io.TeeReader(data, buf) |
205 dec := json.NewDecoder(tee) | 272 dec := json.NewDecoder(tee) |
206 | 273 |
207 var f model.FullResult | 274 var f model.FullResult |
208 if err := dec.Decode(&f); err != nil { | 275 if err := dec.Decode(&f); err != nil { |
209 return statusError{err, http.StatusBadRequest} | 276 return statusError{err, http.StatusBadRequest} |
210 } | 277 } |
211 | 278 |
212 » wg := sync.WaitGroup{} | 279 » if err := uploadTestFile(c, io.MultiReader(buf, dec.Buffered()), "full_r esults.json"); err != nil { |
213 » errCh := make(chan error, 2) | 280 » » return statusError{err, http.StatusInternalServerError} |
281 » } | |
214 | 282 |
215 » wg.Add(1) | 283 » incr, err := f.AggregateResult() |
216 » go func() { | 284 » if err != nil { |
217 » » defer wg.Done() | 285 » » return statusError{err, http.StatusBadRequest} |
218 » » errCh <- uploadTestFile( | 286 » } |
219 » » » c, io.MultiReader(buf, dec.Buffered()), "full_results.js on", | 287 » if err := updateIncremental(c, &incr); err != nil { |
220 » » ) | 288 » » return statusError{err, http.StatusInternalServerError} |
221 » }() | 289 » } |
222 | 290 |
223 wg.Add(1) | |
224 go func() { | |
225 defer wg.Done() | |
226 incr, err := f.AggregateResult() | |
227 if err != nil { | |
228 errCh <- statusError{err, http.StatusBadRequest} | |
229 return | |
230 } | |
231 errCh <- updateIncremental(c, &incr) | |
232 }() | |
233 | |
234 wg.Wait() | |
235 close(errCh) | |
236 for err := range errCh { | |
237 if err != nil { | |
238 return err | |
239 } | |
240 } | |
241 return nil | 291 return nil |
242 } | 292 } |
243 | 293 |
244 // updateIncremental gets "results.json" and "results-small.json" | 294 // updateIncremental gets "results.json" and "results-small.json" |
245 // for values in context, merges incr into them, and puts the updated | 295 // for values in context, merges incr into them, and puts the updated |
246 // files to the datastore. | 296 // files to the datastore. |
247 func updateIncremental(c context.Context, incr *model.AggregateResult) error { | 297 func updateIncremental(c context.Context, incr *model.AggregateResult) error { |
248 u := GetUploadParams(c) | 298 u := GetUploadParams(c) |
249 p := model.TestFileParams{ | 299 p := model.TestFileParams{ |
250 Master: u.Master, | 300 Master: u.Master, |
251 Builder: u.Builder, | 301 Builder: u.Builder, |
252 TestType: u.TestType, | 302 TestType: u.TestType, |
253 } | 303 } |
254 | 304 |
255 names := []string{"results.json", "results-small.json"} | 305 names := []string{"results.json", "results-small.json"} |
256 files := make([]struct { | 306 files := make([]struct { |
257 tf *model.TestFile | 307 tf *model.TestFile |
258 aggr *model.AggregateResult | 308 aggr *model.AggregateResult |
259 err error | 309 err error |
260 }, len(names)) | 310 }, len(names)) |
261 | 311 |
262 wg := sync.WaitGroup{} | 312 wg := sync.WaitGroup{} |
263 | 313 |
264 for i, name := range names { | 314 for i, name := range names { |
265 » » i, name := i, name | 315 » » i, name, p := i, name, p |
266 wg.Add(1) | 316 wg.Add(1) |
317 | |
267 go func() { | 318 go func() { |
268 defer wg.Done() | 319 defer wg.Done() |
320 p.Name = name | |
269 tf, err := getTestFileAlt(c, p, u.DeprecatedMaster) | 321 tf, err := getTestFileAlt(c, p, u.DeprecatedMaster) |
270 if err != nil { | 322 if err != nil { |
271 if _, ok := err.(ErrNoMatches); ok { | 323 if _, ok := err.(ErrNoMatches); ok { |
272 files[i].tf = &model.TestFile{ | 324 files[i].tf = &model.TestFile{ |
273 Master: p.Master, | 325 Master: p.Master, |
274 Builder: p.Builder, | 326 Builder: p.Builder, |
275 TestType: p.TestType, | 327 TestType: p.TestType, |
276 BuildNumber: -1, | 328 BuildNumber: -1, |
277 Name: name, | 329 Name: name, |
278 } | 330 } |
279 » » » » } else { | 331 » » » » » return |
280 » » » » » files[i].err = err | |
281 } | 332 } |
333 files[i].err = err | |
282 return | 334 return |
283 } | 335 } |
284 files[i].tf = tf | |
285 if err := tf.GetData(c); err != nil { | 336 if err := tf.GetData(c); err != nil { |
286 files[i].err = err | 337 files[i].err = err |
287 return | 338 return |
288 } | 339 } |
289 » » » if err := json.NewDecoder(tf.Data).Decode(files[i].aggr) ; err != nil { | 340 » » » var a model.AggregateResult |
341 » » » if err := json.NewDecoder(tf.Data).Decode(&a); err != ni l { | |
290 files[i].err = err | 342 files[i].err = err |
291 return | 343 return |
292 } | 344 } |
345 files[i].tf = tf | |
346 files[i].aggr = &a | |
293 }() | 347 }() |
294 } | 348 } |
295 | 349 |
296 wg.Wait() | 350 wg.Wait() |
297 for _, file := range files { | 351 for _, file := range files { |
298 if file.err != nil { | 352 if file.err != nil { |
299 return file.err | 353 return file.err |
300 } | 354 } |
301 } | 355 } |
302 | 356 |
303 » wg = sync.WaitGroup{} | 357 » return datastore.Get(c).RunInTransaction(func(c context.Context) error { |
304 » errs := make([]error, len(files)) | 358 » » wg = sync.WaitGroup{} |
359 » » errs := make([]error, len(files)) | |
305 | 360 |
306 return datastore.Get(c).RunInTransaction(func(c context.Context) error { | |
307 for i, file := range files { | 361 for i, file := range files { |
308 i, file := i, file | 362 i, file := i, file |
309 wg.Add(1) | 363 wg.Add(1) |
310 go func() { | 364 go func() { |
311 defer wg.Done() | 365 defer wg.Done() |
312 errs[i] = updateAggregate(c, file.tf, file.aggr, incr) | 366 errs[i] = updateAggregate(c, file.tf, file.aggr, incr) |
313 }() | 367 }() |
314 } | 368 } |
315 | 369 |
316 wg.Wait() | 370 wg.Wait() |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
349 a, err = getFirstTestFile(c, p.Query()) | 403 a, err = getFirstTestFile(c, p.Query()) |
350 if err == nil { | 404 if err == nil { |
351 a.Master = origMaster | 405 a.Master = origMaster |
352 return a, nil | 406 return a, nil |
353 } | 407 } |
354 | 408 |
355 return nil, err | 409 return nil, err |
356 } | 410 } |
357 | 411 |
358 // updateAggregate updates tf with the result of merging incr into | 412 // updateAggregate updates tf with the result of merging incr into |
359 // aggr. | 413 // aggr, and updates tf in datastore. |
360 func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.Ag gregateResult) error { | 414 func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.Ag gregateResult) error { |
361 if !model.IsAggregateTestFile(tf.Name) { | 415 if !model.IsAggregateTestFile(tf.Name) { |
362 return errors.New("frontend: tf should be an aggregate test file ") | 416 return errors.New("frontend: tf should be an aggregate test file ") |
363 } | 417 } |
364 | 418 |
365 size := model.ResultsSize | 419 size := model.ResultsSize |
366 if tf.Name == "results-small.json" { | 420 if tf.Name == "results-small.json" { |
367 size = model.ResultsSmallSize | 421 size = model.ResultsSmallSize |
368 } | 422 } |
369 | 423 |
(...skipping 19 matching lines...) Expand all Loading... | |
389 b := &bytes.Buffer{} | 443 b := &bytes.Buffer{} |
390 if err := json.NewEncoder(b).Encode(&aggr); err != nil { | 444 if err := json.NewEncoder(b).Encode(&aggr); err != nil { |
391 return statusError{err, http.StatusInternalServerError} | 445 return statusError{err, http.StatusInternalServerError} |
392 } | 446 } |
393 | 447 |
394 tf.Data = b | 448 tf.Data = b |
395 if err := tf.PutData(c); err != nil { | 449 if err := tf.PutData(c); err != nil { |
396 return statusError{err, http.StatusInternalServerError} | 450 return statusError{err, http.StatusInternalServerError} |
397 } | 451 } |
398 | 452 |
453 if err := datastore.Get(c).Put(tf); err != nil { | |
454 return statusError{err, http.StatusInternalServerError} | |
455 } | |
456 if err := deleteKeys(c, tf.OldDataKeys); err != nil { | |
457 logging.Fields{ | |
458 logging.ErrorKey: err, | |
459 "keys": tf.OldDataKeys, | |
460 }.Errorf(c, "upload: failed to delete keys") | |
461 } | |
399 return nil | 462 return nil |
400 } | 463 } |
464 | |
465 func deleteKeys(c context.Context, k []*datastore.Key) error { | |
466 if len(k) == 0 { | |
467 return nil | |
468 } | |
469 | |
470 keys := make([]string, 0, len(k)) | |
471 for _, key := range k { | |
472 keys = append(keys, key.Encode()) | |
473 } | |
474 | |
475 payload, err := json.Marshal(struct { | |
476 Keys []string `json:"keys"` | |
477 }{ | |
478 keys, | |
479 }) | |
480 if err != nil { | |
481 return err | |
482 } | |
483 | |
484 h := make(http.Header) | |
485 h.Set("Content-Type", "application/json") | |
486 | |
487 logging.Fields{ | |
488 "keys": keys, | |
489 }.Infof(c, "deleteKeys: enqueing") | |
490 | |
491 return taskqueue.Get(c).Add(&taskqueue.Task{ | |
492 Path: "/internal/delete", | |
493 Payload: payload, | |
494 Header: h, | |
495 Method: "POST", | |
496 Delay: time.Duration(30) * time.Minute, | |
497 }, deleteKeysQueueName) | |
498 } | |
OLD | NEW |