| OLD | NEW |
| 1 package frontend | 1 package frontend |
| 2 | 2 |
| 3 import ( | 3 import ( |
| 4 "bytes" | 4 "bytes" |
| 5 "encoding/json" | 5 "encoding/json" |
| 6 "errors" | 6 "errors" |
| 7 "io" | 7 "io" |
| 8 "mime/multipart" | 8 "mime/multipart" |
| 9 "net/http" | 9 "net/http" |
| 10 "net/url" |
| 10 "strconv" | 11 "strconv" |
| 11 "sync" | 12 "sync" |
| 13 "time" |
| 12 | 14 |
| 13 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
| 14 | 16 |
| 15 "github.com/luci/gae/service/datastore" | 17 "github.com/luci/gae/service/datastore" |
| 18 "github.com/luci/gae/service/taskqueue" |
| 19 "github.com/luci/luci-go/common/logging" |
| 16 "github.com/luci/luci-go/server/router" | 20 "github.com/luci/luci-go/server/router" |
| 17 | 21 |
| 22 "infra/appengine/test-results/builderstate" |
| 18 "infra/appengine/test-results/model" | 23 "infra/appengine/test-results/model" |
| 19 ) | 24 ) |
| 20 | 25 |
| 21 type statusError struct { | 26 type statusError struct { |
| 22 error | 27 error |
| 23 code int | 28 code int |
| 24 } | 29 } |
| 25 | 30 |
| 26 // MarshalJSON marshals status error to JSON. | 31 // MarshalJSON marshals status error to JSON. |
| 27 func (se *statusError) MarshalJSON() ([]byte, error) { | 32 func (se *statusError) MarshalJSON() ([]byte, error) { |
| (...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 139 } | 144 } |
| 140 defer file.Close() | 145 defer file.Close() |
| 141 | 146 |
| 142 switch fh.Filename { | 147 switch fh.Filename { |
| 143 case "incremental_results.json": | 148 case "incremental_results.json": |
| 144 var incr model.AggregateResult | 149 var incr model.AggregateResult |
| 145 if err := json.NewDecoder(file).Decode(&incr); err != nil { | 150 if err := json.NewDecoder(file).Decode(&incr); err != nil { |
| 146 return statusError{err, http.StatusBadRequest} | 151 return statusError{err, http.StatusBadRequest} |
| 147 } | 152 } |
| 148 return updateIncremental(c, &incr) | 153 return updateIncremental(c, &incr) |
| 154 |
| 149 case "full_results.json": | 155 case "full_results.json": |
| 150 » » return updateFullResults(c, file) | 156 » » bn, data, err := extractBuildNumber(file) |
| 157 » » if err != nil { |
| 158 » » » if err == ErrInvalidBuildNumber { |
| 159 » » » » return statusError{err, http.StatusBadRequest} |
| 160 » » » } |
| 161 » » » return statusError{err, http.StatusInternalServerError} |
| 162 » » } |
| 163 » » if err := updateFullResults(c, data); err != nil { |
| 164 » » » return err |
| 165 » » } |
| 166 |
| 167 » » p := GetUploadParams(c) |
| 168 » » wg := sync.WaitGroup{} |
| 169 |
| 170 » » wg.Add(1) |
| 171 » » go func() { |
| 172 » » » defer wg.Done() |
| 173 » » » if err := builderstate.Update(c, p.Master, p.Builder, p.
TestType, time.Now().UTC()); err != nil { |
| 174 » » » » logging.WithError(err).Errorf(c, "builderstate u
pdate") |
| 175 » » » } |
| 176 » » }() |
| 177 |
| 178 » » wg.Add(1) |
| 179 » » go func() { |
| 180 » » » defer wg.Done() |
| 181 » » » if err := taskqueue.Get(c).Add( |
| 182 » » » » taskqueue.NewPOSTTask("/internal/monitoring/uplo
ad", url.Values{ |
| 183 » » » » » "master": {p.Master}, |
| 184 » » » » » "builder": {p.Builder}, |
| 185 » » » » » "build_number": {strconv.Itoa(bn)}, |
| 186 » » » » » "test_type": {p.TestType}, |
| 187 » » » » }), |
| 188 » » » » defaultQueueName, |
| 189 » » » ); err != nil { |
| 190 » » » » logging.WithError(err).Errorf(c, "taskqueue add:
/internal/monitoring/upload") |
| 191 » » » } |
| 192 » » }() |
| 193 |
| 194 » » wg.Wait() |
| 195 » » return nil |
| 196 |
| 151 default: | 197 default: |
| 152 return uploadTestFile(c, file, fh.Filename) | 198 return uploadTestFile(c, file, fh.Filename) |
| 153 } | 199 } |
| 154 } | 200 } |
| 155 | 201 |
| 156 // uploadTestFile creates a new TestFile from the values in context | 202 // ErrInvalidBuildNumber is returned when the extractBuildNumber fails |
| 157 // and supplied data, and puts it to the datastore. | 203 // to convert the build number value to an int. |
| 158 func uploadTestFile(c context.Context, data io.Reader, filename string) error { | 204 var ErrInvalidBuildNumber = errors.New("invalid build_number: cannot convert to
int") |
| 205 |
| 206 // extractBuildNumber extracts the value of "build_number" key from |
| 207 // the supplied JSON encoded data. The returned io.Reader will have |
| 208 // the same contents as the supplied io.Reader. |
| 209 // |
| 210 // The error is ErrInvalidBuildNumber if the build number value |
| 211 // could not be converted to an int. |
| 212 func extractBuildNumber(data io.Reader) (int, io.Reader, error) { |
| 159 var buf bytes.Buffer | 213 var buf bytes.Buffer |
| 160 tee := io.TeeReader(data, &buf) | 214 tee := io.TeeReader(data, &buf) |
| 161 | 215 |
| 162 aux := struct { | 216 aux := struct { |
| 163 N string `json:"build_number,omitempty"` | 217 N string `json:"build_number,omitempty"` |
| 164 }{} | 218 }{} |
| 165 | |
| 166 dec := json.NewDecoder(tee) | 219 dec := json.NewDecoder(tee) |
| 167 if err := dec.Decode(&aux); err != nil { | 220 if err := dec.Decode(&aux); err != nil { |
| 168 » » return statusError{err, http.StatusInternalServerError} | 221 » » return 0, io.MultiReader(&buf, dec.Buffered()), err |
| 169 } | 222 } |
| 170 | 223 |
| 171 » bn := 0 | 224 » var bn int |
| 172 | |
| 173 if aux.N != "" { | 225 if aux.N != "" { |
| 174 n, err := strconv.Atoi(aux.N) | 226 n, err := strconv.Atoi(aux.N) |
| 175 if err != nil { | 227 if err != nil { |
| 176 » » » return statusError{errors.New("invalid build_number"), h
ttp.StatusBadRequest} | 228 » » » return 0, io.MultiReader(&buf, dec.Buffered()), ErrInval
idBuildNumber |
| 177 } | 229 } |
| 178 bn = n | 230 bn = n |
| 179 } | 231 } |
| 180 | 232 |
| 233 return bn, io.MultiReader(&buf, dec.Buffered()), nil |
| 234 } |
| 235 |
| 236 // uploadTestFile creates a new TestFile from the values in context |
| 237 // and supplied data, and puts it to the datastore. |
| 238 func uploadTestFile(c context.Context, data io.Reader, filename string) error { |
| 239 bn, data, err := extractBuildNumber(data) |
| 240 if err != nil { |
| 241 if err == ErrInvalidBuildNumber { |
| 242 return statusError{err, http.StatusBadRequest} |
| 243 } |
| 244 return statusError{err, http.StatusInternalServerError} |
| 245 } |
| 246 |
| 181 p := GetUploadParams(c) | 247 p := GetUploadParams(c) |
| 182 tf := model.TestFile{ | 248 tf := model.TestFile{ |
| 183 Master: p.Master, | 249 Master: p.Master, |
| 184 Builder: p.Builder, | 250 Builder: p.Builder, |
| 185 TestType: p.TestType, | 251 TestType: p.TestType, |
| 186 BuildNumber: model.BuildNum(bn), | 252 BuildNumber: model.BuildNum(bn), |
| 187 Name: filename, | 253 Name: filename, |
| 188 » » Data: io.MultiReader(&buf, dec.Buffered()), | 254 » » Data: data, |
| 189 } | 255 } |
| 190 if err := tf.PutData(c); err != nil { | 256 if err := tf.PutData(c); err != nil { |
| 191 return statusError{err, http.StatusInternalServerError} | 257 return statusError{err, http.StatusInternalServerError} |
| 192 } | 258 } |
| 193 » return nil | 259 |
| 260 » return datastore.Get(c).Put(&tf) |
| 194 } | 261 } |
| 195 | 262 |
| 196 // updateFullResults puts the supplied data as "full_results.json" | 263 // updateFullResults puts the supplied data as "full_results.json" |
| 197 // to the datastore, and updates corresponding "results.json" and | 264 // to the datastore, and updates corresponding "results.json" and |
| 198 // "results-small.json" files in the datastore. | 265 // "results-small.json" files in the datastore. |
| 199 // | 266 // |
| 200 // The supplied data should unmarshal into model.FullResults. | 267 // The supplied data should unmarshal into model.FullResults. |
| 201 // Otherwise, an error is returned. | 268 // Otherwise, an error is returned. |
| 202 func updateFullResults(c context.Context, data io.Reader) error { | 269 func updateFullResults(c context.Context, data io.Reader) error { |
| 203 buf := &bytes.Buffer{} | 270 buf := &bytes.Buffer{} |
| 204 tee := io.TeeReader(data, buf) | 271 tee := io.TeeReader(data, buf) |
| 205 dec := json.NewDecoder(tee) | 272 dec := json.NewDecoder(tee) |
| 206 | 273 |
| 207 var f model.FullResult | 274 var f model.FullResult |
| 208 if err := dec.Decode(&f); err != nil { | 275 if err := dec.Decode(&f); err != nil { |
| 209 return statusError{err, http.StatusBadRequest} | 276 return statusError{err, http.StatusBadRequest} |
| 210 } | 277 } |
| 211 | 278 |
| 212 » wg := sync.WaitGroup{} | 279 » if err := uploadTestFile(c, io.MultiReader(buf, dec.Buffered()), "full_r
esults.json"); err != nil { |
| 213 » errCh := make(chan error, 2) | 280 » » return statusError{err, http.StatusInternalServerError} |
| 281 » } |
| 214 | 282 |
| 215 » wg.Add(1) | 283 » incr, err := f.AggregateResult() |
| 216 » go func() { | 284 » if err != nil { |
| 217 » » defer wg.Done() | 285 » » return statusError{err, http.StatusBadRequest} |
| 218 » » errCh <- uploadTestFile( | 286 » } |
| 219 » » » c, io.MultiReader(buf, dec.Buffered()), "full_results.js
on", | 287 » if err := updateIncremental(c, &incr); err != nil { |
| 220 » » ) | 288 » » return statusError{err, http.StatusInternalServerError} |
| 221 » }() | 289 » } |
| 222 | 290 |
| 223 wg.Add(1) | |
| 224 go func() { | |
| 225 defer wg.Done() | |
| 226 incr, err := f.AggregateResult() | |
| 227 if err != nil { | |
| 228 errCh <- statusError{err, http.StatusBadRequest} | |
| 229 return | |
| 230 } | |
| 231 errCh <- updateIncremental(c, &incr) | |
| 232 }() | |
| 233 | |
| 234 wg.Wait() | |
| 235 close(errCh) | |
| 236 for err := range errCh { | |
| 237 if err != nil { | |
| 238 return err | |
| 239 } | |
| 240 } | |
| 241 return nil | 291 return nil |
| 242 } | 292 } |
| 243 | 293 |
| 244 // updateIncremental gets "results.json" and "results-small.json" | 294 // updateIncremental gets "results.json" and "results-small.json" |
| 245 // for values in context, merges incr into them, and puts the updated | 295 // for values in context, merges incr into them, and puts the updated |
| 246 // files to the datastore. | 296 // files to the datastore. |
| 247 func updateIncremental(c context.Context, incr *model.AggregateResult) error { | 297 func updateIncremental(c context.Context, incr *model.AggregateResult) error { |
| 248 u := GetUploadParams(c) | 298 u := GetUploadParams(c) |
| 249 p := model.TestFileParams{ | 299 p := model.TestFileParams{ |
| 250 Master: u.Master, | 300 Master: u.Master, |
| 251 Builder: u.Builder, | 301 Builder: u.Builder, |
| 252 TestType: u.TestType, | 302 TestType: u.TestType, |
| 253 } | 303 } |
| 254 | 304 |
| 255 names := []string{"results.json", "results-small.json"} | 305 names := []string{"results.json", "results-small.json"} |
| 256 files := make([]struct { | 306 files := make([]struct { |
| 257 tf *model.TestFile | 307 tf *model.TestFile |
| 258 aggr *model.AggregateResult | 308 aggr *model.AggregateResult |
| 259 err error | 309 err error |
| 260 }, len(names)) | 310 }, len(names)) |
| 261 | 311 |
| 262 wg := sync.WaitGroup{} | 312 wg := sync.WaitGroup{} |
| 263 | 313 |
| 264 for i, name := range names { | 314 for i, name := range names { |
| 265 » » i, name := i, name | 315 » » i, name, p := i, name, p |
| 266 wg.Add(1) | 316 wg.Add(1) |
| 317 |
| 267 go func() { | 318 go func() { |
| 268 defer wg.Done() | 319 defer wg.Done() |
| 320 p.Name = name |
| 269 tf, err := getTestFileAlt(c, p, u.DeprecatedMaster) | 321 tf, err := getTestFileAlt(c, p, u.DeprecatedMaster) |
| 270 if err != nil { | 322 if err != nil { |
| 271 if _, ok := err.(ErrNoMatches); ok { | 323 if _, ok := err.(ErrNoMatches); ok { |
| 272 files[i].tf = &model.TestFile{ | 324 files[i].tf = &model.TestFile{ |
| 273 Master: p.Master, | 325 Master: p.Master, |
| 274 Builder: p.Builder, | 326 Builder: p.Builder, |
| 275 TestType: p.TestType, | 327 TestType: p.TestType, |
| 276 BuildNumber: -1, | 328 BuildNumber: -1, |
| 277 Name: name, | 329 Name: name, |
| 278 } | 330 } |
| 279 » » » » } else { | 331 » » » » » return |
| 280 » » » » » files[i].err = err | |
| 281 } | 332 } |
| 333 files[i].err = err |
| 282 return | 334 return |
| 283 } | 335 } |
| 284 files[i].tf = tf | |
| 285 if err := tf.GetData(c); err != nil { | 336 if err := tf.GetData(c); err != nil { |
| 286 files[i].err = err | 337 files[i].err = err |
| 287 return | 338 return |
| 288 } | 339 } |
| 289 » » » if err := json.NewDecoder(tf.Data).Decode(files[i].aggr)
; err != nil { | 340 » » » var a model.AggregateResult |
| 341 » » » if err := json.NewDecoder(tf.Data).Decode(&a); err != ni
l { |
| 290 files[i].err = err | 342 files[i].err = err |
| 291 return | 343 return |
| 292 } | 344 } |
| 345 files[i].tf = tf |
| 346 files[i].aggr = &a |
| 293 }() | 347 }() |
| 294 } | 348 } |
| 295 | 349 |
| 296 wg.Wait() | 350 wg.Wait() |
| 297 for _, file := range files { | 351 for _, file := range files { |
| 298 if file.err != nil { | 352 if file.err != nil { |
| 299 return file.err | 353 return file.err |
| 300 } | 354 } |
| 301 } | 355 } |
| 302 | 356 |
| 303 » wg = sync.WaitGroup{} | 357 » return datastore.Get(c).RunInTransaction(func(c context.Context) error { |
| 304 » errs := make([]error, len(files)) | 358 » » wg = sync.WaitGroup{} |
| 359 » » errs := make([]error, len(files)) |
| 305 | 360 |
| 306 return datastore.Get(c).RunInTransaction(func(c context.Context) error { | |
| 307 for i, file := range files { | 361 for i, file := range files { |
| 308 i, file := i, file | 362 i, file := i, file |
| 309 wg.Add(1) | 363 wg.Add(1) |
| 310 go func() { | 364 go func() { |
| 311 defer wg.Done() | 365 defer wg.Done() |
| 312 errs[i] = updateAggregate(c, file.tf, file.aggr,
incr) | 366 errs[i] = updateAggregate(c, file.tf, file.aggr,
incr) |
| 313 }() | 367 }() |
| 314 } | 368 } |
| 315 | 369 |
| 316 wg.Wait() | 370 wg.Wait() |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 349 a, err = getFirstTestFile(c, p.Query()) | 403 a, err = getFirstTestFile(c, p.Query()) |
| 350 if err == nil { | 404 if err == nil { |
| 351 a.Master = origMaster | 405 a.Master = origMaster |
| 352 return a, nil | 406 return a, nil |
| 353 } | 407 } |
| 354 | 408 |
| 355 return nil, err | 409 return nil, err |
| 356 } | 410 } |
| 357 | 411 |
| 358 // updateAggregate updates tf with the result of merging incr into | 412 // updateAggregate updates tf with the result of merging incr into |
| 359 // aggr. | 413 // aggr, and updates tf in datastore. |
| 360 func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.Ag
gregateResult) error { | 414 func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.Ag
gregateResult) error { |
| 361 if !model.IsAggregateTestFile(tf.Name) { | 415 if !model.IsAggregateTestFile(tf.Name) { |
| 362 return errors.New("frontend: tf should be an aggregate test file
") | 416 return errors.New("frontend: tf should be an aggregate test file
") |
| 363 } | 417 } |
| 364 | 418 |
| 365 size := model.ResultsSize | 419 size := model.ResultsSize |
| 366 if tf.Name == "results-small.json" { | 420 if tf.Name == "results-small.json" { |
| 367 size = model.ResultsSmallSize | 421 size = model.ResultsSmallSize |
| 368 } | 422 } |
| 369 | 423 |
| (...skipping 19 matching lines...) Expand all Loading... |
| 389 b := &bytes.Buffer{} | 443 b := &bytes.Buffer{} |
| 390 if err := json.NewEncoder(b).Encode(&aggr); err != nil { | 444 if err := json.NewEncoder(b).Encode(&aggr); err != nil { |
| 391 return statusError{err, http.StatusInternalServerError} | 445 return statusError{err, http.StatusInternalServerError} |
| 392 } | 446 } |
| 393 | 447 |
| 394 tf.Data = b | 448 tf.Data = b |
| 395 if err := tf.PutData(c); err != nil { | 449 if err := tf.PutData(c); err != nil { |
| 396 return statusError{err, http.StatusInternalServerError} | 450 return statusError{err, http.StatusInternalServerError} |
| 397 } | 451 } |
| 398 | 452 |
| 453 if err := datastore.Get(c).Put(tf); err != nil { |
| 454 return statusError{err, http.StatusInternalServerError} |
| 455 } |
| 456 if err := deleteKeys(c, tf.OldDataKeys); err != nil { |
| 457 logging.Fields{ |
| 458 logging.ErrorKey: err, |
| 459 "keys": tf.OldDataKeys, |
| 460 }.Errorf(c, "upload: failed to delete keys") |
| 461 } |
| 399 return nil | 462 return nil |
| 400 } | 463 } |
| 464 |
| 465 func deleteKeys(c context.Context, k []*datastore.Key) error { |
| 466 if len(k) == 0 { |
| 467 return nil |
| 468 } |
| 469 |
| 470 keys := make([]string, 0, len(k)) |
| 471 for _, key := range k { |
| 472 keys = append(keys, key.Encode()) |
| 473 } |
| 474 |
| 475 payload, err := json.Marshal(struct { |
| 476 Keys []string `json:"keys"` |
| 477 }{ |
| 478 keys, |
| 479 }) |
| 480 if err != nil { |
| 481 return err |
| 482 } |
| 483 |
| 484 h := make(http.Header) |
| 485 h.Set("Content-Type", "application/json") |
| 486 |
| 487 logging.Fields{ |
| 488 "keys": keys, |
| 489 }.Infof(c, "deleteKeys: enqueing") |
| 490 |
| 491 return taskqueue.Get(c).Add(&taskqueue.Task{ |
| 492 Path: "/internal/delete", |
| 493 Payload: payload, |
| 494 Header: h, |
| 495 Method: "POST", |
| 496 Delay: time.Duration(30) * time.Minute, |
| 497 }, deleteKeysQueueName) |
| 498 } |
| OLD | NEW |