Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 package frontend | 1 package frontend |
| 2 | 2 |
| 3 import ( | 3 import ( |
| 4 "bytes" | 4 "bytes" |
| 5 "encoding/json" | 5 "encoding/json" |
| 6 "errors" | 6 "errors" |
| 7 "io" | 7 "io" |
| 8 "mime/multipart" | 8 "mime/multipart" |
| 9 "net/http" | 9 "net/http" |
| 10 "net/url" | |
| 10 "strconv" | 11 "strconv" |
| 11 "sync" | 12 "sync" |
| 13 "time" | |
| 12 | 14 |
| 13 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
| 14 | 16 |
| 15 "github.com/luci/gae/service/datastore" | 17 "github.com/luci/gae/service/datastore" |
| 18 "github.com/luci/gae/service/taskqueue" | |
| 19 "github.com/luci/luci-go/common/logging" | |
| 16 "github.com/luci/luci-go/server/router" | 20 "github.com/luci/luci-go/server/router" |
| 17 | 21 |
| 22 "infra/appengine/test-results/builderstate" | |
| 18 "infra/appengine/test-results/model" | 23 "infra/appengine/test-results/model" |
| 19 ) | 24 ) |
| 20 | 25 |
| 21 type statusError struct { | 26 type statusError struct { |
| 22 error | 27 error |
| 23 code int | 28 code int |
| 24 } | 29 } |
| 25 | 30 |
| 26 // MarshalJSON marshals status error to JSON. | 31 // MarshalJSON marshals status error to JSON. |
| 27 func (se *statusError) MarshalJSON() ([]byte, error) { | 32 func (se *statusError) MarshalJSON() ([]byte, error) { |
| (...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 139 } | 144 } |
| 140 defer file.Close() | 145 defer file.Close() |
| 141 | 146 |
| 142 switch fh.Filename { | 147 switch fh.Filename { |
| 143 case "incremental_results.json": | 148 case "incremental_results.json": |
| 144 var incr model.AggregateResult | 149 var incr model.AggregateResult |
| 145 if err := json.NewDecoder(file).Decode(&incr); err != nil { | 150 if err := json.NewDecoder(file).Decode(&incr); err != nil { |
| 146 return statusError{err, http.StatusBadRequest} | 151 return statusError{err, http.StatusBadRequest} |
| 147 } | 152 } |
| 148 return updateIncremental(c, &incr) | 153 return updateIncremental(c, &incr) |
| 154 | |
| 149 case "full_results.json": | 155 case "full_results.json": |
| 150 » » return updateFullResults(c, file) | 156 » » bn, data, err := extractBuildNumber(file) |
| 157 » » if err != nil { | |
| 158 » » » if err == ErrInvalidBuildNumber { | |
| 159 » » » » return statusError{err, http.StatusBadRequest} | |
| 160 » » » } | |
| 161 » » » return statusError{err, http.StatusInternalServerError} | |
| 162 » » } | |
| 163 » » if err := updateFullResults(c, data); err != nil { | |
| 164 » » » return err | |
| 165 » » } | |
| 166 | |
| 167 » » p := GetUploadParams(c) | |
| 168 » » go builderstate.Update(c, p.Master, p.Builder, p.TestType, time. Now().UTC()) | |
|
Vadim Sh.
2016/08/16 18:56:13
you need to wait for these goroutines to finish so
nishanths
2016/08/16 20:35:13
Done.
| |
| 169 » » go taskqueue.Get(c).Add( | |
| 170 » » » taskqueue.NewPOSTTask("/internal/monitoring/upload", url .Values{ | |
|
Vadim Sh.
2016/08/16 18:56:13
does this handler exist?
nishanths
2016/08/16 20:35:13
It exists in the python implementation.
| |
| 171 » » » » "master": {p.Master}, | |
| 172 » » » » "builder": {p.Builder}, | |
| 173 » » » » "build_number": {strconv.Itoa(bn)}, | |
| 174 » » » » "test_type": {p.TestType}, | |
| 175 » » » }), | |
| 176 » » » defaultQueueName, | |
| 177 » » ) | |
| 178 » » return nil | |
| 179 | |
| 151 default: | 180 default: |
| 152 return uploadTestFile(c, file, fh.Filename) | 181 return uploadTestFile(c, file, fh.Filename) |
| 153 } | 182 } |
| 154 } | 183 } |
| 155 | 184 |
| 156 // uploadTestFile creates a new TestFile from the values in context | 185 // ErrInvalidBuildNumber is returned when the extractBuildNumber fails |
| 157 // and supplied data, and puts it to the datastore. | 186 // to convert the build number value to an int. |
| 158 func uploadTestFile(c context.Context, data io.Reader, filename string) error { | 187 var ErrInvalidBuildNumber = errors.New("invalid build_number: cannot convert to int") |
| 188 | |
| 189 // extractBuildNumber extracts the value of "build_number" key from | |
| 190 // the supplied JSON encoded data. The returned io.Reader will have | |
| 191 // the same contents as the supplied io.Reader. | |
| 192 // | |
| 193 // The error is ErrInvalidBuildNumber if the build number value | |
| 194 // could not be converted to an int. | |
| 195 func extractBuildNumber(data io.Reader) (int, io.Reader, error) { | |
| 159 var buf bytes.Buffer | 196 var buf bytes.Buffer |
| 160 tee := io.TeeReader(data, &buf) | 197 tee := io.TeeReader(data, &buf) |
| 161 | 198 |
| 162 aux := struct { | 199 aux := struct { |
| 163 N string `json:"build_number,omitempty"` | 200 N string `json:"build_number,omitempty"` |
| 164 }{} | 201 }{} |
| 165 | |
| 166 dec := json.NewDecoder(tee) | 202 dec := json.NewDecoder(tee) |
| 167 if err := dec.Decode(&aux); err != nil { | 203 if err := dec.Decode(&aux); err != nil { |
| 168 » » return statusError{err, http.StatusInternalServerError} | 204 » » return 0, io.MultiReader(&buf, dec.Buffered()), err |
| 169 } | 205 } |
| 170 | 206 |
| 171 » bn := 0 | 207 » var bn int |
| 172 | |
| 173 if aux.N != "" { | 208 if aux.N != "" { |
| 174 n, err := strconv.Atoi(aux.N) | 209 n, err := strconv.Atoi(aux.N) |
| 175 if err != nil { | 210 if err != nil { |
| 176 » » » return statusError{errors.New("invalid build_number"), h ttp.StatusBadRequest} | 211 » » » return 0, io.MultiReader(&buf, dec.Buffered()), ErrInval idBuildNumber |
| 177 } | 212 } |
| 178 bn = n | 213 bn = n |
| 179 } | 214 } |
| 180 | 215 |
| 216 return bn, io.MultiReader(&buf, dec.Buffered()), nil | |
| 217 } | |
| 218 | |
| 219 // uploadTestFile creates a new TestFile from the values in context | |
| 220 // and supplied data, and puts it to the datastore. | |
| 221 func uploadTestFile(c context.Context, data io.Reader, filename string) error { | |
| 222 bn, data, err := extractBuildNumber(data) | |
| 223 if err != nil { | |
| 224 if err == ErrInvalidBuildNumber { | |
| 225 return statusError{err, http.StatusBadRequest} | |
| 226 } | |
| 227 return statusError{err, http.StatusInternalServerError} | |
| 228 } | |
| 229 | |
| 181 p := GetUploadParams(c) | 230 p := GetUploadParams(c) |
| 182 tf := model.TestFile{ | 231 tf := model.TestFile{ |
| 183 Master: p.Master, | 232 Master: p.Master, |
| 184 Builder: p.Builder, | 233 Builder: p.Builder, |
| 185 TestType: p.TestType, | 234 TestType: p.TestType, |
| 186 BuildNumber: model.BuildNum(bn), | 235 BuildNumber: model.BuildNum(bn), |
| 187 Name: filename, | 236 Name: filename, |
| 188 » » Data: io.MultiReader(&buf, dec.Buffered()), | 237 » » Data: data, |
| 189 } | 238 } |
| 190 if err := tf.PutData(c); err != nil { | 239 if err := tf.PutData(c); err != nil { |
| 191 return statusError{err, http.StatusInternalServerError} | 240 return statusError{err, http.StatusInternalServerError} |
| 192 } | 241 } |
| 193 » return nil | 242 |
| 243 » return datastore.Get(c).Put(&tf) | |
| 194 } | 244 } |
| 195 | 245 |
| 196 // updateFullResults puts the supplied data as "full_results.json" | 246 // updateFullResults puts the supplied data as "full_results.json" |
| 197 // to the datastore, and updates corresponding "results.json" and | 247 // to the datastore, and updates corresponding "results.json" and |
| 198 // "results-small.json" files in the datastore. | 248 // "results-small.json" files in the datastore. |
| 199 // | 249 // |
| 200 // The supplied data should unmarshal into model.FullResults. | 250 // The supplied data should unmarshal into model.FullResults. |
| 201 // Otherwise, an error is returned. | 251 // Otherwise, an error is returned. |
| 202 func updateFullResults(c context.Context, data io.Reader) error { | 252 func updateFullResults(c context.Context, data io.Reader) error { |
| 203 buf := &bytes.Buffer{} | 253 buf := &bytes.Buffer{} |
| 204 tee := io.TeeReader(data, buf) | 254 tee := io.TeeReader(data, buf) |
| 205 dec := json.NewDecoder(tee) | 255 dec := json.NewDecoder(tee) |
| 206 | 256 |
| 207 var f model.FullResult | 257 var f model.FullResult |
| 208 if err := dec.Decode(&f); err != nil { | 258 if err := dec.Decode(&f); err != nil { |
| 209 return statusError{err, http.StatusBadRequest} | 259 return statusError{err, http.StatusBadRequest} |
| 210 } | 260 } |
| 211 | 261 |
| 212 wg := sync.WaitGroup{} | 262 wg := sync.WaitGroup{} |
| 213 errCh := make(chan error, 2) | 263 errCh := make(chan error, 2) |
| 214 | 264 |
| 215 wg.Add(1) | 265 wg.Add(1) |
| 216 go func() { | 266 go func() { |
| 217 defer wg.Done() | 267 defer wg.Done() |
| 218 » » errCh <- uploadTestFile( | 268 » » errCh <- uploadTestFile(c, io.MultiReader(buf, dec.Buffered()), "full_results.json") |
| 219 » » » c, io.MultiReader(buf, dec.Buffered()), "full_results.js on", | |
| 220 » » ) | |
| 221 }() | 269 }() |
| 222 | 270 |
| 223 wg.Add(1) | 271 wg.Add(1) |
| 224 go func() { | 272 go func() { |
| 225 defer wg.Done() | 273 defer wg.Done() |
| 226 incr, err := f.AggregateResult() | 274 incr, err := f.AggregateResult() |
| 227 if err != nil { | 275 if err != nil { |
| 228 errCh <- statusError{err, http.StatusBadRequest} | 276 errCh <- statusError{err, http.StatusBadRequest} |
| 229 return | 277 return |
| 230 } | 278 } |
| (...skipping 24 matching lines...) Expand all Loading... | |
| 255 names := []string{"results.json", "results-small.json"} | 303 names := []string{"results.json", "results-small.json"} |
| 256 files := make([]struct { | 304 files := make([]struct { |
| 257 tf *model.TestFile | 305 tf *model.TestFile |
| 258 aggr *model.AggregateResult | 306 aggr *model.AggregateResult |
| 259 err error | 307 err error |
| 260 }, len(names)) | 308 }, len(names)) |
| 261 | 309 |
| 262 wg := sync.WaitGroup{} | 310 wg := sync.WaitGroup{} |
| 263 | 311 |
| 264 for i, name := range names { | 312 for i, name := range names { |
| 265 » » i, name := i, name | 313 » » i, name, p := i, name, p |
| 266 wg.Add(1) | 314 wg.Add(1) |
| 315 | |
| 267 go func() { | 316 go func() { |
| 268 defer wg.Done() | 317 defer wg.Done() |
| 318 p.Name = name | |
| 269 tf, err := getTestFileAlt(c, p, u.DeprecatedMaster) | 319 tf, err := getTestFileAlt(c, p, u.DeprecatedMaster) |
| 270 if err != nil { | 320 if err != nil { |
| 271 if _, ok := err.(ErrNoMatches); ok { | 321 if _, ok := err.(ErrNoMatches); ok { |
| 272 files[i].tf = &model.TestFile{ | 322 files[i].tf = &model.TestFile{ |
| 273 Master: p.Master, | 323 Master: p.Master, |
| 274 Builder: p.Builder, | 324 Builder: p.Builder, |
| 275 TestType: p.TestType, | 325 TestType: p.TestType, |
| 276 BuildNumber: -1, | 326 BuildNumber: -1, |
| 277 Name: name, | 327 Name: name, |
| 278 } | 328 } |
| 279 » » » » } else { | 329 » » » » » return |
| 280 » » » » » files[i].err = err | |
| 281 } | 330 } |
| 331 files[i].err = err | |
| 282 return | 332 return |
| 283 } | 333 } |
| 284 files[i].tf = tf | |
| 285 if err := tf.GetData(c); err != nil { | 334 if err := tf.GetData(c); err != nil { |
| 286 files[i].err = err | 335 files[i].err = err |
| 287 return | 336 return |
| 288 } | 337 } |
| 289 » » » if err := json.NewDecoder(tf.Data).Decode(files[i].aggr) ; err != nil { | 338 » » » var a model.AggregateResult |
| 339 » » » if err := json.NewDecoder(tf.Data).Decode(&a); err != ni l { | |
| 290 files[i].err = err | 340 files[i].err = err |
| 291 return | 341 return |
| 292 } | 342 } |
| 343 files[i].tf = tf | |
| 344 files[i].aggr = &a | |
| 293 }() | 345 }() |
| 294 } | 346 } |
| 295 | 347 |
| 296 wg.Wait() | 348 wg.Wait() |
| 297 for _, file := range files { | 349 for _, file := range files { |
| 298 if file.err != nil { | 350 if file.err != nil { |
| 299 return file.err | 351 return file.err |
| 300 } | 352 } |
| 301 } | 353 } |
| 302 | 354 |
| 303 » wg = sync.WaitGroup{} | 355 » return datastore.Get(c).RunInTransaction(func(c context.Context) error { |
| 304 » errs := make([]error, len(files)) | 356 » » wg = sync.WaitGroup{} |
| 357 » » errs := make([]error, len(files)) | |
| 305 | 358 |
| 306 return datastore.Get(c).RunInTransaction(func(c context.Context) error { | |
| 307 for i, file := range files { | 359 for i, file := range files { |
| 308 i, file := i, file | 360 i, file := i, file |
| 309 wg.Add(1) | 361 wg.Add(1) |
| 310 go func() { | 362 go func() { |
| 311 defer wg.Done() | 363 defer wg.Done() |
| 312 errs[i] = updateAggregate(c, file.tf, file.aggr, incr) | 364 errs[i] = updateAggregate(c, file.tf, file.aggr, incr) |
| 313 }() | 365 }() |
| 314 } | 366 } |
| 315 | 367 |
| 316 wg.Wait() | 368 wg.Wait() |
| 317 // Prioritize returning http.StatusInternalServerError status | 369 // Prioritize returning http.StatusInternalServerError status |
| 318 // code errors over other errors. | 370 // code errors over other errors. |
| 319 var e error | 371 var e error |
| 320 for _, err := range errs { | 372 for _, err := range errs { |
| 321 se, ok := err.(statusError) | 373 se, ok := err.(statusError) |
| 322 if ok && se.code == http.StatusInternalServerError { | 374 if ok && se.code == http.StatusInternalServerError { |
| 323 return se | 375 return se |
|
Vadim Sh.
2016/08/16 18:56:13
beware that RunInTransaction can return an error o
nishanths
2016/08/16 20:35:13
in that case, the error will be propagated at line
| |
| 324 } | 376 } |
| 325 e = err | 377 e = err |
| 326 } | 378 } |
| 327 return e | 379 return e |
| 328 }, &datastore.TransactionOptions{XG: true}) | 380 }, &datastore.TransactionOptions{XG: true}) |
|
Vadim Sh.
2016/08/16 18:56:13
how many entity groups does this transaction modif
nishanths
2016/08/16 20:35:13
Two entity groups. (1) TestFile (2) DataEntry
| |
| 329 } | 381 } |
| 330 | 382 |
| 331 // getTestFileAlt returns the the first TestFile in the datastore for | 383 // getTestFileAlt returns the the first TestFile in the datastore for |
| 332 // the query formed by calling p.Query(). | 384 // the query formed by calling p.Query(). |
| 333 // | 385 // |
| 334 // The function tries to find the first TestFile using p. If no such TestFile | 386 // The function tries to find the first TestFile using p. If no such TestFile |
| 335 // exists the function sets p.Master to altMaster and tries again. | 387 // exists the function sets p.Master to altMaster and tries again. |
| 336 // If altMaster is empty, the function does not perform the additional try. | 388 // If altMaster is empty, the function does not perform the additional try. |
| 337 func getTestFileAlt(c context.Context, p model.TestFileParams, altMaster string) (ret *model.TestFile, err error) { | 389 func getTestFileAlt(c context.Context, p model.TestFileParams, altMaster string) (ret *model.TestFile, err error) { |
| 338 a, err := getFirstTestFile(c, p.Query()) | 390 a, err := getFirstTestFile(c, p.Query()) |
| (...skipping 10 matching lines...) Expand all Loading... | |
| 349 a, err = getFirstTestFile(c, p.Query()) | 401 a, err = getFirstTestFile(c, p.Query()) |
| 350 if err == nil { | 402 if err == nil { |
| 351 a.Master = origMaster | 403 a.Master = origMaster |
| 352 return a, nil | 404 return a, nil |
| 353 } | 405 } |
| 354 | 406 |
| 355 return nil, err | 407 return nil, err |
| 356 } | 408 } |
| 357 | 409 |
| 358 // updateAggregate updates tf with the result of merging incr into | 410 // updateAggregate updates tf with the result of merging incr into |
| 359 // aggr. | 411 // aggr, and updates tf in datastore. |
| 360 func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.Ag gregateResult) error { | 412 func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.Ag gregateResult) error { |
| 361 if !model.IsAggregateTestFile(tf.Name) { | 413 if !model.IsAggregateTestFile(tf.Name) { |
| 362 return errors.New("frontend: tf should be an aggregate test file ") | 414 return errors.New("frontend: tf should be an aggregate test file ") |
| 363 } | 415 } |
| 364 | 416 |
| 365 size := model.ResultsSize | 417 size := model.ResultsSize |
| 366 if tf.Name == "results-small.json" { | 418 if tf.Name == "results-small.json" { |
| 367 size = model.ResultsSmallSize | 419 size = model.ResultsSmallSize |
| 368 } | 420 } |
| 369 | 421 |
| (...skipping 19 matching lines...) Expand all Loading... | |
| 389 b := &bytes.Buffer{} | 441 b := &bytes.Buffer{} |
| 390 if err := json.NewEncoder(b).Encode(&aggr); err != nil { | 442 if err := json.NewEncoder(b).Encode(&aggr); err != nil { |
| 391 return statusError{err, http.StatusInternalServerError} | 443 return statusError{err, http.StatusInternalServerError} |
| 392 } | 444 } |
| 393 | 445 |
| 394 tf.Data = b | 446 tf.Data = b |
| 395 if err := tf.PutData(c); err != nil { | 447 if err := tf.PutData(c); err != nil { |
| 396 return statusError{err, http.StatusInternalServerError} | 448 return statusError{err, http.StatusInternalServerError} |
| 397 } | 449 } |
| 398 | 450 |
| 451 if err := datastore.Get(c).Put(tf); err != nil { | |
| 452 return err | |
|
Vadim Sh.
2016/08/16 18:56:13
statusError{err, http.StatusInternalServerError} ?
nishanths
2016/08/16 20:35:13
Done.
| |
| 453 } | |
| 454 if err := deleteKeys(c, tf.OldDataKeys); err != nil { | |
|
Vadim Sh.
2016/08/16 18:56:13
ignoring this error?
nishanths
2016/08/16 20:35:13
yes, only logging the error.
Returning an HTTP e
| |
| 455 logging.Fields{ | |
| 456 logging.ErrorKey: err, | |
| 457 "keys": tf.OldDataKeys, | |
| 458 }.Errorf(c, "upload: failed to delete keys") | |
| 459 } | |
| 399 return nil | 460 return nil |
| 400 } | 461 } |
| 462 | |
| 463 func deleteKeys(c context.Context, k []*datastore.Key) error { | |
| 464 if len(k) == 0 { | |
| 465 return nil | |
| 466 } | |
| 467 | |
| 468 keys := make([]string, 0, len(k)) | |
| 469 for _, key := range k { | |
| 470 keys = append(keys, key.Encode()) | |
| 471 } | |
| 472 | |
| 473 payload, err := json.Marshal(struct { | |
| 474 Keys []string `json:"keys"` | |
| 475 }{ | |
| 476 keys, | |
| 477 }) | |
| 478 if err != nil { | |
| 479 return err | |
| 480 } | |
| 481 | |
| 482 h := make(http.Header) | |
| 483 h.Set("Content-Type", "application/json") | |
| 484 | |
| 485 logging.Fields{ | |
| 486 "keys": keys, | |
| 487 }.Infof(c, "deleteKeys: enqueing") | |
| 488 | |
| 489 return taskqueue.Get(c).Add(&taskqueue.Task{ | |
| 490 Path: "/internal/delete", | |
| 491 Payload: payload, | |
| 492 Header: h, | |
| 493 Method: "POST", | |
| 494 Delay: time.Duration(30) * time.Minute, | |
| 495 }, deleteKeysQueueName) | |
| 496 } | |
| OLD | NEW |