Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(700)

Unified Diff: milo/buildsource/buildbucket/pubsub.go

Issue 2964143002: Buildbucket: Save buildbucket build info and summary on pubsub push (Closed)
Patch Set: nits Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « milo/api/resp/build.go ('k') | milo/buildsource/buildbucket/struct.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: milo/buildsource/buildbucket/pubsub.go
diff --git a/milo/buildsource/buildbucket/pubsub.go b/milo/buildsource/buildbucket/pubsub.go
index b51460769a2994a736ea3850ccfb4d0e9f9059b4..a80bbfea35f68f57350fb3c55fae1f11bbadecf7 100644
--- a/milo/buildsource/buildbucket/pubsub.go
+++ b/milo/buildsource/buildbucket/pubsub.go
@@ -1,19 +1,29 @@
+// Copyright 2017 The LUCI Authors. All rights reserved.
+// Use of this source code is governed under the Apache License, Version 2.0
+// that can be found in the LICENSE file.
+
package buildbucket
import (
"encoding/json"
"errors"
+ "fmt"
"net/http"
"strings"
"golang.org/x/net/context"
+ "github.com/luci/gae/service/datastore"
bucketApi "github.com/luci/luci-go/common/api/buildbucket/buildbucket/v1"
+ "github.com/luci/luci-go/common/clock"
"github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/retry/transient"
"github.com/luci/luci-go/common/tsmon/field"
"github.com/luci/luci-go/common/tsmon/metric"
+ "github.com/luci/luci-go/milo/api/resp"
+ "github.com/luci/luci-go/milo/buildsource/swarming"
"github.com/luci/luci-go/milo/common"
+ "github.com/luci/luci-go/milo/common/model"
"github.com/luci/luci-go/server/router"
)
@@ -31,6 +41,11 @@ var (
field.String("action"))
)
+type psMsg struct {
+ Build bucketApi.ApiCommonBuildMessage
+ Hostname string
+}
+
var (
errNoLogLocation = errors.New("log_location tag not found")
errNoProject = errors.New("project tag not found")
@@ -63,23 +78,141 @@ func PubSubHandler(ctx *router.Context) {
}
-func handlePubSubBuild(c context.Context, build *bucketApi.ApiCommonBuildMessage) error {
- if err := buildCounter.Add(
- c, 1, build.Bucket, isLUCI(build), build.Status, "New"); err != nil {
- logging.WithError(err).Warningf(c, "Failed to send metric")
+func maybeGetBuild(
+ c context.Context, build *bucketApi.ApiCommonBuildMessage) (*resp.MiloBuild, error) {
+
+ // Hasn't started yet, so definitely no buildinfo ready yet.
+ if build.Status == "SCHEDULED" {
+ return nil, nil
+ }
+ tags := ParseTags(build.Tags)
+ var host, task string
+ var ok bool
+ if host, ok = tags["swarming_hostname"]; !ok {
+ return nil, errors.New("no swarming hostname tag")
+ }
+ if task, ok = tags["swarming_task_id"]; !ok {
+ return nil, errors.New("no swarming task id")
+ }
+ swarmingSvc, err := swarming.NewProdService(c, host)
+ if err != nil {
+ return nil, err
+ }
+ bl := swarming.BuildLoader{}
+ // The linkBase is not necessary for the summary.
+ return bl.SwarmingBuildImpl(c, swarmingSvc, "", task)
+}
+
+// processBuild queries swarming and logdog for annotation data, then adds or
+// updates a buildEntry in datastore.
+func processBuild(
+ c context.Context, host string, build *bucketApi.ApiCommonBuildMessage) (
+ *buildEntry, error) {
+
+ now := clock.Now(c).UTC()
+ entry := buildEntry{key: buildEntryKey(host, build.Id)}
+
+ err := datastore.Get(c)
+ switch err {
+ case datastore.ErrNoSuchEntity:
+ logging.Infof(c, "%s does not exist, will create", entry.key)
+ entry.created = now
+ case nil:
+ // continue
+ default:
+ return nil, err
+ }
+
+ // If the build is running, try to get the annotation data.
+ respBuild, err := maybeGetBuild(c, build)
+ if err != nil {
+ return nil, err
+ }
+ entry.respBuild = respBuild
+
+ entry.modified = now
+ entry.buildbucketData, err = json.Marshal(build)
+ if err != nil {
+ return nil, err
+ }
+
+ err = datastore.Put(c, &entry)
+ return &entry, err
+}
+
+// saveBuildSummary creates or updates a build summary based off a buildbucket
+// build entry.
+func saveBuildSummary(
+ c context.Context, key *datastore.Key, builderName string,
+ entry *buildEntry) error {
+
+ build, err := entry.getBuild()
+ if err != nil {
+ return err
+ }
+ status, err := parseStatus(build)
+ if err != nil {
+ return err
+ }
+ // TODO(hinoka): Console related items.
+ bs := model.BuildSummary{
+ BuildKey: key,
+ BuilderID: fmt.Sprintf("buildbucket/%s/%s", build.Bucket, builderName),
+ Created: parseTimestamp(build.CreatedTs),
+ Summary: model.Summary{
+ Status: status,
+ Start: parseTimestamp(build.StartedTs),
+ },
+ }
+ if entry.respBuild != nil {
+ // Add info from the respBuild into the build summary if we have the data.
+ entry.respBuild.SummarizeTo(&bs)
+ }
+ logging.Debugf(c, "Created build summary: %#v", bs)
+ // Make datastore flakes transient errors
+ return transient.Tag.Apply(datastore.Put(c, &bs))
+}
+
+func handlePubSubBuild(c context.Context, data *psMsg) error {
+ host := data.Hostname
+ build := &data.Build
+ p := parameters{}
+ err := json.Unmarshal([]byte(build.ParametersJson), &p)
+ if err != nil {
+ logging.WithError(err).Errorf(c, "could not unmarshal build parameters")
+ buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status, "Rejected")
+ // Permanent error, since this is probably a type of build we do not recognize.
+ return err
}
- logging.Debugf(c, "Received build %#v", build)
- // TODO(hinoka): Save this into datastore.
- return nil
+ logging.Debugf(c, "Received from %s: build %s/%s (%s)\n%s",
+ host, build.Bucket, p.builderName, build.Status, build)
+ if !isLUCI(build) {
+ logging.Infof(c, "This is not a luci build, ignoring")
+ buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status, "Rejected")
+ return nil
+ }
+
+ buildEntry, err := processBuild(c, host, build)
+ if err != nil {
+ logging.WithError(err).Errorf(c, "failed to update build")
+ buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status, "Rejected")
+ // Probably a datastore or network flake, make this into a transient error
+ return transient.Tag.Apply(err)
+ }
+ action := "Created"
+ if buildEntry.created != buildEntry.modified {
+ action = "Modified"
+ }
+ buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status, action)
+
+ return saveBuildSummary(
+ c, datastore.MakeKey(c, "buildEntry", buildEntry.key), p.builderName, buildEntry)
}
// This returns 500 (Internal Server Error) if it encounters a transient error,
// and returns 200 (OK) if everything is OK, or if it encounters a permanent error.
func pubSubHandlerImpl(c context.Context, r *http.Request) error {
- var data struct {
- Build bucketApi.ApiCommonBuildMessage
- Hostname string
- }
+ var data psMsg
msg := common.PubSubSubscription{}
defer r.Body.Close()
@@ -100,5 +233,5 @@ func pubSubHandlerImpl(c context.Context, r *http.Request) error {
return err
}
- return handlePubSubBuild(c, &data.Build)
+ return handlePubSubBuild(c, &data)
}
« no previous file with comments | « milo/api/resp/build.go ('k') | milo/buildsource/buildbucket/struct.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698