Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(90)

Unified Diff: go/src/infra/appengine/test-results/frontend/upload.go

Issue 2250043002: test-results: package frontend: Add delete keys task queue (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@xx_5
Patch Set: (Rebase) Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: go/src/infra/appengine/test-results/frontend/upload.go
diff --git a/go/src/infra/appengine/test-results/frontend/upload.go b/go/src/infra/appengine/test-results/frontend/upload.go
index 47637b108f05efb9d30ee7a97ec653ccf831d0fe..d184d25b5bc1cd2cc477147d22bf4a436a171fbb 100644
--- a/go/src/infra/appengine/test-results/frontend/upload.go
+++ b/go/src/infra/appengine/test-results/frontend/upload.go
@@ -7,14 +7,19 @@ import (
"io"
"mime/multipart"
"net/http"
+ "net/url"
"strconv"
"sync"
+ "time"
"golang.org/x/net/context"
"github.com/luci/gae/service/datastore"
+ "github.com/luci/gae/service/taskqueue"
+ "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/server/router"
+ "infra/appengine/test-results/builderstate"
"infra/appengine/test-results/model"
)
@@ -146,38 +151,99 @@ func doFileUpload(c context.Context, fh *multipart.FileHeader) error {
return statusError{err, http.StatusBadRequest}
}
return updateIncremental(c, &incr)
+
case "full_results.json":
- return updateFullResults(c, file)
+ bn, data, err := extractBuildNumber(file)
+ if err != nil {
+ if err == ErrInvalidBuildNumber {
+ return statusError{err, http.StatusBadRequest}
+ }
+ return statusError{err, http.StatusInternalServerError}
+ }
+ if err := updateFullResults(c, data); err != nil {
+ return err
+ }
+
+ p := GetUploadParams(c)
+ wg := sync.WaitGroup{}
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ if err := builderstate.Update(c, p.Master, p.Builder, p.TestType, time.Now().UTC()); err != nil {
+ logging.WithError(err).Errorf(c, "builderstate update")
+ }
+ }()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ if err := taskqueue.Get(c).Add(
+ taskqueue.NewPOSTTask("/internal/monitoring/upload", url.Values{
+ "master": {p.Master},
+ "builder": {p.Builder},
+ "build_number": {strconv.Itoa(bn)},
+ "test_type": {p.TestType},
+ }),
+ defaultQueueName,
+ ); err != nil {
+ logging.WithError(err).Errorf(c, "taskqueue add: /internal/monitoring/upload")
+ }
+ }()
+
+ wg.Wait()
+ return nil
+
default:
return uploadTestFile(c, file, fh.Filename)
}
}
-// uploadTestFile creates a new TestFile from the values in context
-// and supplied data, and puts it to the datastore.
-func uploadTestFile(c context.Context, data io.Reader, filename string) error {
+// ErrInvalidBuildNumber is returned when the extractBuildNumber fails
+// to convert the build number value to an int.
+var ErrInvalidBuildNumber = errors.New("invalid build_number: cannot convert to int")
+
+// extractBuildNumber extracts the value of "build_number" key from
+// the supplied JSON encoded data. The returned io.Reader will have
+// the same contents as the supplied io.Reader.
+//
+// The error is ErrInvalidBuildNumber if the build number value
+// could not be converted to an int.
+func extractBuildNumber(data io.Reader) (int, io.Reader, error) {
var buf bytes.Buffer
tee := io.TeeReader(data, &buf)
aux := struct {
N string `json:"build_number,omitempty"`
}{}
-
dec := json.NewDecoder(tee)
if err := dec.Decode(&aux); err != nil {
- return statusError{err, http.StatusInternalServerError}
+ return 0, io.MultiReader(&buf, dec.Buffered()), err
}
- bn := 0
-
+ var bn int
if aux.N != "" {
n, err := strconv.Atoi(aux.N)
if err != nil {
- return statusError{errors.New("invalid build_number"), http.StatusBadRequest}
+ return 0, io.MultiReader(&buf, dec.Buffered()), ErrInvalidBuildNumber
}
bn = n
}
+ return bn, io.MultiReader(&buf, dec.Buffered()), nil
+}
+
+// uploadTestFile creates a new TestFile from the values in context
+// and supplied data, and puts it to the datastore.
+func uploadTestFile(c context.Context, data io.Reader, filename string) error {
+ bn, data, err := extractBuildNumber(data)
+ if err != nil {
+ if err == ErrInvalidBuildNumber {
+ return statusError{err, http.StatusBadRequest}
+ }
+ return statusError{err, http.StatusInternalServerError}
+ }
+
p := GetUploadParams(c)
tf := model.TestFile{
Master: p.Master,
@@ -185,12 +251,13 @@ func uploadTestFile(c context.Context, data io.Reader, filename string) error {
TestType: p.TestType,
BuildNumber: model.BuildNum(bn),
Name: filename,
- Data: io.MultiReader(&buf, dec.Buffered()),
+ Data: data,
}
if err := tf.PutData(c); err != nil {
return statusError{err, http.StatusInternalServerError}
}
- return nil
+
+ return datastore.Get(c).Put(&tf)
}
// updateFullResults puts the supplied data as "full_results.json"
@@ -209,35 +276,18 @@ func updateFullResults(c context.Context, data io.Reader) error {
return statusError{err, http.StatusBadRequest}
}
- wg := sync.WaitGroup{}
- errCh := make(chan error, 2)
-
- wg.Add(1)
- go func() {
- defer wg.Done()
- errCh <- uploadTestFile(
- c, io.MultiReader(buf, dec.Buffered()), "full_results.json",
- )
- }()
-
- wg.Add(1)
- go func() {
- defer wg.Done()
- incr, err := f.AggregateResult()
- if err != nil {
- errCh <- statusError{err, http.StatusBadRequest}
- return
- }
- errCh <- updateIncremental(c, &incr)
- }()
+ if err := uploadTestFile(c, io.MultiReader(buf, dec.Buffered()), "full_results.json"); err != nil {
+ return statusError{err, http.StatusInternalServerError}
+ }
- wg.Wait()
- close(errCh)
- for err := range errCh {
- if err != nil {
- return err
- }
+ incr, err := f.AggregateResult()
+ if err != nil {
+ return statusError{err, http.StatusBadRequest}
+ }
+ if err := updateIncremental(c, &incr); err != nil {
+ return statusError{err, http.StatusInternalServerError}
}
+
return nil
}
@@ -262,10 +312,12 @@ func updateIncremental(c context.Context, incr *model.AggregateResult) error {
wg := sync.WaitGroup{}
for i, name := range names {
- i, name := i, name
+ i, name, p := i, name, p
wg.Add(1)
+
go func() {
defer wg.Done()
+ p.Name = name
tf, err := getTestFileAlt(c, p, u.DeprecatedMaster)
if err != nil {
if _, ok := err.(ErrNoMatches); ok {
@@ -276,20 +328,22 @@ func updateIncremental(c context.Context, incr *model.AggregateResult) error {
BuildNumber: -1,
Name: name,
}
- } else {
- files[i].err = err
+ return
}
+ files[i].err = err
return
}
- files[i].tf = tf
if err := tf.GetData(c); err != nil {
files[i].err = err
return
}
- if err := json.NewDecoder(tf.Data).Decode(files[i].aggr); err != nil {
+ var a model.AggregateResult
+ if err := json.NewDecoder(tf.Data).Decode(&a); err != nil {
files[i].err = err
return
}
+ files[i].tf = tf
+ files[i].aggr = &a
}()
}
@@ -300,10 +354,10 @@ func updateIncremental(c context.Context, incr *model.AggregateResult) error {
}
}
- wg = sync.WaitGroup{}
- errs := make([]error, len(files))
-
return datastore.Get(c).RunInTransaction(func(c context.Context) error {
+ wg = sync.WaitGroup{}
+ errs := make([]error, len(files))
+
for i, file := range files {
i, file := i, file
wg.Add(1)
@@ -356,7 +410,7 @@ func getTestFileAlt(c context.Context, p model.TestFileParams, altMaster string)
}
// updateAggregate updates tf with the result of merging incr into
-// aggr.
+// aggr, and updates tf in datastore.
func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.AggregateResult) error {
if !model.IsAggregateTestFile(tf.Name) {
return errors.New("frontend: tf should be an aggregate test file")
@@ -396,5 +450,49 @@ func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.Ag
return statusError{err, http.StatusInternalServerError}
}
+ if err := datastore.Get(c).Put(tf); err != nil {
+ return statusError{err, http.StatusInternalServerError}
+ }
+ if err := deleteKeys(c, tf.OldDataKeys); err != nil {
+ logging.Fields{
+ logging.ErrorKey: err,
+ "keys": tf.OldDataKeys,
+ }.Errorf(c, "upload: failed to delete keys")
+ }
return nil
}
+
+func deleteKeys(c context.Context, k []*datastore.Key) error {
+ if len(k) == 0 {
+ return nil
+ }
+
+ keys := make([]string, 0, len(k))
+ for _, key := range k {
+ keys = append(keys, key.Encode())
+ }
+
+ payload, err := json.Marshal(struct {
+ Keys []string `json:"keys"`
+ }{
+ keys,
+ })
+ if err != nil {
+ return err
+ }
+
+ h := make(http.Header)
+ h.Set("Content-Type", "application/json")
+
+ logging.Fields{
+ "keys": keys,
+ }.Infof(c, "deleteKeys: enqueing")
+
+ return taskqueue.Get(c).Add(&taskqueue.Task{
+ Path: "/internal/delete",
+ Payload: payload,
+ Header: h,
+ Method: "POST",
+ Delay: time.Duration(30) * time.Minute,
+ }, deleteKeysQueueName)
+}

Powered by Google App Engine
This is Rietveld 408576698