Index: milo/appengine/buildbot/pubsub.go |
diff --git a/milo/appengine/buildbot/pubsub.go b/milo/appengine/buildbot/pubsub.go |
deleted file mode 100644 |
index 6f922215be930a91beaf11fb1024e0fd93bda9cf..0000000000000000000000000000000000000000 |
--- a/milo/appengine/buildbot/pubsub.go |
+++ /dev/null |
@@ -1,407 +0,0 @@ |
-// Copyright 2016 The LUCI Authors. All rights reserved. |
-// Use of this source code is governed under the Apache License, Version 2.0 |
-// that can be found in the LICENSE file. |
- |
-package buildbot |
- |
-import ( |
- "bytes" |
- "compress/gzip" |
- "compress/zlib" |
- "encoding/base64" |
- "encoding/json" |
- "fmt" |
- "net/http" |
- "strings" |
- "time" |
- |
- ds "github.com/luci/gae/service/datastore" |
- "github.com/luci/luci-go/common/clock" |
- "github.com/luci/luci-go/common/iotools" |
- "github.com/luci/luci-go/common/logging" |
- "github.com/luci/luci-go/milo/appengine/common" |
- "github.com/luci/luci-go/server/router" |
- |
- "golang.org/x/net/context" |
- |
- "github.com/luci/luci-go/common/tsmon/field" |
- "github.com/luci/luci-go/common/tsmon/metric" |
-) |
- |
-var ( |
- // Metrics |
- buildCounter = metric.NewCounter( |
- "luci/milo/buildbot_pubsub/builds", |
- "The number of buildbot builds received by Milo from PubSub", |
- nil, |
- field.Bool("internal"), |
- field.String("master"), |
- field.String("builder"), |
- field.Bool("finished"), |
- // Status can be one of 3 options. "New", "Replaced", "Rejected". |
- field.String("status")) |
- |
- masterCounter = metric.NewCounter( |
- "luci/milo/buildbot_pubsub/masters", |
- "The number of buildbot master jsons received by Milo from PubSub", |
- nil, |
- field.Bool("internal"), |
- field.String("master"), |
- // Status can be one of 2 options. "success", "failure". |
- field.String("status")) |
-) |
- |
-type pubSubMessage struct { |
- Attributes map[string]string `json:"attributes"` |
- Data string `json:"data"` |
- MessageID string `json:"message_id"` |
-} |
- |
-type pubSubSubscription struct { |
- Message pubSubMessage `json:"message"` |
- Subscription string `json:"subscription"` |
-} |
- |
-type buildMasterMsg struct { |
- Master *buildbotMaster `json:"master"` |
- Builds []*buildbotBuild `json:"builds"` |
-} |
- |
-// buildbotMasterEntry is a container for a marshaled and packed buildbot |
-// master json. |
-type buildbotMasterEntry struct { |
- // Name of the buildbot master. |
- Name string `gae:"$id"` |
- // Internal |
- Internal bool |
- // Data is the json serialzed and gzipped blob of the master data. |
- Data []byte `gae:",noindex"` |
- // Modified is when this entry was last modified. |
- Modified time.Time |
-} |
- |
-func putDSMasterJSON( |
- c context.Context, master *buildbotMaster, internal bool) error { |
- for _, builder := range master.Builders { |
- // Trim out extra info in the "Changes" portion of the pending build state, |
- // we don't actually need comments, files, and properties |
- for _, pbs := range builder.PendingBuildStates { |
- for i := range pbs.Source.Changes { |
- pbs.Source.Changes[i].Comments = "" |
- pbs.Source.Changes[i].Files = nil |
- pbs.Source.Changes[i].Properties = nil |
- } |
- } |
- } |
- entry := buildbotMasterEntry{ |
- Name: master.Name, |
- Internal: internal, |
- Modified: clock.Now(c).UTC(), |
- } |
- gzbs := bytes.Buffer{} |
- gsw := gzip.NewWriter(&gzbs) |
- cw := iotools.CountingWriter{Writer: gsw} |
- e := json.NewEncoder(&cw) |
- if err := e.Encode(master); err != nil { |
- return err |
- } |
- gsw.Close() |
- entry.Data = gzbs.Bytes() |
- logging.Debugf(c, "Length of json data: %d", cw.Count) |
- logging.Debugf(c, "Length of gzipped data: %d", len(entry.Data)) |
- return ds.Put(c, &entry) |
-} |
- |
-// GetData returns the expanded form of Data (decoded from base64). |
-func (m *pubSubSubscription) GetData() ([]byte, error) { |
- return base64.StdEncoding.DecodeString(m.Message.Data) |
-} |
- |
-// unmarshal a gzipped byte stream into a list of buildbot builds and masters. |
-func unmarshal( |
- c context.Context, msg []byte) ([]*buildbotBuild, *buildbotMaster, error) { |
- bm := buildMasterMsg{} |
- if len(msg) == 0 { |
- return bm.Builds, bm.Master, nil |
- } |
- reader, err := zlib.NewReader(bytes.NewReader(msg)) |
- if err != nil { |
- logging.WithError(err).Errorf(c, "gzip decompression error") |
- return nil, nil, err |
- } |
- defer reader.Close() |
- d := json.NewDecoder(reader) |
- if err = d.Decode(&bm); err != nil { |
- logging.WithError(err).Errorf(c, "could not unmarshal message") |
- return nil, nil, err |
- } |
- // Extract the builds out of master and append it onto builds. |
- if bm.Master != nil { |
- for _, slave := range bm.Master.Slaves { |
- if slave.RunningbuildsMap == nil { |
- slave.RunningbuildsMap = map[string][]int{} |
- } |
- for _, build := range slave.Runningbuilds { |
- build.Master = bm.Master.Name |
- bm.Builds = append(bm.Builds, build) |
- slave.RunningbuildsMap[build.Buildername] = append( |
- slave.RunningbuildsMap[build.Buildername], build.Number) |
- } |
- slave.Runningbuilds = nil |
- } |
- } |
- return bm.Builds, bm.Master, nil |
-} |
- |
-// getOSInfo fetches the os family and version of the slave the build was |
-// running on from the master json on a best-effort basis. |
-func getOSInfo(c context.Context, b *buildbotBuild, m *buildbotMaster) ( |
- family, version string) { |
- // Fetch the master info from datastore if not provided. |
- if m.Name == "" { |
- logging.Infof(c, "Fetching info for master %s", b.Master) |
- entry := buildbotMasterEntry{Name: b.Master} |
- err := ds.Get(c, &entry) |
- if err != nil { |
- logging.WithError(err).Errorf( |
- c, "Encountered error while fetching entry for %s", b.Master) |
- return |
- } |
- err = decodeMasterEntry(c, &entry, m) |
- if err != nil { |
- logging.WithError(err).Warningf( |
- c, "Failed to decode master information for OS info on master %s", b.Master) |
- return |
- } |
- if entry.Internal && !b.Internal { |
- logging.Errorf(c, "Build references an internal master, but build is not internal.") |
- return |
- } |
- } |
- |
- s, ok := m.Slaves[b.Slave] |
- if !ok { |
- logging.Warningf(c, "Could not find slave %s in master %s", b.Slave, b.Master) |
- return |
- } |
- hostInfo := map[string]string{} |
- for _, v := range strings.Split(s.Host, "\n") { |
- if info := strings.SplitN(v, ":", 2); len(info) == 2 { |
- hostInfo[info[0]] = strings.TrimSpace(info[1]) |
- } |
- } |
- // Extract OS and OS Family |
- if v, ok := hostInfo["os family"]; ok { |
- family = v |
- } |
- if v, ok := hostInfo["os version"]; ok { |
- version = v |
- } |
- return |
-} |
- |
-// Marks a build as finished and expired. |
-func expireBuild(c context.Context, b *buildbotBuild) error { |
- finished := float64(clock.Now(c).Unix()) |
- if b.TimeStamp != nil { |
- finished = float64(*b.TimeStamp) |
- } |
- results := int(4) // Exception |
- b.Times[1] = &finished |
- b.Finished = true |
- b.Results = &results |
- b.Currentstep = nil |
- b.Text = append(b.Text, "Build expired on Milo") |
- return ds.Put(c, b) |
-} |
- |
-func doMaster(c context.Context, master *buildbotMaster, internal bool) int { |
- // Store the master json into the datastore. |
- err := putDSMasterJSON(c, master, internal) |
- fullname := fmt.Sprintf("master.%s", master.Name) |
- if err != nil { |
- logging.WithError(err).Errorf( |
- c, "Could not save master in datastore %s", err) |
- masterCounter.Add(c, 1, internal, fullname, "failure") |
- // This is transient, we do want PubSub to retry. |
- return http.StatusInternalServerError |
- } |
- masterCounter.Add(c, 1, internal, fullname, "success") |
- |
- // Extract current builds data out of the master json, and use it to |
- // clean up expired builds. |
- q := ds.NewQuery("buildbotBuild"). |
- Eq("finished", false). |
- Eq("master", master.Name) |
- builds := []*buildbotBuild{} |
- err = getBuildQueryBatcher(c).GetAll(c, q, &builds) |
- if err != nil { |
- logging.WithError(err).Errorf(c, "Could not load current builds from master %s", |
- master.Name) |
- return http.StatusInternalServerError |
- } |
- for _, b := range builds { |
- builder, ok := master.Builders[b.Buildername] |
- if !ok { |
- // Mark this build due to builder being removed. |
- buildCounter.Add( |
- c, 1, internal, b.Master, b.Buildername, b.Finished, "Expired") |
- logging.Infof(c, "Expiring %s/%s/%d due to builder being removed", |
- master.Name, b.Buildername, b.Number) |
- err = expireBuild(c, b) |
- if err != nil { |
- logging.WithError(err).Errorf(c, "Could not expire build") |
- return http.StatusInternalServerError |
- } |
- continue |
- } |
- |
- found := false |
- for _, bnum := range builder.CurrentBuilds { |
- if b.Number == bnum { |
- found = true |
- break |
- } |
- } |
- if !found { |
- now := int(clock.Now(c).Unix()) |
- if b.TimeStamp == nil || ((*b.TimeStamp)+20*60 < now) { |
- // Expire builds after 20 minutes of not getting data. |
- // Mark this build due to build not current anymore. |
- buildCounter.Add( |
- c, 1, internal, b.Master, b.Buildername, b.Finished, "Expired") |
- logging.Infof(c, "Expiring %s/%s/%d due to build not current", |
- master.Name, b.Buildername, b.Number) |
- err = expireBuild(c, b) |
- if err != nil { |
- logging.WithError(err).Errorf(c, "Could not expire build") |
- return http.StatusInternalServerError |
- } |
- } |
- } |
- } |
- return 0 |
-} |
- |
-// PubSubHandler is a webhook that stores the builds coming in from pubsub. |
-func PubSubHandler(ctx *router.Context) { |
- statusCode := pubSubHandlerImpl(ctx.Context, ctx.Request) |
- ctx.Writer.WriteHeader(statusCode) |
-} |
- |
-// This is the actual implementation of the pubsub handler. Returns |
-// a status code. StatusOK (200) for okay (ACK implied, don't retry). |
-// Anything else will signal to pubsub to retry. |
-func pubSubHandlerImpl(c context.Context, r *http.Request) int { |
- msg := pubSubSubscription{} |
- now := int(clock.Now(c).Unix()) |
- defer r.Body.Close() |
- dec := json.NewDecoder(r.Body) |
- if err := dec.Decode(&msg); err != nil { |
- logging.WithError(err).Errorf( |
- c, "Could not decode message. %s", err) |
- return http.StatusOK // This is a hard failure, we don't want PubSub to retry. |
- } |
- internal := true |
- // Get the name of the subscription on luci-config |
- settings := common.GetSettings(c) |
- switch msg.Subscription { |
- case settings.Buildbot.PublicSubscription: |
- internal = false |
- case settings.Buildbot.InternalSubscription: |
- // internal = true, but that's already set. |
- default: |
- logging.Errorf( |
- c, "Subscription name %s does not match %s or %s", |
- msg.Subscription, settings.Buildbot.PublicSubscription, |
- settings.Buildbot.InternalSubscription) |
- // This is a configuration error. Tell PubSub to retry until we fix our |
- // configs. |
- return http.StatusInternalServerError |
- } |
- logging.Infof( |
- c, "Message ID \"%s\" from subscription %s is %d bytes long", |
- msg.Message.MessageID, msg.Subscription, r.ContentLength) |
- bbMsg, err := msg.GetData() |
- if err != nil { |
- logging.WithError(err).Errorf(c, "Could not base64 decode message %s", err) |
- return http.StatusOK |
- } |
- builds, master, err := unmarshal(c, bbMsg) |
- if err != nil { |
- logging.WithError(err).Errorf(c, "Could not unmarshal message %s", err) |
- return http.StatusOK |
- } |
- logging.Infof(c, "There are %d builds", len(builds)) |
- if master != nil { |
- logging.Infof(c, "The master name is %s", master.Name) |
- } else { |
- logging.Infof(c, "No master in this message") |
- } |
- // This is used to cache the master used for extracting OS information. |
- cachedMaster := buildbotMaster{} |
- // Do not use PutMulti because we might hit the 1MB limit. |
- for _, build := range builds { |
- if build.Master == "" { |
- logging.Errorf(c, "Invalid message, missing master name") |
- return http.StatusOK |
- } |
- existingBuild := &buildbotBuild{ |
- Master: build.Master, |
- Buildername: build.Buildername, |
- Number: build.Number, |
- } |
- buildExists := false |
- if err := ds.Get(c, existingBuild); err == nil { |
- if existingBuild.Finished { |
- // Never replace a completed build. |
- buildCounter.Add( |
- c, 1, false, build.Master, build.Buildername, false, "Rejected") |
- continue |
- } |
- buildExists = true |
- } |
- // Also set the finished, timestamp, and internal bit. |
- build.Finished = false |
- if build.TimeStamp == nil { |
- build.TimeStamp = &now |
- } |
- if len(build.Times) == 2 && build.Times[1] != nil { |
- build.Finished = true |
- logging.Infof( |
- c, "Recording finished build %s/%s/%d", build.Master, |
- build.Buildername, build.Number) |
- } |
- build.Internal = internal |
- // Try to get the OS information on a best-effort basis. This assumes that all |
- // builds come from one master. |
- build.OSFamily, build.OSVersion = getOSInfo(c, build, &cachedMaster) |
- err = ds.Put(c, build) |
- if err != nil { |
- if _, ok := err.(errTooBig); ok { |
- // This will never work, we don't want PubSub to retry. |
- logging.WithError(err).Errorf( |
- c, "Could not save build to datastore, failing permanently") |
- return http.StatusOK |
- } |
- // This is transient, we do want PubSub to retry. |
- logging.WithError(err).Errorf(c, "Could not save build in datastore") |
- return http.StatusInternalServerError |
- } |
- if buildExists { |
- buildCounter.Add( |
- c, 1, false, build.Master, build.Buildername, build.Finished, "Replaced") |
- } else { |
- buildCounter.Add( |
- c, 1, false, build.Master, build.Buildername, build.Finished, "New") |
- } |
- |
- } |
- if master != nil { |
- code := doMaster(c, master, internal) |
- if code != 0 { |
- return code |
- } |
- } |
- return http.StatusOK |
-} |