| OLD | NEW |
| (Empty) |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | |
| 3 // that can be found in the LICENSE file. | |
| 4 | |
| 5 package buildbot | |
| 6 | |
| 7 import ( | |
| 8 "bytes" | |
| 9 "compress/gzip" | |
| 10 "compress/zlib" | |
| 11 "encoding/base64" | |
| 12 "encoding/json" | |
| 13 "fmt" | |
| 14 "net/http" | |
| 15 "strings" | |
| 16 "time" | |
| 17 | |
| 18 ds "github.com/luci/gae/service/datastore" | |
| 19 "github.com/luci/luci-go/common/clock" | |
| 20 "github.com/luci/luci-go/common/iotools" | |
| 21 "github.com/luci/luci-go/common/logging" | |
| 22 "github.com/luci/luci-go/milo/appengine/common" | |
| 23 "github.com/luci/luci-go/server/router" | |
| 24 | |
| 25 "golang.org/x/net/context" | |
| 26 | |
| 27 "github.com/luci/luci-go/common/tsmon/field" | |
| 28 "github.com/luci/luci-go/common/tsmon/metric" | |
| 29 ) | |
| 30 | |
| 31 var ( | |
| 32 // Metrics | |
| 33 buildCounter = metric.NewCounter( | |
| 34 "luci/milo/buildbot_pubsub/builds", | |
| 35 "The number of buildbot builds received by Milo from PubSub", | |
| 36 nil, | |
| 37 field.Bool("internal"), | |
| 38 field.String("master"), | |
| 39 field.String("builder"), | |
| 40 field.Bool("finished"), | |
| 41 // Status can be one of 3 options. "New", "Replaced", "Rejected
". | |
| 42 field.String("status")) | |
| 43 | |
| 44 masterCounter = metric.NewCounter( | |
| 45 "luci/milo/buildbot_pubsub/masters", | |
| 46 "The number of buildbot master jsons received by Milo from PubSu
b", | |
| 47 nil, | |
| 48 field.Bool("internal"), | |
| 49 field.String("master"), | |
| 50 // Status can be one of 2 options. "success", "failure". | |
| 51 field.String("status")) | |
| 52 ) | |
| 53 | |
| 54 type pubSubMessage struct { | |
| 55 Attributes map[string]string `json:"attributes"` | |
| 56 Data string `json:"data"` | |
| 57 MessageID string `json:"message_id"` | |
| 58 } | |
| 59 | |
| 60 type pubSubSubscription struct { | |
| 61 Message pubSubMessage `json:"message"` | |
| 62 Subscription string `json:"subscription"` | |
| 63 } | |
| 64 | |
| 65 type buildMasterMsg struct { | |
| 66 Master *buildbotMaster `json:"master"` | |
| 67 Builds []*buildbotBuild `json:"builds"` | |
| 68 } | |
| 69 | |
| 70 // buildbotMasterEntry is a container for a marshaled and packed buildbot | |
| 71 // master json. | |
| 72 type buildbotMasterEntry struct { | |
| 73 // Name of the buildbot master. | |
| 74 Name string `gae:"$id"` | |
| 75 // Internal | |
| 76 Internal bool | |
| 77 // Data is the json serialzed and gzipped blob of the master data. | |
| 78 Data []byte `gae:",noindex"` | |
| 79 // Modified is when this entry was last modified. | |
| 80 Modified time.Time | |
| 81 } | |
| 82 | |
| 83 func putDSMasterJSON( | |
| 84 c context.Context, master *buildbotMaster, internal bool) error { | |
| 85 for _, builder := range master.Builders { | |
| 86 // Trim out extra info in the "Changes" portion of the pending b
uild state, | |
| 87 // we don't actually need comments, files, and properties | |
| 88 for _, pbs := range builder.PendingBuildStates { | |
| 89 for i := range pbs.Source.Changes { | |
| 90 pbs.Source.Changes[i].Comments = "" | |
| 91 pbs.Source.Changes[i].Files = nil | |
| 92 pbs.Source.Changes[i].Properties = nil | |
| 93 } | |
| 94 } | |
| 95 } | |
| 96 entry := buildbotMasterEntry{ | |
| 97 Name: master.Name, | |
| 98 Internal: internal, | |
| 99 Modified: clock.Now(c).UTC(), | |
| 100 } | |
| 101 gzbs := bytes.Buffer{} | |
| 102 gsw := gzip.NewWriter(&gzbs) | |
| 103 cw := iotools.CountingWriter{Writer: gsw} | |
| 104 e := json.NewEncoder(&cw) | |
| 105 if err := e.Encode(master); err != nil { | |
| 106 return err | |
| 107 } | |
| 108 gsw.Close() | |
| 109 entry.Data = gzbs.Bytes() | |
| 110 logging.Debugf(c, "Length of json data: %d", cw.Count) | |
| 111 logging.Debugf(c, "Length of gzipped data: %d", len(entry.Data)) | |
| 112 return ds.Put(c, &entry) | |
| 113 } | |
| 114 | |
| 115 // GetData returns the expanded form of Data (decoded from base64). | |
| 116 func (m *pubSubSubscription) GetData() ([]byte, error) { | |
| 117 return base64.StdEncoding.DecodeString(m.Message.Data) | |
| 118 } | |
| 119 | |
| 120 // unmarshal a gzipped byte stream into a list of buildbot builds and masters. | |
| 121 func unmarshal( | |
| 122 c context.Context, msg []byte) ([]*buildbotBuild, *buildbotMaster, error
) { | |
| 123 bm := buildMasterMsg{} | |
| 124 if len(msg) == 0 { | |
| 125 return bm.Builds, bm.Master, nil | |
| 126 } | |
| 127 reader, err := zlib.NewReader(bytes.NewReader(msg)) | |
| 128 if err != nil { | |
| 129 logging.WithError(err).Errorf(c, "gzip decompression error") | |
| 130 return nil, nil, err | |
| 131 } | |
| 132 defer reader.Close() | |
| 133 d := json.NewDecoder(reader) | |
| 134 if err = d.Decode(&bm); err != nil { | |
| 135 logging.WithError(err).Errorf(c, "could not unmarshal message") | |
| 136 return nil, nil, err | |
| 137 } | |
| 138 // Extract the builds out of master and append it onto builds. | |
| 139 if bm.Master != nil { | |
| 140 for _, slave := range bm.Master.Slaves { | |
| 141 if slave.RunningbuildsMap == nil { | |
| 142 slave.RunningbuildsMap = map[string][]int{} | |
| 143 } | |
| 144 for _, build := range slave.Runningbuilds { | |
| 145 build.Master = bm.Master.Name | |
| 146 bm.Builds = append(bm.Builds, build) | |
| 147 slave.RunningbuildsMap[build.Buildername] = appe
nd( | |
| 148 slave.RunningbuildsMap[build.Buildername
], build.Number) | |
| 149 } | |
| 150 slave.Runningbuilds = nil | |
| 151 } | |
| 152 } | |
| 153 return bm.Builds, bm.Master, nil | |
| 154 } | |
| 155 | |
| 156 // getOSInfo fetches the os family and version of the slave the build was | |
| 157 // running on from the master json on a best-effort basis. | |
| 158 func getOSInfo(c context.Context, b *buildbotBuild, m *buildbotMaster) ( | |
| 159 family, version string) { | |
| 160 // Fetch the master info from datastore if not provided. | |
| 161 if m.Name == "" { | |
| 162 logging.Infof(c, "Fetching info for master %s", b.Master) | |
| 163 entry := buildbotMasterEntry{Name: b.Master} | |
| 164 err := ds.Get(c, &entry) | |
| 165 if err != nil { | |
| 166 logging.WithError(err).Errorf( | |
| 167 c, "Encountered error while fetching entry for %
s", b.Master) | |
| 168 return | |
| 169 } | |
| 170 err = decodeMasterEntry(c, &entry, m) | |
| 171 if err != nil { | |
| 172 logging.WithError(err).Warningf( | |
| 173 c, "Failed to decode master information for OS i
nfo on master %s", b.Master) | |
| 174 return | |
| 175 } | |
| 176 if entry.Internal && !b.Internal { | |
| 177 logging.Errorf(c, "Build references an internal master,
but build is not internal.") | |
| 178 return | |
| 179 } | |
| 180 } | |
| 181 | |
| 182 s, ok := m.Slaves[b.Slave] | |
| 183 if !ok { | |
| 184 logging.Warningf(c, "Could not find slave %s in master %s", b.Sl
ave, b.Master) | |
| 185 return | |
| 186 } | |
| 187 hostInfo := map[string]string{} | |
| 188 for _, v := range strings.Split(s.Host, "\n") { | |
| 189 if info := strings.SplitN(v, ":", 2); len(info) == 2 { | |
| 190 hostInfo[info[0]] = strings.TrimSpace(info[1]) | |
| 191 } | |
| 192 } | |
| 193 // Extract OS and OS Family | |
| 194 if v, ok := hostInfo["os family"]; ok { | |
| 195 family = v | |
| 196 } | |
| 197 if v, ok := hostInfo["os version"]; ok { | |
| 198 version = v | |
| 199 } | |
| 200 return | |
| 201 } | |
| 202 | |
| 203 // Marks a build as finished and expired. | |
| 204 func expireBuild(c context.Context, b *buildbotBuild) error { | |
| 205 finished := float64(clock.Now(c).Unix()) | |
| 206 if b.TimeStamp != nil { | |
| 207 finished = float64(*b.TimeStamp) | |
| 208 } | |
| 209 results := int(4) // Exception | |
| 210 b.Times[1] = &finished | |
| 211 b.Finished = true | |
| 212 b.Results = &results | |
| 213 b.Currentstep = nil | |
| 214 b.Text = append(b.Text, "Build expired on Milo") | |
| 215 return ds.Put(c, b) | |
| 216 } | |
| 217 | |
| 218 func doMaster(c context.Context, master *buildbotMaster, internal bool) int { | |
| 219 // Store the master json into the datastore. | |
| 220 err := putDSMasterJSON(c, master, internal) | |
| 221 fullname := fmt.Sprintf("master.%s", master.Name) | |
| 222 if err != nil { | |
| 223 logging.WithError(err).Errorf( | |
| 224 c, "Could not save master in datastore %s", err) | |
| 225 masterCounter.Add(c, 1, internal, fullname, "failure") | |
| 226 // This is transient, we do want PubSub to retry. | |
| 227 return http.StatusInternalServerError | |
| 228 } | |
| 229 masterCounter.Add(c, 1, internal, fullname, "success") | |
| 230 | |
| 231 // Extract current builds data out of the master json, and use it to | |
| 232 // clean up expired builds. | |
| 233 q := ds.NewQuery("buildbotBuild"). | |
| 234 Eq("finished", false). | |
| 235 Eq("master", master.Name) | |
| 236 builds := []*buildbotBuild{} | |
| 237 err = getBuildQueryBatcher(c).GetAll(c, q, &builds) | |
| 238 if err != nil { | |
| 239 logging.WithError(err).Errorf(c, "Could not load current builds
from master %s", | |
| 240 master.Name) | |
| 241 return http.StatusInternalServerError | |
| 242 } | |
| 243 for _, b := range builds { | |
| 244 builder, ok := master.Builders[b.Buildername] | |
| 245 if !ok { | |
| 246 // Mark this build due to builder being removed. | |
| 247 buildCounter.Add( | |
| 248 c, 1, internal, b.Master, b.Buildername, b.Finis
hed, "Expired") | |
| 249 logging.Infof(c, "Expiring %s/%s/%d due to builder being
removed", | |
| 250 master.Name, b.Buildername, b.Number) | |
| 251 err = expireBuild(c, b) | |
| 252 if err != nil { | |
| 253 logging.WithError(err).Errorf(c, "Could not expi
re build") | |
| 254 return http.StatusInternalServerError | |
| 255 } | |
| 256 continue | |
| 257 } | |
| 258 | |
| 259 found := false | |
| 260 for _, bnum := range builder.CurrentBuilds { | |
| 261 if b.Number == bnum { | |
| 262 found = true | |
| 263 break | |
| 264 } | |
| 265 } | |
| 266 if !found { | |
| 267 now := int(clock.Now(c).Unix()) | |
| 268 if b.TimeStamp == nil || ((*b.TimeStamp)+20*60 < now) { | |
| 269 // Expire builds after 20 minutes of not getting
data. | |
| 270 // Mark this build due to build not current anym
ore. | |
| 271 buildCounter.Add( | |
| 272 c, 1, internal, b.Master, b.Buildername,
b.Finished, "Expired") | |
| 273 logging.Infof(c, "Expiring %s/%s/%d due to build
not current", | |
| 274 master.Name, b.Buildername, b.Number) | |
| 275 err = expireBuild(c, b) | |
| 276 if err != nil { | |
| 277 logging.WithError(err).Errorf(c, "Could
not expire build") | |
| 278 return http.StatusInternalServerError | |
| 279 } | |
| 280 } | |
| 281 } | |
| 282 } | |
| 283 return 0 | |
| 284 } | |
| 285 | |
| 286 // PubSubHandler is a webhook that stores the builds coming in from pubsub. | |
| 287 func PubSubHandler(ctx *router.Context) { | |
| 288 statusCode := pubSubHandlerImpl(ctx.Context, ctx.Request) | |
| 289 ctx.Writer.WriteHeader(statusCode) | |
| 290 } | |
| 291 | |
| 292 // This is the actual implementation of the pubsub handler. Returns | |
| 293 // a status code. StatusOK (200) for okay (ACK implied, don't retry). | |
| 294 // Anything else will signal to pubsub to retry. | |
| 295 func pubSubHandlerImpl(c context.Context, r *http.Request) int { | |
| 296 msg := pubSubSubscription{} | |
| 297 now := int(clock.Now(c).Unix()) | |
| 298 defer r.Body.Close() | |
| 299 dec := json.NewDecoder(r.Body) | |
| 300 if err := dec.Decode(&msg); err != nil { | |
| 301 logging.WithError(err).Errorf( | |
| 302 c, "Could not decode message. %s", err) | |
| 303 return http.StatusOK // This is a hard failure, we don't want Pu
bSub to retry. | |
| 304 } | |
| 305 internal := true | |
| 306 // Get the name of the subscription on luci-config | |
| 307 settings := common.GetSettings(c) | |
| 308 switch msg.Subscription { | |
| 309 case settings.Buildbot.PublicSubscription: | |
| 310 internal = false | |
| 311 case settings.Buildbot.InternalSubscription: | |
| 312 // internal = true, but that's already set. | |
| 313 default: | |
| 314 logging.Errorf( | |
| 315 c, "Subscription name %s does not match %s or %s", | |
| 316 msg.Subscription, settings.Buildbot.PublicSubscription, | |
| 317 settings.Buildbot.InternalSubscription) | |
| 318 // This is a configuration error. Tell PubSub to retry until we
fix our | |
| 319 // configs. | |
| 320 return http.StatusInternalServerError | |
| 321 } | |
| 322 logging.Infof( | |
| 323 c, "Message ID \"%s\" from subscription %s is %d bytes long", | |
| 324 msg.Message.MessageID, msg.Subscription, r.ContentLength) | |
| 325 bbMsg, err := msg.GetData() | |
| 326 if err != nil { | |
| 327 logging.WithError(err).Errorf(c, "Could not base64 decode messag
e %s", err) | |
| 328 return http.StatusOK | |
| 329 } | |
| 330 builds, master, err := unmarshal(c, bbMsg) | |
| 331 if err != nil { | |
| 332 logging.WithError(err).Errorf(c, "Could not unmarshal message %s
", err) | |
| 333 return http.StatusOK | |
| 334 } | |
| 335 logging.Infof(c, "There are %d builds", len(builds)) | |
| 336 if master != nil { | |
| 337 logging.Infof(c, "The master name is %s", master.Name) | |
| 338 } else { | |
| 339 logging.Infof(c, "No master in this message") | |
| 340 } | |
| 341 // This is used to cache the master used for extracting OS information. | |
| 342 cachedMaster := buildbotMaster{} | |
| 343 // Do not use PutMulti because we might hit the 1MB limit. | |
| 344 for _, build := range builds { | |
| 345 if build.Master == "" { | |
| 346 logging.Errorf(c, "Invalid message, missing master name"
) | |
| 347 return http.StatusOK | |
| 348 } | |
| 349 existingBuild := &buildbotBuild{ | |
| 350 Master: build.Master, | |
| 351 Buildername: build.Buildername, | |
| 352 Number: build.Number, | |
| 353 } | |
| 354 buildExists := false | |
| 355 if err := ds.Get(c, existingBuild); err == nil { | |
| 356 if existingBuild.Finished { | |
| 357 // Never replace a completed build. | |
| 358 buildCounter.Add( | |
| 359 c, 1, false, build.Master, build.Builder
name, false, "Rejected") | |
| 360 continue | |
| 361 } | |
| 362 buildExists = true | |
| 363 } | |
| 364 // Also set the finished, timestamp, and internal bit. | |
| 365 build.Finished = false | |
| 366 if build.TimeStamp == nil { | |
| 367 build.TimeStamp = &now | |
| 368 } | |
| 369 if len(build.Times) == 2 && build.Times[1] != nil { | |
| 370 build.Finished = true | |
| 371 logging.Infof( | |
| 372 c, "Recording finished build %s/%s/%d", build.Ma
ster, | |
| 373 build.Buildername, build.Number) | |
| 374 } | |
| 375 build.Internal = internal | |
| 376 // Try to get the OS information on a best-effort basis. This a
ssumes that all | |
| 377 // builds come from one master. | |
| 378 build.OSFamily, build.OSVersion = getOSInfo(c, build, &cachedMas
ter) | |
| 379 err = ds.Put(c, build) | |
| 380 if err != nil { | |
| 381 if _, ok := err.(errTooBig); ok { | |
| 382 // This will never work, we don't want PubSub to
retry. | |
| 383 logging.WithError(err).Errorf( | |
| 384 c, "Could not save build to datastore, f
ailing permanently") | |
| 385 return http.StatusOK | |
| 386 } | |
| 387 // This is transient, we do want PubSub to retry. | |
| 388 logging.WithError(err).Errorf(c, "Could not save build i
n datastore") | |
| 389 return http.StatusInternalServerError | |
| 390 } | |
| 391 if buildExists { | |
| 392 buildCounter.Add( | |
| 393 c, 1, false, build.Master, build.Buildername, bu
ild.Finished, "Replaced") | |
| 394 } else { | |
| 395 buildCounter.Add( | |
| 396 c, 1, false, build.Master, build.Buildername, bu
ild.Finished, "New") | |
| 397 } | |
| 398 | |
| 399 } | |
| 400 if master != nil { | |
| 401 code := doMaster(c, master, internal) | |
| 402 if code != 0 { | |
| 403 return code | |
| 404 } | |
| 405 } | |
| 406 return http.StatusOK | |
| 407 } | |
| OLD | NEW |