Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(37)

Side by Side Diff: milo/appengine/buildbot/pubsub.go

Issue 2944983003: [milo] {buildbucket,buildbot,swarming,logdog} -> backends/*. (Closed)
Patch Set: Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 package buildbot
6
7 import (
8 "bytes"
9 "compress/gzip"
10 "compress/zlib"
11 "encoding/base64"
12 "encoding/json"
13 "fmt"
14 "net/http"
15 "strings"
16 "time"
17
18 ds "github.com/luci/gae/service/datastore"
19 "github.com/luci/luci-go/common/clock"
20 "github.com/luci/luci-go/common/iotools"
21 "github.com/luci/luci-go/common/logging"
22 "github.com/luci/luci-go/milo/appengine/common"
23 "github.com/luci/luci-go/server/router"
24
25 "golang.org/x/net/context"
26
27 "github.com/luci/luci-go/common/tsmon/field"
28 "github.com/luci/luci-go/common/tsmon/metric"
29 )
30
31 var (
32 // Metrics
33 buildCounter = metric.NewCounter(
34 "luci/milo/buildbot_pubsub/builds",
35 "The number of buildbot builds received by Milo from PubSub",
36 nil,
37 field.Bool("internal"),
38 field.String("master"),
39 field.String("builder"),
40 field.Bool("finished"),
41 // Status can be one of 3 options. "New", "Replaced", "Rejected ".
42 field.String("status"))
43
44 masterCounter = metric.NewCounter(
45 "luci/milo/buildbot_pubsub/masters",
46 "The number of buildbot master jsons received by Milo from PubSu b",
47 nil,
48 field.Bool("internal"),
49 field.String("master"),
50 // Status can be one of 2 options. "success", "failure".
51 field.String("status"))
52 )
53
54 type pubSubMessage struct {
55 Attributes map[string]string `json:"attributes"`
56 Data string `json:"data"`
57 MessageID string `json:"message_id"`
58 }
59
60 type pubSubSubscription struct {
61 Message pubSubMessage `json:"message"`
62 Subscription string `json:"subscription"`
63 }
64
65 type buildMasterMsg struct {
66 Master *buildbotMaster `json:"master"`
67 Builds []*buildbotBuild `json:"builds"`
68 }
69
70 // buildbotMasterEntry is a container for a marshaled and packed buildbot
71 // master json.
72 type buildbotMasterEntry struct {
73 // Name of the buildbot master.
74 Name string `gae:"$id"`
75 // Internal
76 Internal bool
77 // Data is the json serialzed and gzipped blob of the master data.
78 Data []byte `gae:",noindex"`
79 // Modified is when this entry was last modified.
80 Modified time.Time
81 }
82
83 func putDSMasterJSON(
84 c context.Context, master *buildbotMaster, internal bool) error {
85 for _, builder := range master.Builders {
86 // Trim out extra info in the "Changes" portion of the pending b uild state,
87 // we don't actually need comments, files, and properties
88 for _, pbs := range builder.PendingBuildStates {
89 for i := range pbs.Source.Changes {
90 pbs.Source.Changes[i].Comments = ""
91 pbs.Source.Changes[i].Files = nil
92 pbs.Source.Changes[i].Properties = nil
93 }
94 }
95 }
96 entry := buildbotMasterEntry{
97 Name: master.Name,
98 Internal: internal,
99 Modified: clock.Now(c).UTC(),
100 }
101 gzbs := bytes.Buffer{}
102 gsw := gzip.NewWriter(&gzbs)
103 cw := iotools.CountingWriter{Writer: gsw}
104 e := json.NewEncoder(&cw)
105 if err := e.Encode(master); err != nil {
106 return err
107 }
108 gsw.Close()
109 entry.Data = gzbs.Bytes()
110 logging.Debugf(c, "Length of json data: %d", cw.Count)
111 logging.Debugf(c, "Length of gzipped data: %d", len(entry.Data))
112 return ds.Put(c, &entry)
113 }
114
115 // GetData returns the expanded form of Data (decoded from base64).
116 func (m *pubSubSubscription) GetData() ([]byte, error) {
117 return base64.StdEncoding.DecodeString(m.Message.Data)
118 }
119
120 // unmarshal a gzipped byte stream into a list of buildbot builds and masters.
121 func unmarshal(
122 c context.Context, msg []byte) ([]*buildbotBuild, *buildbotMaster, error ) {
123 bm := buildMasterMsg{}
124 if len(msg) == 0 {
125 return bm.Builds, bm.Master, nil
126 }
127 reader, err := zlib.NewReader(bytes.NewReader(msg))
128 if err != nil {
129 logging.WithError(err).Errorf(c, "gzip decompression error")
130 return nil, nil, err
131 }
132 defer reader.Close()
133 d := json.NewDecoder(reader)
134 if err = d.Decode(&bm); err != nil {
135 logging.WithError(err).Errorf(c, "could not unmarshal message")
136 return nil, nil, err
137 }
138 // Extract the builds out of master and append it onto builds.
139 if bm.Master != nil {
140 for _, slave := range bm.Master.Slaves {
141 if slave.RunningbuildsMap == nil {
142 slave.RunningbuildsMap = map[string][]int{}
143 }
144 for _, build := range slave.Runningbuilds {
145 build.Master = bm.Master.Name
146 bm.Builds = append(bm.Builds, build)
147 slave.RunningbuildsMap[build.Buildername] = appe nd(
148 slave.RunningbuildsMap[build.Buildername ], build.Number)
149 }
150 slave.Runningbuilds = nil
151 }
152 }
153 return bm.Builds, bm.Master, nil
154 }
155
156 // getOSInfo fetches the os family and version of the slave the build was
157 // running on from the master json on a best-effort basis.
158 func getOSInfo(c context.Context, b *buildbotBuild, m *buildbotMaster) (
159 family, version string) {
160 // Fetch the master info from datastore if not provided.
161 if m.Name == "" {
162 logging.Infof(c, "Fetching info for master %s", b.Master)
163 entry := buildbotMasterEntry{Name: b.Master}
164 err := ds.Get(c, &entry)
165 if err != nil {
166 logging.WithError(err).Errorf(
167 c, "Encountered error while fetching entry for % s", b.Master)
168 return
169 }
170 err = decodeMasterEntry(c, &entry, m)
171 if err != nil {
172 logging.WithError(err).Warningf(
173 c, "Failed to decode master information for OS i nfo on master %s", b.Master)
174 return
175 }
176 if entry.Internal && !b.Internal {
177 logging.Errorf(c, "Build references an internal master, but build is not internal.")
178 return
179 }
180 }
181
182 s, ok := m.Slaves[b.Slave]
183 if !ok {
184 logging.Warningf(c, "Could not find slave %s in master %s", b.Sl ave, b.Master)
185 return
186 }
187 hostInfo := map[string]string{}
188 for _, v := range strings.Split(s.Host, "\n") {
189 if info := strings.SplitN(v, ":", 2); len(info) == 2 {
190 hostInfo[info[0]] = strings.TrimSpace(info[1])
191 }
192 }
193 // Extract OS and OS Family
194 if v, ok := hostInfo["os family"]; ok {
195 family = v
196 }
197 if v, ok := hostInfo["os version"]; ok {
198 version = v
199 }
200 return
201 }
202
203 // Marks a build as finished and expired.
204 func expireBuild(c context.Context, b *buildbotBuild) error {
205 finished := float64(clock.Now(c).Unix())
206 if b.TimeStamp != nil {
207 finished = float64(*b.TimeStamp)
208 }
209 results := int(4) // Exception
210 b.Times[1] = &finished
211 b.Finished = true
212 b.Results = &results
213 b.Currentstep = nil
214 b.Text = append(b.Text, "Build expired on Milo")
215 return ds.Put(c, b)
216 }
217
218 func doMaster(c context.Context, master *buildbotMaster, internal bool) int {
219 // Store the master json into the datastore.
220 err := putDSMasterJSON(c, master, internal)
221 fullname := fmt.Sprintf("master.%s", master.Name)
222 if err != nil {
223 logging.WithError(err).Errorf(
224 c, "Could not save master in datastore %s", err)
225 masterCounter.Add(c, 1, internal, fullname, "failure")
226 // This is transient, we do want PubSub to retry.
227 return http.StatusInternalServerError
228 }
229 masterCounter.Add(c, 1, internal, fullname, "success")
230
231 // Extract current builds data out of the master json, and use it to
232 // clean up expired builds.
233 q := ds.NewQuery("buildbotBuild").
234 Eq("finished", false).
235 Eq("master", master.Name)
236 builds := []*buildbotBuild{}
237 err = getBuildQueryBatcher(c).GetAll(c, q, &builds)
238 if err != nil {
239 logging.WithError(err).Errorf(c, "Could not load current builds from master %s",
240 master.Name)
241 return http.StatusInternalServerError
242 }
243 for _, b := range builds {
244 builder, ok := master.Builders[b.Buildername]
245 if !ok {
246 // Mark this build due to builder being removed.
247 buildCounter.Add(
248 c, 1, internal, b.Master, b.Buildername, b.Finis hed, "Expired")
249 logging.Infof(c, "Expiring %s/%s/%d due to builder being removed",
250 master.Name, b.Buildername, b.Number)
251 err = expireBuild(c, b)
252 if err != nil {
253 logging.WithError(err).Errorf(c, "Could not expi re build")
254 return http.StatusInternalServerError
255 }
256 continue
257 }
258
259 found := false
260 for _, bnum := range builder.CurrentBuilds {
261 if b.Number == bnum {
262 found = true
263 break
264 }
265 }
266 if !found {
267 now := int(clock.Now(c).Unix())
268 if b.TimeStamp == nil || ((*b.TimeStamp)+20*60 < now) {
269 // Expire builds after 20 minutes of not getting data.
270 // Mark this build due to build not current anym ore.
271 buildCounter.Add(
272 c, 1, internal, b.Master, b.Buildername, b.Finished, "Expired")
273 logging.Infof(c, "Expiring %s/%s/%d due to build not current",
274 master.Name, b.Buildername, b.Number)
275 err = expireBuild(c, b)
276 if err != nil {
277 logging.WithError(err).Errorf(c, "Could not expire build")
278 return http.StatusInternalServerError
279 }
280 }
281 }
282 }
283 return 0
284 }
285
286 // PubSubHandler is a webhook that stores the builds coming in from pubsub.
287 func PubSubHandler(ctx *router.Context) {
288 statusCode := pubSubHandlerImpl(ctx.Context, ctx.Request)
289 ctx.Writer.WriteHeader(statusCode)
290 }
291
292 // This is the actual implementation of the pubsub handler. Returns
293 // a status code. StatusOK (200) for okay (ACK implied, don't retry).
294 // Anything else will signal to pubsub to retry.
295 func pubSubHandlerImpl(c context.Context, r *http.Request) int {
296 msg := pubSubSubscription{}
297 now := int(clock.Now(c).Unix())
298 defer r.Body.Close()
299 dec := json.NewDecoder(r.Body)
300 if err := dec.Decode(&msg); err != nil {
301 logging.WithError(err).Errorf(
302 c, "Could not decode message. %s", err)
303 return http.StatusOK // This is a hard failure, we don't want Pu bSub to retry.
304 }
305 internal := true
306 // Get the name of the subscription on luci-config
307 settings := common.GetSettings(c)
308 switch msg.Subscription {
309 case settings.Buildbot.PublicSubscription:
310 internal = false
311 case settings.Buildbot.InternalSubscription:
312 // internal = true, but that's already set.
313 default:
314 logging.Errorf(
315 c, "Subscription name %s does not match %s or %s",
316 msg.Subscription, settings.Buildbot.PublicSubscription,
317 settings.Buildbot.InternalSubscription)
318 // This is a configuration error. Tell PubSub to retry until we fix our
319 // configs.
320 return http.StatusInternalServerError
321 }
322 logging.Infof(
323 c, "Message ID \"%s\" from subscription %s is %d bytes long",
324 msg.Message.MessageID, msg.Subscription, r.ContentLength)
325 bbMsg, err := msg.GetData()
326 if err != nil {
327 logging.WithError(err).Errorf(c, "Could not base64 decode messag e %s", err)
328 return http.StatusOK
329 }
330 builds, master, err := unmarshal(c, bbMsg)
331 if err != nil {
332 logging.WithError(err).Errorf(c, "Could not unmarshal message %s ", err)
333 return http.StatusOK
334 }
335 logging.Infof(c, "There are %d builds", len(builds))
336 if master != nil {
337 logging.Infof(c, "The master name is %s", master.Name)
338 } else {
339 logging.Infof(c, "No master in this message")
340 }
341 // This is used to cache the master used for extracting OS information.
342 cachedMaster := buildbotMaster{}
343 // Do not use PutMulti because we might hit the 1MB limit.
344 for _, build := range builds {
345 if build.Master == "" {
346 logging.Errorf(c, "Invalid message, missing master name" )
347 return http.StatusOK
348 }
349 existingBuild := &buildbotBuild{
350 Master: build.Master,
351 Buildername: build.Buildername,
352 Number: build.Number,
353 }
354 buildExists := false
355 if err := ds.Get(c, existingBuild); err == nil {
356 if existingBuild.Finished {
357 // Never replace a completed build.
358 buildCounter.Add(
359 c, 1, false, build.Master, build.Builder name, false, "Rejected")
360 continue
361 }
362 buildExists = true
363 }
364 // Also set the finished, timestamp, and internal bit.
365 build.Finished = false
366 if build.TimeStamp == nil {
367 build.TimeStamp = &now
368 }
369 if len(build.Times) == 2 && build.Times[1] != nil {
370 build.Finished = true
371 logging.Infof(
372 c, "Recording finished build %s/%s/%d", build.Ma ster,
373 build.Buildername, build.Number)
374 }
375 build.Internal = internal
376 // Try to get the OS information on a best-effort basis. This a ssumes that all
377 // builds come from one master.
378 build.OSFamily, build.OSVersion = getOSInfo(c, build, &cachedMas ter)
379 err = ds.Put(c, build)
380 if err != nil {
381 if _, ok := err.(errTooBig); ok {
382 // This will never work, we don't want PubSub to retry.
383 logging.WithError(err).Errorf(
384 c, "Could not save build to datastore, f ailing permanently")
385 return http.StatusOK
386 }
387 // This is transient, we do want PubSub to retry.
388 logging.WithError(err).Errorf(c, "Could not save build i n datastore")
389 return http.StatusInternalServerError
390 }
391 if buildExists {
392 buildCounter.Add(
393 c, 1, false, build.Master, build.Buildername, bu ild.Finished, "Replaced")
394 } else {
395 buildCounter.Add(
396 c, 1, false, build.Master, build.Buildername, bu ild.Finished, "New")
397 }
398
399 }
400 if master != nil {
401 code := doMaster(c, master, internal)
402 if code != 0 {
403 return code
404 }
405 }
406 return http.StatusOK
407 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698