Chromium Code Reviews| Index: milo/buildsource/buildbucket/pubsub.go |
| diff --git a/milo/buildsource/buildbucket/pubsub.go b/milo/buildsource/buildbucket/pubsub.go |
| index b51460769a2994a736ea3850ccfb4d0e9f9059b4..aa45666d99e30960ad213087d32fa683fbe63850 100644 |
| --- a/milo/buildsource/buildbucket/pubsub.go |
| +++ b/milo/buildsource/buildbucket/pubsub.go |
| @@ -1,19 +1,29 @@ |
| +// Copyright 2017 The LUCI Authors. All rights reserved. |
| +// Use of this source code is governed under the Apache License, Version 2.0 |
| +// that can be found in the LICENSE file. |
| + |
| package buildbucket |
| import ( |
| "encoding/json" |
| "errors" |
| + "fmt" |
| "net/http" |
| "strings" |
| "golang.org/x/net/context" |
| + "github.com/luci/gae/service/datastore" |
| bucketApi "github.com/luci/luci-go/common/api/buildbucket/buildbucket/v1" |
| + "github.com/luci/luci-go/common/clock" |
| "github.com/luci/luci-go/common/logging" |
| "github.com/luci/luci-go/common/retry/transient" |
| "github.com/luci/luci-go/common/tsmon/field" |
| "github.com/luci/luci-go/common/tsmon/metric" |
| + "github.com/luci/luci-go/milo/api/resp" |
| + "github.com/luci/luci-go/milo/buildsource/swarming" |
| "github.com/luci/luci-go/milo/common" |
| + "github.com/luci/luci-go/milo/common/model" |
| "github.com/luci/luci-go/server/router" |
| ) |
| @@ -31,6 +41,11 @@ var ( |
| field.String("action")) |
| ) |
| +type psMsg struct { |
| + Build bucketApi.ApiCommonBuildMessage |
| + Hostname string |
| +} |
| + |
| var ( |
| errNoLogLocation = errors.New("log_location tag not found") |
| errNoProject = errors.New("project tag not found") |
| @@ -63,23 +78,143 @@ func PubSubHandler(ctx *router.Context) { |
| } |
| -func handlePubSubBuild(c context.Context, build *bucketApi.ApiCommonBuildMessage) error { |
| - if err := buildCounter.Add( |
| - c, 1, build.Bucket, isLUCI(build), build.Status, "New"); err != nil { |
| - logging.WithError(err).Warningf(c, "Failed to send metric") |
| +func maybeGetBuild( |
| + c context.Context, |
| + build *bucketApi.ApiCommonBuildMessage) ( |
| + *resp.MiloBuild, error) { |
|
iannucci
2017/07/11 21:51:20
one line?
Ryan Tseng
2017/07/11 23:09:22
Done.
|
| + |
| + // Hasn't started yet, so definitely no buildinfo ready yet. |
| + if build.Status == "SCHEDULED" { |
| + return nil, nil |
| + } |
| + tags := ParseTags(build.Tags) |
| + var host, task string |
| + var ok bool |
| + if host, ok = tags["swarming_hostname"]; !ok { |
| + return nil, errors.New("no swarming hostname tag") |
| + } |
| + if task, ok = tags["swarming_task_id"]; !ok { |
| + return nil, errors.New("no swarming task id") |
| + } |
| + swarmingSvc, err := swarming.NewProdService(c, host) |
|
iannucci
2017/07/11 21:51:20
I assume you'll change this in a subsequent CL?
Ryan Tseng
2017/07/11 23:09:22
Yep
|
| + if err != nil { |
| + return nil, err |
| + } |
| + bl := swarming.BuildLoader{} |
| + // The linkBase is not necessary for the summary. |
| + return bl.SwarmingBuildImpl(c, swarmingSvc, "", task) |
| +} |
| + |
| +// processBuild queries swarming and logdog for annotation data, then adds or |
| +// updates a buildEntry in datastore. |
| +func processBuild( |
| + c context.Context, host string, build *bucketApi.ApiCommonBuildMessage) ( |
| + *buildEntry, error) { |
| + |
| + now := clock.Now(c).UTC() |
| + entry := buildEntry{key: buildEntryKey(host, build.Id)} |
| + |
| + err := datastore.Get(c) |
| + switch err { |
| + case datastore.ErrNoSuchEntity: |
| + logging.Infof(c, "%s does not exist, will create", entry.key) |
| + entry.created = now |
| + case nil: |
| + // continue |
| + default: |
| + return nil, err |
| + } |
| + |
| + // If the build is running, try to get the annotation data. |
| + respBuild, err := maybeGetBuild(c, build) |
| + if err != nil { |
| + return nil, err |
| + } |
| + entry.respBuild = respBuild |
| + |
| + entry.modified = now |
| + entry.buildbucketData, err = json.Marshal(build) |
| + if err != nil { |
| + return nil, err |
| + } |
| + |
| + err = datastore.Put(c, &entry) |
| + return &entry, err |
| +} |
| + |
| +// createBuildSummary creates or updates a build summary based off a buildbucket |
|
iannucci
2017/07/11 21:51:20
this is actually putting the build summary, not ju
Ryan Tseng
2017/07/11 23:09:22
renamed.
|
| +// build entry. |
| +func createBuildSummary( |
| + c context.Context, key *datastore.Key, builderName string, |
| + entry *buildEntry) error { |
| + |
| + build, err := entry.getBuild() |
| + if err != nil { |
| + return err |
| + } |
| + status, err := parseStatus(build) |
| + if err != nil { |
| + return err |
| + } |
| + // TODO(hinoka): Console related items. |
| + bs := model.BuildSummary{ |
| + BuildKey: key, |
| + BuilderID: fmt.Sprintf("buildbucket/%s/%s", build.Bucket, builderName), |
| + Created: parseTimestamp(build.CreatedTs), |
| + Summary: model.Summary{ |
| + Status: status, |
| + Start: parseTimestamp(build.StartedTs), |
| + }, |
| + } |
| + if entry.respBuild != nil { |
| + // Add info from the respBuild into the build summary if we have the data. |
| + entry.respBuild.SummarizeTo(&bs) |
| + } |
| + logging.Debugf(c, "Created build summary: %#v", bs) |
| + // Make datastore flakes transient errors |
| + return transient.Tag.Apply(datastore.Put(c, &bs)) |
| +} |
| + |
| +func handlePubSubBuild(c context.Context, data *psMsg) error { |
| + host := data.Hostname |
| + build := &data.Build |
| + p := parameters{} |
| + err := json.Unmarshal([]byte(build.ParametersJson), &p) |
| + if err != nil { |
| + logging.WithError(err).Errorf(c, "could not unmarshal build parameters") |
| + buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status, "Rejected") |
| + // Permanent error, since this is probably a type of build we do not recognize. |
| + return err |
| } |
| - logging.Debugf(c, "Received build %#v", build) |
| - // TODO(hinoka): Save this into datastore. |
| - return nil |
| + logging.Debugf(c, "Received from %s: build %s/%s (%s)\n%s", |
| + host, build.Bucket, p.builderName, build.Status, build) |
| + if !isLUCI(build) { |
| + logging.Infof(c, "This is not a luci build, ignoring") |
| + buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status, "Rejected") |
| + return nil |
| + } |
| + |
| + buildEntry, err := processBuild(c, host, build) |
| + if err != nil { |
| + logging.WithError(err).Errorf(c, "failed to update build") |
| + buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status, "Rejected") |
| + // Probably a datastore or network flake, make this into a transient error |
| + return transient.Tag.Apply(err) |
| + } |
| + action := "Created" |
| + if buildEntry.created != buildEntry.modified { |
| + action = "Modified" |
| + } |
| + buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status, action) |
| + |
| + return createBuildSummary( |
| + c, datastore.MakeKey(c, "buildEntry", buildEntry.key), p.builderName, buildEntry) |
| } |
| // This returns 500 (Internal Server Error) if it encounters a transient error, |
| // and returns 200 (OK) if everything is OK, or if it encounters a permanent error. |
| func pubSubHandlerImpl(c context.Context, r *http.Request) error { |
| - var data struct { |
| - Build bucketApi.ApiCommonBuildMessage |
| - Hostname string |
| - } |
| + var data psMsg |
| msg := common.PubSubSubscription{} |
| defer r.Body.Close() |
| @@ -100,5 +235,5 @@ func pubSubHandlerImpl(c context.Context, r *http.Request) error { |
| return err |
| } |
| - return handlePubSubBuild(c, &data.Build) |
| + return handlePubSubBuild(c, &data) |
| } |