| Index: go/src/infra/appengine/test-results/frontend/upload.go
|
| diff --git a/go/src/infra/appengine/test-results/frontend/upload.go b/go/src/infra/appengine/test-results/frontend/upload.go
|
| index 47637b108f05efb9d30ee7a97ec653ccf831d0fe..d184d25b5bc1cd2cc477147d22bf4a436a171fbb 100644
|
| --- a/go/src/infra/appengine/test-results/frontend/upload.go
|
| +++ b/go/src/infra/appengine/test-results/frontend/upload.go
|
| @@ -7,14 +7,19 @@ import (
|
| "io"
|
| "mime/multipart"
|
| "net/http"
|
| + "net/url"
|
| "strconv"
|
| "sync"
|
| + "time"
|
|
|
| "golang.org/x/net/context"
|
|
|
| "github.com/luci/gae/service/datastore"
|
| + "github.com/luci/gae/service/taskqueue"
|
| + "github.com/luci/luci-go/common/logging"
|
| "github.com/luci/luci-go/server/router"
|
|
|
| + "infra/appengine/test-results/builderstate"
|
| "infra/appengine/test-results/model"
|
| )
|
|
|
| @@ -146,38 +151,99 @@ func doFileUpload(c context.Context, fh *multipart.FileHeader) error {
|
| return statusError{err, http.StatusBadRequest}
|
| }
|
| return updateIncremental(c, &incr)
|
| +
|
| case "full_results.json":
|
| - return updateFullResults(c, file)
|
| + bn, data, err := extractBuildNumber(file)
|
| + if err != nil {
|
| + if err == ErrInvalidBuildNumber {
|
| + return statusError{err, http.StatusBadRequest}
|
| + }
|
| + return statusError{err, http.StatusInternalServerError}
|
| + }
|
| + if err := updateFullResults(c, data); err != nil {
|
| + return err
|
| + }
|
| +
|
| + p := GetUploadParams(c)
|
| + wg := sync.WaitGroup{}
|
| +
|
| + wg.Add(1)
|
| + go func() {
|
| + defer wg.Done()
|
| + if err := builderstate.Update(c, p.Master, p.Builder, p.TestType, time.Now().UTC()); err != nil {
|
| + logging.WithError(err).Errorf(c, "builderstate update")
|
| + }
|
| + }()
|
| +
|
| + wg.Add(1)
|
| + go func() {
|
| + defer wg.Done()
|
| + if err := taskqueue.Get(c).Add(
|
| + taskqueue.NewPOSTTask("/internal/monitoring/upload", url.Values{
|
| + "master": {p.Master},
|
| + "builder": {p.Builder},
|
| + "build_number": {strconv.Itoa(bn)},
|
| + "test_type": {p.TestType},
|
| + }),
|
| + defaultQueueName,
|
| + ); err != nil {
|
| + logging.WithError(err).Errorf(c, "taskqueue add: /internal/monitoring/upload")
|
| + }
|
| + }()
|
| +
|
| + wg.Wait()
|
| + return nil
|
| +
|
| default:
|
| return uploadTestFile(c, file, fh.Filename)
|
| }
|
| }
|
|
|
| -// uploadTestFile creates a new TestFile from the values in context
|
| -// and supplied data, and puts it to the datastore.
|
| -func uploadTestFile(c context.Context, data io.Reader, filename string) error {
|
| +// ErrInvalidBuildNumber is returned when the extractBuildNumber fails
|
| +// to convert the build number value to an int.
|
| +var ErrInvalidBuildNumber = errors.New("invalid build_number: cannot convert to int")
|
| +
|
| +// extractBuildNumber extracts the value of "build_number" key from
|
| +// the supplied JSON encoded data. The returned io.Reader will have
|
| +// the same contents as the supplied io.Reader.
|
| +//
|
| +// The error is ErrInvalidBuildNumber if the build number value
|
| +// could not be converted to an int.
|
| +func extractBuildNumber(data io.Reader) (int, io.Reader, error) {
|
| var buf bytes.Buffer
|
| tee := io.TeeReader(data, &buf)
|
|
|
| aux := struct {
|
| N string `json:"build_number,omitempty"`
|
| }{}
|
| -
|
| dec := json.NewDecoder(tee)
|
| if err := dec.Decode(&aux); err != nil {
|
| - return statusError{err, http.StatusInternalServerError}
|
| + return 0, io.MultiReader(&buf, dec.Buffered()), err
|
| }
|
|
|
| - bn := 0
|
| -
|
| + var bn int
|
| if aux.N != "" {
|
| n, err := strconv.Atoi(aux.N)
|
| if err != nil {
|
| - return statusError{errors.New("invalid build_number"), http.StatusBadRequest}
|
| + return 0, io.MultiReader(&buf, dec.Buffered()), ErrInvalidBuildNumber
|
| }
|
| bn = n
|
| }
|
|
|
| + return bn, io.MultiReader(&buf, dec.Buffered()), nil
|
| +}
|
| +
|
| +// uploadTestFile creates a new TestFile from the values in context
|
| +// and supplied data, and puts it to the datastore.
|
| +func uploadTestFile(c context.Context, data io.Reader, filename string) error {
|
| + bn, data, err := extractBuildNumber(data)
|
| + if err != nil {
|
| + if err == ErrInvalidBuildNumber {
|
| + return statusError{err, http.StatusBadRequest}
|
| + }
|
| + return statusError{err, http.StatusInternalServerError}
|
| + }
|
| +
|
| p := GetUploadParams(c)
|
| tf := model.TestFile{
|
| Master: p.Master,
|
| @@ -185,12 +251,13 @@ func uploadTestFile(c context.Context, data io.Reader, filename string) error {
|
| TestType: p.TestType,
|
| BuildNumber: model.BuildNum(bn),
|
| Name: filename,
|
| - Data: io.MultiReader(&buf, dec.Buffered()),
|
| + Data: data,
|
| }
|
| if err := tf.PutData(c); err != nil {
|
| return statusError{err, http.StatusInternalServerError}
|
| }
|
| - return nil
|
| +
|
| + return datastore.Get(c).Put(&tf)
|
| }
|
|
|
| // updateFullResults puts the supplied data as "full_results.json"
|
| @@ -209,35 +276,18 @@ func updateFullResults(c context.Context, data io.Reader) error {
|
| return statusError{err, http.StatusBadRequest}
|
| }
|
|
|
| - wg := sync.WaitGroup{}
|
| - errCh := make(chan error, 2)
|
| -
|
| - wg.Add(1)
|
| - go func() {
|
| - defer wg.Done()
|
| - errCh <- uploadTestFile(
|
| - c, io.MultiReader(buf, dec.Buffered()), "full_results.json",
|
| - )
|
| - }()
|
| -
|
| - wg.Add(1)
|
| - go func() {
|
| - defer wg.Done()
|
| - incr, err := f.AggregateResult()
|
| - if err != nil {
|
| - errCh <- statusError{err, http.StatusBadRequest}
|
| - return
|
| - }
|
| - errCh <- updateIncremental(c, &incr)
|
| - }()
|
| + if err := uploadTestFile(c, io.MultiReader(buf, dec.Buffered()), "full_results.json"); err != nil {
|
| + return statusError{err, http.StatusInternalServerError}
|
| + }
|
|
|
| - wg.Wait()
|
| - close(errCh)
|
| - for err := range errCh {
|
| - if err != nil {
|
| - return err
|
| - }
|
| + incr, err := f.AggregateResult()
|
| + if err != nil {
|
| + return statusError{err, http.StatusBadRequest}
|
| + }
|
| + if err := updateIncremental(c, &incr); err != nil {
|
| + return statusError{err, http.StatusInternalServerError}
|
| }
|
| +
|
| return nil
|
| }
|
|
|
| @@ -262,10 +312,12 @@ func updateIncremental(c context.Context, incr *model.AggregateResult) error {
|
| wg := sync.WaitGroup{}
|
|
|
| for i, name := range names {
|
| - i, name := i, name
|
| + i, name, p := i, name, p
|
| wg.Add(1)
|
| +
|
| go func() {
|
| defer wg.Done()
|
| + p.Name = name
|
| tf, err := getTestFileAlt(c, p, u.DeprecatedMaster)
|
| if err != nil {
|
| if _, ok := err.(ErrNoMatches); ok {
|
| @@ -276,20 +328,22 @@ func updateIncremental(c context.Context, incr *model.AggregateResult) error {
|
| BuildNumber: -1,
|
| Name: name,
|
| }
|
| - } else {
|
| - files[i].err = err
|
| + return
|
| }
|
| + files[i].err = err
|
| return
|
| }
|
| - files[i].tf = tf
|
| if err := tf.GetData(c); err != nil {
|
| files[i].err = err
|
| return
|
| }
|
| - if err := json.NewDecoder(tf.Data).Decode(files[i].aggr); err != nil {
|
| + var a model.AggregateResult
|
| + if err := json.NewDecoder(tf.Data).Decode(&a); err != nil {
|
| files[i].err = err
|
| return
|
| }
|
| + files[i].tf = tf
|
| + files[i].aggr = &a
|
| }()
|
| }
|
|
|
| @@ -300,10 +354,10 @@ func updateIncremental(c context.Context, incr *model.AggregateResult) error {
|
| }
|
| }
|
|
|
| - wg = sync.WaitGroup{}
|
| - errs := make([]error, len(files))
|
| -
|
| return datastore.Get(c).RunInTransaction(func(c context.Context) error {
|
| + wg = sync.WaitGroup{}
|
| + errs := make([]error, len(files))
|
| +
|
| for i, file := range files {
|
| i, file := i, file
|
| wg.Add(1)
|
| @@ -356,7 +410,7 @@ func getTestFileAlt(c context.Context, p model.TestFileParams, altMaster string)
|
| }
|
|
|
| // updateAggregate updates tf with the result of merging incr into
|
| -// aggr.
|
| +// aggr, and updates tf in datastore.
|
| func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.AggregateResult) error {
|
| if !model.IsAggregateTestFile(tf.Name) {
|
| return errors.New("frontend: tf should be an aggregate test file")
|
| @@ -396,5 +450,49 @@ func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.Ag
|
| return statusError{err, http.StatusInternalServerError}
|
| }
|
|
|
| + if err := datastore.Get(c).Put(tf); err != nil {
|
| + return statusError{err, http.StatusInternalServerError}
|
| + }
|
| + if err := deleteKeys(c, tf.OldDataKeys); err != nil {
|
| + logging.Fields{
|
| + logging.ErrorKey: err,
|
| + "keys": tf.OldDataKeys,
|
| + }.Errorf(c, "upload: failed to delete keys")
|
| + }
|
| return nil
|
| }
|
| +
|
| +func deleteKeys(c context.Context, k []*datastore.Key) error {
|
| + if len(k) == 0 {
|
| + return nil
|
| + }
|
| +
|
| + keys := make([]string, 0, len(k))
|
| + for _, key := range k {
|
| + keys = append(keys, key.Encode())
|
| + }
|
| +
|
| + payload, err := json.Marshal(struct {
|
| + Keys []string `json:"keys"`
|
| + }{
|
| + keys,
|
| + })
|
| + if err != nil {
|
| + return err
|
| + }
|
| +
|
| + h := make(http.Header)
|
| + h.Set("Content-Type", "application/json")
|
| +
|
| + logging.Fields{
|
| + "keys": keys,
|
| + }.Infof(c, "deleteKeys: enqueing")
|
| +
|
| + return taskqueue.Get(c).Add(&taskqueue.Task{
|
| + Path: "/internal/delete",
|
| + Payload: payload,
|
| + Header: h,
|
| + Method: "POST",
|
| + Delay: time.Duration(30) * time.Minute,
|
| + }, deleteKeysQueueName)
|
| +}
|
|
|