Chromium Code Reviews| Index: milo/common/pubsub.go |
| diff --git a/milo/common/pubsub.go b/milo/common/pubsub.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..84fd76f6329b98f63c366797903bd0edb026dffa |
| --- /dev/null |
| +++ b/milo/common/pubsub.go |
| @@ -0,0 +1,129 @@ |
| +package common |
| + |
| +import ( |
| + "encoding/base64" |
| + "fmt" |
| + "net/url" |
| + "strings" |
| + "time" |
| + |
| + "cloud.google.com/go/pubsub" |
| + "golang.org/x/net/context" |
| + |
| + "github.com/luci/gae/service/info" |
| + "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/milo/api/config" |
| +) |
| + |
| +type PubSubMessage struct { |
| + Attributes map[string]string `json:"attributes"` |
| + Data string `json:"data"` |
| + MessageID string `json:"message_id"` |
| +} |
| + |
| +type PubSubSubscription struct { |
| + Message PubSubMessage `json:"message"` |
| + Subscription string `json:"subscription"` |
| +} |
| + |
| +// GetData returns the expanded form of Data (decoded from base64). |
| +func (m *PubSubSubscription) GetData() ([]byte, error) { |
| + return base64.StdEncoding.DecodeString(m.Message.Data) |
| +} |
| + |
| +// ensurePubSubSubscribed makes sure the following subscriptions are in place: |
| +// * buildbucket, via the settings.Buildbucket.Topic setting |
| +func ensurePubSubSubscribed(c context.Context, settings *config.Settings) error { |
| + if settings.Buildbucket != nil { |
| + err := ensureBuildbucketSubscribed(c, settings.Buildbucket.Project) |
| + return err |
| + } |
| + // TODO(hinoka): Ensure buildbot subscribed. |
| + return nil |
| +} |
| + |
| +// ensureSubscribed is called by a cron job and ensures that the Milo |
| +// instance is properly subscribed to the buildbucket subscription endpoint. |
| +func ensureBuildbucketSubscribed(c context.Context, projectID string) error { |
| + topicID := "builds" |
|
Ryan Tseng
2017/06/28 22:14:18
This is hardcoded in the buildbucket code
|
| + if projectID == "" { |
| + return fmt.Errorf("missing buildbucket project", projectID) |
| + } |
| + client, err := pubsub.NewClient(c, projectID) |
| + if err != nil { |
| + return err |
| + } |
| + // Check to see if the topic exists first. |
| + topic := client.Topic(topicID) |
| + if exists, err := topic.Exists(c); err != nil { |
| + if strings.Contains(err.Error(), "PermissionDenied") { |
| + URL := "https://console.cloud.google.com/iam-admin/iam/project?project=" + projectID |
| + acct, serr := info.ServiceAccount(c) |
| + if serr != nil { |
| + acct = fmt.Sprintf("Unknown: %s", serr.Error()) |
| + } |
| + // The documentation is incorrect. The following permissions are needed: |
| + // PubSub Viewer - To view Topic (This action) |
| + // PubSub Editor - To subscribe to Topics (Below) |
| + // PubSub Subscriber does NOT allow viewing topics, and it does NOT |
| + // allow for attaching subscriptions to topics. |
| + logging.WithError(err).Errorf( |
| + c, "please go to %s and add %s as a Pub/Sub Editor", URL, acct) |
| + } else { |
| + logging.WithError(err).Errorf(c, "could not check topic %#v", topic) |
| + } |
| + return err |
| + } else if !exists { |
| + err = fmt.Errorf("topic %#v does not exist", topic) |
| + logging.Errorf(c, err.Error()) |
| + return err |
| + } |
| + |
| + // Now check to see if the subscription already exists. |
| + subID := info.AppID(c) |
| + // Get the pubsub module of our app. We do not want to use info.ModuleHostname() |
| + // because it returns a version pinned hostname instead of the default route. |
| + pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c) |
| + sub := client.Subscription(subID) |
| + if exists, err := sub.Exists(c); err != nil { |
| + logging.WithError(err).Errorf(c, "could not check subscription %#v", sub) |
| + return err |
| + } else if exists { |
| + logging.Infof(c, "subscription %#v exists, no need to update", sub) |
| + return nil |
| + } |
| + if err != nil { |
| + logging.WithError(err).Errorf(c, "could not get hostname for pubsub module") |
| + return err |
| + } |
| + endpointURL := url.URL{ |
| + Scheme: "https", |
| + Host: pubsubModuleHost, |
| + Path: "/_ah/push-handlers/buildbucket", |
| + } |
| + subConfig := pubsub.SubscriptionConfig{ |
| + Topic: topic, |
| + PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()}, |
| + AckDeadline: time.Minute * 10, |
| + } |
| + newSub, err := client.CreateSubscription(c, subID, subConfig) |
| + if err != nil { |
| + if strings.Contains(err.Error(), "The supplied HTTP URL is not registered") { |
| + registerURL := "https://pantheon.corp.google.com/apis/credentials/domainverification?project=" + projectID |
| + verifyURL := "https://www.google.com/webmasters/verification/verification?hl=en-GB&siteUrl=http://" + pubsubModuleHost |
| + logging.WithError(err).Errorf( |
| + c, "The domain has to be verified and added.\n\n"+ |
| + "1. Go to %s\n"+ |
| + "2. Verify the domain\n"+ |
| + "3. Go to %s\n"+ |
| + "4. Add %s to allowed domain\n\n", |
| + verifyURL, registerURL, pubsubModuleHost) |
| + } else { |
| + logging.WithError(err).Errorf(c, "could not create subscription %#v", sub) |
| + } |
| + return err |
| + } |
| + // Success! |
| + logging.Infof(c, "successfully created subscription %#v", newSub) |
| + return nil |
| +} |