Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(174)

Side by Side Diff: milo/build_source/buildbucket/pubsub.go

Issue 2955223002: Milo: Buildbucket PubSub ingestion outline (Closed)
Patch Set: Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 package buildbucket
2
3 import (
4 "encoding/json"
5 "net/http"
6 "strings"
7
8 "golang.org/x/net/context"
9
10 bucketApi "github.com/luci/luci-go/common/api/buildbucket/buildbucket/v1 "
11 "github.com/luci/luci-go/common/logging"
12 "github.com/luci/luci-go/common/tsmon/field"
13 "github.com/luci/luci-go/common/tsmon/metric"
14 "github.com/luci/luci-go/milo/common"
15 "github.com/luci/luci-go/server/router"
16 )
17
18 var (
19 buildCounter = metric.NewCounter(
20 "luci/milo/buildbucket_pubsub/builds",
21 "The number of buildbucket builds received by Milo from PubSub",
22 nil,
23 field.String("bucket"),
24 // True for luci build, False for non-luci (ie buildbot) build.
25 field.Bool("luci"),
26 // Status can be "COMPLETED", "SCHEDULED", or "STARTED"
27 field.String("status"),
28 // Action can be one of 3 options. "New", "Replaced", "Rejected ".
29 field.String("action"))
30 )
31
32 type transientError error
33 type permanenterror error
34
35 func isLUCI(build *bucketApi.ApiCommonBuildMessage) bool {
36 // All luci buckets are assumed to be prefixed with luci.
37 return strings.HasPrefix(build.Bucket, "luci.")
38 }
39
40 // PubSubHandler is a webhook that stores the builds coming in from pubsub.
41 func PubSubHandler(ctx *router.Context) {
42 statusCode := pubSubHandlerImpl(ctx.Context, ctx.Request)
43 ctx.Writer.WriteHeader(statusCode)
44 }
45
46 func handlePubSubBuild(c context.Context, build *bucketApi.ApiCommonBuildMessage ) error {
47 if err := buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Stat us, "New"); err != nil {
48 logging.WithError(err).Warningf(c, "Failed to send metric")
49 }
50 logging.Debugf(c, "Received build %#v", build)
51 // TODO(hinoka): Save this into datastore.
52 return nil
53 }
54
55 func pubSubHandlerImpl(c context.Context, r *http.Request) int {
56 var data struct {
57 Build bucketApi.ApiCommonBuildMessage
58 Hostname string
59 }
60
61 msg := common.PubSubSubscription{}
62 defer r.Body.Close()
63 dec := json.NewDecoder(r.Body)
64 if err := dec.Decode(&msg); err != nil {
65 logging.WithError(err).Errorf(
66 c, "could not decode message. %s", err)
67 return http.StatusOK // This is a hard failure, we don't want Pu bSub to retry.
68 }
69 bData, err := msg.GetData()
70 if err != nil {
71 logging.WithError(err).Errorf(c, "could not parse pubsub message string")
72 return http.StatusOK // This is a hard failure, we don't want Pu bSub to retry.
73 }
74 if err := json.Unmarshal(bData, &data); err != nil {
75 logging.WithError(err).Errorf(c, "could not parse pubsub message data")
76 return http.StatusOK // This is a hard failure, we don't want Pu bSub to retry.
77 }
78
79 if err := handlePubSubBuild(c, &data.Build); err != nil {
80 switch err.(type) {
81 case transientError:
82 return http.StatusInternalServerError
83 case permanenterror:
84 return http.StatusOK
85 }
86 }
87
88 return http.StatusOK
89 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698