Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(247)

Unified Diff: go/src/infra/appengine/test-results/frontend/upload.go

Issue 2240473004: test-results: package frontend: add upload handler (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Address estaab@ comments, set dependent CL Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: go/src/infra/appengine/test-results/frontend/upload.go
diff --git a/go/src/infra/appengine/test-results/frontend/upload.go b/go/src/infra/appengine/test-results/frontend/upload.go
new file mode 100644
index 0000000000000000000000000000000000000000..47637b108f05efb9d30ee7a97ec653ccf831d0fe
--- /dev/null
+++ b/go/src/infra/appengine/test-results/frontend/upload.go
@@ -0,0 +1,400 @@
+package frontend
+
+import (
+ "bytes"
+ "encoding/json"
+ "errors"
+ "io"
+ "mime/multipart"
+ "net/http"
+ "strconv"
+ "sync"
+
+ "golang.org/x/net/context"
+
+ "github.com/luci/gae/service/datastore"
+ "github.com/luci/luci-go/server/router"
+
+ "infra/appengine/test-results/model"
+)
+
+type statusError struct {
+ error
+ code int
+}
+
+// MarshalJSON marshals status error to JSON.
+func (se *statusError) MarshalJSON() ([]byte, error) {
+ m := map[string]interface{}{}
+
+ if se == nil || se.error == nil {
+ return json.Marshal(m)
+ }
+
+ m["error"] = se.Error()
+ m["status"] = se.code
+ return json.Marshal(m)
+}
+
+// UploadParams is the multipart form values in a
+// TestFile upload request.
+type UploadParams struct {
+ Master string
+ // DeprecatedMaster is set when master.Name was provided
+ // in the request, instead of master.Identifer.
+ DeprecatedMaster string
+ Builder string
+ TestType string
+}
+
+type contextKey int
+
+const uploadContextKey = contextKey(0)
+
+// GetUploadParams returns the UploadParams from the context if
+// present or nil otherwise.
+func GetUploadParams(c context.Context) *UploadParams {
+ if v := c.Value(uploadContextKey); v != nil {
+ return v.(*UploadParams)
+ }
+ return nil
+}
+
+// SetUploadParams returns a new context with the supplied
+// UploadParams added to it.
+func SetUploadParams(c context.Context, p *UploadParams) context.Context {
+ return context.WithValue(c, uploadContextKey, p)
+}
+
+// withParsedUploadForm is middleware that verifies and adds
+// multipart form upload data to the context.
+//
+// If there is an error parsing the form or required
+// values are missing, WithParsed writes the HTTP error
+// to the response writer and stops execution of the request.
+func withParsedUploadForm(ctx *router.Context, next router.Handler) {
+ w, r := ctx.Writer, ctx.Request
+ const _1M = 1 << 20
+
+ if err := r.ParseMultipartForm(_1M); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ u := &UploadParams{}
+
+ if v := r.MultipartForm.Value["master"]; len(v) > 0 {
+ if m := model.MasterByName(v[0]); m != nil {
+ u.Master = m.Identifier
+ u.DeprecatedMaster = v[0]
+ } else {
+ u.Master = v[0]
+ }
+ }
+
+ if v := r.MultipartForm.Value["builder"]; len(v) > 0 {
+ u.Builder = v[0]
+ } else {
+ http.Error(w, "missing builder", http.StatusBadRequest)
+ return
+ }
+
+ if v := r.MultipartForm.Value["testtype"]; len(v) > 0 {
+ u.TestType = cleanTestType(v[0])
+ }
+
+ if _, ok := r.MultipartForm.File["file"]; !ok {
+ http.Error(w, "missing file", http.StatusBadRequest)
+ return
+ }
+
+ ctx.Context = SetUploadParams(ctx.Context, u)
+ next(ctx)
+}
+
+// uploadHandler is the HTTP handler for upload
+// requests.
+func uploadHandler(ctx *router.Context) {
+ c, w, r := ctx.Context, ctx.Writer, ctx.Request
+ fileheaders := r.MultipartForm.File["file"]
+
+ for _, fh := range fileheaders {
+ if err := doFileUpload(c, fh); err != nil {
+ code := http.StatusInternalServerError
+ if se, ok := err.(statusError); ok {
+ code = se.code
+ }
+ http.Error(w, err.Error(), code)
+ return
+ }
+ }
+
+ io.WriteString(w, "OK")
+}
+
+func doFileUpload(c context.Context, fh *multipart.FileHeader) error {
+ file, err := fh.Open()
+ if err != nil {
+ return statusError{err, http.StatusInternalServerError}
+ }
+ defer file.Close()
+
+ switch fh.Filename {
+ case "incremental_results.json":
+ var incr model.AggregateResult
+ if err := json.NewDecoder(file).Decode(&incr); err != nil {
+ return statusError{err, http.StatusBadRequest}
+ }
+ return updateIncremental(c, &incr)
+ case "full_results.json":
+ return updateFullResults(c, file)
+ default:
+ return uploadTestFile(c, file, fh.Filename)
+ }
+}
+
+// uploadTestFile creates a new TestFile from the values in context
+// and supplied data, and puts it to the datastore.
+func uploadTestFile(c context.Context, data io.Reader, filename string) error {
+ var buf bytes.Buffer
+ tee := io.TeeReader(data, &buf)
+
+ aux := struct {
+ N string `json:"build_number,omitempty"`
+ }{}
+
+ dec := json.NewDecoder(tee)
+ if err := dec.Decode(&aux); err != nil {
+ return statusError{err, http.StatusInternalServerError}
+ }
+
+ bn := 0
+
+ if aux.N != "" {
+ n, err := strconv.Atoi(aux.N)
+ if err != nil {
+ return statusError{errors.New("invalid build_number"), http.StatusBadRequest}
+ }
+ bn = n
+ }
+
+ p := GetUploadParams(c)
+ tf := model.TestFile{
+ Master: p.Master,
+ Builder: p.Builder,
+ TestType: p.TestType,
+ BuildNumber: model.BuildNum(bn),
+ Name: filename,
+ Data: io.MultiReader(&buf, dec.Buffered()),
+ }
+ if err := tf.PutData(c); err != nil {
+ return statusError{err, http.StatusInternalServerError}
+ }
+ return nil
+}
+
+// updateFullResults puts the supplied data as "full_results.json"
+// to the datastore, and updates corresponding "results.json" and
+// "results-small.json" files in the datastore.
+//
+// The supplied data should unmarshal into model.FullResults.
+// Otherwise, an error is returned.
+func updateFullResults(c context.Context, data io.Reader) error {
+ buf := &bytes.Buffer{}
+ tee := io.TeeReader(data, buf)
+ dec := json.NewDecoder(tee)
+
+ var f model.FullResult
+ if err := dec.Decode(&f); err != nil {
+ return statusError{err, http.StatusBadRequest}
+ }
+
+ wg := sync.WaitGroup{}
+ errCh := make(chan error, 2)
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ errCh <- uploadTestFile(
+ c, io.MultiReader(buf, dec.Buffered()), "full_results.json",
+ )
+ }()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ incr, err := f.AggregateResult()
+ if err != nil {
+ errCh <- statusError{err, http.StatusBadRequest}
+ return
+ }
+ errCh <- updateIncremental(c, &incr)
+ }()
+
+ wg.Wait()
+ close(errCh)
+ for err := range errCh {
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// updateIncremental gets "results.json" and "results-small.json"
+// for values in context, merges incr into them, and puts the updated
+// files to the datastore.
+func updateIncremental(c context.Context, incr *model.AggregateResult) error {
+ u := GetUploadParams(c)
+ p := model.TestFileParams{
+ Master: u.Master,
+ Builder: u.Builder,
+ TestType: u.TestType,
+ }
+
+ names := []string{"results.json", "results-small.json"}
+ files := make([]struct {
+ tf *model.TestFile
+ aggr *model.AggregateResult
+ err error
+ }, len(names))
+
+ wg := sync.WaitGroup{}
+
+ for i, name := range names {
+ i, name := i, name
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ tf, err := getTestFileAlt(c, p, u.DeprecatedMaster)
+ if err != nil {
+ if _, ok := err.(ErrNoMatches); ok {
+ files[i].tf = &model.TestFile{
+ Master: p.Master,
+ Builder: p.Builder,
+ TestType: p.TestType,
+ BuildNumber: -1,
+ Name: name,
+ }
+ } else {
+ files[i].err = err
+ }
+ return
+ }
+ files[i].tf = tf
+ if err := tf.GetData(c); err != nil {
+ files[i].err = err
+ return
+ }
+ if err := json.NewDecoder(tf.Data).Decode(files[i].aggr); err != nil {
+ files[i].err = err
+ return
+ }
+ }()
+ }
+
+ wg.Wait()
+ for _, file := range files {
+ if file.err != nil {
+ return file.err
+ }
+ }
+
+ wg = sync.WaitGroup{}
+ errs := make([]error, len(files))
+
+ return datastore.Get(c).RunInTransaction(func(c context.Context) error {
+ for i, file := range files {
+ i, file := i, file
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ errs[i] = updateAggregate(c, file.tf, file.aggr, incr)
+ }()
+ }
+
+ wg.Wait()
+ // Prioritize returning http.StatusInternalServerError status
+ // code errors over other errors.
+ var e error
+ for _, err := range errs {
+ se, ok := err.(statusError)
+ if ok && se.code == http.StatusInternalServerError {
+ return se
+ }
+ e = err
+ }
+ return e
+ }, &datastore.TransactionOptions{XG: true})
+}
+
+// getTestFileAlt returns the the first TestFile in the datastore for
+// the query formed by calling p.Query().
+//
+// The function tries to find the first TestFile using p. If no such TestFile
+// exists the function sets p.Master to altMaster and tries again.
+// If altMaster is empty, the function does not perform the additional try.
+func getTestFileAlt(c context.Context, p model.TestFileParams, altMaster string) (ret *model.TestFile, err error) {
+ a, err := getFirstTestFile(c, p.Query())
+ if err == nil {
+ return a, nil
+ }
+ if _, ok := err.(ErrNoMatches); ok && altMaster == "" {
+ return nil, err
+ }
+
+ origMaster := p.Master
+ p.Master = altMaster
+
+ a, err = getFirstTestFile(c, p.Query())
+ if err == nil {
+ a.Master = origMaster
+ return a, nil
+ }
+
+ return nil, err
+}
+
+// updateAggregate updates tf with the result of merging incr into
+// aggr.
+func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.AggregateResult) error {
+ if !model.IsAggregateTestFile(tf.Name) {
+ return errors.New("frontend: tf should be an aggregate test file")
+ }
+
+ size := model.ResultsSize
+ if tf.Name == "results-small.json" {
+ size = model.ResultsSmallSize
+ }
+
+ if aggr == nil {
+ aggr = incr
+ } else {
+ if err := aggr.Merge(incr); err != nil {
+ switch err {
+ case model.ErrBuilderNameConflict:
+ return statusError{err, http.StatusBadRequest}
+ case model.ErrBuildNumberConflict:
+ return statusError{err, http.StatusConflict}
+ default:
+ return statusError{err, http.StatusInternalServerError}
+ }
+ }
+ }
+
+ if err := aggr.Trim(size); err != nil {
+ return statusError{err, http.StatusInternalServerError}
+ }
+
+ b := &bytes.Buffer{}
+ if err := json.NewEncoder(b).Encode(&aggr); err != nil {
+ return statusError{err, http.StatusInternalServerError}
+ }
+
+ tf.Data = b
+ if err := tf.PutData(c); err != nil {
+ return statusError{err, http.StatusInternalServerError}
+ }
+
+ return nil
+}

Powered by Google App Engine
This is Rietveld 408576698