| OLD | NEW |
| (Empty) | |
| 1 package buildbucket |
| 2 |
| 3 import ( |
| 4 "encoding/json" |
| 5 "errors" |
| 6 "net/http" |
| 7 "strings" |
| 8 |
| 9 "golang.org/x/net/context" |
| 10 |
| 11 bucketApi "github.com/luci/luci-go/common/api/buildbucket/buildbucket/v1
" |
| 12 "github.com/luci/luci-go/common/logging" |
| 13 "github.com/luci/luci-go/common/retry/transient" |
| 14 "github.com/luci/luci-go/common/tsmon/field" |
| 15 "github.com/luci/luci-go/common/tsmon/metric" |
| 16 "github.com/luci/luci-go/milo/common" |
| 17 "github.com/luci/luci-go/server/router" |
| 18 ) |
| 19 |
| 20 var ( |
| 21 buildCounter = metric.NewCounter( |
| 22 "luci/milo/buildbucket_pubsub/builds", |
| 23 "The number of buildbucket builds received by Milo from PubSub", |
| 24 nil, |
| 25 field.String("bucket"), |
| 26 // True for luci build, False for non-luci (ie buildbot) build. |
| 27 field.Bool("luci"), |
| 28 // Status can be "COMPLETED", "SCHEDULED", or "STARTED" |
| 29 field.String("status"), |
| 30 // Action can be one of 3 options. "New", "Replaced", "Rejected
". |
| 31 field.String("action")) |
| 32 ) |
| 33 |
| 34 var ( |
| 35 errNoLogLocation = errors.New("log_location tag not found") |
| 36 errNoProject = errors.New("project tag not found") |
| 37 ) |
| 38 |
| 39 type parameters struct { |
| 40 builderName string `json:"builder_name"` |
| 41 properties string `json:"properties"` |
| 42 } |
| 43 |
| 44 func isLUCI(build *bucketApi.ApiCommonBuildMessage) bool { |
| 45 // All luci buckets are assumed to be prefixed with luci. |
| 46 return strings.HasPrefix(build.Bucket, "luci.") |
| 47 } |
| 48 |
| 49 // PubSubHandler is a webhook that stores the builds coming in from pubsub. |
| 50 func PubSubHandler(ctx *router.Context) { |
| 51 err := pubSubHandlerImpl(ctx.Context, ctx.Request) |
| 52 if err != nil { |
| 53 logging.WithError(err).Errorf(ctx.Context, "error while updating
buildbucket") |
| 54 } |
| 55 if transient.Tag.In(err) { |
| 56 // Transient errors are 500 so that PubSub retries them. |
| 57 ctx.Writer.WriteHeader(http.StatusInternalServerError) |
| 58 } else { |
| 59 // No errors or non-transient errors are 200s so that PubSub doe
s not retry |
| 60 // them. |
| 61 ctx.Writer.WriteHeader(http.StatusOK) |
| 62 } |
| 63 |
| 64 } |
| 65 |
| 66 func handlePubSubBuild(c context.Context, build *bucketApi.ApiCommonBuildMessage
) error { |
| 67 if err := buildCounter.Add( |
| 68 c, 1, build.Bucket, isLUCI(build), build.Status, "New"); err !=
nil { |
| 69 logging.WithError(err).Warningf(c, "Failed to send metric") |
| 70 } |
| 71 logging.Debugf(c, "Received build %#v", build) |
| 72 // TODO(hinoka): Save this into datastore. |
| 73 return nil |
| 74 } |
| 75 |
| 76 // This returns 500 (Internal Server Error) if it encounters a transient error, |
| 77 // and returns 200 (OK) if everything is OK, or if it encounters a permanent err
or. |
| 78 func pubSubHandlerImpl(c context.Context, r *http.Request) error { |
| 79 var data struct { |
| 80 Build bucketApi.ApiCommonBuildMessage |
| 81 Hostname string |
| 82 } |
| 83 |
| 84 msg := common.PubSubSubscription{} |
| 85 defer r.Body.Close() |
| 86 dec := json.NewDecoder(r.Body) |
| 87 if err := dec.Decode(&msg); err != nil { |
| 88 logging.WithError(err).Errorf(c, "could not decode message:\n%s"
, r.Body) |
| 89 // This might be a transient error, e.g. when the json format ch
anges |
| 90 // and Milo isn't updated yet. |
| 91 return transient.Tag.Apply(err) |
| 92 } |
| 93 bData, err := msg.GetData() |
| 94 if err != nil { |
| 95 logging.WithError(err).Errorf(c, "could not parse pubsub message
string") |
| 96 return err |
| 97 } |
| 98 if err := json.Unmarshal(bData, &data); err != nil { |
| 99 logging.WithError(err).Errorf(c, "could not parse pubsub message
data") |
| 100 return err |
| 101 } |
| 102 |
| 103 return handlePubSubBuild(c, &data.Build) |
| 104 } |
| OLD | NEW |