Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(167)

Side by Side Diff: milo/buildsource/buildbucket/pubsub.go

Issue 2964143002: Buildbucket: Save buildbucket build info and summary on pubsub push (Closed)
Patch Set: nits Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « milo/api/resp/build.go ('k') | milo/buildsource/buildbucket/struct.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2017 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
1 package buildbucket 5 package buildbucket
2 6
3 import ( 7 import (
4 "encoding/json" 8 "encoding/json"
5 "errors" 9 "errors"
10 "fmt"
6 "net/http" 11 "net/http"
7 "strings" 12 "strings"
8 13
9 "golang.org/x/net/context" 14 "golang.org/x/net/context"
10 15
16 "github.com/luci/gae/service/datastore"
11 bucketApi "github.com/luci/luci-go/common/api/buildbucket/buildbucket/v1 " 17 bucketApi "github.com/luci/luci-go/common/api/buildbucket/buildbucket/v1 "
18 "github.com/luci/luci-go/common/clock"
12 "github.com/luci/luci-go/common/logging" 19 "github.com/luci/luci-go/common/logging"
13 "github.com/luci/luci-go/common/retry/transient" 20 "github.com/luci/luci-go/common/retry/transient"
14 "github.com/luci/luci-go/common/tsmon/field" 21 "github.com/luci/luci-go/common/tsmon/field"
15 "github.com/luci/luci-go/common/tsmon/metric" 22 "github.com/luci/luci-go/common/tsmon/metric"
23 "github.com/luci/luci-go/milo/api/resp"
24 "github.com/luci/luci-go/milo/buildsource/swarming"
16 "github.com/luci/luci-go/milo/common" 25 "github.com/luci/luci-go/milo/common"
26 "github.com/luci/luci-go/milo/common/model"
17 "github.com/luci/luci-go/server/router" 27 "github.com/luci/luci-go/server/router"
18 ) 28 )
19 29
20 var ( 30 var (
21 buildCounter = metric.NewCounter( 31 buildCounter = metric.NewCounter(
22 "luci/milo/buildbucket_pubsub/builds", 32 "luci/milo/buildbucket_pubsub/builds",
23 "The number of buildbucket builds received by Milo from PubSub", 33 "The number of buildbucket builds received by Milo from PubSub",
24 nil, 34 nil,
25 field.String("bucket"), 35 field.String("bucket"),
26 // True for luci build, False for non-luci (ie buildbot) build. 36 // True for luci build, False for non-luci (ie buildbot) build.
27 field.Bool("luci"), 37 field.Bool("luci"),
28 // Status can be "COMPLETED", "SCHEDULED", or "STARTED" 38 // Status can be "COMPLETED", "SCHEDULED", or "STARTED"
29 field.String("status"), 39 field.String("status"),
30 // Action can be one of 3 options. "New", "Replaced", "Rejected ". 40 // Action can be one of 3 options. "New", "Replaced", "Rejected ".
31 field.String("action")) 41 field.String("action"))
32 ) 42 )
33 43
44 type psMsg struct {
45 Build bucketApi.ApiCommonBuildMessage
46 Hostname string
47 }
48
34 var ( 49 var (
35 errNoLogLocation = errors.New("log_location tag not found") 50 errNoLogLocation = errors.New("log_location tag not found")
36 errNoProject = errors.New("project tag not found") 51 errNoProject = errors.New("project tag not found")
37 ) 52 )
38 53
39 type parameters struct { 54 type parameters struct {
40 builderName string `json:"builder_name"` 55 builderName string `json:"builder_name"`
41 properties string `json:"properties"` 56 properties string `json:"properties"`
42 } 57 }
43 58
(...skipping 12 matching lines...) Expand all
56 // Transient errors are 500 so that PubSub retries them. 71 // Transient errors are 500 so that PubSub retries them.
57 ctx.Writer.WriteHeader(http.StatusInternalServerError) 72 ctx.Writer.WriteHeader(http.StatusInternalServerError)
58 } else { 73 } else {
59 // No errors or non-transient errors are 200s so that PubSub doe s not retry 74 // No errors or non-transient errors are 200s so that PubSub doe s not retry
60 // them. 75 // them.
61 ctx.Writer.WriteHeader(http.StatusOK) 76 ctx.Writer.WriteHeader(http.StatusOK)
62 } 77 }
63 78
64 } 79 }
65 80
66 func handlePubSubBuild(c context.Context, build *bucketApi.ApiCommonBuildMessage ) error { 81 func maybeGetBuild(
67 » if err := buildCounter.Add( 82 » c context.Context, build *bucketApi.ApiCommonBuildMessage) (*resp.MiloBu ild, error) {
68 » » c, 1, build.Bucket, isLUCI(build), build.Status, "New"); err != nil { 83
69 » » logging.WithError(err).Warningf(c, "Failed to send metric") 84 » // Hasn't started yet, so definitely no buildinfo ready yet.
85 » if build.Status == "SCHEDULED" {
86 » » return nil, nil
70 } 87 }
71 » logging.Debugf(c, "Received build %#v", build) 88 » tags := ParseTags(build.Tags)
72 » // TODO(hinoka): Save this into datastore. 89 » var host, task string
73 » return nil 90 » var ok bool
91 » if host, ok = tags["swarming_hostname"]; !ok {
92 » » return nil, errors.New("no swarming hostname tag")
93 » }
94 » if task, ok = tags["swarming_task_id"]; !ok {
95 » » return nil, errors.New("no swarming task id")
96 » }
97 » swarmingSvc, err := swarming.NewProdService(c, host)
98 » if err != nil {
99 » » return nil, err
100 » }
101 » bl := swarming.BuildLoader{}
102 » // The linkBase is not necessary for the summary.
103 » return bl.SwarmingBuildImpl(c, swarmingSvc, "", task)
104 }
105
106 // processBuild queries swarming and logdog for annotation data, then adds or
107 // updates a buildEntry in datastore.
108 func processBuild(
109 » c context.Context, host string, build *bucketApi.ApiCommonBuildMessage) (
110 » *buildEntry, error) {
111
112 » now := clock.Now(c).UTC()
113 » entry := buildEntry{key: buildEntryKey(host, build.Id)}
114
115 » err := datastore.Get(c)
116 » switch err {
117 » case datastore.ErrNoSuchEntity:
118 » » logging.Infof(c, "%s does not exist, will create", entry.key)
119 » » entry.created = now
120 » case nil:
121 » » // continue
122 » default:
123 » » return nil, err
124 » }
125
126 » // If the build is running, try to get the annotation data.
127 » respBuild, err := maybeGetBuild(c, build)
128 » if err != nil {
129 » » return nil, err
130 » }
131 » entry.respBuild = respBuild
132
133 » entry.modified = now
134 » entry.buildbucketData, err = json.Marshal(build)
135 » if err != nil {
136 » » return nil, err
137 » }
138
139 » err = datastore.Put(c, &entry)
140 » return &entry, err
141 }
142
143 // saveBuildSummary creates or updates a build summary based off a buildbucket
144 // build entry.
145 func saveBuildSummary(
146 » c context.Context, key *datastore.Key, builderName string,
147 » entry *buildEntry) error {
148
149 » build, err := entry.getBuild()
150 » if err != nil {
151 » » return err
152 » }
153 » status, err := parseStatus(build)
154 » if err != nil {
155 » » return err
156 » }
157 » // TODO(hinoka): Console related items.
158 » bs := model.BuildSummary{
159 » » BuildKey: key,
160 » » BuilderID: fmt.Sprintf("buildbucket/%s/%s", build.Bucket, builde rName),
161 » » Created: parseTimestamp(build.CreatedTs),
162 » » Summary: model.Summary{
163 » » » Status: status,
164 » » » Start: parseTimestamp(build.StartedTs),
165 » » },
166 » }
167 » if entry.respBuild != nil {
168 » » // Add info from the respBuild into the build summary if we have the data.
169 » » entry.respBuild.SummarizeTo(&bs)
170 » }
171 » logging.Debugf(c, "Created build summary: %#v", bs)
172 » // Make datastore flakes transient errors
173 » return transient.Tag.Apply(datastore.Put(c, &bs))
174 }
175
176 func handlePubSubBuild(c context.Context, data *psMsg) error {
177 » host := data.Hostname
178 » build := &data.Build
179 » p := parameters{}
180 » err := json.Unmarshal([]byte(build.ParametersJson), &p)
181 » if err != nil {
182 » » logging.WithError(err).Errorf(c, "could not unmarshal build para meters")
183 » » buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status , "Rejected")
184 » » // Permanent error, since this is probably a type of build we do not recognize.
185 » » return err
186 » }
187 » logging.Debugf(c, "Received from %s: build %s/%s (%s)\n%s",
188 » » host, build.Bucket, p.builderName, build.Status, build)
189 » if !isLUCI(build) {
190 » » logging.Infof(c, "This is not a luci build, ignoring")
191 » » buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status , "Rejected")
192 » » return nil
193 » }
194
195 » buildEntry, err := processBuild(c, host, build)
196 » if err != nil {
197 » » logging.WithError(err).Errorf(c, "failed to update build")
198 » » buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status , "Rejected")
199 » » // Probably a datastore or network flake, make this into a trans ient error
200 » » return transient.Tag.Apply(err)
201 » }
202 » action := "Created"
203 » if buildEntry.created != buildEntry.modified {
204 » » action = "Modified"
205 » }
206 » buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status, action )
207
208 » return saveBuildSummary(
209 » » c, datastore.MakeKey(c, "buildEntry", buildEntry.key), p.builder Name, buildEntry)
74 } 210 }
75 211
76 // This returns 500 (Internal Server Error) if it encounters a transient error, 212 // This returns 500 (Internal Server Error) if it encounters a transient error,
77 // and returns 200 (OK) if everything is OK, or if it encounters a permanent err or. 213 // and returns 200 (OK) if everything is OK, or if it encounters a permanent err or.
78 func pubSubHandlerImpl(c context.Context, r *http.Request) error { 214 func pubSubHandlerImpl(c context.Context, r *http.Request) error {
79 » var data struct { 215 » var data psMsg
80 » » Build bucketApi.ApiCommonBuildMessage
81 » » Hostname string
82 » }
83 216
84 msg := common.PubSubSubscription{} 217 msg := common.PubSubSubscription{}
85 defer r.Body.Close() 218 defer r.Body.Close()
86 dec := json.NewDecoder(r.Body) 219 dec := json.NewDecoder(r.Body)
87 if err := dec.Decode(&msg); err != nil { 220 if err := dec.Decode(&msg); err != nil {
88 logging.WithError(err).Errorf(c, "could not decode message:\n%s" , r.Body) 221 logging.WithError(err).Errorf(c, "could not decode message:\n%s" , r.Body)
89 // This might be a transient error, e.g. when the json format ch anges 222 // This might be a transient error, e.g. when the json format ch anges
90 // and Milo isn't updated yet. 223 // and Milo isn't updated yet.
91 return transient.Tag.Apply(err) 224 return transient.Tag.Apply(err)
92 } 225 }
93 bData, err := msg.GetData() 226 bData, err := msg.GetData()
94 if err != nil { 227 if err != nil {
95 logging.WithError(err).Errorf(c, "could not parse pubsub message string") 228 logging.WithError(err).Errorf(c, "could not parse pubsub message string")
96 return err 229 return err
97 } 230 }
98 if err := json.Unmarshal(bData, &data); err != nil { 231 if err := json.Unmarshal(bData, &data); err != nil {
99 logging.WithError(err).Errorf(c, "could not parse pubsub message data") 232 logging.WithError(err).Errorf(c, "could not parse pubsub message data")
100 return err 233 return err
101 } 234 }
102 235
103 » return handlePubSubBuild(c, &data.Build) 236 » return handlePubSubBuild(c, &data)
104 } 237 }
OLDNEW
« no previous file with comments | « milo/api/resp/build.go ('k') | milo/buildsource/buildbucket/struct.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698