OLD | NEW |
---|---|
1 package frontend | 1 package frontend |
2 | 2 |
3 import ( | 3 import ( |
4 "bytes" | 4 "bytes" |
5 "encoding/json" | 5 "encoding/json" |
6 "errors" | 6 "errors" |
7 "io" | 7 "io" |
8 "mime/multipart" | 8 "mime/multipart" |
9 "net/http" | 9 "net/http" |
10 "net/url" | |
10 "strconv" | 11 "strconv" |
11 "sync" | 12 "sync" |
13 "time" | |
12 | 14 |
13 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
14 | 16 |
15 "github.com/luci/gae/service/datastore" | 17 "github.com/luci/gae/service/datastore" |
18 "github.com/luci/gae/service/taskqueue" | |
19 "github.com/luci/luci-go/common/logging" | |
16 "github.com/luci/luci-go/server/router" | 20 "github.com/luci/luci-go/server/router" |
17 | 21 |
22 "infra/appengine/test-results/builderstate" | |
18 "infra/appengine/test-results/model" | 23 "infra/appengine/test-results/model" |
19 ) | 24 ) |
20 | 25 |
21 type statusError struct { | 26 type statusError struct { |
22 error | 27 error |
23 code int | 28 code int |
24 } | 29 } |
25 | 30 |
26 // MarshalJSON marshals status error to JSON. | 31 // MarshalJSON marshals status error to JSON. |
27 func (se *statusError) MarshalJSON() ([]byte, error) { | 32 func (se *statusError) MarshalJSON() ([]byte, error) { |
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
139 } | 144 } |
140 defer file.Close() | 145 defer file.Close() |
141 | 146 |
142 switch fh.Filename { | 147 switch fh.Filename { |
143 case "incremental_results.json": | 148 case "incremental_results.json": |
144 var incr model.AggregateResult | 149 var incr model.AggregateResult |
145 if err := json.NewDecoder(file).Decode(&incr); err != nil { | 150 if err := json.NewDecoder(file).Decode(&incr); err != nil { |
146 return statusError{err, http.StatusBadRequest} | 151 return statusError{err, http.StatusBadRequest} |
147 } | 152 } |
148 return updateIncremental(c, &incr) | 153 return updateIncremental(c, &incr) |
154 | |
149 case "full_results.json": | 155 case "full_results.json": |
150 » » return updateFullResults(c, file) | 156 » » bn, data, err := extractBuildNumber(file) |
157 » » if err != nil { | |
158 » » » if err == ErrInvalidBuildNumber { | |
159 » » » » return statusError{err, http.StatusBadRequest} | |
160 » » » } | |
161 » » » return statusError{err, http.StatusInternalServerError} | |
162 » » } | |
163 » » if err := updateFullResults(c, data); err != nil { | |
164 » » » return err | |
165 » » } | |
166 | |
167 » » p := GetUploadParams(c) | |
168 » » go builderstate.Update(c, p.Master, p.Builder, p.TestType, time. Now().UTC()) | |
Vadim Sh.
2016/08/16 18:56:13
you need to wait for these goroutines to finish so
nishanths
2016/08/16 20:35:13
Done.
| |
169 » » go taskqueue.Get(c).Add( | |
170 » » » taskqueue.NewPOSTTask("/internal/monitoring/upload", url .Values{ | |
Vadim Sh.
2016/08/16 18:56:13
does this handler exist?
nishanths
2016/08/16 20:35:13
It exists in the python implementation.
| |
171 » » » » "master": {p.Master}, | |
172 » » » » "builder": {p.Builder}, | |
173 » » » » "build_number": {strconv.Itoa(bn)}, | |
174 » » » » "test_type": {p.TestType}, | |
175 » » » }), | |
176 » » » defaultQueueName, | |
177 » » ) | |
178 » » return nil | |
179 | |
151 default: | 180 default: |
152 return uploadTestFile(c, file, fh.Filename) | 181 return uploadTestFile(c, file, fh.Filename) |
153 } | 182 } |
154 } | 183 } |
155 | 184 |
156 // uploadTestFile creates a new TestFile from the values in context | 185 // ErrInvalidBuildNumber is returned when the extractBuildNumber fails |
157 // and supplied data, and puts it to the datastore. | 186 // to convert the build number value to an int. |
158 func uploadTestFile(c context.Context, data io.Reader, filename string) error { | 187 var ErrInvalidBuildNumber = errors.New("invalid build_number: cannot convert to int") |
188 | |
189 // extractBuildNumber extracts the value of "build_number" key from | |
190 // the supplied JSON encoded data. The returned io.Reader will have | |
191 // the same contents as the supplied io.Reader. | |
192 // | |
193 // The error is ErrInvalidBuildNumber if the build number value | |
194 // could not be converted to an int. | |
195 func extractBuildNumber(data io.Reader) (int, io.Reader, error) { | |
159 var buf bytes.Buffer | 196 var buf bytes.Buffer |
160 tee := io.TeeReader(data, &buf) | 197 tee := io.TeeReader(data, &buf) |
161 | 198 |
162 aux := struct { | 199 aux := struct { |
163 N string `json:"build_number,omitempty"` | 200 N string `json:"build_number,omitempty"` |
164 }{} | 201 }{} |
165 | |
166 dec := json.NewDecoder(tee) | 202 dec := json.NewDecoder(tee) |
167 if err := dec.Decode(&aux); err != nil { | 203 if err := dec.Decode(&aux); err != nil { |
168 » » return statusError{err, http.StatusInternalServerError} | 204 » » return 0, io.MultiReader(&buf, dec.Buffered()), err |
169 } | 205 } |
170 | 206 |
171 » bn := 0 | 207 » var bn int |
172 | |
173 if aux.N != "" { | 208 if aux.N != "" { |
174 n, err := strconv.Atoi(aux.N) | 209 n, err := strconv.Atoi(aux.N) |
175 if err != nil { | 210 if err != nil { |
176 » » » return statusError{errors.New("invalid build_number"), h ttp.StatusBadRequest} | 211 » » » return 0, io.MultiReader(&buf, dec.Buffered()), ErrInval idBuildNumber |
177 } | 212 } |
178 bn = n | 213 bn = n |
179 } | 214 } |
180 | 215 |
216 return bn, io.MultiReader(&buf, dec.Buffered()), nil | |
217 } | |
218 | |
219 // uploadTestFile creates a new TestFile from the values in context | |
220 // and supplied data, and puts it to the datastore. | |
221 func uploadTestFile(c context.Context, data io.Reader, filename string) error { | |
222 bn, data, err := extractBuildNumber(data) | |
223 if err != nil { | |
224 if err == ErrInvalidBuildNumber { | |
225 return statusError{err, http.StatusBadRequest} | |
226 } | |
227 return statusError{err, http.StatusInternalServerError} | |
228 } | |
229 | |
181 p := GetUploadParams(c) | 230 p := GetUploadParams(c) |
182 tf := model.TestFile{ | 231 tf := model.TestFile{ |
183 Master: p.Master, | 232 Master: p.Master, |
184 Builder: p.Builder, | 233 Builder: p.Builder, |
185 TestType: p.TestType, | 234 TestType: p.TestType, |
186 BuildNumber: model.BuildNum(bn), | 235 BuildNumber: model.BuildNum(bn), |
187 Name: filename, | 236 Name: filename, |
188 » » Data: io.MultiReader(&buf, dec.Buffered()), | 237 » » Data: data, |
189 } | 238 } |
190 if err := tf.PutData(c); err != nil { | 239 if err := tf.PutData(c); err != nil { |
191 return statusError{err, http.StatusInternalServerError} | 240 return statusError{err, http.StatusInternalServerError} |
192 } | 241 } |
193 » return nil | 242 |
243 » return datastore.Get(c).Put(&tf) | |
194 } | 244 } |
195 | 245 |
196 // updateFullResults puts the supplied data as "full_results.json" | 246 // updateFullResults puts the supplied data as "full_results.json" |
197 // to the datastore, and updates corresponding "results.json" and | 247 // to the datastore, and updates corresponding "results.json" and |
198 // "results-small.json" files in the datastore. | 248 // "results-small.json" files in the datastore. |
199 // | 249 // |
200 // The supplied data should unmarshal into model.FullResults. | 250 // The supplied data should unmarshal into model.FullResults. |
201 // Otherwise, an error is returned. | 251 // Otherwise, an error is returned. |
202 func updateFullResults(c context.Context, data io.Reader) error { | 252 func updateFullResults(c context.Context, data io.Reader) error { |
203 buf := &bytes.Buffer{} | 253 buf := &bytes.Buffer{} |
204 tee := io.TeeReader(data, buf) | 254 tee := io.TeeReader(data, buf) |
205 dec := json.NewDecoder(tee) | 255 dec := json.NewDecoder(tee) |
206 | 256 |
207 var f model.FullResult | 257 var f model.FullResult |
208 if err := dec.Decode(&f); err != nil { | 258 if err := dec.Decode(&f); err != nil { |
209 return statusError{err, http.StatusBadRequest} | 259 return statusError{err, http.StatusBadRequest} |
210 } | 260 } |
211 | 261 |
212 wg := sync.WaitGroup{} | 262 wg := sync.WaitGroup{} |
213 errCh := make(chan error, 2) | 263 errCh := make(chan error, 2) |
214 | 264 |
215 wg.Add(1) | 265 wg.Add(1) |
216 go func() { | 266 go func() { |
217 defer wg.Done() | 267 defer wg.Done() |
218 » » errCh <- uploadTestFile( | 268 » » errCh <- uploadTestFile(c, io.MultiReader(buf, dec.Buffered()), "full_results.json") |
219 » » » c, io.MultiReader(buf, dec.Buffered()), "full_results.js on", | |
220 » » ) | |
221 }() | 269 }() |
222 | 270 |
223 wg.Add(1) | 271 wg.Add(1) |
224 go func() { | 272 go func() { |
225 defer wg.Done() | 273 defer wg.Done() |
226 incr, err := f.AggregateResult() | 274 incr, err := f.AggregateResult() |
227 if err != nil { | 275 if err != nil { |
228 errCh <- statusError{err, http.StatusBadRequest} | 276 errCh <- statusError{err, http.StatusBadRequest} |
229 return | 277 return |
230 } | 278 } |
(...skipping 24 matching lines...) Expand all Loading... | |
255 names := []string{"results.json", "results-small.json"} | 303 names := []string{"results.json", "results-small.json"} |
256 files := make([]struct { | 304 files := make([]struct { |
257 tf *model.TestFile | 305 tf *model.TestFile |
258 aggr *model.AggregateResult | 306 aggr *model.AggregateResult |
259 err error | 307 err error |
260 }, len(names)) | 308 }, len(names)) |
261 | 309 |
262 wg := sync.WaitGroup{} | 310 wg := sync.WaitGroup{} |
263 | 311 |
264 for i, name := range names { | 312 for i, name := range names { |
265 » » i, name := i, name | 313 » » i, name, p := i, name, p |
266 wg.Add(1) | 314 wg.Add(1) |
315 | |
267 go func() { | 316 go func() { |
268 defer wg.Done() | 317 defer wg.Done() |
318 p.Name = name | |
269 tf, err := getTestFileAlt(c, p, u.DeprecatedMaster) | 319 tf, err := getTestFileAlt(c, p, u.DeprecatedMaster) |
270 if err != nil { | 320 if err != nil { |
271 if _, ok := err.(ErrNoMatches); ok { | 321 if _, ok := err.(ErrNoMatches); ok { |
272 files[i].tf = &model.TestFile{ | 322 files[i].tf = &model.TestFile{ |
273 Master: p.Master, | 323 Master: p.Master, |
274 Builder: p.Builder, | 324 Builder: p.Builder, |
275 TestType: p.TestType, | 325 TestType: p.TestType, |
276 BuildNumber: -1, | 326 BuildNumber: -1, |
277 Name: name, | 327 Name: name, |
278 } | 328 } |
279 » » » » } else { | 329 » » » » » return |
280 » » » » » files[i].err = err | |
281 } | 330 } |
331 files[i].err = err | |
282 return | 332 return |
283 } | 333 } |
284 files[i].tf = tf | |
285 if err := tf.GetData(c); err != nil { | 334 if err := tf.GetData(c); err != nil { |
286 files[i].err = err | 335 files[i].err = err |
287 return | 336 return |
288 } | 337 } |
289 » » » if err := json.NewDecoder(tf.Data).Decode(files[i].aggr) ; err != nil { | 338 » » » var a model.AggregateResult |
339 » » » if err := json.NewDecoder(tf.Data).Decode(&a); err != ni l { | |
290 files[i].err = err | 340 files[i].err = err |
291 return | 341 return |
292 } | 342 } |
343 files[i].tf = tf | |
344 files[i].aggr = &a | |
293 }() | 345 }() |
294 } | 346 } |
295 | 347 |
296 wg.Wait() | 348 wg.Wait() |
297 for _, file := range files { | 349 for _, file := range files { |
298 if file.err != nil { | 350 if file.err != nil { |
299 return file.err | 351 return file.err |
300 } | 352 } |
301 } | 353 } |
302 | 354 |
303 » wg = sync.WaitGroup{} | 355 » return datastore.Get(c).RunInTransaction(func(c context.Context) error { |
304 » errs := make([]error, len(files)) | 356 » » wg = sync.WaitGroup{} |
357 » » errs := make([]error, len(files)) | |
305 | 358 |
306 return datastore.Get(c).RunInTransaction(func(c context.Context) error { | |
307 for i, file := range files { | 359 for i, file := range files { |
308 i, file := i, file | 360 i, file := i, file |
309 wg.Add(1) | 361 wg.Add(1) |
310 go func() { | 362 go func() { |
311 defer wg.Done() | 363 defer wg.Done() |
312 errs[i] = updateAggregate(c, file.tf, file.aggr, incr) | 364 errs[i] = updateAggregate(c, file.tf, file.aggr, incr) |
313 }() | 365 }() |
314 } | 366 } |
315 | 367 |
316 wg.Wait() | 368 wg.Wait() |
317 // Prioritize returning http.StatusInternalServerError status | 369 // Prioritize returning http.StatusInternalServerError status |
318 // code errors over other errors. | 370 // code errors over other errors. |
319 var e error | 371 var e error |
320 for _, err := range errs { | 372 for _, err := range errs { |
321 se, ok := err.(statusError) | 373 se, ok := err.(statusError) |
322 if ok && se.code == http.StatusInternalServerError { | 374 if ok && se.code == http.StatusInternalServerError { |
323 return se | 375 return se |
Vadim Sh.
2016/08/16 18:56:13
beware that RunInTransaction can return an error o
nishanths
2016/08/16 20:35:13
in that case, the error will be propagated at line
| |
324 } | 376 } |
325 e = err | 377 e = err |
326 } | 378 } |
327 return e | 379 return e |
328 }, &datastore.TransactionOptions{XG: true}) | 380 }, &datastore.TransactionOptions{XG: true}) |
Vadim Sh.
2016/08/16 18:56:13
how many entity groups does this transaction modif
nishanths
2016/08/16 20:35:13
Two entity groups. (1) TestFile (2) DataEntry
| |
329 } | 381 } |
330 | 382 |
331 // getTestFileAlt returns the the first TestFile in the datastore for | 383 // getTestFileAlt returns the the first TestFile in the datastore for |
332 // the query formed by calling p.Query(). | 384 // the query formed by calling p.Query(). |
333 // | 385 // |
334 // The function tries to find the first TestFile using p. If no such TestFile | 386 // The function tries to find the first TestFile using p. If no such TestFile |
335 // exists the function sets p.Master to altMaster and tries again. | 387 // exists the function sets p.Master to altMaster and tries again. |
336 // If altMaster is empty, the function does not perform the additional try. | 388 // If altMaster is empty, the function does not perform the additional try. |
337 func getTestFileAlt(c context.Context, p model.TestFileParams, altMaster string) (ret *model.TestFile, err error) { | 389 func getTestFileAlt(c context.Context, p model.TestFileParams, altMaster string) (ret *model.TestFile, err error) { |
338 a, err := getFirstTestFile(c, p.Query()) | 390 a, err := getFirstTestFile(c, p.Query()) |
(...skipping 10 matching lines...) Expand all Loading... | |
349 a, err = getFirstTestFile(c, p.Query()) | 401 a, err = getFirstTestFile(c, p.Query()) |
350 if err == nil { | 402 if err == nil { |
351 a.Master = origMaster | 403 a.Master = origMaster |
352 return a, nil | 404 return a, nil |
353 } | 405 } |
354 | 406 |
355 return nil, err | 407 return nil, err |
356 } | 408 } |
357 | 409 |
358 // updateAggregate updates tf with the result of merging incr into | 410 // updateAggregate updates tf with the result of merging incr into |
359 // aggr. | 411 // aggr, and updates tf in datastore. |
360 func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.Ag gregateResult) error { | 412 func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.Ag gregateResult) error { |
361 if !model.IsAggregateTestFile(tf.Name) { | 413 if !model.IsAggregateTestFile(tf.Name) { |
362 return errors.New("frontend: tf should be an aggregate test file ") | 414 return errors.New("frontend: tf should be an aggregate test file ") |
363 } | 415 } |
364 | 416 |
365 size := model.ResultsSize | 417 size := model.ResultsSize |
366 if tf.Name == "results-small.json" { | 418 if tf.Name == "results-small.json" { |
367 size = model.ResultsSmallSize | 419 size = model.ResultsSmallSize |
368 } | 420 } |
369 | 421 |
(...skipping 19 matching lines...) Expand all Loading... | |
389 b := &bytes.Buffer{} | 441 b := &bytes.Buffer{} |
390 if err := json.NewEncoder(b).Encode(&aggr); err != nil { | 442 if err := json.NewEncoder(b).Encode(&aggr); err != nil { |
391 return statusError{err, http.StatusInternalServerError} | 443 return statusError{err, http.StatusInternalServerError} |
392 } | 444 } |
393 | 445 |
394 tf.Data = b | 446 tf.Data = b |
395 if err := tf.PutData(c); err != nil { | 447 if err := tf.PutData(c); err != nil { |
396 return statusError{err, http.StatusInternalServerError} | 448 return statusError{err, http.StatusInternalServerError} |
397 } | 449 } |
398 | 450 |
451 if err := datastore.Get(c).Put(tf); err != nil { | |
452 return err | |
Vadim Sh.
2016/08/16 18:56:13
statusError{err, http.StatusInternalServerError} ?
nishanths
2016/08/16 20:35:13
Done.
| |
453 } | |
454 if err := deleteKeys(c, tf.OldDataKeys); err != nil { | |
Vadim Sh.
2016/08/16 18:56:13
ignoring this error?
nishanths
2016/08/16 20:35:13
yes, only logging the error.
Returning an HTTP e
| |
455 logging.Fields{ | |
456 logging.ErrorKey: err, | |
457 "keys": tf.OldDataKeys, | |
458 }.Errorf(c, "upload: failed to delete keys") | |
459 } | |
399 return nil | 460 return nil |
400 } | 461 } |
462 | |
463 func deleteKeys(c context.Context, k []*datastore.Key) error { | |
464 if len(k) == 0 { | |
465 return nil | |
466 } | |
467 | |
468 keys := make([]string, 0, len(k)) | |
469 for _, key := range k { | |
470 keys = append(keys, key.Encode()) | |
471 } | |
472 | |
473 payload, err := json.Marshal(struct { | |
474 Keys []string `json:"keys"` | |
475 }{ | |
476 keys, | |
477 }) | |
478 if err != nil { | |
479 return err | |
480 } | |
481 | |
482 h := make(http.Header) | |
483 h.Set("Content-Type", "application/json") | |
484 | |
485 logging.Fields{ | |
486 "keys": keys, | |
487 }.Infof(c, "deleteKeys: enqueing") | |
488 | |
489 return taskqueue.Get(c).Add(&taskqueue.Task{ | |
490 Path: "/internal/delete", | |
491 Payload: payload, | |
492 Header: h, | |
493 Method: "POST", | |
494 Delay: time.Duration(30) * time.Minute, | |
495 }, deleteKeysQueueName) | |
496 } | |
OLD | NEW |