OLD | NEW |
| (Empty) |
1 // Copyright 2016 The LUCI Authors. All rights reserved. | |
2 // Use of this source code is governed under the Apache License, Version 2.0 | |
3 // that can be found in the LICENSE file. | |
4 | |
5 package buildbot | |
6 | |
7 import ( | |
8 "bytes" | |
9 "compress/gzip" | |
10 "compress/zlib" | |
11 "encoding/base64" | |
12 "encoding/json" | |
13 "fmt" | |
14 "net/http" | |
15 "strings" | |
16 "time" | |
17 | |
18 ds "github.com/luci/gae/service/datastore" | |
19 "github.com/luci/luci-go/common/clock" | |
20 "github.com/luci/luci-go/common/iotools" | |
21 "github.com/luci/luci-go/common/logging" | |
22 "github.com/luci/luci-go/milo/appengine/common" | |
23 "github.com/luci/luci-go/server/router" | |
24 | |
25 "golang.org/x/net/context" | |
26 | |
27 "github.com/luci/luci-go/common/tsmon/field" | |
28 "github.com/luci/luci-go/common/tsmon/metric" | |
29 ) | |
30 | |
31 var ( | |
32 // Metrics | |
33 buildCounter = metric.NewCounter( | |
34 "luci/milo/buildbot_pubsub/builds", | |
35 "The number of buildbot builds received by Milo from PubSub", | |
36 nil, | |
37 field.Bool("internal"), | |
38 field.String("master"), | |
39 field.String("builder"), | |
40 field.Bool("finished"), | |
41 // Status can be one of 3 options. "New", "Replaced", "Rejected
". | |
42 field.String("status")) | |
43 | |
44 masterCounter = metric.NewCounter( | |
45 "luci/milo/buildbot_pubsub/masters", | |
46 "The number of buildbot master jsons received by Milo from PubSu
b", | |
47 nil, | |
48 field.Bool("internal"), | |
49 field.String("master"), | |
50 // Status can be one of 2 options. "success", "failure". | |
51 field.String("status")) | |
52 ) | |
53 | |
54 type pubSubMessage struct { | |
55 Attributes map[string]string `json:"attributes"` | |
56 Data string `json:"data"` | |
57 MessageID string `json:"message_id"` | |
58 } | |
59 | |
60 type pubSubSubscription struct { | |
61 Message pubSubMessage `json:"message"` | |
62 Subscription string `json:"subscription"` | |
63 } | |
64 | |
65 type buildMasterMsg struct { | |
66 Master *buildbotMaster `json:"master"` | |
67 Builds []*buildbotBuild `json:"builds"` | |
68 } | |
69 | |
70 // buildbotMasterEntry is a container for a marshaled and packed buildbot | |
71 // master json. | |
72 type buildbotMasterEntry struct { | |
73 // Name of the buildbot master. | |
74 Name string `gae:"$id"` | |
75 // Internal | |
76 Internal bool | |
77 // Data is the json serialzed and gzipped blob of the master data. | |
78 Data []byte `gae:",noindex"` | |
79 // Modified is when this entry was last modified. | |
80 Modified time.Time | |
81 } | |
82 | |
83 func putDSMasterJSON( | |
84 c context.Context, master *buildbotMaster, internal bool) error { | |
85 for _, builder := range master.Builders { | |
86 // Trim out extra info in the "Changes" portion of the pending b
uild state, | |
87 // we don't actually need comments, files, and properties | |
88 for _, pbs := range builder.PendingBuildStates { | |
89 for i := range pbs.Source.Changes { | |
90 pbs.Source.Changes[i].Comments = "" | |
91 pbs.Source.Changes[i].Files = nil | |
92 pbs.Source.Changes[i].Properties = nil | |
93 } | |
94 } | |
95 } | |
96 entry := buildbotMasterEntry{ | |
97 Name: master.Name, | |
98 Internal: internal, | |
99 Modified: clock.Now(c).UTC(), | |
100 } | |
101 gzbs := bytes.Buffer{} | |
102 gsw := gzip.NewWriter(&gzbs) | |
103 cw := iotools.CountingWriter{Writer: gsw} | |
104 e := json.NewEncoder(&cw) | |
105 if err := e.Encode(master); err != nil { | |
106 return err | |
107 } | |
108 gsw.Close() | |
109 entry.Data = gzbs.Bytes() | |
110 logging.Debugf(c, "Length of json data: %d", cw.Count) | |
111 logging.Debugf(c, "Length of gzipped data: %d", len(entry.Data)) | |
112 return ds.Put(c, &entry) | |
113 } | |
114 | |
115 // GetData returns the expanded form of Data (decoded from base64). | |
116 func (m *pubSubSubscription) GetData() ([]byte, error) { | |
117 return base64.StdEncoding.DecodeString(m.Message.Data) | |
118 } | |
119 | |
120 // unmarshal a gzipped byte stream into a list of buildbot builds and masters. | |
121 func unmarshal( | |
122 c context.Context, msg []byte) ([]*buildbotBuild, *buildbotMaster, error
) { | |
123 bm := buildMasterMsg{} | |
124 if len(msg) == 0 { | |
125 return bm.Builds, bm.Master, nil | |
126 } | |
127 reader, err := zlib.NewReader(bytes.NewReader(msg)) | |
128 if err != nil { | |
129 logging.WithError(err).Errorf(c, "gzip decompression error") | |
130 return nil, nil, err | |
131 } | |
132 defer reader.Close() | |
133 d := json.NewDecoder(reader) | |
134 if err = d.Decode(&bm); err != nil { | |
135 logging.WithError(err).Errorf(c, "could not unmarshal message") | |
136 return nil, nil, err | |
137 } | |
138 // Extract the builds out of master and append it onto builds. | |
139 if bm.Master != nil { | |
140 for _, slave := range bm.Master.Slaves { | |
141 if slave.RunningbuildsMap == nil { | |
142 slave.RunningbuildsMap = map[string][]int{} | |
143 } | |
144 for _, build := range slave.Runningbuilds { | |
145 build.Master = bm.Master.Name | |
146 bm.Builds = append(bm.Builds, build) | |
147 slave.RunningbuildsMap[build.Buildername] = appe
nd( | |
148 slave.RunningbuildsMap[build.Buildername
], build.Number) | |
149 } | |
150 slave.Runningbuilds = nil | |
151 } | |
152 } | |
153 return bm.Builds, bm.Master, nil | |
154 } | |
155 | |
156 // getOSInfo fetches the os family and version of the slave the build was | |
157 // running on from the master json on a best-effort basis. | |
158 func getOSInfo(c context.Context, b *buildbotBuild, m *buildbotMaster) ( | |
159 family, version string) { | |
160 // Fetch the master info from datastore if not provided. | |
161 if m.Name == "" { | |
162 logging.Infof(c, "Fetching info for master %s", b.Master) | |
163 entry := buildbotMasterEntry{Name: b.Master} | |
164 err := ds.Get(c, &entry) | |
165 if err != nil { | |
166 logging.WithError(err).Errorf( | |
167 c, "Encountered error while fetching entry for %
s", b.Master) | |
168 return | |
169 } | |
170 err = decodeMasterEntry(c, &entry, m) | |
171 if err != nil { | |
172 logging.WithError(err).Warningf( | |
173 c, "Failed to decode master information for OS i
nfo on master %s", b.Master) | |
174 return | |
175 } | |
176 if entry.Internal && !b.Internal { | |
177 logging.Errorf(c, "Build references an internal master,
but build is not internal.") | |
178 return | |
179 } | |
180 } | |
181 | |
182 s, ok := m.Slaves[b.Slave] | |
183 if !ok { | |
184 logging.Warningf(c, "Could not find slave %s in master %s", b.Sl
ave, b.Master) | |
185 return | |
186 } | |
187 hostInfo := map[string]string{} | |
188 for _, v := range strings.Split(s.Host, "\n") { | |
189 if info := strings.SplitN(v, ":", 2); len(info) == 2 { | |
190 hostInfo[info[0]] = strings.TrimSpace(info[1]) | |
191 } | |
192 } | |
193 // Extract OS and OS Family | |
194 if v, ok := hostInfo["os family"]; ok { | |
195 family = v | |
196 } | |
197 if v, ok := hostInfo["os version"]; ok { | |
198 version = v | |
199 } | |
200 return | |
201 } | |
202 | |
203 // Marks a build as finished and expired. | |
204 func expireBuild(c context.Context, b *buildbotBuild) error { | |
205 finished := float64(clock.Now(c).Unix()) | |
206 if b.TimeStamp != nil { | |
207 finished = float64(*b.TimeStamp) | |
208 } | |
209 results := int(4) // Exception | |
210 b.Times[1] = &finished | |
211 b.Finished = true | |
212 b.Results = &results | |
213 b.Currentstep = nil | |
214 b.Text = append(b.Text, "Build expired on Milo") | |
215 return ds.Put(c, b) | |
216 } | |
217 | |
218 func doMaster(c context.Context, master *buildbotMaster, internal bool) int { | |
219 // Store the master json into the datastore. | |
220 err := putDSMasterJSON(c, master, internal) | |
221 fullname := fmt.Sprintf("master.%s", master.Name) | |
222 if err != nil { | |
223 logging.WithError(err).Errorf( | |
224 c, "Could not save master in datastore %s", err) | |
225 masterCounter.Add(c, 1, internal, fullname, "failure") | |
226 // This is transient, we do want PubSub to retry. | |
227 return http.StatusInternalServerError | |
228 } | |
229 masterCounter.Add(c, 1, internal, fullname, "success") | |
230 | |
231 // Extract current builds data out of the master json, and use it to | |
232 // clean up expired builds. | |
233 q := ds.NewQuery("buildbotBuild"). | |
234 Eq("finished", false). | |
235 Eq("master", master.Name) | |
236 builds := []*buildbotBuild{} | |
237 err = getBuildQueryBatcher(c).GetAll(c, q, &builds) | |
238 if err != nil { | |
239 logging.WithError(err).Errorf(c, "Could not load current builds
from master %s", | |
240 master.Name) | |
241 return http.StatusInternalServerError | |
242 } | |
243 for _, b := range builds { | |
244 builder, ok := master.Builders[b.Buildername] | |
245 if !ok { | |
246 // Mark this build due to builder being removed. | |
247 buildCounter.Add( | |
248 c, 1, internal, b.Master, b.Buildername, b.Finis
hed, "Expired") | |
249 logging.Infof(c, "Expiring %s/%s/%d due to builder being
removed", | |
250 master.Name, b.Buildername, b.Number) | |
251 err = expireBuild(c, b) | |
252 if err != nil { | |
253 logging.WithError(err).Errorf(c, "Could not expi
re build") | |
254 return http.StatusInternalServerError | |
255 } | |
256 continue | |
257 } | |
258 | |
259 found := false | |
260 for _, bnum := range builder.CurrentBuilds { | |
261 if b.Number == bnum { | |
262 found = true | |
263 break | |
264 } | |
265 } | |
266 if !found { | |
267 now := int(clock.Now(c).Unix()) | |
268 if b.TimeStamp == nil || ((*b.TimeStamp)+20*60 < now) { | |
269 // Expire builds after 20 minutes of not getting
data. | |
270 // Mark this build due to build not current anym
ore. | |
271 buildCounter.Add( | |
272 c, 1, internal, b.Master, b.Buildername,
b.Finished, "Expired") | |
273 logging.Infof(c, "Expiring %s/%s/%d due to build
not current", | |
274 master.Name, b.Buildername, b.Number) | |
275 err = expireBuild(c, b) | |
276 if err != nil { | |
277 logging.WithError(err).Errorf(c, "Could
not expire build") | |
278 return http.StatusInternalServerError | |
279 } | |
280 } | |
281 } | |
282 } | |
283 return 0 | |
284 } | |
285 | |
286 // PubSubHandler is a webhook that stores the builds coming in from pubsub. | |
287 func PubSubHandler(ctx *router.Context) { | |
288 statusCode := pubSubHandlerImpl(ctx.Context, ctx.Request) | |
289 ctx.Writer.WriteHeader(statusCode) | |
290 } | |
291 | |
292 // This is the actual implementation of the pubsub handler. Returns | |
293 // a status code. StatusOK (200) for okay (ACK implied, don't retry). | |
294 // Anything else will signal to pubsub to retry. | |
295 func pubSubHandlerImpl(c context.Context, r *http.Request) int { | |
296 msg := pubSubSubscription{} | |
297 now := int(clock.Now(c).Unix()) | |
298 defer r.Body.Close() | |
299 dec := json.NewDecoder(r.Body) | |
300 if err := dec.Decode(&msg); err != nil { | |
301 logging.WithError(err).Errorf( | |
302 c, "Could not decode message. %s", err) | |
303 return http.StatusOK // This is a hard failure, we don't want Pu
bSub to retry. | |
304 } | |
305 internal := true | |
306 // Get the name of the subscription on luci-config | |
307 settings := common.GetSettings(c) | |
308 switch msg.Subscription { | |
309 case settings.Buildbot.PublicSubscription: | |
310 internal = false | |
311 case settings.Buildbot.InternalSubscription: | |
312 // internal = true, but that's already set. | |
313 default: | |
314 logging.Errorf( | |
315 c, "Subscription name %s does not match %s or %s", | |
316 msg.Subscription, settings.Buildbot.PublicSubscription, | |
317 settings.Buildbot.InternalSubscription) | |
318 // This is a configuration error. Tell PubSub to retry until we
fix our | |
319 // configs. | |
320 return http.StatusInternalServerError | |
321 } | |
322 logging.Infof( | |
323 c, "Message ID \"%s\" from subscription %s is %d bytes long", | |
324 msg.Message.MessageID, msg.Subscription, r.ContentLength) | |
325 bbMsg, err := msg.GetData() | |
326 if err != nil { | |
327 logging.WithError(err).Errorf(c, "Could not base64 decode messag
e %s", err) | |
328 return http.StatusOK | |
329 } | |
330 builds, master, err := unmarshal(c, bbMsg) | |
331 if err != nil { | |
332 logging.WithError(err).Errorf(c, "Could not unmarshal message %s
", err) | |
333 return http.StatusOK | |
334 } | |
335 logging.Infof(c, "There are %d builds", len(builds)) | |
336 if master != nil { | |
337 logging.Infof(c, "The master name is %s", master.Name) | |
338 } else { | |
339 logging.Infof(c, "No master in this message") | |
340 } | |
341 // This is used to cache the master used for extracting OS information. | |
342 cachedMaster := buildbotMaster{} | |
343 // Do not use PutMulti because we might hit the 1MB limit. | |
344 for _, build := range builds { | |
345 if build.Master == "" { | |
346 logging.Errorf(c, "Invalid message, missing master name"
) | |
347 return http.StatusOK | |
348 } | |
349 existingBuild := &buildbotBuild{ | |
350 Master: build.Master, | |
351 Buildername: build.Buildername, | |
352 Number: build.Number, | |
353 } | |
354 buildExists := false | |
355 if err := ds.Get(c, existingBuild); err == nil { | |
356 if existingBuild.Finished { | |
357 // Never replace a completed build. | |
358 buildCounter.Add( | |
359 c, 1, false, build.Master, build.Builder
name, false, "Rejected") | |
360 continue | |
361 } | |
362 buildExists = true | |
363 } | |
364 // Also set the finished, timestamp, and internal bit. | |
365 build.Finished = false | |
366 if build.TimeStamp == nil { | |
367 build.TimeStamp = &now | |
368 } | |
369 if len(build.Times) == 2 && build.Times[1] != nil { | |
370 build.Finished = true | |
371 logging.Infof( | |
372 c, "Recording finished build %s/%s/%d", build.Ma
ster, | |
373 build.Buildername, build.Number) | |
374 } | |
375 build.Internal = internal | |
376 // Try to get the OS information on a best-effort basis. This a
ssumes that all | |
377 // builds come from one master. | |
378 build.OSFamily, build.OSVersion = getOSInfo(c, build, &cachedMas
ter) | |
379 err = ds.Put(c, build) | |
380 if err != nil { | |
381 if _, ok := err.(errTooBig); ok { | |
382 // This will never work, we don't want PubSub to
retry. | |
383 logging.WithError(err).Errorf( | |
384 c, "Could not save build to datastore, f
ailing permanently") | |
385 return http.StatusOK | |
386 } | |
387 // This is transient, we do want PubSub to retry. | |
388 logging.WithError(err).Errorf(c, "Could not save build i
n datastore") | |
389 return http.StatusInternalServerError | |
390 } | |
391 if buildExists { | |
392 buildCounter.Add( | |
393 c, 1, false, build.Master, build.Buildername, bu
ild.Finished, "Replaced") | |
394 } else { | |
395 buildCounter.Add( | |
396 c, 1, false, build.Master, build.Buildername, bu
ild.Finished, "New") | |
397 } | |
398 | |
399 } | |
400 if master != nil { | |
401 code := doMaster(c, master, internal) | |
402 if code != 0 { | |
403 return code | |
404 } | |
405 } | |
406 return http.StatusOK | |
407 } | |
OLD | NEW |