Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(505)

Side by Side Diff: milo/buildsource/buildbucket/pubsub.go

Issue 2964143002: Buildbucket: Save buildbucket build info and summary on pubsub push (Closed)
Patch Set: comment Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2017 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
1 package buildbucket 5 package buildbucket
2 6
3 import ( 7 import (
4 "encoding/json" 8 "encoding/json"
5 "errors" 9 "errors"
10 "fmt"
6 "net/http" 11 "net/http"
7 "strings" 12 "strings"
8 13
9 "golang.org/x/net/context" 14 "golang.org/x/net/context"
10 15
16 "github.com/luci/gae/service/datastore"
11 bucketApi "github.com/luci/luci-go/common/api/buildbucket/buildbucket/v1 " 17 bucketApi "github.com/luci/luci-go/common/api/buildbucket/buildbucket/v1 "
18 "github.com/luci/luci-go/common/clock"
12 "github.com/luci/luci-go/common/logging" 19 "github.com/luci/luci-go/common/logging"
13 "github.com/luci/luci-go/common/retry/transient" 20 "github.com/luci/luci-go/common/retry/transient"
14 "github.com/luci/luci-go/common/tsmon/field" 21 "github.com/luci/luci-go/common/tsmon/field"
15 "github.com/luci/luci-go/common/tsmon/metric" 22 "github.com/luci/luci-go/common/tsmon/metric"
23 "github.com/luci/luci-go/milo/api/resp"
24 "github.com/luci/luci-go/milo/buildsource/swarming"
16 "github.com/luci/luci-go/milo/common" 25 "github.com/luci/luci-go/milo/common"
26 "github.com/luci/luci-go/milo/common/model"
17 "github.com/luci/luci-go/server/router" 27 "github.com/luci/luci-go/server/router"
18 ) 28 )
19 29
20 var ( 30 var (
21 buildCounter = metric.NewCounter( 31 buildCounter = metric.NewCounter(
22 "luci/milo/buildbucket_pubsub/builds", 32 "luci/milo/buildbucket_pubsub/builds",
23 "The number of buildbucket builds received by Milo from PubSub", 33 "The number of buildbucket builds received by Milo from PubSub",
24 nil, 34 nil,
25 field.String("bucket"), 35 field.String("bucket"),
26 // True for luci build, False for non-luci (ie buildbot) build. 36 // True for luci build, False for non-luci (ie buildbot) build.
27 field.Bool("luci"), 37 field.Bool("luci"),
28 // Status can be "COMPLETED", "SCHEDULED", or "STARTED" 38 // Status can be "COMPLETED", "SCHEDULED", or "STARTED"
29 field.String("status"), 39 field.String("status"),
30 // Action can be one of 3 options. "New", "Replaced", "Rejected ". 40 // Action can be one of 3 options. "New", "Replaced", "Rejected ".
31 field.String("action")) 41 field.String("action"))
32 ) 42 )
33 43
44 type psMsg struct {
45 Build bucketApi.ApiCommonBuildMessage
46 Hostname string
47 }
48
34 var ( 49 var (
35 errNoLogLocation = errors.New("log_location tag not found") 50 errNoLogLocation = errors.New("log_location tag not found")
36 errNoProject = errors.New("project tag not found") 51 errNoProject = errors.New("project tag not found")
37 ) 52 )
38 53
39 type parameters struct { 54 type parameters struct {
40 builderName string `json:"builder_name"` 55 builderName string `json:"builder_name"`
41 properties string `json:"properties"` 56 properties string `json:"properties"`
42 } 57 }
43 58
(...skipping 12 matching lines...) Expand all
56 // Transient errors are 500 so that PubSub retries them. 71 // Transient errors are 500 so that PubSub retries them.
57 ctx.Writer.WriteHeader(http.StatusInternalServerError) 72 ctx.Writer.WriteHeader(http.StatusInternalServerError)
58 } else { 73 } else {
59 // No errors or non-transient errors are 200s so that PubSub doe s not retry 74 // No errors or non-transient errors are 200s so that PubSub doe s not retry
60 // them. 75 // them.
61 ctx.Writer.WriteHeader(http.StatusOK) 76 ctx.Writer.WriteHeader(http.StatusOK)
62 } 77 }
63 78
64 } 79 }
65 80
66 func handlePubSubBuild(c context.Context, build *bucketApi.ApiCommonBuildMessage ) error { 81 func maybeGetBuild(
67 » if err := buildCounter.Add( 82 » c context.Context,
68 » » c, 1, build.Bucket, isLUCI(build), build.Status, "New"); err != nil { 83 » build *bucketApi.ApiCommonBuildMessage) (
69 » » logging.WithError(err).Warningf(c, "Failed to send metric") 84 » *resp.MiloBuild, error) {
iannucci 2017/07/11 21:51:20 one line?
Ryan Tseng 2017/07/11 23:09:22 Done.
85
86 » // Hasn't started yet, so definitely no buildinfo ready yet.
87 » if build.Status == "SCHEDULED" {
88 » » return nil, nil
70 } 89 }
71 » logging.Debugf(c, "Received build %#v", build) 90 » tags := ParseTags(build.Tags)
72 » // TODO(hinoka): Save this into datastore. 91 » var host, task string
73 » return nil 92 » var ok bool
93 » if host, ok = tags["swarming_hostname"]; !ok {
94 » » return nil, errors.New("no swarming hostname tag")
95 » }
96 » if task, ok = tags["swarming_task_id"]; !ok {
97 » » return nil, errors.New("no swarming task id")
98 » }
99 » swarmingSvc, err := swarming.NewProdService(c, host)
iannucci 2017/07/11 21:51:20 I assume you'll change this in a subsequent CL?
Ryan Tseng 2017/07/11 23:09:22 Yep
100 » if err != nil {
101 » » return nil, err
102 » }
103 » bl := swarming.BuildLoader{}
104 » // The linkBase is not necessary for the summary.
105 » return bl.SwarmingBuildImpl(c, swarmingSvc, "", task)
106 }
107
108 // processBuild queries swarming and logdog for annotation data, then adds or
109 // updates a buildEntry in datastore.
110 func processBuild(
111 » c context.Context, host string, build *bucketApi.ApiCommonBuildMessage) (
112 » *buildEntry, error) {
113
114 » now := clock.Now(c).UTC()
115 » entry := buildEntry{key: buildEntryKey(host, build.Id)}
116
117 » err := datastore.Get(c)
118 » switch err {
119 » case datastore.ErrNoSuchEntity:
120 » » logging.Infof(c, "%s does not exist, will create", entry.key)
121 » » entry.created = now
122 » case nil:
123 » » // continue
124 » default:
125 » » return nil, err
126 » }
127
128 » // If the build is running, try to get the annotation data.
129 » respBuild, err := maybeGetBuild(c, build)
130 » if err != nil {
131 » » return nil, err
132 » }
133 » entry.respBuild = respBuild
134
135 » entry.modified = now
136 » entry.buildbucketData, err = json.Marshal(build)
137 » if err != nil {
138 » » return nil, err
139 » }
140
141 » err = datastore.Put(c, &entry)
142 » return &entry, err
143 }
144
145 // createBuildSummary creates or updates a build summary based off a buildbucket
iannucci 2017/07/11 21:51:20 this is actually putting the build summary, not ju
Ryan Tseng 2017/07/11 23:09:22 renamed.
146 // build entry.
147 func createBuildSummary(
148 » c context.Context, key *datastore.Key, builderName string,
149 » entry *buildEntry) error {
150
151 » build, err := entry.getBuild()
152 » if err != nil {
153 » » return err
154 » }
155 » status, err := parseStatus(build)
156 » if err != nil {
157 » » return err
158 » }
159 » // TODO(hinoka): Console related items.
160 » bs := model.BuildSummary{
161 » » BuildKey: key,
162 » » BuilderID: fmt.Sprintf("buildbucket/%s/%s", build.Bucket, builde rName),
163 » » Created: parseTimestamp(build.CreatedTs),
164 » » Summary: model.Summary{
165 » » » Status: status,
166 » » » Start: parseTimestamp(build.StartedTs),
167 » » },
168 » }
169 » if entry.respBuild != nil {
170 » » // Add info from the respBuild into the build summary if we have the data.
171 » » entry.respBuild.SummarizeTo(&bs)
172 » }
173 » logging.Debugf(c, "Created build summary: %#v", bs)
174 » // Make datastore flakes transient errors
175 » return transient.Tag.Apply(datastore.Put(c, &bs))
176 }
177
178 func handlePubSubBuild(c context.Context, data *psMsg) error {
179 » host := data.Hostname
180 » build := &data.Build
181 » p := parameters{}
182 » err := json.Unmarshal([]byte(build.ParametersJson), &p)
183 » if err != nil {
184 » » logging.WithError(err).Errorf(c, "could not unmarshal build para meters")
185 » » buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status , "Rejected")
186 » » // Permanent error, since this is probably a type of build we do not recognize.
187 » » return err
188 » }
189 » logging.Debugf(c, "Received from %s: build %s/%s (%s)\n%s",
190 » » host, build.Bucket, p.builderName, build.Status, build)
191 » if !isLUCI(build) {
192 » » logging.Infof(c, "This is not a luci build, ignoring")
193 » » buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status , "Rejected")
194 » » return nil
195 » }
196
197 » buildEntry, err := processBuild(c, host, build)
198 » if err != nil {
199 » » logging.WithError(err).Errorf(c, "failed to update build")
200 » » buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status , "Rejected")
201 » » // Probably a datastore or network flake, make this into a trans ient error
202 » » return transient.Tag.Apply(err)
203 » }
204 » action := "Created"
205 » if buildEntry.created != buildEntry.modified {
206 » » action = "Modified"
207 » }
208 » buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status, action )
209
210 » return createBuildSummary(
211 » » c, datastore.MakeKey(c, "buildEntry", buildEntry.key), p.builder Name, buildEntry)
74 } 212 }
75 213
76 // This returns 500 (Internal Server Error) if it encounters a transient error, 214 // This returns 500 (Internal Server Error) if it encounters a transient error,
77 // and returns 200 (OK) if everything is OK, or if it encounters a permanent err or. 215 // and returns 200 (OK) if everything is OK, or if it encounters a permanent err or.
78 func pubSubHandlerImpl(c context.Context, r *http.Request) error { 216 func pubSubHandlerImpl(c context.Context, r *http.Request) error {
79 » var data struct { 217 » var data psMsg
80 » » Build bucketApi.ApiCommonBuildMessage
81 » » Hostname string
82 » }
83 218
84 msg := common.PubSubSubscription{} 219 msg := common.PubSubSubscription{}
85 defer r.Body.Close() 220 defer r.Body.Close()
86 dec := json.NewDecoder(r.Body) 221 dec := json.NewDecoder(r.Body)
87 if err := dec.Decode(&msg); err != nil { 222 if err := dec.Decode(&msg); err != nil {
88 logging.WithError(err).Errorf(c, "could not decode message:\n%s" , r.Body) 223 logging.WithError(err).Errorf(c, "could not decode message:\n%s" , r.Body)
89 // This might be a transient error, e.g. when the json format ch anges 224 // This might be a transient error, e.g. when the json format ch anges
90 // and Milo isn't updated yet. 225 // and Milo isn't updated yet.
91 return transient.Tag.Apply(err) 226 return transient.Tag.Apply(err)
92 } 227 }
93 bData, err := msg.GetData() 228 bData, err := msg.GetData()
94 if err != nil { 229 if err != nil {
95 logging.WithError(err).Errorf(c, "could not parse pubsub message string") 230 logging.WithError(err).Errorf(c, "could not parse pubsub message string")
96 return err 231 return err
97 } 232 }
98 if err := json.Unmarshal(bData, &data); err != nil { 233 if err := json.Unmarshal(bData, &data); err != nil {
99 logging.WithError(err).Errorf(c, "could not parse pubsub message data") 234 logging.WithError(err).Errorf(c, "could not parse pubsub message data")
100 return err 235 return err
101 } 236 }
102 237
103 » return handlePubSubBuild(c, &data.Build) 238 » return handlePubSubBuild(c, &data)
104 } 239 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698