Chromium Code Reviews| Index: go/src/infra/appengine/test-results/frontend/upload.go |
| diff --git a/go/src/infra/appengine/test-results/frontend/upload.go b/go/src/infra/appengine/test-results/frontend/upload.go |
| index 47637b108f05efb9d30ee7a97ec653ccf831d0fe..fc17e50645145dacb361dcdf6e2c8a3fa90d6939 100644 |
| --- a/go/src/infra/appengine/test-results/frontend/upload.go |
| +++ b/go/src/infra/appengine/test-results/frontend/upload.go |
| @@ -7,14 +7,19 @@ import ( |
| "io" |
| "mime/multipart" |
| "net/http" |
| + "net/url" |
| "strconv" |
| "sync" |
| + "time" |
| "golang.org/x/net/context" |
| "github.com/luci/gae/service/datastore" |
| + "github.com/luci/gae/service/taskqueue" |
| + "github.com/luci/luci-go/common/logging" |
| "github.com/luci/luci-go/server/router" |
| + "infra/appengine/test-results/builderstate" |
| "infra/appengine/test-results/model" |
| ) |
| @@ -146,38 +151,82 @@ func doFileUpload(c context.Context, fh *multipart.FileHeader) error { |
| return statusError{err, http.StatusBadRequest} |
| } |
| return updateIncremental(c, &incr) |
| + |
| case "full_results.json": |
| - return updateFullResults(c, file) |
| + bn, data, err := extractBuildNumber(file) |
| + if err != nil { |
| + if err == ErrInvalidBuildNumber { |
| + return statusError{err, http.StatusBadRequest} |
| + } |
| + return statusError{err, http.StatusInternalServerError} |
| + } |
| + if err := updateFullResults(c, data); err != nil { |
| + return err |
| + } |
| + |
| + p := GetUploadParams(c) |
| + go builderstate.Update(c, p.Master, p.Builder, p.TestType, time.Now().UTC()) |
|
Vadim Sh.
2016/08/16 18:56:13
you need to wait for these goroutines to finish so
nishanths
2016/08/16 20:35:13
Done.
|
| + go taskqueue.Get(c).Add( |
| + taskqueue.NewPOSTTask("/internal/monitoring/upload", url.Values{ |
|
Vadim Sh.
2016/08/16 18:56:13
does this handler exist?
nishanths
2016/08/16 20:35:13
It exists in the python implementation.
|
| + "master": {p.Master}, |
| + "builder": {p.Builder}, |
| + "build_number": {strconv.Itoa(bn)}, |
| + "test_type": {p.TestType}, |
| + }), |
| + defaultQueueName, |
| + ) |
| + return nil |
| + |
| default: |
| return uploadTestFile(c, file, fh.Filename) |
| } |
| } |
| -// uploadTestFile creates a new TestFile from the values in context |
| -// and supplied data, and puts it to the datastore. |
| -func uploadTestFile(c context.Context, data io.Reader, filename string) error { |
| +// ErrInvalidBuildNumber is returned when the extractBuildNumber fails |
| +// to convert the build number value to an int. |
| +var ErrInvalidBuildNumber = errors.New("invalid build_number: cannot convert to int") |
| + |
| +// extractBuildNumber extracts the value of "build_number" key from |
| +// the supplied JSON encoded data. The returned io.Reader will have |
| +// the same contents as the supplied io.Reader. |
| +// |
| +// The error is ErrInvalidBuildNumber if the build number value |
| +// could not be converted to an int. |
| +func extractBuildNumber(data io.Reader) (int, io.Reader, error) { |
| var buf bytes.Buffer |
| tee := io.TeeReader(data, &buf) |
| aux := struct { |
| N string `json:"build_number,omitempty"` |
| }{} |
| - |
| dec := json.NewDecoder(tee) |
| if err := dec.Decode(&aux); err != nil { |
| - return statusError{err, http.StatusInternalServerError} |
| + return 0, io.MultiReader(&buf, dec.Buffered()), err |
| } |
| - bn := 0 |
| - |
| + var bn int |
| if aux.N != "" { |
| n, err := strconv.Atoi(aux.N) |
| if err != nil { |
| - return statusError{errors.New("invalid build_number"), http.StatusBadRequest} |
| + return 0, io.MultiReader(&buf, dec.Buffered()), ErrInvalidBuildNumber |
| } |
| bn = n |
| } |
| + return bn, io.MultiReader(&buf, dec.Buffered()), nil |
| +} |
| + |
| +// uploadTestFile creates a new TestFile from the values in context |
| +// and supplied data, and puts it to the datastore. |
| +func uploadTestFile(c context.Context, data io.Reader, filename string) error { |
| + bn, data, err := extractBuildNumber(data) |
| + if err != nil { |
| + if err == ErrInvalidBuildNumber { |
| + return statusError{err, http.StatusBadRequest} |
| + } |
| + return statusError{err, http.StatusInternalServerError} |
| + } |
| + |
| p := GetUploadParams(c) |
| tf := model.TestFile{ |
| Master: p.Master, |
| @@ -185,12 +234,13 @@ func uploadTestFile(c context.Context, data io.Reader, filename string) error { |
| TestType: p.TestType, |
| BuildNumber: model.BuildNum(bn), |
| Name: filename, |
| - Data: io.MultiReader(&buf, dec.Buffered()), |
| + Data: data, |
| } |
| if err := tf.PutData(c); err != nil { |
| return statusError{err, http.StatusInternalServerError} |
| } |
| - return nil |
| + |
| + return datastore.Get(c).Put(&tf) |
| } |
| // updateFullResults puts the supplied data as "full_results.json" |
| @@ -215,9 +265,7 @@ func updateFullResults(c context.Context, data io.Reader) error { |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| - errCh <- uploadTestFile( |
| - c, io.MultiReader(buf, dec.Buffered()), "full_results.json", |
| - ) |
| + errCh <- uploadTestFile(c, io.MultiReader(buf, dec.Buffered()), "full_results.json") |
| }() |
| wg.Add(1) |
| @@ -262,10 +310,12 @@ func updateIncremental(c context.Context, incr *model.AggregateResult) error { |
| wg := sync.WaitGroup{} |
| for i, name := range names { |
| - i, name := i, name |
| + i, name, p := i, name, p |
| wg.Add(1) |
| + |
| go func() { |
| defer wg.Done() |
| + p.Name = name |
| tf, err := getTestFileAlt(c, p, u.DeprecatedMaster) |
| if err != nil { |
| if _, ok := err.(ErrNoMatches); ok { |
| @@ -276,20 +326,22 @@ func updateIncremental(c context.Context, incr *model.AggregateResult) error { |
| BuildNumber: -1, |
| Name: name, |
| } |
| - } else { |
| - files[i].err = err |
| + return |
| } |
| + files[i].err = err |
| return |
| } |
| - files[i].tf = tf |
| if err := tf.GetData(c); err != nil { |
| files[i].err = err |
| return |
| } |
| - if err := json.NewDecoder(tf.Data).Decode(files[i].aggr); err != nil { |
| + var a model.AggregateResult |
| + if err := json.NewDecoder(tf.Data).Decode(&a); err != nil { |
| files[i].err = err |
| return |
| } |
| + files[i].tf = tf |
| + files[i].aggr = &a |
| }() |
| } |
| @@ -300,10 +352,10 @@ func updateIncremental(c context.Context, incr *model.AggregateResult) error { |
| } |
| } |
| - wg = sync.WaitGroup{} |
| - errs := make([]error, len(files)) |
| - |
| return datastore.Get(c).RunInTransaction(func(c context.Context) error { |
| + wg = sync.WaitGroup{} |
| + errs := make([]error, len(files)) |
| + |
| for i, file := range files { |
| i, file := i, file |
| wg.Add(1) |
| @@ -356,7 +408,7 @@ func getTestFileAlt(c context.Context, p model.TestFileParams, altMaster string) |
| } |
| // updateAggregate updates tf with the result of merging incr into |
| -// aggr. |
| +// aggr, and updates tf in datastore. |
| func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.AggregateResult) error { |
| if !model.IsAggregateTestFile(tf.Name) { |
| return errors.New("frontend: tf should be an aggregate test file") |
| @@ -396,5 +448,49 @@ func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.Ag |
| return statusError{err, http.StatusInternalServerError} |
| } |
| + if err := datastore.Get(c).Put(tf); err != nil { |
| + return err |
|
Vadim Sh.
2016/08/16 18:56:13
statusError{err, http.StatusInternalServerError} ?
nishanths
2016/08/16 20:35:13
Done.
|
| + } |
| + if err := deleteKeys(c, tf.OldDataKeys); err != nil { |
|
Vadim Sh.
2016/08/16 18:56:13
ignoring this error?
nishanths
2016/08/16 20:35:13
yes, only logging the error.
Returning an HTTP e
|
| + logging.Fields{ |
| + logging.ErrorKey: err, |
| + "keys": tf.OldDataKeys, |
| + }.Errorf(c, "upload: failed to delete keys") |
| + } |
| return nil |
| } |
| + |
| +func deleteKeys(c context.Context, k []*datastore.Key) error { |
| + if len(k) == 0 { |
| + return nil |
| + } |
| + |
| + keys := make([]string, 0, len(k)) |
| + for _, key := range k { |
| + keys = append(keys, key.Encode()) |
| + } |
| + |
| + payload, err := json.Marshal(struct { |
| + Keys []string `json:"keys"` |
| + }{ |
| + keys, |
| + }) |
| + if err != nil { |
| + return err |
| + } |
| + |
| + h := make(http.Header) |
| + h.Set("Content-Type", "application/json") |
| + |
| + logging.Fields{ |
| + "keys": keys, |
| + }.Infof(c, "deleteKeys: enqueing") |
| + |
| + return taskqueue.Get(c).Add(&taskqueue.Task{ |
| + Path: "/internal/delete", |
| + Payload: payload, |
| + Header: h, |
| + Method: "POST", |
| + Delay: time.Duration(30) * time.Minute, |
| + }, deleteKeysQueueName) |
| +} |