| OLD | NEW |
| (Empty) | |
| 1 package buildbucket |
| 2 |
| 3 import ( |
| 4 "encoding/json" |
| 5 "net/http" |
| 6 "strings" |
| 7 |
| 8 "golang.org/x/net/context" |
| 9 |
| 10 bucketApi "github.com/luci/luci-go/common/api/buildbucket/buildbucket/v1
" |
| 11 "github.com/luci/luci-go/common/logging" |
| 12 "github.com/luci/luci-go/common/tsmon/field" |
| 13 "github.com/luci/luci-go/common/tsmon/metric" |
| 14 "github.com/luci/luci-go/milo/common" |
| 15 "github.com/luci/luci-go/server/router" |
| 16 ) |
| 17 |
| 18 var ( |
| 19 buildCounter = metric.NewCounter( |
| 20 "luci/milo/buildbucket_pubsub/builds", |
| 21 "The number of buildbucket builds received by Milo from PubSub", |
| 22 nil, |
| 23 field.String("bucket"), |
| 24 // True for luci build, False for non-luci (ie buildbot) build. |
| 25 field.Bool("luci"), |
| 26 // Status can be "COMPLETED", "SCHEDULED", or "STARTED" |
| 27 field.String("status"), |
| 28 // Action can be one of 3 options. "New", "Replaced", "Rejected
". |
| 29 field.String("action")) |
| 30 ) |
| 31 |
| 32 type transientError error |
| 33 type permanenterror error |
| 34 |
| 35 func isLUCI(build *bucketApi.ApiCommonBuildMessage) bool { |
| 36 // All luci buckets are assumed to be prefixed with luci. |
| 37 return strings.HasPrefix(build.Bucket, "luci.") |
| 38 } |
| 39 |
| 40 // PubSubHandler is a webhook that stores the builds coming in from pubsub. |
| 41 func PubSubHandler(ctx *router.Context) { |
| 42 statusCode := pubSubHandlerImpl(ctx.Context, ctx.Request) |
| 43 ctx.Writer.WriteHeader(statusCode) |
| 44 } |
| 45 |
| 46 func handlePubSubBuild(c context.Context, build *bucketApi.ApiCommonBuildMessage
) error { |
| 47 if err := buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Stat
us, "New"); err != nil { |
| 48 logging.WithError(err).Warningf(c, "Failed to send metric") |
| 49 } |
| 50 logging.Debugf(c, "Received build %#v", build) |
| 51 // TODO(hinoka): Save this into datastore. |
| 52 return nil |
| 53 } |
| 54 |
| 55 func pubSubHandlerImpl(c context.Context, r *http.Request) int { |
| 56 var data struct { |
| 57 Build bucketApi.ApiCommonBuildMessage |
| 58 Hostname string |
| 59 } |
| 60 |
| 61 msg := common.PubSubSubscription{} |
| 62 defer r.Body.Close() |
| 63 dec := json.NewDecoder(r.Body) |
| 64 if err := dec.Decode(&msg); err != nil { |
| 65 logging.WithError(err).Errorf( |
| 66 c, "could not decode message. %s", err) |
| 67 return http.StatusOK // This is a hard failure, we don't want Pu
bSub to retry. |
| 68 } |
| 69 bData, err := msg.GetData() |
| 70 if err != nil { |
| 71 logging.WithError(err).Errorf(c, "could not parse pubsub message
string") |
| 72 return http.StatusOK // This is a hard failure, we don't want Pu
bSub to retry. |
| 73 } |
| 74 if err := json.Unmarshal(bData, &data); err != nil { |
| 75 logging.WithError(err).Errorf(c, "could not parse pubsub message
data") |
| 76 return http.StatusOK // This is a hard failure, we don't want Pu
bSub to retry. |
| 77 } |
| 78 |
| 79 if err := handlePubSubBuild(c, &data.Build); err != nil { |
| 80 switch err.(type) { |
| 81 case transientError: |
| 82 return http.StatusInternalServerError |
| 83 case permanenterror: |
| 84 return http.StatusOK |
| 85 } |
| 86 } |
| 87 |
| 88 return http.StatusOK |
| 89 } |
| OLD | NEW |