Index: go/src/infra/appengine/test-results/frontend/upload.go |
diff --git a/go/src/infra/appengine/test-results/frontend/upload.go b/go/src/infra/appengine/test-results/frontend/upload.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..47637b108f05efb9d30ee7a97ec653ccf831d0fe |
--- /dev/null |
+++ b/go/src/infra/appengine/test-results/frontend/upload.go |
@@ -0,0 +1,400 @@ |
+package frontend |
+ |
+import ( |
+ "bytes" |
+ "encoding/json" |
+ "errors" |
+ "io" |
+ "mime/multipart" |
+ "net/http" |
+ "strconv" |
+ "sync" |
+ |
+ "golang.org/x/net/context" |
+ |
+ "github.com/luci/gae/service/datastore" |
+ "github.com/luci/luci-go/server/router" |
+ |
+ "infra/appengine/test-results/model" |
+) |
+ |
+type statusError struct { |
+ error |
+ code int |
+} |
+ |
+// MarshalJSON marshals status error to JSON. |
+func (se *statusError) MarshalJSON() ([]byte, error) { |
+ m := map[string]interface{}{} |
+ |
+ if se == nil || se.error == nil { |
+ return json.Marshal(m) |
+ } |
+ |
+ m["error"] = se.Error() |
+ m["status"] = se.code |
+ return json.Marshal(m) |
+} |
+ |
+// UploadParams is the multipart form values in a |
+// TestFile upload request. |
+type UploadParams struct { |
+ Master string |
+ // DeprecatedMaster is set when master.Name was provided |
+ // in the request, instead of master.Identifer. |
+ DeprecatedMaster string |
+ Builder string |
+ TestType string |
+} |
+ |
+type contextKey int |
+ |
+const uploadContextKey = contextKey(0) |
+ |
+// GetUploadParams returns the UploadParams from the context if |
+// present or nil otherwise. |
+func GetUploadParams(c context.Context) *UploadParams { |
+ if v := c.Value(uploadContextKey); v != nil { |
+ return v.(*UploadParams) |
+ } |
+ return nil |
+} |
+ |
+// SetUploadParams returns a new context with the supplied |
+// UploadParams added to it. |
+func SetUploadParams(c context.Context, p *UploadParams) context.Context { |
+ return context.WithValue(c, uploadContextKey, p) |
+} |
+ |
+// withParsedUploadForm is middleware that verifies and adds |
+// multipart form upload data to the context. |
+// |
+// If there is an error parsing the form or required |
+// values are missing, WithParsed writes the HTTP error |
+// to the response writer and stops execution of the request. |
+func withParsedUploadForm(ctx *router.Context, next router.Handler) { |
+ w, r := ctx.Writer, ctx.Request |
+ const _1M = 1 << 20 |
+ |
+ if err := r.ParseMultipartForm(_1M); err != nil { |
+ http.Error(w, err.Error(), http.StatusInternalServerError) |
+ return |
+ } |
+ |
+ u := &UploadParams{} |
+ |
+ if v := r.MultipartForm.Value["master"]; len(v) > 0 { |
+ if m := model.MasterByName(v[0]); m != nil { |
+ u.Master = m.Identifier |
+ u.DeprecatedMaster = v[0] |
+ } else { |
+ u.Master = v[0] |
+ } |
+ } |
+ |
+ if v := r.MultipartForm.Value["builder"]; len(v) > 0 { |
+ u.Builder = v[0] |
+ } else { |
+ http.Error(w, "missing builder", http.StatusBadRequest) |
+ return |
+ } |
+ |
+ if v := r.MultipartForm.Value["testtype"]; len(v) > 0 { |
+ u.TestType = cleanTestType(v[0]) |
+ } |
+ |
+ if _, ok := r.MultipartForm.File["file"]; !ok { |
+ http.Error(w, "missing file", http.StatusBadRequest) |
+ return |
+ } |
+ |
+ ctx.Context = SetUploadParams(ctx.Context, u) |
+ next(ctx) |
+} |
+ |
+// uploadHandler is the HTTP handler for upload |
+// requests. |
+func uploadHandler(ctx *router.Context) { |
+ c, w, r := ctx.Context, ctx.Writer, ctx.Request |
+ fileheaders := r.MultipartForm.File["file"] |
+ |
+ for _, fh := range fileheaders { |
+ if err := doFileUpload(c, fh); err != nil { |
+ code := http.StatusInternalServerError |
+ if se, ok := err.(statusError); ok { |
+ code = se.code |
+ } |
+ http.Error(w, err.Error(), code) |
+ return |
+ } |
+ } |
+ |
+ io.WriteString(w, "OK") |
+} |
+ |
+func doFileUpload(c context.Context, fh *multipart.FileHeader) error { |
+ file, err := fh.Open() |
+ if err != nil { |
+ return statusError{err, http.StatusInternalServerError} |
+ } |
+ defer file.Close() |
+ |
+ switch fh.Filename { |
+ case "incremental_results.json": |
+ var incr model.AggregateResult |
+ if err := json.NewDecoder(file).Decode(&incr); err != nil { |
+ return statusError{err, http.StatusBadRequest} |
+ } |
+ return updateIncremental(c, &incr) |
+ case "full_results.json": |
+ return updateFullResults(c, file) |
+ default: |
+ return uploadTestFile(c, file, fh.Filename) |
+ } |
+} |
+ |
+// uploadTestFile creates a new TestFile from the values in context |
+// and supplied data, and puts it to the datastore. |
+func uploadTestFile(c context.Context, data io.Reader, filename string) error { |
+ var buf bytes.Buffer |
+ tee := io.TeeReader(data, &buf) |
+ |
+ aux := struct { |
+ N string `json:"build_number,omitempty"` |
+ }{} |
+ |
+ dec := json.NewDecoder(tee) |
+ if err := dec.Decode(&aux); err != nil { |
+ return statusError{err, http.StatusInternalServerError} |
+ } |
+ |
+ bn := 0 |
+ |
+ if aux.N != "" { |
+ n, err := strconv.Atoi(aux.N) |
+ if err != nil { |
+ return statusError{errors.New("invalid build_number"), http.StatusBadRequest} |
+ } |
+ bn = n |
+ } |
+ |
+ p := GetUploadParams(c) |
+ tf := model.TestFile{ |
+ Master: p.Master, |
+ Builder: p.Builder, |
+ TestType: p.TestType, |
+ BuildNumber: model.BuildNum(bn), |
+ Name: filename, |
+ Data: io.MultiReader(&buf, dec.Buffered()), |
+ } |
+ if err := tf.PutData(c); err != nil { |
+ return statusError{err, http.StatusInternalServerError} |
+ } |
+ return nil |
+} |
+ |
+// updateFullResults puts the supplied data as "full_results.json" |
+// to the datastore, and updates corresponding "results.json" and |
+// "results-small.json" files in the datastore. |
+// |
+// The supplied data should unmarshal into model.FullResults. |
+// Otherwise, an error is returned. |
+func updateFullResults(c context.Context, data io.Reader) error { |
+ buf := &bytes.Buffer{} |
+ tee := io.TeeReader(data, buf) |
+ dec := json.NewDecoder(tee) |
+ |
+ var f model.FullResult |
+ if err := dec.Decode(&f); err != nil { |
+ return statusError{err, http.StatusBadRequest} |
+ } |
+ |
+ wg := sync.WaitGroup{} |
+ errCh := make(chan error, 2) |
+ |
+ wg.Add(1) |
+ go func() { |
+ defer wg.Done() |
+ errCh <- uploadTestFile( |
+ c, io.MultiReader(buf, dec.Buffered()), "full_results.json", |
+ ) |
+ }() |
+ |
+ wg.Add(1) |
+ go func() { |
+ defer wg.Done() |
+ incr, err := f.AggregateResult() |
+ if err != nil { |
+ errCh <- statusError{err, http.StatusBadRequest} |
+ return |
+ } |
+ errCh <- updateIncremental(c, &incr) |
+ }() |
+ |
+ wg.Wait() |
+ close(errCh) |
+ for err := range errCh { |
+ if err != nil { |
+ return err |
+ } |
+ } |
+ return nil |
+} |
+ |
+// updateIncremental gets "results.json" and "results-small.json" |
+// for values in context, merges incr into them, and puts the updated |
+// files to the datastore. |
+func updateIncremental(c context.Context, incr *model.AggregateResult) error { |
+ u := GetUploadParams(c) |
+ p := model.TestFileParams{ |
+ Master: u.Master, |
+ Builder: u.Builder, |
+ TestType: u.TestType, |
+ } |
+ |
+ names := []string{"results.json", "results-small.json"} |
+ files := make([]struct { |
+ tf *model.TestFile |
+ aggr *model.AggregateResult |
+ err error |
+ }, len(names)) |
+ |
+ wg := sync.WaitGroup{} |
+ |
+ for i, name := range names { |
+ i, name := i, name |
+ wg.Add(1) |
+ go func() { |
+ defer wg.Done() |
+ tf, err := getTestFileAlt(c, p, u.DeprecatedMaster) |
+ if err != nil { |
+ if _, ok := err.(ErrNoMatches); ok { |
+ files[i].tf = &model.TestFile{ |
+ Master: p.Master, |
+ Builder: p.Builder, |
+ TestType: p.TestType, |
+ BuildNumber: -1, |
+ Name: name, |
+ } |
+ } else { |
+ files[i].err = err |
+ } |
+ return |
+ } |
+ files[i].tf = tf |
+ if err := tf.GetData(c); err != nil { |
+ files[i].err = err |
+ return |
+ } |
+ if err := json.NewDecoder(tf.Data).Decode(files[i].aggr); err != nil { |
+ files[i].err = err |
+ return |
+ } |
+ }() |
+ } |
+ |
+ wg.Wait() |
+ for _, file := range files { |
+ if file.err != nil { |
+ return file.err |
+ } |
+ } |
+ |
+ wg = sync.WaitGroup{} |
+ errs := make([]error, len(files)) |
+ |
+ return datastore.Get(c).RunInTransaction(func(c context.Context) error { |
+ for i, file := range files { |
+ i, file := i, file |
+ wg.Add(1) |
+ go func() { |
+ defer wg.Done() |
+ errs[i] = updateAggregate(c, file.tf, file.aggr, incr) |
+ }() |
+ } |
+ |
+ wg.Wait() |
+ // Prioritize returning http.StatusInternalServerError status |
+ // code errors over other errors. |
+ var e error |
+ for _, err := range errs { |
+ se, ok := err.(statusError) |
+ if ok && se.code == http.StatusInternalServerError { |
+ return se |
+ } |
+ e = err |
+ } |
+ return e |
+ }, &datastore.TransactionOptions{XG: true}) |
+} |
+ |
+// getTestFileAlt returns the the first TestFile in the datastore for |
+// the query formed by calling p.Query(). |
+// |
+// The function tries to find the first TestFile using p. If no such TestFile |
+// exists the function sets p.Master to altMaster and tries again. |
+// If altMaster is empty, the function does not perform the additional try. |
+func getTestFileAlt(c context.Context, p model.TestFileParams, altMaster string) (ret *model.TestFile, err error) { |
+ a, err := getFirstTestFile(c, p.Query()) |
+ if err == nil { |
+ return a, nil |
+ } |
+ if _, ok := err.(ErrNoMatches); ok && altMaster == "" { |
+ return nil, err |
+ } |
+ |
+ origMaster := p.Master |
+ p.Master = altMaster |
+ |
+ a, err = getFirstTestFile(c, p.Query()) |
+ if err == nil { |
+ a.Master = origMaster |
+ return a, nil |
+ } |
+ |
+ return nil, err |
+} |
+ |
+// updateAggregate updates tf with the result of merging incr into |
+// aggr. |
+func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.AggregateResult) error { |
+ if !model.IsAggregateTestFile(tf.Name) { |
+ return errors.New("frontend: tf should be an aggregate test file") |
+ } |
+ |
+ size := model.ResultsSize |
+ if tf.Name == "results-small.json" { |
+ size = model.ResultsSmallSize |
+ } |
+ |
+ if aggr == nil { |
+ aggr = incr |
+ } else { |
+ if err := aggr.Merge(incr); err != nil { |
+ switch err { |
+ case model.ErrBuilderNameConflict: |
+ return statusError{err, http.StatusBadRequest} |
+ case model.ErrBuildNumberConflict: |
+ return statusError{err, http.StatusConflict} |
+ default: |
+ return statusError{err, http.StatusInternalServerError} |
+ } |
+ } |
+ } |
+ |
+ if err := aggr.Trim(size); err != nil { |
+ return statusError{err, http.StatusInternalServerError} |
+ } |
+ |
+ b := &bytes.Buffer{} |
+ if err := json.NewEncoder(b).Encode(&aggr); err != nil { |
+ return statusError{err, http.StatusInternalServerError} |
+ } |
+ |
+ tf.Data = b |
+ if err := tf.PutData(c); err != nil { |
+ return statusError{err, http.StatusInternalServerError} |
+ } |
+ |
+ return nil |
+} |