| Index: milo/build_source/buildbucket/pubsub.go
|
| diff --git a/milo/build_source/buildbucket/pubsub.go b/milo/build_source/buildbucket/pubsub.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..72cc66fac3d7fb1349b8999f0783b66289b8a021
|
| --- /dev/null
|
| +++ b/milo/build_source/buildbucket/pubsub.go
|
| @@ -0,0 +1,89 @@
|
| +package buildbucket
|
| +
|
| +import (
|
| + "encoding/json"
|
| + "net/http"
|
| + "strings"
|
| +
|
| + "golang.org/x/net/context"
|
| +
|
| + bucketApi "github.com/luci/luci-go/common/api/buildbucket/buildbucket/v1"
|
| + "github.com/luci/luci-go/common/logging"
|
| + "github.com/luci/luci-go/common/tsmon/field"
|
| + "github.com/luci/luci-go/common/tsmon/metric"
|
| + "github.com/luci/luci-go/milo/common"
|
| + "github.com/luci/luci-go/server/router"
|
| +)
|
| +
|
| +var (
|
| + buildCounter = metric.NewCounter(
|
| + "luci/milo/buildbucket_pubsub/builds",
|
| + "The number of buildbucket builds received by Milo from PubSub",
|
| + nil,
|
| + field.String("bucket"),
|
| + // True for luci build, False for non-luci (ie buildbot) build.
|
| + field.Bool("luci"),
|
| + // Status can be "COMPLETED", "SCHEDULED", or "STARTED"
|
| + field.String("status"),
|
| + // Action can be one of 3 options. "New", "Replaced", "Rejected".
|
| + field.String("action"))
|
| +)
|
| +
|
| +type transientError error
|
| +type permanenterror error
|
| +
|
| +func isLUCI(build *bucketApi.ApiCommonBuildMessage) bool {
|
| + // All luci buckets are assumed to be prefixed with luci.
|
| + return strings.HasPrefix(build.Bucket, "luci.")
|
| +}
|
| +
|
| +// PubSubHandler is a webhook that stores the builds coming in from pubsub.
|
| +func PubSubHandler(ctx *router.Context) {
|
| + statusCode := pubSubHandlerImpl(ctx.Context, ctx.Request)
|
| + ctx.Writer.WriteHeader(statusCode)
|
| +}
|
| +
|
| +func handlePubSubBuild(c context.Context, build *bucketApi.ApiCommonBuildMessage) error {
|
| + if err := buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status, "New"); err != nil {
|
| + logging.WithError(err).Warningf(c, "Failed to send metric")
|
| + }
|
| + logging.Debugf(c, "Received build %#v", build)
|
| + // TODO(hinoka): Save this into datastore.
|
| + return nil
|
| +}
|
| +
|
| +func pubSubHandlerImpl(c context.Context, r *http.Request) int {
|
| + var data struct {
|
| + Build bucketApi.ApiCommonBuildMessage
|
| + Hostname string
|
| + }
|
| +
|
| + msg := common.PubSubSubscription{}
|
| + defer r.Body.Close()
|
| + dec := json.NewDecoder(r.Body)
|
| + if err := dec.Decode(&msg); err != nil {
|
| + logging.WithError(err).Errorf(
|
| + c, "could not decode message. %s", err)
|
| + return http.StatusOK // This is a hard failure, we don't want PubSub to retry.
|
| + }
|
| + bData, err := msg.GetData()
|
| + if err != nil {
|
| + logging.WithError(err).Errorf(c, "could not parse pubsub message string")
|
| + return http.StatusOK // This is a hard failure, we don't want PubSub to retry.
|
| + }
|
| + if err := json.Unmarshal(bData, &data); err != nil {
|
| + logging.WithError(err).Errorf(c, "could not parse pubsub message data")
|
| + return http.StatusOK // This is a hard failure, we don't want PubSub to retry.
|
| + }
|
| +
|
| + if err := handlePubSubBuild(c, &data.Build); err != nil {
|
| + switch err.(type) {
|
| + case transientError:
|
| + return http.StatusInternalServerError
|
| + case permanenterror:
|
| + return http.StatusOK
|
| + }
|
| + }
|
| +
|
| + return http.StatusOK
|
| +}
|
|
|