Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2017 The LUCI Authors. All rights reserved. | |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | |
| 3 // that can be found in the LICENSE file. | |
| 4 | |
| 1 package buildbucket | 5 package buildbucket |
| 2 | 6 |
| 3 import ( | 7 import ( |
| 4 "encoding/json" | 8 "encoding/json" |
| 5 "errors" | 9 "errors" |
| 10 "fmt" | |
| 6 "net/http" | 11 "net/http" |
| 7 "strings" | 12 "strings" |
| 8 | 13 |
| 9 "golang.org/x/net/context" | 14 "golang.org/x/net/context" |
| 10 | 15 |
| 16 "github.com/luci/gae/service/datastore" | |
| 11 bucketApi "github.com/luci/luci-go/common/api/buildbucket/buildbucket/v1 " | 17 bucketApi "github.com/luci/luci-go/common/api/buildbucket/buildbucket/v1 " |
| 18 "github.com/luci/luci-go/common/clock" | |
| 12 "github.com/luci/luci-go/common/logging" | 19 "github.com/luci/luci-go/common/logging" |
| 13 "github.com/luci/luci-go/common/retry/transient" | 20 "github.com/luci/luci-go/common/retry/transient" |
| 14 "github.com/luci/luci-go/common/tsmon/field" | 21 "github.com/luci/luci-go/common/tsmon/field" |
| 15 "github.com/luci/luci-go/common/tsmon/metric" | 22 "github.com/luci/luci-go/common/tsmon/metric" |
| 23 "github.com/luci/luci-go/milo/api/resp" | |
| 24 "github.com/luci/luci-go/milo/buildsource/swarming" | |
| 16 "github.com/luci/luci-go/milo/common" | 25 "github.com/luci/luci-go/milo/common" |
| 26 "github.com/luci/luci-go/milo/common/model" | |
| 17 "github.com/luci/luci-go/server/router" | 27 "github.com/luci/luci-go/server/router" |
| 18 ) | 28 ) |
| 19 | 29 |
| 20 var ( | 30 var ( |
| 21 buildCounter = metric.NewCounter( | 31 buildCounter = metric.NewCounter( |
| 22 "luci/milo/buildbucket_pubsub/builds", | 32 "luci/milo/buildbucket_pubsub/builds", |
| 23 "The number of buildbucket builds received by Milo from PubSub", | 33 "The number of buildbucket builds received by Milo from PubSub", |
| 24 nil, | 34 nil, |
| 25 field.String("bucket"), | 35 field.String("bucket"), |
| 26 // True for luci build, False for non-luci (ie buildbot) build. | 36 // True for luci build, False for non-luci (ie buildbot) build. |
| 27 field.Bool("luci"), | 37 field.Bool("luci"), |
| 28 // Status can be "COMPLETED", "SCHEDULED", or "STARTED" | 38 // Status can be "COMPLETED", "SCHEDULED", or "STARTED" |
| 29 field.String("status"), | 39 field.String("status"), |
| 30 // Action can be one of 3 options. "New", "Replaced", "Rejected ". | 40 // Action can be one of 3 options. "New", "Replaced", "Rejected ". |
| 31 field.String("action")) | 41 field.String("action")) |
| 32 ) | 42 ) |
| 33 | 43 |
| 44 type psMsg struct { | |
| 45 Build bucketApi.ApiCommonBuildMessage | |
| 46 Hostname string | |
| 47 } | |
| 48 | |
| 34 var ( | 49 var ( |
| 35 errNoLogLocation = errors.New("log_location tag not found") | 50 errNoLogLocation = errors.New("log_location tag not found") |
| 36 errNoProject = errors.New("project tag not found") | 51 errNoProject = errors.New("project tag not found") |
| 37 ) | 52 ) |
| 38 | 53 |
| 39 type parameters struct { | 54 type parameters struct { |
| 40 builderName string `json:"builder_name"` | 55 builderName string `json:"builder_name"` |
| 41 properties string `json:"properties"` | 56 properties string `json:"properties"` |
| 42 } | 57 } |
| 43 | 58 |
| (...skipping 12 matching lines...) Expand all Loading... | |
| 56 // Transient errors are 500 so that PubSub retries them. | 71 // Transient errors are 500 so that PubSub retries them. |
| 57 ctx.Writer.WriteHeader(http.StatusInternalServerError) | 72 ctx.Writer.WriteHeader(http.StatusInternalServerError) |
| 58 } else { | 73 } else { |
| 59 // No errors or non-transient errors are 200s so that PubSub doe s not retry | 74 // No errors or non-transient errors are 200s so that PubSub doe s not retry |
| 60 // them. | 75 // them. |
| 61 ctx.Writer.WriteHeader(http.StatusOK) | 76 ctx.Writer.WriteHeader(http.StatusOK) |
| 62 } | 77 } |
| 63 | 78 |
| 64 } | 79 } |
| 65 | 80 |
| 66 func handlePubSubBuild(c context.Context, build *bucketApi.ApiCommonBuildMessage ) error { | 81 func maybeGetBuild( |
| 67 » if err := buildCounter.Add( | 82 » c context.Context, |
| 68 » » c, 1, build.Bucket, isLUCI(build), build.Status, "New"); err != nil { | 83 » build *bucketApi.ApiCommonBuildMessage) ( |
| 69 » » logging.WithError(err).Warningf(c, "Failed to send metric") | 84 » *resp.MiloBuild, error) { |
|
iannucci
2017/07/11 21:51:20
one line?
Ryan Tseng
2017/07/11 23:09:22
Done.
| |
| 85 | |
| 86 » // Hasn't started yet, so definitely no buildinfo ready yet. | |
| 87 » if build.Status == "SCHEDULED" { | |
| 88 » » return nil, nil | |
| 70 } | 89 } |
| 71 » logging.Debugf(c, "Received build %#v", build) | 90 » tags := ParseTags(build.Tags) |
| 72 » // TODO(hinoka): Save this into datastore. | 91 » var host, task string |
| 73 » return nil | 92 » var ok bool |
| 93 » if host, ok = tags["swarming_hostname"]; !ok { | |
| 94 » » return nil, errors.New("no swarming hostname tag") | |
| 95 » } | |
| 96 » if task, ok = tags["swarming_task_id"]; !ok { | |
| 97 » » return nil, errors.New("no swarming task id") | |
| 98 » } | |
| 99 » swarmingSvc, err := swarming.NewProdService(c, host) | |
|
iannucci
2017/07/11 21:51:20
I assume you'll change this in a subsequent CL?
Ryan Tseng
2017/07/11 23:09:22
Yep
| |
| 100 » if err != nil { | |
| 101 » » return nil, err | |
| 102 » } | |
| 103 » bl := swarming.BuildLoader{} | |
| 104 » // The linkBase is not necessary for the summary. | |
| 105 » return bl.SwarmingBuildImpl(c, swarmingSvc, "", task) | |
| 106 } | |
| 107 | |
| 108 // processBuild queries swarming and logdog for annotation data, then adds or | |
| 109 // updates a buildEntry in datastore. | |
| 110 func processBuild( | |
| 111 » c context.Context, host string, build *bucketApi.ApiCommonBuildMessage) ( | |
| 112 » *buildEntry, error) { | |
| 113 | |
| 114 » now := clock.Now(c).UTC() | |
| 115 » entry := buildEntry{key: buildEntryKey(host, build.Id)} | |
| 116 | |
| 117 » err := datastore.Get(c) | |
| 118 » switch err { | |
| 119 » case datastore.ErrNoSuchEntity: | |
| 120 » » logging.Infof(c, "%s does not exist, will create", entry.key) | |
| 121 » » entry.created = now | |
| 122 » case nil: | |
| 123 » » // continue | |
| 124 » default: | |
| 125 » » return nil, err | |
| 126 » } | |
| 127 | |
| 128 » // If the build is running, try to get the annotation data. | |
| 129 » respBuild, err := maybeGetBuild(c, build) | |
| 130 » if err != nil { | |
| 131 » » return nil, err | |
| 132 » } | |
| 133 » entry.respBuild = respBuild | |
| 134 | |
| 135 » entry.modified = now | |
| 136 » entry.buildbucketData, err = json.Marshal(build) | |
| 137 » if err != nil { | |
| 138 » » return nil, err | |
| 139 » } | |
| 140 | |
| 141 » err = datastore.Put(c, &entry) | |
| 142 » return &entry, err | |
| 143 } | |
| 144 | |
| 145 // createBuildSummary creates or updates a build summary based off a buildbucket | |
|
iannucci
2017/07/11 21:51:20
this is actually putting the build summary, not ju
Ryan Tseng
2017/07/11 23:09:22
renamed.
| |
| 146 // build entry. | |
| 147 func createBuildSummary( | |
| 148 » c context.Context, key *datastore.Key, builderName string, | |
| 149 » entry *buildEntry) error { | |
| 150 | |
| 151 » build, err := entry.getBuild() | |
| 152 » if err != nil { | |
| 153 » » return err | |
| 154 » } | |
| 155 » status, err := parseStatus(build) | |
| 156 » if err != nil { | |
| 157 » » return err | |
| 158 » } | |
| 159 » // TODO(hinoka): Console related items. | |
| 160 » bs := model.BuildSummary{ | |
| 161 » » BuildKey: key, | |
| 162 » » BuilderID: fmt.Sprintf("buildbucket/%s/%s", build.Bucket, builde rName), | |
| 163 » » Created: parseTimestamp(build.CreatedTs), | |
| 164 » » Summary: model.Summary{ | |
| 165 » » » Status: status, | |
| 166 » » » Start: parseTimestamp(build.StartedTs), | |
| 167 » » }, | |
| 168 » } | |
| 169 » if entry.respBuild != nil { | |
| 170 » » // Add info from the respBuild into the build summary if we have the data. | |
| 171 » » entry.respBuild.SummarizeTo(&bs) | |
| 172 » } | |
| 173 » logging.Debugf(c, "Created build summary: %#v", bs) | |
| 174 » // Make datastore flakes transient errors | |
| 175 » return transient.Tag.Apply(datastore.Put(c, &bs)) | |
| 176 } | |
| 177 | |
| 178 func handlePubSubBuild(c context.Context, data *psMsg) error { | |
| 179 » host := data.Hostname | |
| 180 » build := &data.Build | |
| 181 » p := parameters{} | |
| 182 » err := json.Unmarshal([]byte(build.ParametersJson), &p) | |
| 183 » if err != nil { | |
| 184 » » logging.WithError(err).Errorf(c, "could not unmarshal build para meters") | |
| 185 » » buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status , "Rejected") | |
| 186 » » // Permanent error, since this is probably a type of build we do not recognize. | |
| 187 » » return err | |
| 188 » } | |
| 189 » logging.Debugf(c, "Received from %s: build %s/%s (%s)\n%s", | |
| 190 » » host, build.Bucket, p.builderName, build.Status, build) | |
| 191 » if !isLUCI(build) { | |
| 192 » » logging.Infof(c, "This is not a luci build, ignoring") | |
| 193 » » buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status , "Rejected") | |
| 194 » » return nil | |
| 195 » } | |
| 196 | |
| 197 » buildEntry, err := processBuild(c, host, build) | |
| 198 » if err != nil { | |
| 199 » » logging.WithError(err).Errorf(c, "failed to update build") | |
| 200 » » buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status , "Rejected") | |
| 201 » » // Probably a datastore or network flake, make this into a trans ient error | |
| 202 » » return transient.Tag.Apply(err) | |
| 203 » } | |
| 204 » action := "Created" | |
| 205 » if buildEntry.created != buildEntry.modified { | |
| 206 » » action = "Modified" | |
| 207 » } | |
| 208 » buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status, action ) | |
| 209 | |
| 210 » return createBuildSummary( | |
| 211 » » c, datastore.MakeKey(c, "buildEntry", buildEntry.key), p.builder Name, buildEntry) | |
| 74 } | 212 } |
| 75 | 213 |
| 76 // This returns 500 (Internal Server Error) if it encounters a transient error, | 214 // This returns 500 (Internal Server Error) if it encounters a transient error, |
| 77 // and returns 200 (OK) if everything is OK, or if it encounters a permanent err or. | 215 // and returns 200 (OK) if everything is OK, or if it encounters a permanent err or. |
| 78 func pubSubHandlerImpl(c context.Context, r *http.Request) error { | 216 func pubSubHandlerImpl(c context.Context, r *http.Request) error { |
| 79 » var data struct { | 217 » var data psMsg |
| 80 » » Build bucketApi.ApiCommonBuildMessage | |
| 81 » » Hostname string | |
| 82 » } | |
| 83 | 218 |
| 84 msg := common.PubSubSubscription{} | 219 msg := common.PubSubSubscription{} |
| 85 defer r.Body.Close() | 220 defer r.Body.Close() |
| 86 dec := json.NewDecoder(r.Body) | 221 dec := json.NewDecoder(r.Body) |
| 87 if err := dec.Decode(&msg); err != nil { | 222 if err := dec.Decode(&msg); err != nil { |
| 88 logging.WithError(err).Errorf(c, "could not decode message:\n%s" , r.Body) | 223 logging.WithError(err).Errorf(c, "could not decode message:\n%s" , r.Body) |
| 89 // This might be a transient error, e.g. when the json format ch anges | 224 // This might be a transient error, e.g. when the json format ch anges |
| 90 // and Milo isn't updated yet. | 225 // and Milo isn't updated yet. |
| 91 return transient.Tag.Apply(err) | 226 return transient.Tag.Apply(err) |
| 92 } | 227 } |
| 93 bData, err := msg.GetData() | 228 bData, err := msg.GetData() |
| 94 if err != nil { | 229 if err != nil { |
| 95 logging.WithError(err).Errorf(c, "could not parse pubsub message string") | 230 logging.WithError(err).Errorf(c, "could not parse pubsub message string") |
| 96 return err | 231 return err |
| 97 } | 232 } |
| 98 if err := json.Unmarshal(bData, &data); err != nil { | 233 if err := json.Unmarshal(bData, &data); err != nil { |
| 99 logging.WithError(err).Errorf(c, "could not parse pubsub message data") | 234 logging.WithError(err).Errorf(c, "could not parse pubsub message data") |
| 100 return err | 235 return err |
| 101 } | 236 } |
| 102 | 237 |
| 103 » return handlePubSubBuild(c, &data.Build) | 238 » return handlePubSubBuild(c, &data) |
| 104 } | 239 } |
| OLD | NEW |