Chromium Code Reviews| Index: go/src/infra/appengine/test-results/frontend/upload.go |
| diff --git a/go/src/infra/appengine/test-results/frontend/upload.go b/go/src/infra/appengine/test-results/frontend/upload.go |
| index 47637b108f05efb9d30ee7a97ec653ccf831d0fe..d184d25b5bc1cd2cc477147d22bf4a436a171fbb 100644 |
| --- a/go/src/infra/appengine/test-results/frontend/upload.go |
| +++ b/go/src/infra/appengine/test-results/frontend/upload.go |
| @@ -7,14 +7,19 @@ import ( |
| "io" |
| "mime/multipart" |
| "net/http" |
| + "net/url" |
| "strconv" |
| "sync" |
| + "time" |
| "golang.org/x/net/context" |
| "github.com/luci/gae/service/datastore" |
| + "github.com/luci/gae/service/taskqueue" |
| + "github.com/luci/luci-go/common/logging" |
| "github.com/luci/luci-go/server/router" |
| + "infra/appengine/test-results/builderstate" |
| "infra/appengine/test-results/model" |
| ) |
| @@ -146,38 +151,99 @@ func doFileUpload(c context.Context, fh *multipart.FileHeader) error { |
| return statusError{err, http.StatusBadRequest} |
| } |
| return updateIncremental(c, &incr) |
| + |
| case "full_results.json": |
| - return updateFullResults(c, file) |
| + bn, data, err := extractBuildNumber(file) |
| + if err != nil { |
| + if err == ErrInvalidBuildNumber { |
| + return statusError{err, http.StatusBadRequest} |
| + } |
| + return statusError{err, http.StatusInternalServerError} |
| + } |
| + if err := updateFullResults(c, data); err != nil { |
| + return err |
| + } |
| + |
| + p := GetUploadParams(c) |
| + wg := sync.WaitGroup{} |
| + |
| + wg.Add(1) |
| + go func() { |
| + defer wg.Done() |
| + if err := builderstate.Update(c, p.Master, p.Builder, p.TestType, time.Now().UTC()); err != nil { |
| + logging.WithError(err).Errorf(c, "builderstate update") |
| + } |
| + }() |
| + |
| + wg.Add(1) |
| + go func() { |
| + defer wg.Done() |
| + if err := taskqueue.Get(c).Add( |
| + taskqueue.NewPOSTTask("/internal/monitoring/upload", url.Values{ |
| + "master": {p.Master}, |
| + "builder": {p.Builder}, |
| + "build_number": {strconv.Itoa(bn)}, |
| + "test_type": {p.TestType}, |
| + }), |
| + defaultQueueName, |
| + ); err != nil { |
| + logging.WithError(err).Errorf(c, "taskqueue add: /internal/monitoring/upload") |
| + } |
| + }() |
| + |
| + wg.Wait() |
| + return nil |
|
Vadim Sh.
2016/08/16 21:27:12
no reporting errors back to the user?
nishanths
2016/08/16 21:29:54
These errors are not critical to the POST request.
Vadim Sh.
2016/08/16 21:34:46
I have no idea how it all works on high level, I j
|
| + |
| default: |
| return uploadTestFile(c, file, fh.Filename) |
| } |
| } |
| -// uploadTestFile creates a new TestFile from the values in context |
| -// and supplied data, and puts it to the datastore. |
| -func uploadTestFile(c context.Context, data io.Reader, filename string) error { |
| +// ErrInvalidBuildNumber is returned when the extractBuildNumber fails |
| +// to convert the build number value to an int. |
| +var ErrInvalidBuildNumber = errors.New("invalid build_number: cannot convert to int") |
| + |
| +// extractBuildNumber extracts the value of "build_number" key from |
| +// the supplied JSON encoded data. The returned io.Reader will have |
| +// the same contents as the supplied io.Reader. |
| +// |
| +// The error is ErrInvalidBuildNumber if the build number value |
| +// could not be converted to an int. |
| +func extractBuildNumber(data io.Reader) (int, io.Reader, error) { |
| var buf bytes.Buffer |
| tee := io.TeeReader(data, &buf) |
| aux := struct { |
| N string `json:"build_number,omitempty"` |
| }{} |
| - |
| dec := json.NewDecoder(tee) |
| if err := dec.Decode(&aux); err != nil { |
| - return statusError{err, http.StatusInternalServerError} |
| + return 0, io.MultiReader(&buf, dec.Buffered()), err |
| } |
| - bn := 0 |
| - |
| + var bn int |
| if aux.N != "" { |
| n, err := strconv.Atoi(aux.N) |
| if err != nil { |
| - return statusError{errors.New("invalid build_number"), http.StatusBadRequest} |
| + return 0, io.MultiReader(&buf, dec.Buffered()), ErrInvalidBuildNumber |
| } |
| bn = n |
| } |
| + return bn, io.MultiReader(&buf, dec.Buffered()), nil |
| +} |
| + |
| +// uploadTestFile creates a new TestFile from the values in context |
| +// and supplied data, and puts it to the datastore. |
| +func uploadTestFile(c context.Context, data io.Reader, filename string) error { |
| + bn, data, err := extractBuildNumber(data) |
| + if err != nil { |
| + if err == ErrInvalidBuildNumber { |
| + return statusError{err, http.StatusBadRequest} |
| + } |
| + return statusError{err, http.StatusInternalServerError} |
| + } |
| + |
| p := GetUploadParams(c) |
| tf := model.TestFile{ |
| Master: p.Master, |
| @@ -185,12 +251,13 @@ func uploadTestFile(c context.Context, data io.Reader, filename string) error { |
| TestType: p.TestType, |
| BuildNumber: model.BuildNum(bn), |
| Name: filename, |
| - Data: io.MultiReader(&buf, dec.Buffered()), |
| + Data: data, |
| } |
| if err := tf.PutData(c); err != nil { |
| return statusError{err, http.StatusInternalServerError} |
| } |
| - return nil |
| + |
| + return datastore.Get(c).Put(&tf) |
| } |
| // updateFullResults puts the supplied data as "full_results.json" |
| @@ -209,35 +276,18 @@ func updateFullResults(c context.Context, data io.Reader) error { |
| return statusError{err, http.StatusBadRequest} |
| } |
| - wg := sync.WaitGroup{} |
| - errCh := make(chan error, 2) |
| - |
| - wg.Add(1) |
| - go func() { |
| - defer wg.Done() |
| - errCh <- uploadTestFile( |
| - c, io.MultiReader(buf, dec.Buffered()), "full_results.json", |
| - ) |
| - }() |
| - |
| - wg.Add(1) |
| - go func() { |
| - defer wg.Done() |
| - incr, err := f.AggregateResult() |
| - if err != nil { |
| - errCh <- statusError{err, http.StatusBadRequest} |
| - return |
| - } |
| - errCh <- updateIncremental(c, &incr) |
| - }() |
| + if err := uploadTestFile(c, io.MultiReader(buf, dec.Buffered()), "full_results.json"); err != nil { |
| + return statusError{err, http.StatusInternalServerError} |
| + } |
| - wg.Wait() |
| - close(errCh) |
| - for err := range errCh { |
| - if err != nil { |
| - return err |
| - } |
| + incr, err := f.AggregateResult() |
| + if err != nil { |
| + return statusError{err, http.StatusBadRequest} |
| + } |
| + if err := updateIncremental(c, &incr); err != nil { |
| + return statusError{err, http.StatusInternalServerError} |
| } |
| + |
| return nil |
| } |
| @@ -262,10 +312,12 @@ func updateIncremental(c context.Context, incr *model.AggregateResult) error { |
| wg := sync.WaitGroup{} |
| for i, name := range names { |
| - i, name := i, name |
| + i, name, p := i, name, p |
| wg.Add(1) |
| + |
| go func() { |
| defer wg.Done() |
| + p.Name = name |
| tf, err := getTestFileAlt(c, p, u.DeprecatedMaster) |
| if err != nil { |
| if _, ok := err.(ErrNoMatches); ok { |
| @@ -276,20 +328,22 @@ func updateIncremental(c context.Context, incr *model.AggregateResult) error { |
| BuildNumber: -1, |
| Name: name, |
| } |
| - } else { |
| - files[i].err = err |
| + return |
| } |
| + files[i].err = err |
| return |
| } |
| - files[i].tf = tf |
| if err := tf.GetData(c); err != nil { |
| files[i].err = err |
| return |
| } |
| - if err := json.NewDecoder(tf.Data).Decode(files[i].aggr); err != nil { |
| + var a model.AggregateResult |
| + if err := json.NewDecoder(tf.Data).Decode(&a); err != nil { |
| files[i].err = err |
| return |
| } |
| + files[i].tf = tf |
| + files[i].aggr = &a |
| }() |
| } |
| @@ -300,10 +354,10 @@ func updateIncremental(c context.Context, incr *model.AggregateResult) error { |
| } |
| } |
| - wg = sync.WaitGroup{} |
| - errs := make([]error, len(files)) |
| - |
| return datastore.Get(c).RunInTransaction(func(c context.Context) error { |
| + wg = sync.WaitGroup{} |
| + errs := make([]error, len(files)) |
| + |
| for i, file := range files { |
| i, file := i, file |
| wg.Add(1) |
| @@ -356,7 +410,7 @@ func getTestFileAlt(c context.Context, p model.TestFileParams, altMaster string) |
| } |
| // updateAggregate updates tf with the result of merging incr into |
| -// aggr. |
| +// aggr, and updates tf in datastore. |
| func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.AggregateResult) error { |
| if !model.IsAggregateTestFile(tf.Name) { |
| return errors.New("frontend: tf should be an aggregate test file") |
| @@ -396,5 +450,49 @@ func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.Ag |
| return statusError{err, http.StatusInternalServerError} |
| } |
| + if err := datastore.Get(c).Put(tf); err != nil { |
| + return statusError{err, http.StatusInternalServerError} |
| + } |
| + if err := deleteKeys(c, tf.OldDataKeys); err != nil { |
| + logging.Fields{ |
| + logging.ErrorKey: err, |
| + "keys": tf.OldDataKeys, |
| + }.Errorf(c, "upload: failed to delete keys") |
| + } |
| return nil |
| } |
| + |
| +func deleteKeys(c context.Context, k []*datastore.Key) error { |
| + if len(k) == 0 { |
| + return nil |
| + } |
| + |
| + keys := make([]string, 0, len(k)) |
| + for _, key := range k { |
| + keys = append(keys, key.Encode()) |
| + } |
| + |
| + payload, err := json.Marshal(struct { |
| + Keys []string `json:"keys"` |
| + }{ |
| + keys, |
| + }) |
| + if err != nil { |
| + return err |
| + } |
| + |
| + h := make(http.Header) |
| + h.Set("Content-Type", "application/json") |
| + |
| + logging.Fields{ |
| + "keys": keys, |
| + }.Infof(c, "deleteKeys: enqueing") |
| + |
| + return taskqueue.Get(c).Add(&taskqueue.Task{ |
| + Path: "/internal/delete", |
| + Payload: payload, |
| + Header: h, |
| + Method: "POST", |
| + Delay: time.Duration(30) * time.Minute, |
| + }, deleteKeysQueueName) |
| +} |