Index: go/src/infra/appengine/test-results/frontend/upload.go |
diff --git a/go/src/infra/appengine/test-results/frontend/upload.go b/go/src/infra/appengine/test-results/frontend/upload.go |
index 47637b108f05efb9d30ee7a97ec653ccf831d0fe..d184d25b5bc1cd2cc477147d22bf4a436a171fbb 100644 |
--- a/go/src/infra/appengine/test-results/frontend/upload.go |
+++ b/go/src/infra/appengine/test-results/frontend/upload.go |
@@ -7,14 +7,19 @@ import ( |
"io" |
"mime/multipart" |
"net/http" |
+ "net/url" |
"strconv" |
"sync" |
+ "time" |
"golang.org/x/net/context" |
"github.com/luci/gae/service/datastore" |
+ "github.com/luci/gae/service/taskqueue" |
+ "github.com/luci/luci-go/common/logging" |
"github.com/luci/luci-go/server/router" |
+ "infra/appengine/test-results/builderstate" |
"infra/appengine/test-results/model" |
) |
@@ -146,38 +151,99 @@ func doFileUpload(c context.Context, fh *multipart.FileHeader) error { |
return statusError{err, http.StatusBadRequest} |
} |
return updateIncremental(c, &incr) |
+ |
case "full_results.json": |
- return updateFullResults(c, file) |
+ bn, data, err := extractBuildNumber(file) |
+ if err != nil { |
+ if err == ErrInvalidBuildNumber { |
+ return statusError{err, http.StatusBadRequest} |
+ } |
+ return statusError{err, http.StatusInternalServerError} |
+ } |
+ if err := updateFullResults(c, data); err != nil { |
+ return err |
+ } |
+ |
+ p := GetUploadParams(c) |
+ wg := sync.WaitGroup{} |
+ |
+ wg.Add(1) |
+ go func() { |
+ defer wg.Done() |
+ if err := builderstate.Update(c, p.Master, p.Builder, p.TestType, time.Now().UTC()); err != nil { |
+ logging.WithError(err).Errorf(c, "builderstate update") |
+ } |
+ }() |
+ |
+ wg.Add(1) |
+ go func() { |
+ defer wg.Done() |
+ if err := taskqueue.Get(c).Add( |
+ taskqueue.NewPOSTTask("/internal/monitoring/upload", url.Values{ |
+ "master": {p.Master}, |
+ "builder": {p.Builder}, |
+ "build_number": {strconv.Itoa(bn)}, |
+ "test_type": {p.TestType}, |
+ }), |
+ defaultQueueName, |
+ ); err != nil { |
+ logging.WithError(err).Errorf(c, "taskqueue add: /internal/monitoring/upload") |
+ } |
+ }() |
+ |
+ wg.Wait() |
+ return nil |
+ |
default: |
return uploadTestFile(c, file, fh.Filename) |
} |
} |
-// uploadTestFile creates a new TestFile from the values in context |
-// and supplied data, and puts it to the datastore. |
-func uploadTestFile(c context.Context, data io.Reader, filename string) error { |
+// ErrInvalidBuildNumber is returned when the extractBuildNumber fails |
+// to convert the build number value to an int. |
+var ErrInvalidBuildNumber = errors.New("invalid build_number: cannot convert to int") |
+ |
+// extractBuildNumber extracts the value of "build_number" key from |
+// the supplied JSON encoded data. The returned io.Reader will have |
+// the same contents as the supplied io.Reader. |
+// |
+// The error is ErrInvalidBuildNumber if the build number value |
+// could not be converted to an int. |
+func extractBuildNumber(data io.Reader) (int, io.Reader, error) { |
var buf bytes.Buffer |
tee := io.TeeReader(data, &buf) |
aux := struct { |
N string `json:"build_number,omitempty"` |
}{} |
- |
dec := json.NewDecoder(tee) |
if err := dec.Decode(&aux); err != nil { |
- return statusError{err, http.StatusInternalServerError} |
+ return 0, io.MultiReader(&buf, dec.Buffered()), err |
} |
- bn := 0 |
- |
+ var bn int |
if aux.N != "" { |
n, err := strconv.Atoi(aux.N) |
if err != nil { |
- return statusError{errors.New("invalid build_number"), http.StatusBadRequest} |
+ return 0, io.MultiReader(&buf, dec.Buffered()), ErrInvalidBuildNumber |
} |
bn = n |
} |
+ return bn, io.MultiReader(&buf, dec.Buffered()), nil |
+} |
+ |
+// uploadTestFile creates a new TestFile from the values in context |
+// and supplied data, and puts it to the datastore. |
+func uploadTestFile(c context.Context, data io.Reader, filename string) error { |
+ bn, data, err := extractBuildNumber(data) |
+ if err != nil { |
+ if err == ErrInvalidBuildNumber { |
+ return statusError{err, http.StatusBadRequest} |
+ } |
+ return statusError{err, http.StatusInternalServerError} |
+ } |
+ |
p := GetUploadParams(c) |
tf := model.TestFile{ |
Master: p.Master, |
@@ -185,12 +251,13 @@ func uploadTestFile(c context.Context, data io.Reader, filename string) error { |
TestType: p.TestType, |
BuildNumber: model.BuildNum(bn), |
Name: filename, |
- Data: io.MultiReader(&buf, dec.Buffered()), |
+ Data: data, |
} |
if err := tf.PutData(c); err != nil { |
return statusError{err, http.StatusInternalServerError} |
} |
- return nil |
+ |
+ return datastore.Get(c).Put(&tf) |
} |
// updateFullResults puts the supplied data as "full_results.json" |
@@ -209,35 +276,18 @@ func updateFullResults(c context.Context, data io.Reader) error { |
return statusError{err, http.StatusBadRequest} |
} |
- wg := sync.WaitGroup{} |
- errCh := make(chan error, 2) |
- |
- wg.Add(1) |
- go func() { |
- defer wg.Done() |
- errCh <- uploadTestFile( |
- c, io.MultiReader(buf, dec.Buffered()), "full_results.json", |
- ) |
- }() |
- |
- wg.Add(1) |
- go func() { |
- defer wg.Done() |
- incr, err := f.AggregateResult() |
- if err != nil { |
- errCh <- statusError{err, http.StatusBadRequest} |
- return |
- } |
- errCh <- updateIncremental(c, &incr) |
- }() |
+ if err := uploadTestFile(c, io.MultiReader(buf, dec.Buffered()), "full_results.json"); err != nil { |
+ return statusError{err, http.StatusInternalServerError} |
+ } |
- wg.Wait() |
- close(errCh) |
- for err := range errCh { |
- if err != nil { |
- return err |
- } |
+ incr, err := f.AggregateResult() |
+ if err != nil { |
+ return statusError{err, http.StatusBadRequest} |
+ } |
+ if err := updateIncremental(c, &incr); err != nil { |
+ return statusError{err, http.StatusInternalServerError} |
} |
+ |
return nil |
} |
@@ -262,10 +312,12 @@ func updateIncremental(c context.Context, incr *model.AggregateResult) error { |
wg := sync.WaitGroup{} |
for i, name := range names { |
- i, name := i, name |
+ i, name, p := i, name, p |
wg.Add(1) |
+ |
go func() { |
defer wg.Done() |
+ p.Name = name |
tf, err := getTestFileAlt(c, p, u.DeprecatedMaster) |
if err != nil { |
if _, ok := err.(ErrNoMatches); ok { |
@@ -276,20 +328,22 @@ func updateIncremental(c context.Context, incr *model.AggregateResult) error { |
BuildNumber: -1, |
Name: name, |
} |
- } else { |
- files[i].err = err |
+ return |
} |
+ files[i].err = err |
return |
} |
- files[i].tf = tf |
if err := tf.GetData(c); err != nil { |
files[i].err = err |
return |
} |
- if err := json.NewDecoder(tf.Data).Decode(files[i].aggr); err != nil { |
+ var a model.AggregateResult |
+ if err := json.NewDecoder(tf.Data).Decode(&a); err != nil { |
files[i].err = err |
return |
} |
+ files[i].tf = tf |
+ files[i].aggr = &a |
}() |
} |
@@ -300,10 +354,10 @@ func updateIncremental(c context.Context, incr *model.AggregateResult) error { |
} |
} |
- wg = sync.WaitGroup{} |
- errs := make([]error, len(files)) |
- |
return datastore.Get(c).RunInTransaction(func(c context.Context) error { |
+ wg = sync.WaitGroup{} |
+ errs := make([]error, len(files)) |
+ |
for i, file := range files { |
i, file := i, file |
wg.Add(1) |
@@ -356,7 +410,7 @@ func getTestFileAlt(c context.Context, p model.TestFileParams, altMaster string) |
} |
// updateAggregate updates tf with the result of merging incr into |
-// aggr. |
+// aggr, and updates tf in datastore. |
func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.AggregateResult) error { |
if !model.IsAggregateTestFile(tf.Name) { |
return errors.New("frontend: tf should be an aggregate test file") |
@@ -396,5 +450,49 @@ func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.Ag |
return statusError{err, http.StatusInternalServerError} |
} |
+ if err := datastore.Get(c).Put(tf); err != nil { |
+ return statusError{err, http.StatusInternalServerError} |
+ } |
+ if err := deleteKeys(c, tf.OldDataKeys); err != nil { |
+ logging.Fields{ |
+ logging.ErrorKey: err, |
+ "keys": tf.OldDataKeys, |
+ }.Errorf(c, "upload: failed to delete keys") |
+ } |
return nil |
} |
+ |
+func deleteKeys(c context.Context, k []*datastore.Key) error { |
+ if len(k) == 0 { |
+ return nil |
+ } |
+ |
+ keys := make([]string, 0, len(k)) |
+ for _, key := range k { |
+ keys = append(keys, key.Encode()) |
+ } |
+ |
+ payload, err := json.Marshal(struct { |
+ Keys []string `json:"keys"` |
+ }{ |
+ keys, |
+ }) |
+ if err != nil { |
+ return err |
+ } |
+ |
+ h := make(http.Header) |
+ h.Set("Content-Type", "application/json") |
+ |
+ logging.Fields{ |
+ "keys": keys, |
+ }.Infof(c, "deleteKeys: enqueing") |
+ |
+ return taskqueue.Get(c).Add(&taskqueue.Task{ |
+ Path: "/internal/delete", |
+ Payload: payload, |
+ Header: h, |
+ Method: "POST", |
+ Delay: time.Duration(30) * time.Minute, |
+ }, deleteKeysQueueName) |
+} |