Chromium Code Reviews| Index: milo/build_source/buildbucket/pubsub.go |
| diff --git a/milo/build_source/buildbucket/pubsub.go b/milo/build_source/buildbucket/pubsub.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..157cbaef4004d3511e68a7a2a78dc224f0036210 |
| --- /dev/null |
| +++ b/milo/build_source/buildbucket/pubsub.go |
| @@ -0,0 +1,90 @@ |
| +package buildbucket |
| + |
| +import ( |
| + "encoding/json" |
| + "net/http" |
| + "strings" |
| + |
| + "golang.org/x/net/context" |
| + |
| + bucketApi "github.com/luci/luci-go/common/api/buildbucket/buildbucket/v1" |
| + "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/common/tsmon/field" |
| + "github.com/luci/luci-go/common/tsmon/metric" |
| + "github.com/luci/luci-go/milo/common" |
| + "github.com/luci/luci-go/server/router" |
| +) |
| + |
| +var ( |
| + buildCounter = metric.NewCounter( |
| + "luci/milo/buildbucket_pubsub/builds", |
| + "The number of buildbucket builds received by Milo from PubSub", |
| + nil, |
| + field.String("bucket"), |
| + // True for luci build, False for non-luci (ie buildbot) build. |
| + field.Bool("luci"), |
| + // Status can be "COMPLETED", "SCHEDULED", or "STARTED" |
| + field.String("status"), |
| + // Action can be one of 3 options. "New", "Replaced", "Rejected". |
| + field.String("action")) |
| +) |
| + |
| +type transientError error |
|
iannucci
2017/07/08 00:05:25
use transient.Tag instead, and ditch these error t
Ryan Tseng
2017/07/08 01:47:33
Done.
|
| +type permanenterror error |
| + |
| +func isLUCI(build *bucketApi.ApiCommonBuildMessage) bool { |
| + // All luci buckets are assumed to be prefixed with luci. |
| + return strings.HasPrefix(build.Bucket, "luci.") |
|
iannucci
2017/07/08 00:05:25
this seems hard-codey? what is this for?
Ryan Tseng
2017/07/08 01:47:33
To filter out buildbot builds. the luci-migration
|
| +} |
| + |
| +// PubSubHandler is a webhook that stores the builds coming in from pubsub. |
| +func PubSubHandler(ctx *router.Context) { |
| + statusCode := pubSubHandlerImpl(ctx.Context, ctx.Request) |
| + ctx.Writer.WriteHeader(statusCode) |
| +} |
| + |
| +func handlePubSubBuild(c context.Context, build *bucketApi.ApiCommonBuildMessage) error { |
| + if err := buildCounter.Add( |
| + c, 1, build.Bucket, isLUCI(build), build.Status, "New"); err != nil { |
| + logging.WithError(err).Warningf(c, "Failed to send metric") |
| + } |
| + logging.Debugf(c, "Received build %#v", build) |
| + // TODO(hinoka): Save this into datastore. |
|
iannucci
2017/07/08 00:05:25
:)
Ryan Tseng
2017/07/08 01:47:33
Sooooon™
|
| + return nil |
| +} |
| + |
| +func pubSubHandlerImpl(c context.Context, r *http.Request) int { |
|
iannucci
2017/07/08 00:05:25
lets have this return an error and sort out the st
Ryan Tseng
2017/07/08 01:47:33
There's basically just two failure modes that need
iannucci
2017/07/08 02:08:40
I was meaning that we could just tag the error and
|
| + var data struct { |
| + Build bucketApi.ApiCommonBuildMessage |
| + Hostname string |
| + } |
| + |
| + msg := common.PubSubSubscription{} |
| + defer r.Body.Close() |
| + dec := json.NewDecoder(r.Body) |
| + if err := dec.Decode(&msg); err != nil { |
| + logging.WithError(err).Errorf( |
| + c, "could not decode message. %s", err) |
| + return http.StatusOK // This is a hard failure, we don't want PubSub to retry. |
|
iannucci
2017/07/08 00:05:25
i.e. we should return an error tagged as 'badPaylo
Ryan Tseng
2017/07/08 01:47:33
Done.
|
| + } |
| + bData, err := msg.GetData() |
| + if err != nil { |
| + logging.WithError(err).Errorf(c, "could not parse pubsub message string") |
| + return http.StatusOK // This is a hard failure, we don't want PubSub to retry. |
| + } |
| + if err := json.Unmarshal(bData, &data); err != nil { |
| + logging.WithError(err).Errorf(c, "could not parse pubsub message data") |
| + return http.StatusOK // This is a hard failure, we don't want PubSub to retry. |
| + } |
| + |
| + if err := handlePubSubBuild(c, &data.Build); err != nil { |
| + switch err.(type) { |
| + case transientError: |
| + return http.StatusInternalServerError |
| + case permanenterror: |
| + return http.StatusOK |
| + } |
| + } |
| + |
| + return http.StatusOK |
| +} |