Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(7)

Unified Diff: go/src/infra/appengine/test-results/frontend/upload.go

Issue 2250043002: test-results: package frontend: Add delete keys task queue (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@xx_5
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: go/src/infra/appengine/test-results/frontend/upload.go
diff --git a/go/src/infra/appengine/test-results/frontend/upload.go b/go/src/infra/appengine/test-results/frontend/upload.go
index 47637b108f05efb9d30ee7a97ec653ccf831d0fe..fc17e50645145dacb361dcdf6e2c8a3fa90d6939 100644
--- a/go/src/infra/appengine/test-results/frontend/upload.go
+++ b/go/src/infra/appengine/test-results/frontend/upload.go
@@ -7,14 +7,19 @@ import (
"io"
"mime/multipart"
"net/http"
+ "net/url"
"strconv"
"sync"
+ "time"
"golang.org/x/net/context"
"github.com/luci/gae/service/datastore"
+ "github.com/luci/gae/service/taskqueue"
+ "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/server/router"
+ "infra/appengine/test-results/builderstate"
"infra/appengine/test-results/model"
)
@@ -146,38 +151,82 @@ func doFileUpload(c context.Context, fh *multipart.FileHeader) error {
return statusError{err, http.StatusBadRequest}
}
return updateIncremental(c, &incr)
+
case "full_results.json":
- return updateFullResults(c, file)
+ bn, data, err := extractBuildNumber(file)
+ if err != nil {
+ if err == ErrInvalidBuildNumber {
+ return statusError{err, http.StatusBadRequest}
+ }
+ return statusError{err, http.StatusInternalServerError}
+ }
+ if err := updateFullResults(c, data); err != nil {
+ return err
+ }
+
+ p := GetUploadParams(c)
+ go builderstate.Update(c, p.Master, p.Builder, p.TestType, time.Now().UTC())
Vadim Sh. 2016/08/16 18:56:13 you need to wait for these goroutines to finish so
nishanths 2016/08/16 20:35:13 Done.
+ go taskqueue.Get(c).Add(
+ taskqueue.NewPOSTTask("/internal/monitoring/upload", url.Values{
Vadim Sh. 2016/08/16 18:56:13 does this handler exist?
nishanths 2016/08/16 20:35:13 It exists in the python implementation.
+ "master": {p.Master},
+ "builder": {p.Builder},
+ "build_number": {strconv.Itoa(bn)},
+ "test_type": {p.TestType},
+ }),
+ defaultQueueName,
+ )
+ return nil
+
default:
return uploadTestFile(c, file, fh.Filename)
}
}
-// uploadTestFile creates a new TestFile from the values in context
-// and supplied data, and puts it to the datastore.
-func uploadTestFile(c context.Context, data io.Reader, filename string) error {
+// ErrInvalidBuildNumber is returned when the extractBuildNumber fails
+// to convert the build number value to an int.
+var ErrInvalidBuildNumber = errors.New("invalid build_number: cannot convert to int")
+
+// extractBuildNumber extracts the value of "build_number" key from
+// the supplied JSON encoded data. The returned io.Reader will have
+// the same contents as the supplied io.Reader.
+//
+// The error is ErrInvalidBuildNumber if the build number value
+// could not be converted to an int.
+func extractBuildNumber(data io.Reader) (int, io.Reader, error) {
var buf bytes.Buffer
tee := io.TeeReader(data, &buf)
aux := struct {
N string `json:"build_number,omitempty"`
}{}
-
dec := json.NewDecoder(tee)
if err := dec.Decode(&aux); err != nil {
- return statusError{err, http.StatusInternalServerError}
+ return 0, io.MultiReader(&buf, dec.Buffered()), err
}
- bn := 0
-
+ var bn int
if aux.N != "" {
n, err := strconv.Atoi(aux.N)
if err != nil {
- return statusError{errors.New("invalid build_number"), http.StatusBadRequest}
+ return 0, io.MultiReader(&buf, dec.Buffered()), ErrInvalidBuildNumber
}
bn = n
}
+ return bn, io.MultiReader(&buf, dec.Buffered()), nil
+}
+
+// uploadTestFile creates a new TestFile from the values in context
+// and supplied data, and puts it to the datastore.
+func uploadTestFile(c context.Context, data io.Reader, filename string) error {
+ bn, data, err := extractBuildNumber(data)
+ if err != nil {
+ if err == ErrInvalidBuildNumber {
+ return statusError{err, http.StatusBadRequest}
+ }
+ return statusError{err, http.StatusInternalServerError}
+ }
+
p := GetUploadParams(c)
tf := model.TestFile{
Master: p.Master,
@@ -185,12 +234,13 @@ func uploadTestFile(c context.Context, data io.Reader, filename string) error {
TestType: p.TestType,
BuildNumber: model.BuildNum(bn),
Name: filename,
- Data: io.MultiReader(&buf, dec.Buffered()),
+ Data: data,
}
if err := tf.PutData(c); err != nil {
return statusError{err, http.StatusInternalServerError}
}
- return nil
+
+ return datastore.Get(c).Put(&tf)
}
// updateFullResults puts the supplied data as "full_results.json"
@@ -215,9 +265,7 @@ func updateFullResults(c context.Context, data io.Reader) error {
wg.Add(1)
go func() {
defer wg.Done()
- errCh <- uploadTestFile(
- c, io.MultiReader(buf, dec.Buffered()), "full_results.json",
- )
+ errCh <- uploadTestFile(c, io.MultiReader(buf, dec.Buffered()), "full_results.json")
}()
wg.Add(1)
@@ -262,10 +310,12 @@ func updateIncremental(c context.Context, incr *model.AggregateResult) error {
wg := sync.WaitGroup{}
for i, name := range names {
- i, name := i, name
+ i, name, p := i, name, p
wg.Add(1)
+
go func() {
defer wg.Done()
+ p.Name = name
tf, err := getTestFileAlt(c, p, u.DeprecatedMaster)
if err != nil {
if _, ok := err.(ErrNoMatches); ok {
@@ -276,20 +326,22 @@ func updateIncremental(c context.Context, incr *model.AggregateResult) error {
BuildNumber: -1,
Name: name,
}
- } else {
- files[i].err = err
+ return
}
+ files[i].err = err
return
}
- files[i].tf = tf
if err := tf.GetData(c); err != nil {
files[i].err = err
return
}
- if err := json.NewDecoder(tf.Data).Decode(files[i].aggr); err != nil {
+ var a model.AggregateResult
+ if err := json.NewDecoder(tf.Data).Decode(&a); err != nil {
files[i].err = err
return
}
+ files[i].tf = tf
+ files[i].aggr = &a
}()
}
@@ -300,10 +352,10 @@ func updateIncremental(c context.Context, incr *model.AggregateResult) error {
}
}
- wg = sync.WaitGroup{}
- errs := make([]error, len(files))
-
return datastore.Get(c).RunInTransaction(func(c context.Context) error {
+ wg = sync.WaitGroup{}
+ errs := make([]error, len(files))
+
for i, file := range files {
i, file := i, file
wg.Add(1)
@@ -356,7 +408,7 @@ func getTestFileAlt(c context.Context, p model.TestFileParams, altMaster string)
}
// updateAggregate updates tf with the result of merging incr into
-// aggr.
+// aggr, and updates tf in datastore.
func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.AggregateResult) error {
if !model.IsAggregateTestFile(tf.Name) {
return errors.New("frontend: tf should be an aggregate test file")
@@ -396,5 +448,49 @@ func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.Ag
return statusError{err, http.StatusInternalServerError}
}
+ if err := datastore.Get(c).Put(tf); err != nil {
+ return err
Vadim Sh. 2016/08/16 18:56:13 statusError{err, http.StatusInternalServerError} ?
nishanths 2016/08/16 20:35:13 Done.
+ }
+ if err := deleteKeys(c, tf.OldDataKeys); err != nil {
Vadim Sh. 2016/08/16 18:56:13 ignoring this error?
nishanths 2016/08/16 20:35:13 yes, only logging the error. Returning an HTTP e
+ logging.Fields{
+ logging.ErrorKey: err,
+ "keys": tf.OldDataKeys,
+ }.Errorf(c, "upload: failed to delete keys")
+ }
return nil
}
+
+func deleteKeys(c context.Context, k []*datastore.Key) error {
+ if len(k) == 0 {
+ return nil
+ }
+
+ keys := make([]string, 0, len(k))
+ for _, key := range k {
+ keys = append(keys, key.Encode())
+ }
+
+ payload, err := json.Marshal(struct {
+ Keys []string `json:"keys"`
+ }{
+ keys,
+ })
+ if err != nil {
+ return err
+ }
+
+ h := make(http.Header)
+ h.Set("Content-Type", "application/json")
+
+ logging.Fields{
+ "keys": keys,
+ }.Infof(c, "deleteKeys: enqueing")
+
+ return taskqueue.Get(c).Add(&taskqueue.Task{
+ Path: "/internal/delete",
+ Payload: payload,
+ Header: h,
+ Method: "POST",
+ Delay: time.Duration(30) * time.Minute,
+ }, deleteKeysQueueName)
+}

Powered by Google App Engine
This is Rietveld 408576698