| OLD | NEW |
| 1 // Copyright 2017 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. |
| 4 |
| 1 package buildbucket | 5 package buildbucket |
| 2 | 6 |
| 3 import ( | 7 import ( |
| 4 "encoding/json" | 8 "encoding/json" |
| 5 "errors" | 9 "errors" |
| 10 "fmt" |
| 6 "net/http" | 11 "net/http" |
| 7 "strings" | 12 "strings" |
| 8 | 13 |
| 9 "golang.org/x/net/context" | 14 "golang.org/x/net/context" |
| 10 | 15 |
| 16 "github.com/luci/gae/service/datastore" |
| 11 bucketApi "github.com/luci/luci-go/common/api/buildbucket/buildbucket/v1
" | 17 bucketApi "github.com/luci/luci-go/common/api/buildbucket/buildbucket/v1
" |
| 18 "github.com/luci/luci-go/common/clock" |
| 12 "github.com/luci/luci-go/common/logging" | 19 "github.com/luci/luci-go/common/logging" |
| 13 "github.com/luci/luci-go/common/retry/transient" | 20 "github.com/luci/luci-go/common/retry/transient" |
| 14 "github.com/luci/luci-go/common/tsmon/field" | 21 "github.com/luci/luci-go/common/tsmon/field" |
| 15 "github.com/luci/luci-go/common/tsmon/metric" | 22 "github.com/luci/luci-go/common/tsmon/metric" |
| 23 "github.com/luci/luci-go/milo/api/resp" |
| 24 "github.com/luci/luci-go/milo/buildsource/swarming" |
| 16 "github.com/luci/luci-go/milo/common" | 25 "github.com/luci/luci-go/milo/common" |
| 26 "github.com/luci/luci-go/milo/common/model" |
| 17 "github.com/luci/luci-go/server/router" | 27 "github.com/luci/luci-go/server/router" |
| 18 ) | 28 ) |
| 19 | 29 |
| 20 var ( | 30 var ( |
| 21 buildCounter = metric.NewCounter( | 31 buildCounter = metric.NewCounter( |
| 22 "luci/milo/buildbucket_pubsub/builds", | 32 "luci/milo/buildbucket_pubsub/builds", |
| 23 "The number of buildbucket builds received by Milo from PubSub", | 33 "The number of buildbucket builds received by Milo from PubSub", |
| 24 nil, | 34 nil, |
| 25 field.String("bucket"), | 35 field.String("bucket"), |
| 26 // True for luci build, False for non-luci (ie buildbot) build. | 36 // True for luci build, False for non-luci (ie buildbot) build. |
| 27 field.Bool("luci"), | 37 field.Bool("luci"), |
| 28 // Status can be "COMPLETED", "SCHEDULED", or "STARTED" | 38 // Status can be "COMPLETED", "SCHEDULED", or "STARTED" |
| 29 field.String("status"), | 39 field.String("status"), |
| 30 // Action can be one of 3 options. "New", "Replaced", "Rejected
". | 40 // Action can be one of 3 options. "New", "Replaced", "Rejected
". |
| 31 field.String("action")) | 41 field.String("action")) |
| 32 ) | 42 ) |
| 33 | 43 |
| 44 type psMsg struct { |
| 45 Build bucketApi.ApiCommonBuildMessage |
| 46 Hostname string |
| 47 } |
| 48 |
| 34 var ( | 49 var ( |
| 35 errNoLogLocation = errors.New("log_location tag not found") | 50 errNoLogLocation = errors.New("log_location tag not found") |
| 36 errNoProject = errors.New("project tag not found") | 51 errNoProject = errors.New("project tag not found") |
| 37 ) | 52 ) |
| 38 | 53 |
| 39 type parameters struct { | 54 type parameters struct { |
| 40 builderName string `json:"builder_name"` | 55 builderName string `json:"builder_name"` |
| 41 properties string `json:"properties"` | 56 properties string `json:"properties"` |
| 42 } | 57 } |
| 43 | 58 |
| (...skipping 12 matching lines...) Expand all Loading... |
| 56 // Transient errors are 500 so that PubSub retries them. | 71 // Transient errors are 500 so that PubSub retries them. |
| 57 ctx.Writer.WriteHeader(http.StatusInternalServerError) | 72 ctx.Writer.WriteHeader(http.StatusInternalServerError) |
| 58 } else { | 73 } else { |
| 59 // No errors or non-transient errors are 200s so that PubSub doe
s not retry | 74 // No errors or non-transient errors are 200s so that PubSub doe
s not retry |
| 60 // them. | 75 // them. |
| 61 ctx.Writer.WriteHeader(http.StatusOK) | 76 ctx.Writer.WriteHeader(http.StatusOK) |
| 62 } | 77 } |
| 63 | 78 |
| 64 } | 79 } |
| 65 | 80 |
| 66 func handlePubSubBuild(c context.Context, build *bucketApi.ApiCommonBuildMessage
) error { | 81 func maybeGetBuild( |
| 67 » if err := buildCounter.Add( | 82 » c context.Context, build *bucketApi.ApiCommonBuildMessage) (*resp.MiloBu
ild, error) { |
| 68 » » c, 1, build.Bucket, isLUCI(build), build.Status, "New"); err !=
nil { | 83 |
| 69 » » logging.WithError(err).Warningf(c, "Failed to send metric") | 84 » // Hasn't started yet, so definitely no buildinfo ready yet. |
| 85 » if build.Status == "SCHEDULED" { |
| 86 » » return nil, nil |
| 70 } | 87 } |
| 71 » logging.Debugf(c, "Received build %#v", build) | 88 » tags := ParseTags(build.Tags) |
| 72 » // TODO(hinoka): Save this into datastore. | 89 » var host, task string |
| 73 » return nil | 90 » var ok bool |
| 91 » if host, ok = tags["swarming_hostname"]; !ok { |
| 92 » » return nil, errors.New("no swarming hostname tag") |
| 93 » } |
| 94 » if task, ok = tags["swarming_task_id"]; !ok { |
| 95 » » return nil, errors.New("no swarming task id") |
| 96 » } |
| 97 » swarmingSvc, err := swarming.NewProdService(c, host) |
| 98 » if err != nil { |
| 99 » » return nil, err |
| 100 » } |
| 101 » bl := swarming.BuildLoader{} |
| 102 » // The linkBase is not necessary for the summary. |
| 103 » return bl.SwarmingBuildImpl(c, swarmingSvc, "", task) |
| 104 } |
| 105 |
| 106 // processBuild queries swarming and logdog for annotation data, then adds or |
| 107 // updates a buildEntry in datastore. |
| 108 func processBuild( |
| 109 » c context.Context, host string, build *bucketApi.ApiCommonBuildMessage)
( |
| 110 » *buildEntry, error) { |
| 111 |
| 112 » now := clock.Now(c).UTC() |
| 113 » entry := buildEntry{key: buildEntryKey(host, build.Id)} |
| 114 |
| 115 » err := datastore.Get(c) |
| 116 » switch err { |
| 117 » case datastore.ErrNoSuchEntity: |
| 118 » » logging.Infof(c, "%s does not exist, will create", entry.key) |
| 119 » » entry.created = now |
| 120 » case nil: |
| 121 » » // continue |
| 122 » default: |
| 123 » » return nil, err |
| 124 » } |
| 125 |
| 126 » // If the build is running, try to get the annotation data. |
| 127 » respBuild, err := maybeGetBuild(c, build) |
| 128 » if err != nil { |
| 129 » » return nil, err |
| 130 » } |
| 131 » entry.respBuild = respBuild |
| 132 |
| 133 » entry.modified = now |
| 134 » entry.buildbucketData, err = json.Marshal(build) |
| 135 » if err != nil { |
| 136 » » return nil, err |
| 137 » } |
| 138 |
| 139 » err = datastore.Put(c, &entry) |
| 140 » return &entry, err |
| 141 } |
| 142 |
| 143 // saveBuildSummary creates or updates a build summary based off a buildbucket |
| 144 // build entry. |
| 145 func saveBuildSummary( |
| 146 » c context.Context, key *datastore.Key, builderName string, |
| 147 » entry *buildEntry) error { |
| 148 |
| 149 » build, err := entry.getBuild() |
| 150 » if err != nil { |
| 151 » » return err |
| 152 » } |
| 153 » status, err := parseStatus(build) |
| 154 » if err != nil { |
| 155 » » return err |
| 156 » } |
| 157 » // TODO(hinoka): Console related items. |
| 158 » bs := model.BuildSummary{ |
| 159 » » BuildKey: key, |
| 160 » » BuilderID: fmt.Sprintf("buildbucket/%s/%s", build.Bucket, builde
rName), |
| 161 » » Created: parseTimestamp(build.CreatedTs), |
| 162 » » Summary: model.Summary{ |
| 163 » » » Status: status, |
| 164 » » » Start: parseTimestamp(build.StartedTs), |
| 165 » » }, |
| 166 » } |
| 167 » if entry.respBuild != nil { |
| 168 » » // Add info from the respBuild into the build summary if we have
the data. |
| 169 » » entry.respBuild.SummarizeTo(&bs) |
| 170 » } |
| 171 » logging.Debugf(c, "Created build summary: %#v", bs) |
| 172 » // Make datastore flakes transient errors |
| 173 » return transient.Tag.Apply(datastore.Put(c, &bs)) |
| 174 } |
| 175 |
| 176 func handlePubSubBuild(c context.Context, data *psMsg) error { |
| 177 » host := data.Hostname |
| 178 » build := &data.Build |
| 179 » p := parameters{} |
| 180 » err := json.Unmarshal([]byte(build.ParametersJson), &p) |
| 181 » if err != nil { |
| 182 » » logging.WithError(err).Errorf(c, "could not unmarshal build para
meters") |
| 183 » » buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status
, "Rejected") |
| 184 » » // Permanent error, since this is probably a type of build we do
not recognize. |
| 185 » » return err |
| 186 » } |
| 187 » logging.Debugf(c, "Received from %s: build %s/%s (%s)\n%s", |
| 188 » » host, build.Bucket, p.builderName, build.Status, build) |
| 189 » if !isLUCI(build) { |
| 190 » » logging.Infof(c, "This is not a luci build, ignoring") |
| 191 » » buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status
, "Rejected") |
| 192 » » return nil |
| 193 » } |
| 194 |
| 195 » buildEntry, err := processBuild(c, host, build) |
| 196 » if err != nil { |
| 197 » » logging.WithError(err).Errorf(c, "failed to update build") |
| 198 » » buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status
, "Rejected") |
| 199 » » // Probably a datastore or network flake, make this into a trans
ient error |
| 200 » » return transient.Tag.Apply(err) |
| 201 » } |
| 202 » action := "Created" |
| 203 » if buildEntry.created != buildEntry.modified { |
| 204 » » action = "Modified" |
| 205 » } |
| 206 » buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status, action
) |
| 207 |
| 208 » return saveBuildSummary( |
| 209 » » c, datastore.MakeKey(c, "buildEntry", buildEntry.key), p.builder
Name, buildEntry) |
| 74 } | 210 } |
| 75 | 211 |
| 76 // This returns 500 (Internal Server Error) if it encounters a transient error, | 212 // This returns 500 (Internal Server Error) if it encounters a transient error, |
| 77 // and returns 200 (OK) if everything is OK, or if it encounters a permanent err
or. | 213 // and returns 200 (OK) if everything is OK, or if it encounters a permanent err
or. |
| 78 func pubSubHandlerImpl(c context.Context, r *http.Request) error { | 214 func pubSubHandlerImpl(c context.Context, r *http.Request) error { |
| 79 » var data struct { | 215 » var data psMsg |
| 80 » » Build bucketApi.ApiCommonBuildMessage | |
| 81 » » Hostname string | |
| 82 » } | |
| 83 | 216 |
| 84 msg := common.PubSubSubscription{} | 217 msg := common.PubSubSubscription{} |
| 85 defer r.Body.Close() | 218 defer r.Body.Close() |
| 86 dec := json.NewDecoder(r.Body) | 219 dec := json.NewDecoder(r.Body) |
| 87 if err := dec.Decode(&msg); err != nil { | 220 if err := dec.Decode(&msg); err != nil { |
| 88 logging.WithError(err).Errorf(c, "could not decode message:\n%s"
, r.Body) | 221 logging.WithError(err).Errorf(c, "could not decode message:\n%s"
, r.Body) |
| 89 // This might be a transient error, e.g. when the json format ch
anges | 222 // This might be a transient error, e.g. when the json format ch
anges |
| 90 // and Milo isn't updated yet. | 223 // and Milo isn't updated yet. |
| 91 return transient.Tag.Apply(err) | 224 return transient.Tag.Apply(err) |
| 92 } | 225 } |
| 93 bData, err := msg.GetData() | 226 bData, err := msg.GetData() |
| 94 if err != nil { | 227 if err != nil { |
| 95 logging.WithError(err).Errorf(c, "could not parse pubsub message
string") | 228 logging.WithError(err).Errorf(c, "could not parse pubsub message
string") |
| 96 return err | 229 return err |
| 97 } | 230 } |
| 98 if err := json.Unmarshal(bData, &data); err != nil { | 231 if err := json.Unmarshal(bData, &data); err != nil { |
| 99 logging.WithError(err).Errorf(c, "could not parse pubsub message
data") | 232 logging.WithError(err).Errorf(c, "could not parse pubsub message
data") |
| 100 return err | 233 return err |
| 101 } | 234 } |
| 102 | 235 |
| 103 » return handlePubSubBuild(c, &data.Build) | 236 » return handlePubSubBuild(c, &data) |
| 104 } | 237 } |
| OLD | NEW |