Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(366)

Side by Side Diff: milo/appengine/buildbot/pubsub.go

Issue 2765383002: Milo: Move instance configuration to luci-config (Closed)
Patch Set: Review Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « milo/appengine/buildbot/build_test.go ('k') | milo/appengine/buildbucket/buckets.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The LUCI Authors. All rights reserved. 1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package buildbot 5 package buildbot
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "compress/gzip" 9 "compress/gzip"
10 "compress/zlib" 10 "compress/zlib"
11 "encoding/base64" 11 "encoding/base64"
12 "encoding/json" 12 "encoding/json"
13 "fmt" 13 "fmt"
14 "net/http" 14 "net/http"
15 "strings" 15 "strings"
16 "time" 16 "time"
17 17
18 ds "github.com/luci/gae/service/datastore" 18 ds "github.com/luci/gae/service/datastore"
19 "github.com/luci/luci-go/common/clock" 19 "github.com/luci/luci-go/common/clock"
20 "github.com/luci/luci-go/common/iotools" 20 "github.com/luci/luci-go/common/iotools"
21 "github.com/luci/luci-go/common/logging" 21 "github.com/luci/luci-go/common/logging"
22 "github.com/luci/luci-go/milo/appengine/common"
22 "github.com/luci/luci-go/server/router" 23 "github.com/luci/luci-go/server/router"
23 24
24 "golang.org/x/net/context" 25 "golang.org/x/net/context"
25 26
26 "github.com/luci/luci-go/common/tsmon/field" 27 "github.com/luci/luci-go/common/tsmon/field"
27 "github.com/luci/luci-go/common/tsmon/metric" 28 "github.com/luci/luci-go/common/tsmon/metric"
28 ) 29 )
29 30
30 var ( 31 var (
31 // publicSubName is the name of the pubsub subscription that milo is exp ecting.
32 // TODO(hinoka): This should be read from luci-config.
33 publicSubName = "projects/luci-milo/subscriptions/buildbot-public"
34 internalSubName = "projects/luci-milo/subscriptions/buildbot-private"
35
36 // Metrics 32 // Metrics
37 buildCounter = metric.NewCounter( 33 buildCounter = metric.NewCounter(
38 "luci/milo/buildbot_pubsub/builds", 34 "luci/milo/buildbot_pubsub/builds",
39 "The number of buildbot builds received by Milo from PubSub", 35 "The number of buildbot builds received by Milo from PubSub",
40 nil, 36 nil,
41 field.Bool("internal"), 37 field.Bool("internal"),
42 field.String("master"), 38 field.String("master"),
43 field.String("builder"), 39 field.String("builder"),
44 field.Bool("finished"), 40 field.Bool("finished"),
45 // Status can be one of 3 options. "New", "Replaced", "Rejected ". 41 // Status can be one of 3 options. "New", "Replaced", "Rejected ".
(...skipping 175 matching lines...) Expand 10 before | Expand all | Expand 10 after
221 217
222 func doMaster(c context.Context, master *buildbotMaster, internal bool) int { 218 func doMaster(c context.Context, master *buildbotMaster, internal bool) int {
223 // Store the master json into the datastore. 219 // Store the master json into the datastore.
224 err := putDSMasterJSON(c, master, internal) 220 err := putDSMasterJSON(c, master, internal)
225 fullname := fmt.Sprintf("master.%s", master.Name) 221 fullname := fmt.Sprintf("master.%s", master.Name)
226 if err != nil { 222 if err != nil {
227 logging.WithError(err).Errorf( 223 logging.WithError(err).Errorf(
228 c, "Could not save master in datastore %s", err) 224 c, "Could not save master in datastore %s", err)
229 masterCounter.Add(c, 1, internal, fullname, "failure") 225 masterCounter.Add(c, 1, internal, fullname, "failure")
230 // This is transient, we do want PubSub to retry. 226 // This is transient, we do want PubSub to retry.
231 » » return 500 227 » » return http.StatusInternalServerError
232 } 228 }
233 masterCounter.Add(c, 1, internal, fullname, "success") 229 masterCounter.Add(c, 1, internal, fullname, "success")
234 230
235 // Extract current builds data out of the master json, and use it to 231 // Extract current builds data out of the master json, and use it to
236 // clean up expired builds. 232 // clean up expired builds.
237 q := ds.NewQuery("buildbotBuild"). 233 q := ds.NewQuery("buildbotBuild").
238 Eq("finished", false). 234 Eq("finished", false).
239 Eq("master", master.Name) 235 Eq("master", master.Name)
240 builds := []*buildbotBuild{} 236 builds := []*buildbotBuild{}
241 err = getBuildQueryBatcher(c).GetAll(c, q, &builds) 237 err = getBuildQueryBatcher(c).GetAll(c, q, &builds)
242 if err != nil { 238 if err != nil {
243 logging.WithError(err).Errorf(c, "Could not load current builds from master %s", 239 logging.WithError(err).Errorf(c, "Could not load current builds from master %s",
244 master.Name) 240 master.Name)
245 » » return 500 241 » » return http.StatusInternalServerError
246 } 242 }
247 for _, b := range builds { 243 for _, b := range builds {
248 builder, ok := master.Builders[b.Buildername] 244 builder, ok := master.Builders[b.Buildername]
249 if !ok { 245 if !ok {
250 // Mark this build due to builder being removed. 246 // Mark this build due to builder being removed.
251 buildCounter.Add( 247 buildCounter.Add(
252 c, 1, internal, b.Master, b.Buildername, b.Finis hed, "Expired") 248 c, 1, internal, b.Master, b.Buildername, b.Finis hed, "Expired")
253 logging.Infof(c, "Expiring %s/%s/%d due to builder being removed", 249 logging.Infof(c, "Expiring %s/%s/%d due to builder being removed",
254 master.Name, b.Buildername, b.Number) 250 master.Name, b.Buildername, b.Number)
255 err = expireBuild(c, b) 251 err = expireBuild(c, b)
256 if err != nil { 252 if err != nil {
257 logging.WithError(err).Errorf(c, "Could not expi re build") 253 logging.WithError(err).Errorf(c, "Could not expi re build")
258 » » » » return 500 254 » » » » return http.StatusInternalServerError
259 } 255 }
260 continue 256 continue
261 } 257 }
262 258
263 found := false 259 found := false
264 for _, bnum := range builder.CurrentBuilds { 260 for _, bnum := range builder.CurrentBuilds {
265 if b.Number == bnum { 261 if b.Number == bnum {
266 found = true 262 found = true
267 break 263 break
268 } 264 }
269 } 265 }
270 if !found { 266 if !found {
271 now := int(clock.Now(c).Unix()) 267 now := int(clock.Now(c).Unix())
272 if b.TimeStamp == nil || ((*b.TimeStamp)+20*60 < now) { 268 if b.TimeStamp == nil || ((*b.TimeStamp)+20*60 < now) {
273 // Expire builds after 20 minutes of not getting data. 269 // Expire builds after 20 minutes of not getting data.
274 // Mark this build due to build not current anym ore. 270 // Mark this build due to build not current anym ore.
275 buildCounter.Add( 271 buildCounter.Add(
276 c, 1, internal, b.Master, b.Buildername, b.Finished, "Expired") 272 c, 1, internal, b.Master, b.Buildername, b.Finished, "Expired")
277 logging.Infof(c, "Expiring %s/%s/%d due to build not current", 273 logging.Infof(c, "Expiring %s/%s/%d due to build not current",
278 master.Name, b.Buildername, b.Number) 274 master.Name, b.Buildername, b.Number)
279 err = expireBuild(c, b) 275 err = expireBuild(c, b)
280 if err != nil { 276 if err != nil {
281 logging.WithError(err).Errorf(c, "Could not expire build") 277 logging.WithError(err).Errorf(c, "Could not expire build")
282 » » » » » return 500 278 » » » » » return http.StatusInternalServerError
283 } 279 }
284 } 280 }
285 } 281 }
286 } 282 }
287 return 0 283 return 0
288 } 284 }
289 285
290 // PubSubHandler is a webhook that stores the builds coming in from pubsub. 286 // PubSubHandler is a webhook that stores the builds coming in from pubsub.
291 func PubSubHandler(ctx *router.Context) { 287 func PubSubHandler(ctx *router.Context) {
292 statusCode := pubSubHandlerImpl(ctx.Context, ctx.Request) 288 statusCode := pubSubHandlerImpl(ctx.Context, ctx.Request)
293 ctx.Writer.WriteHeader(statusCode) 289 ctx.Writer.WriteHeader(statusCode)
294 } 290 }
295 291
296 // This is the actual implementation of the pubsub handler. Returns 292 // This is the actual implementation of the pubsub handler. Returns
297 // a status code. 200 for okay (ACK implied, don't retry). Anything else 293 // a status code. StatusOK (200) for okay (ACK implied, don't retry).
298 // will signal to pubsub to retry. 294 // Anything else will signal to pubsub to retry.
299 func pubSubHandlerImpl(c context.Context, r *http.Request) int { 295 func pubSubHandlerImpl(c context.Context, r *http.Request) int {
300 msg := pubSubSubscription{} 296 msg := pubSubSubscription{}
301 now := int(clock.Now(c).Unix()) 297 now := int(clock.Now(c).Unix())
302 defer r.Body.Close() 298 defer r.Body.Close()
303 dec := json.NewDecoder(r.Body) 299 dec := json.NewDecoder(r.Body)
304 if err := dec.Decode(&msg); err != nil { 300 if err := dec.Decode(&msg); err != nil {
305 logging.WithError(err).Errorf( 301 logging.WithError(err).Errorf(
306 c, "Could not decode message. %s", err) 302 c, "Could not decode message. %s", err)
307 » » return 200 // This is a hard failure, we don't want PubSub to re try. 303 » » return http.StatusOK // This is a hard failure, we don't want Pu bSub to retry.
308 } 304 }
309 internal := true 305 internal := true
306 // Get the name of the subscription on luci-config
307 settings, err := common.GetSettings(c)
308 if err != nil {
309 logging.WithError(err).Errorf(c,
310 "Cannot load settings to check subscription name.")
311 // This is a configuration error. Tell PubSub to retry until we fix our
312 // configs.
313 return http.StatusInternalServerError
314 }
310 switch msg.Subscription { 315 switch msg.Subscription {
311 » // TODO(hinoka): Move these names to luci-config 316 » case settings.Buildbot.PublicTopic:
312 » case publicSubName, publicSubName + "-dev":
313 internal = false 317 internal = false
314 » case internalSubName, internalSubName + "-dev": 318 » case settings.Buildbot.InternalTopic:
315 // internal = true, but that's already set. 319 // internal = true, but that's already set.
316 default: 320 default:
317 logging.Errorf( 321 logging.Errorf(
318 c, "Subscription name %s does not match %s or %s", 322 c, "Subscription name %s does not match %s or %s",
319 » » » msg.Subscription, publicSubName, internalSubName) 323 » » » msg.Subscription, settings.Buildbot.PublicTopic,
320 » » return 200 324 » » » settings.Buildbot.InternalTopic)
325 » » // This is a configuration error. Tell PubSub to retry until we fix our
326 » » // configs.
327 » » return http.StatusInternalServerError
321 } 328 }
322 logging.Infof( 329 logging.Infof(
323 c, "Message ID \"%s\" from subscription %s is %d bytes long", 330 c, "Message ID \"%s\" from subscription %s is %d bytes long",
324 msg.Message.MessageID, msg.Subscription, r.ContentLength) 331 msg.Message.MessageID, msg.Subscription, r.ContentLength)
325 bbMsg, err := msg.GetData() 332 bbMsg, err := msg.GetData()
326 if err != nil { 333 if err != nil {
327 logging.WithError(err).Errorf(c, "Could not base64 decode messag e %s", err) 334 logging.WithError(err).Errorf(c, "Could not base64 decode messag e %s", err)
328 » » return 200 335 » » return http.StatusOK
329 } 336 }
330 builds, master, err := unmarshal(c, bbMsg) 337 builds, master, err := unmarshal(c, bbMsg)
331 if err != nil { 338 if err != nil {
332 logging.WithError(err).Errorf(c, "Could not unmarshal message %s ", err) 339 logging.WithError(err).Errorf(c, "Could not unmarshal message %s ", err)
333 » » return 200 340 » » return http.StatusOK
334 } 341 }
335 logging.Infof(c, "There are %d builds", len(builds)) 342 logging.Infof(c, "There are %d builds", len(builds))
336 if master != nil { 343 if master != nil {
337 logging.Infof(c, "The master name is %s", master.Name) 344 logging.Infof(c, "The master name is %s", master.Name)
338 } else { 345 } else {
339 logging.Infof(c, "No master in this message") 346 logging.Infof(c, "No master in this message")
340 } 347 }
341 // This is used to cache the master used for extracting OS information. 348 // This is used to cache the master used for extracting OS information.
342 cachedMaster := buildbotMaster{} 349 cachedMaster := buildbotMaster{}
343 // Do not use PutMulti because we might hit the 1MB limit. 350 // Do not use PutMulti because we might hit the 1MB limit.
344 for _, build := range builds { 351 for _, build := range builds {
345 if build.Master == "" { 352 if build.Master == "" {
346 logging.Errorf(c, "Invalid message, missing master name" ) 353 logging.Errorf(c, "Invalid message, missing master name" )
347 » » » return 200 354 » » » return http.StatusOK
348 } 355 }
349 existingBuild := &buildbotBuild{ 356 existingBuild := &buildbotBuild{
350 Master: build.Master, 357 Master: build.Master,
351 Buildername: build.Buildername, 358 Buildername: build.Buildername,
352 Number: build.Number, 359 Number: build.Number,
353 } 360 }
354 buildExists := false 361 buildExists := false
355 if err := ds.Get(c, existingBuild); err == nil { 362 if err := ds.Get(c, existingBuild); err == nil {
356 if existingBuild.Finished { 363 if existingBuild.Finished {
357 // Never replace a completed build. 364 // Never replace a completed build.
(...skipping 17 matching lines...) Expand all
375 build.Internal = internal 382 build.Internal = internal
376 // Try to get the OS information on a best-effort basis. This a ssumes that all 383 // Try to get the OS information on a best-effort basis. This a ssumes that all
377 // builds come from one master. 384 // builds come from one master.
378 build.OSFamily, build.OSVersion = getOSInfo(c, build, &cachedMas ter) 385 build.OSFamily, build.OSVersion = getOSInfo(c, build, &cachedMas ter)
379 err = ds.Put(c, build) 386 err = ds.Put(c, build)
380 if err != nil { 387 if err != nil {
381 if _, ok := err.(errTooBig); ok { 388 if _, ok := err.(errTooBig); ok {
382 // This will never work, we don't want PubSub to retry. 389 // This will never work, we don't want PubSub to retry.
383 logging.WithError(err).Errorf( 390 logging.WithError(err).Errorf(
384 c, "Could not save build to datastore, f ailing permanently") 391 c, "Could not save build to datastore, f ailing permanently")
385 » » » » return 200 392 » » » » return http.StatusOK
386 } 393 }
387 // This is transient, we do want PubSub to retry. 394 // This is transient, we do want PubSub to retry.
388 logging.WithError(err).Errorf(c, "Could not save build i n datastore") 395 logging.WithError(err).Errorf(c, "Could not save build i n datastore")
389 » » » return 500 396 » » » return http.StatusInternalServerError
390 } 397 }
391 if buildExists { 398 if buildExists {
392 buildCounter.Add( 399 buildCounter.Add(
393 c, 1, false, build.Master, build.Buildername, bu ild.Finished, "Replaced") 400 c, 1, false, build.Master, build.Buildername, bu ild.Finished, "Replaced")
394 } else { 401 } else {
395 buildCounter.Add( 402 buildCounter.Add(
396 c, 1, false, build.Master, build.Buildername, bu ild.Finished, "New") 403 c, 1, false, build.Master, build.Buildername, bu ild.Finished, "New")
397 } 404 }
398 405
399 } 406 }
400 if master != nil { 407 if master != nil {
401 code := doMaster(c, master, internal) 408 code := doMaster(c, master, internal)
402 if code != 0 { 409 if code != 0 {
403 return code 410 return code
404 } 411 }
405 } 412 }
406 » return 200 413 » return http.StatusOK
407 } 414 }
OLDNEW
« no previous file with comments | « milo/appengine/buildbot/build_test.go ('k') | milo/appengine/buildbucket/buckets.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698