Chromium Code Reviews| Index: milo/common/pubsub.go |
| diff --git a/milo/common/pubsub.go b/milo/common/pubsub.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..6f15deea7d3f0993826659804be79b21793fa753 |
| --- /dev/null |
| +++ b/milo/common/pubsub.go |
| @@ -0,0 +1,197 @@ |
| +package common |
| + |
| +import ( |
| + "encoding/base64" |
| + "errors" |
| + "fmt" |
| + "net/url" |
| + "strings" |
| + "time" |
| + |
| + "cloud.google.com/go/pubsub" |
| + "golang.org/x/net/context" |
| + |
| + "github.com/luci/gae/service/info" |
| + "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/milo/api/config" |
| +) |
| + |
| +var pubSubClientKey = "stores a pubsubClient" |
| + |
| +type PubSubMessage struct { |
| + Attributes map[string]string `json:"attributes"` |
| + Data string `json:"data"` |
| + MessageID string `json:"message_id"` |
| +} |
| + |
| +type PubSubSubscription struct { |
| + Message PubSubMessage `json:"message"` |
| + Subscription string `json:"subscription"` |
| +} |
| + |
| +var errNotExist = errors.New("does not exist") |
| + |
| +// GetData returns the expanded form of Data (decoded from base64). |
| +func (m *PubSubSubscription) GetData() ([]byte, error) { |
| + return base64.StdEncoding.DecodeString(m.Message.Data) |
| +} |
| + |
| +// pubsubClient is an interface representing a pubsub.Client containing only |
| +// the functions that Milo calls. Internal use only, can be swapped |
| +// out for testing. |
| +type pubsubClient interface { |
| + getTopic(string) (*pubsub.Topic, error) |
| + getSubscription(string) (*pubsub.Subscription, error) |
| + createSubscription(string, pubsub.SubscriptionConfig) ( |
| + *pubsub.Subscription, error) |
| +} |
| + |
| +// prodPubSubClient is a wrapper around the production pubsub client. |
| +type prodPubSubClient struct { |
| + ctx context.Context |
| + client *pubsub.Client |
| +} |
| + |
| +// getTopic returns the pubsub topic if it exists, a notExist error if |
| +// it does not exist, or an error if there was an error. |
|
iannucci
2017/07/08 00:05:25
move the docstrings to the interface
Ryan Tseng
2017/07/08 01:47:34
Done.
|
| +func (pc *prodPubSubClient) getTopic(id string) (*pubsub.Topic, error) { |
| + topic := pc.client.Topic(id) |
| + exists, err := topic.Exists(pc.ctx) |
| + switch { |
| + case err != nil: |
| + return nil, err |
| + case !exists: |
| + return nil, errNotExist |
| + } |
| + return topic, nil |
| +} |
| + |
| +// getSubscription returns the pubsub subscriptionif it exists, |
|
iannucci
2017/07/08 00:05:25
nit: missing space
Ryan Tseng
2017/07/08 01:47:34
Done.
|
| +// a notExist error if it does not exist, or an error if there was an error. |
| +func (pc *prodPubSubClient) getSubscription(id string) ( |
| + *pubsub.Subscription, error) { |
|
iannucci
2017/07/08 00:05:25
no newline... 80 cols is not a thing in go
Ryan Tseng
2017/07/08 01:47:34
Done.
|
| + sub := pc.client.Subscription(id) |
| + exists, err := sub.Exists(pc.ctx) |
| + switch { |
| + case err != nil: |
| + return nil, err |
| + case !exists: |
| + return nil, errNotExist |
| + } |
| + return sub, nil |
| +} |
| + |
| +func (pc *prodPubSubClient) createSubscription( |
| + id string, cfg pubsub.SubscriptionConfig) ( |
|
iannucci
2017/07/08 00:05:25
nix newlines
Ryan Tseng
2017/07/08 01:47:34
Done.
|
| + *pubsub.Subscription, error) { |
| + |
| + return pc.client.CreateSubscription(pc.ctx, id, cfg) |
| +} |
| + |
| +// getPubSubClient tries to extract a debug PubSub client out of the context. |
|
iannucci
2017/07/08 00:05:25
I would consider instead just having GetTopic(cont
Ryan Tseng
2017/07/08 01:47:34
Done.
|
| +// if none exists, then return a prod client. |
| +func getPubSubClient(c context.Context, projectID string) (pubsubClient, error) { |
| + if client, ok := c.Value(&pubSubClientKey).(pubsubClient); ok { |
| + return client, nil |
| + } |
| + client, err := pubsub.NewClient(c, projectID) |
| + return &prodPubSubClient{c, client}, err |
| +} |
| + |
| +// EnsurePubSubSubscribed makes sure the following subscriptions are in place: |
| +// * buildbucket, via the settings.Buildbucket.Topic setting |
| +func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error { |
| + if settings.Buildbucket != nil { |
| + err := ensureBuildbucketSubscribed(c, settings.Buildbucket.Project) |
| + return err |
| + } |
| + // TODO(hinoka): Ensure buildbot subscribed. |
| + return nil |
| +} |
| + |
| +// ensureSubscribed is called by a cron job and ensures that the Milo |
| +// instance is properly subscribed to the buildbucket subscription endpoint. |
| +func ensureBuildbucketSubscribed(c context.Context, projectID string) error { |
| + topicID := "builds" |
| + if projectID == "" { |
| + return fmt.Errorf("missing buildbucket project") |
|
iannucci
2017/07/08 00:05:25
errors.New
Ryan Tseng
2017/07/08 01:47:33
Done.
|
| + } |
| + client, err := getPubSubClient(c, projectID) |
| + if err != nil { |
| + return err |
| + } |
| + // Check to see if the topic exists first. |
| + topic, err := client.getTopic(topicID) |
| + switch err { |
| + case errNotExist: |
| + err = fmt.Errorf("topic %#v does not exist", topic) |
|
iannucci
2017/07/08 00:05:25
why not have getTopic do this?
Ryan Tseng
2017/07/08 01:47:34
Done.
|
| + logging.Errorf(c, err.Error()) |
| + return err |
| + case nil: |
| + // continue |
| + default: |
| + if strings.Contains(err.Error(), "PermissionDenied") { |
|
iannucci
2017/07/08 00:05:25
oof, is this the best way?
Ryan Tseng
2017/07/08 01:47:34
As far as I can tell :(
|
| + URL := "https://console.cloud.google.com/iam-admin/iam/project?project=" + projectID |
|
iannucci
2017/07/08 00:05:25
may be worth figuring out the gcloud sdk command f
Ryan Tseng
2017/07/08 01:47:34
gcloud doesn't appear to be able to add/manipulate
|
| + acct, serr := info.ServiceAccount(c) |
| + if serr != nil { |
| + acct = fmt.Sprintf("Unknown: %s", serr.Error()) |
| + } |
| + // The documentation is incorrect. We need Editor permission because |
| + // the Subscriber permission does NOT permit attaching subscriptions to |
| + // topics or to view topics. |
| + logging.WithError(err).Errorf( |
| + c, "please go to %s and add %s as a Pub/Sub Editor", URL, acct) |
| + } else { |
| + logging.WithError(err).Errorf(c, "could not check topic %#v", topic) |
| + } |
| + return err |
| + } |
| + // Now check to see if the subscription already exists. |
| + subID := info.AppID(c) |
| + // Get the pubsub module of our app. We do not want to use info.ModuleHostname() |
| + // because it returns a version pinned hostname instead of the default route. |
| + sub, err := client.getSubscription(subID) |
| + switch err { |
| + case errNotExist: |
| + // continue |
| + case nil: |
| + logging.Infof(c, "subscription %#v exists, no need to update", sub) |
| + return nil |
| + default: |
| + logging.WithError(err).Errorf(c, "could not check subscription %#v", sub) |
| + return err |
| + } |
| + pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c) |
| + |
| + // No subscription exists, attach a new subscription to the existing topic. |
| + endpointURL := url.URL{ |
| + Scheme: "https", |
| + Host: pubsubModuleHost, |
| + Path: "/_ah/push-handlers/buildbucket", |
| + } |
| + subConfig := pubsub.SubscriptionConfig{ |
| + Topic: topic, |
| + PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()}, |
| + AckDeadline: time.Minute * 10, |
| + } |
| + newSub, err := client.createSubscription(subID, subConfig) |
| + if err != nil { |
| + if strings.Contains(err.Error(), "The supplied HTTP URL is not registered") { |
| + registerURL := "https://pantheon.corp.google.com/apis/credentials/domainverification?project=" + projectID |
|
iannucci
2017/07/08 00:05:25
this should be a console url
Ryan Tseng
2017/07/08 01:47:34
derp. everytime i go to console.cloud.google.com
|
| + verifyURL := "https://www.google.com/webmasters/verification/verification?hl=en-GB&siteUrl=http://" + pubsubModuleHost |
| + logging.WithError(err).Errorf( |
| + c, "The domain has to be verified and added.\n\n"+ |
| + "1. Go to %s\n"+ |
| + "2. Verify the domain\n"+ |
| + "3. Go to %s\n"+ |
| + "4. Add %s to allowed domains\n\n", |
| + verifyURL, registerURL, pubsubModuleHost) |
| + } else { |
| + logging.WithError(err).Errorf(c, "could not create subscription %#v", sub) |
| + } |
| + return err |
| + } |
| + // Success! |
| + logging.Infof(c, "successfully created subscription %#v", newSub) |
| + return nil |
| +} |