Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(100)

Unified Diff: milo/build_source/buildbucket/pubsub.go

Issue 2955223002: Milo: Buildbucket PubSub ingestion outline (Closed)
Patch Set: Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: milo/build_source/buildbucket/pubsub.go
diff --git a/milo/build_source/buildbucket/pubsub.go b/milo/build_source/buildbucket/pubsub.go
new file mode 100644
index 0000000000000000000000000000000000000000..72cc66fac3d7fb1349b8999f0783b66289b8a021
--- /dev/null
+++ b/milo/build_source/buildbucket/pubsub.go
@@ -0,0 +1,89 @@
+package buildbucket
+
+import (
+ "encoding/json"
+ "net/http"
+ "strings"
+
+ "golang.org/x/net/context"
+
+ bucketApi "github.com/luci/luci-go/common/api/buildbucket/buildbucket/v1"
+ "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/tsmon/field"
+ "github.com/luci/luci-go/common/tsmon/metric"
+ "github.com/luci/luci-go/milo/common"
+ "github.com/luci/luci-go/server/router"
+)
+
+var (
+ buildCounter = metric.NewCounter(
+ "luci/milo/buildbucket_pubsub/builds",
+ "The number of buildbucket builds received by Milo from PubSub",
+ nil,
+ field.String("bucket"),
+ // True for luci build, False for non-luci (ie buildbot) build.
+ field.Bool("luci"),
+ // Status can be "COMPLETED", "SCHEDULED", or "STARTED"
+ field.String("status"),
+ // Action can be one of 3 options. "New", "Replaced", "Rejected".
+ field.String("action"))
+)
+
+type transientError error
+type permanenterror error
+
+func isLUCI(build *bucketApi.ApiCommonBuildMessage) bool {
+ // All luci buckets are assumed to be prefixed with luci.
+ return strings.HasPrefix(build.Bucket, "luci.")
+}
+
+// PubSubHandler is a webhook that stores the builds coming in from pubsub.
+func PubSubHandler(ctx *router.Context) {
+ statusCode := pubSubHandlerImpl(ctx.Context, ctx.Request)
+ ctx.Writer.WriteHeader(statusCode)
+}
+
+func handlePubSubBuild(c context.Context, build *bucketApi.ApiCommonBuildMessage) error {
+ if err := buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status, "New"); err != nil {
+ logging.WithError(err).Warningf(c, "Failed to send metric")
+ }
+ logging.Debugf(c, "Received build %#v", build)
+ // TODO(hinoka): Save this into datastore.
+ return nil
+}
+
+func pubSubHandlerImpl(c context.Context, r *http.Request) int {
+ var data struct {
+ Build bucketApi.ApiCommonBuildMessage
+ Hostname string
+ }
+
+ msg := common.PubSubSubscription{}
+ defer r.Body.Close()
+ dec := json.NewDecoder(r.Body)
+ if err := dec.Decode(&msg); err != nil {
+ logging.WithError(err).Errorf(
+ c, "could not decode message. %s", err)
+ return http.StatusOK // This is a hard failure, we don't want PubSub to retry.
+ }
+ bData, err := msg.GetData()
+ if err != nil {
+ logging.WithError(err).Errorf(c, "could not parse pubsub message string")
+ return http.StatusOK // This is a hard failure, we don't want PubSub to retry.
+ }
+ if err := json.Unmarshal(bData, &data); err != nil {
+ logging.WithError(err).Errorf(c, "could not parse pubsub message data")
+ return http.StatusOK // This is a hard failure, we don't want PubSub to retry.
+ }
+
+ if err := handlePubSubBuild(c, &data.Build); err != nil {
+ switch err.(type) {
+ case transientError:
+ return http.StatusInternalServerError
+ case permanenterror:
+ return http.StatusOK
+ }
+ }
+
+ return http.StatusOK
+}

Powered by Google App Engine
This is Rietveld 408576698