| Index: milo/common/pubsub.go
|
| diff --git a/milo/common/pubsub.go b/milo/common/pubsub.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..56b79c2b05902c61395af9853b59294f0865550e
|
| --- /dev/null
|
| +++ b/milo/common/pubsub.go
|
| @@ -0,0 +1,231 @@
|
| +package common
|
| +
|
| +import (
|
| + "encoding/base64"
|
| + "errors"
|
| + "fmt"
|
| + "net/url"
|
| + "strings"
|
| + "time"
|
| +
|
| + "cloud.google.com/go/pubsub"
|
| + "golang.org/x/net/context"
|
| +
|
| + "github.com/luci/gae/service/info"
|
| + "github.com/luci/luci-go/common/logging"
|
| + "github.com/luci/luci-go/milo/api/config"
|
| +)
|
| +
|
| +var pubSubClientKey = "stores a pubsubClient"
|
| +
|
| +type PubSubMessage struct {
|
| + Attributes map[string]string `json:"attributes"`
|
| + Data string `json:"data"`
|
| + MessageID string `json:"message_id"`
|
| +}
|
| +
|
| +type PubSubSubscription struct {
|
| + Message PubSubMessage `json:"message"`
|
| + Subscription string `json:"subscription"`
|
| +}
|
| +
|
| +var errNotExist = errors.New("does not exist")
|
| +
|
| +// GetData returns the expanded form of Data (decoded from base64).
|
| +func (m *PubSubSubscription) GetData() ([]byte, error) {
|
| + return base64.StdEncoding.DecodeString(m.Message.Data)
|
| +}
|
| +
|
| +// pubsubClient is an interface representing a pubsub.Client containing only
|
| +// the functions that Milo calls. Internal use only, can be swapped
|
| +// out for testing.
|
| +type pubsubClient interface {
|
| + // getTopic returns the pubsub topic if it exists, a notExist error if
|
| + // it does not exist, or an error if there was an error.
|
| + getTopic(string) (*pubsub.Topic, error)
|
| +
|
| + // getSubscription returns the pubsub subscription if it exists,
|
| + // a notExist error if it does not exist, or an error if there was an error.
|
| + getSubscription(string) (*pubsub.Subscription, error)
|
| +
|
| + createSubscription(string, pubsub.SubscriptionConfig) (
|
| + *pubsub.Subscription, error)
|
| +}
|
| +
|
| +// prodPubSubClient is a wrapper around the production pubsub client.
|
| +type prodPubSubClient struct {
|
| + ctx context.Context
|
| + client *pubsub.Client
|
| +}
|
| +
|
| +func (pc *prodPubSubClient) getTopic(id string) (*pubsub.Topic, error) {
|
| + topic := pc.client.Topic(id)
|
| + exists, err := topic.Exists(pc.ctx)
|
| + switch {
|
| + case err != nil:
|
| + return nil, err
|
| + case !exists:
|
| + return nil, errNotExist
|
| + }
|
| + return topic, nil
|
| +}
|
| +
|
| +func (pc *prodPubSubClient) getSubscription(id string) (*pubsub.Subscription, error) {
|
| + sub := pc.client.Subscription(id)
|
| + exists, err := sub.Exists(pc.ctx)
|
| + switch {
|
| + case err != nil:
|
| + return nil, err
|
| + case !exists:
|
| + return nil, errNotExist
|
| + }
|
| + return sub, nil
|
| +}
|
| +
|
| +func (pc *prodPubSubClient) createSubscription(id string, cfg pubsub.SubscriptionConfig) (
|
| + *pubsub.Subscription, error) {
|
| +
|
| + return pc.client.CreateSubscription(pc.ctx, id, cfg)
|
| +}
|
| +
|
| +// getPubSubClient extracts a debug PubSub client out of the context.
|
| +func getPubSubClient(c context.Context) (pubsubClient, error) {
|
| + if client, ok := c.Value(&pubSubClientKey).(pubsubClient); ok {
|
| + return client, nil
|
| + }
|
| + return nil, errors.New("no pubsub clients installed")
|
| +}
|
| +
|
| +// withClient returns a context with a pubsub client instantiated to the
|
| +// given project ID
|
| +func withClient(c context.Context, projectID string) (context.Context, error) {
|
| + if projectID == "" {
|
| + return nil, errors.New("missing buildbucket project")
|
| + }
|
| + client, err := pubsub.NewClient(c, projectID)
|
| + if err != nil {
|
| + return nil, err
|
| + }
|
| + return context.WithValue(c, &pubSubClientKey, &prodPubSubClient{c, client}), nil
|
| +}
|
| +
|
| +func getTopic(c context.Context, id string) (*pubsub.Topic, error) {
|
| + client, err := getPubSubClient(c)
|
| + if err != nil {
|
| + return nil, err
|
| + }
|
| + return client.getTopic(id)
|
| +}
|
| +
|
| +func getSubscription(c context.Context, id string) (*pubsub.Subscription, error) {
|
| + client, err := getPubSubClient(c)
|
| + if err != nil {
|
| + return nil, err
|
| + }
|
| + return client.getSubscription(id)
|
| +}
|
| +
|
| +func createSubscription(c context.Context, id string, cfg pubsub.SubscriptionConfig) (
|
| + *pubsub.Subscription, error) {
|
| +
|
| + client, err := getPubSubClient(c)
|
| + if err != nil {
|
| + return nil, err
|
| + }
|
| + return client.createSubscription(id, cfg)
|
| +}
|
| +
|
| +// EnsurePubSubSubscribed makes sure the following subscriptions are in place:
|
| +// * buildbucket, via the settings.Buildbucket.Topic setting
|
| +func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error {
|
| + if settings.Buildbucket != nil {
|
| + // Install the production pubsub client pointing to the buildbucket project
|
| + // into the context.
|
| + c, err := withClient(c, settings.Buildbucket.Project)
|
| + if err != nil {
|
| + return err
|
| + }
|
| + return ensureBuildbucketSubscribed(c, settings.Buildbucket.Project)
|
| + }
|
| + // TODO(hinoka): Ensure buildbot subscribed.
|
| + return nil
|
| +}
|
| +
|
| +// ensureSubscribed is called by a cron job and ensures that the Milo
|
| +// instance is properly subscribed to the buildbucket subscription endpoint.
|
| +func ensureBuildbucketSubscribed(c context.Context, projectID string) error {
|
| + topicID := "builds"
|
| + // Check to see if the topic exists first.
|
| + topic, err := getTopic(c, topicID)
|
| + switch err {
|
| + case errNotExist:
|
| + logging.WithError(err).Errorf(c, "%s does not exist", topicID)
|
| + return err
|
| + case nil:
|
| + // continue
|
| + default:
|
| + if strings.Contains(err.Error(), "PermissionDenied") {
|
| + URL := "https://console.cloud.google.com/iam-admin/iam/project?project=" + projectID
|
| + acct, serr := info.ServiceAccount(c)
|
| + if serr != nil {
|
| + acct = fmt.Sprintf("Unknown: %s", serr.Error())
|
| + }
|
| + // The documentation is incorrect. We need Editor permission because
|
| + // the Subscriber permission does NOT permit attaching subscriptions to
|
| + // topics or to view topics.
|
| + logging.WithError(err).Errorf(
|
| + c, "please go to %s and add %s as a Pub/Sub Editor", URL, acct)
|
| + } else {
|
| + logging.WithError(err).Errorf(c, "could not check topic %#v", topic)
|
| + }
|
| + return err
|
| + }
|
| + // Now check to see if the subscription already exists.
|
| + subID := info.AppID(c)
|
| + // Get the pubsub module of our app. We do not want to use info.ModuleHostname()
|
| + // because it returns a version pinned hostname instead of the default route.
|
| + sub, err := getSubscription(c, subID)
|
| + switch err {
|
| + case errNotExist:
|
| + // continue
|
| + case nil:
|
| + logging.Infof(c, "subscription %#v exists, no need to update", sub)
|
| + return nil
|
| + default:
|
| + logging.WithError(err).Errorf(c, "could not check subscription %#v", sub)
|
| + return err
|
| + }
|
| + pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c)
|
| +
|
| + // No subscription exists, attach a new subscription to the existing topic.
|
| + endpointURL := url.URL{
|
| + Scheme: "https",
|
| + Host: pubsubModuleHost,
|
| + Path: "/_ah/push-handlers/buildbucket",
|
| + }
|
| + subConfig := pubsub.SubscriptionConfig{
|
| + Topic: topic,
|
| + PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()},
|
| + AckDeadline: time.Minute * 10,
|
| + }
|
| + newSub, err := createSubscription(c, subID, subConfig)
|
| + if err != nil {
|
| + if strings.Contains(err.Error(), "The supplied HTTP URL is not registered") {
|
| + registerURL := "https://console.cloud.google.com/apis/credentials/domainverification?project=" + projectID
|
| + verifyURL := "https://www.google.com/webmasters/verification/verification?hl=en-GB&siteUrl=http://" + pubsubModuleHost
|
| + logging.WithError(err).Errorf(
|
| + c, "The domain has to be verified and added.\n\n"+
|
| + "1. Go to %s\n"+
|
| + "2. Verify the domain\n"+
|
| + "3. Go to %s\n"+
|
| + "4. Add %s to allowed domains\n\n",
|
| + verifyURL, registerURL, pubsubModuleHost)
|
| + } else {
|
| + logging.WithError(err).Errorf(c, "could not create subscription %#v", sub)
|
| + }
|
| + return err
|
| + }
|
| + // Success!
|
| + logging.Infof(c, "successfully created subscription %#v", newSub)
|
| + return nil
|
| +}
|
|
|