Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(488)

Unified Diff: milo/common/pubsub.go

Issue 2955223002: Milo: Buildbucket PubSub ingestion outline (Closed)
Patch Set: rebase Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « milo/common/config_test.go ('k') | milo/common/pubsub_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: milo/common/pubsub.go
diff --git a/milo/common/pubsub.go b/milo/common/pubsub.go
new file mode 100644
index 0000000000000000000000000000000000000000..56b79c2b05902c61395af9853b59294f0865550e
--- /dev/null
+++ b/milo/common/pubsub.go
@@ -0,0 +1,231 @@
+package common
+
+import (
+ "encoding/base64"
+ "errors"
+ "fmt"
+ "net/url"
+ "strings"
+ "time"
+
+ "cloud.google.com/go/pubsub"
+ "golang.org/x/net/context"
+
+ "github.com/luci/gae/service/info"
+ "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/milo/api/config"
+)
+
+var pubSubClientKey = "stores a pubsubClient"
+
+type PubSubMessage struct {
+ Attributes map[string]string `json:"attributes"`
+ Data string `json:"data"`
+ MessageID string `json:"message_id"`
+}
+
+type PubSubSubscription struct {
+ Message PubSubMessage `json:"message"`
+ Subscription string `json:"subscription"`
+}
+
+var errNotExist = errors.New("does not exist")
+
+// GetData returns the expanded form of Data (decoded from base64).
+func (m *PubSubSubscription) GetData() ([]byte, error) {
+ return base64.StdEncoding.DecodeString(m.Message.Data)
+}
+
+// pubsubClient is an interface representing a pubsub.Client containing only
+// the functions that Milo calls. Internal use only, can be swapped
+// out for testing.
+type pubsubClient interface {
+ // getTopic returns the pubsub topic if it exists, a notExist error if
+ // it does not exist, or an error if there was an error.
+ getTopic(string) (*pubsub.Topic, error)
+
+ // getSubscription returns the pubsub subscription if it exists,
+ // a notExist error if it does not exist, or an error if there was an error.
+ getSubscription(string) (*pubsub.Subscription, error)
+
+ createSubscription(string, pubsub.SubscriptionConfig) (
+ *pubsub.Subscription, error)
+}
+
+// prodPubSubClient is a wrapper around the production pubsub client.
+type prodPubSubClient struct {
+ ctx context.Context
+ client *pubsub.Client
+}
+
+func (pc *prodPubSubClient) getTopic(id string) (*pubsub.Topic, error) {
+ topic := pc.client.Topic(id)
+ exists, err := topic.Exists(pc.ctx)
+ switch {
+ case err != nil:
+ return nil, err
+ case !exists:
+ return nil, errNotExist
+ }
+ return topic, nil
+}
+
+func (pc *prodPubSubClient) getSubscription(id string) (*pubsub.Subscription, error) {
+ sub := pc.client.Subscription(id)
+ exists, err := sub.Exists(pc.ctx)
+ switch {
+ case err != nil:
+ return nil, err
+ case !exists:
+ return nil, errNotExist
+ }
+ return sub, nil
+}
+
+func (pc *prodPubSubClient) createSubscription(id string, cfg pubsub.SubscriptionConfig) (
+ *pubsub.Subscription, error) {
+
+ return pc.client.CreateSubscription(pc.ctx, id, cfg)
+}
+
+// getPubSubClient extracts a debug PubSub client out of the context.
+func getPubSubClient(c context.Context) (pubsubClient, error) {
+ if client, ok := c.Value(&pubSubClientKey).(pubsubClient); ok {
+ return client, nil
+ }
+ return nil, errors.New("no pubsub clients installed")
+}
+
+// withClient returns a context with a pubsub client instantiated to the
+// given project ID
+func withClient(c context.Context, projectID string) (context.Context, error) {
+ if projectID == "" {
+ return nil, errors.New("missing buildbucket project")
+ }
+ client, err := pubsub.NewClient(c, projectID)
+ if err != nil {
+ return nil, err
+ }
+ return context.WithValue(c, &pubSubClientKey, &prodPubSubClient{c, client}), nil
+}
+
+func getTopic(c context.Context, id string) (*pubsub.Topic, error) {
+ client, err := getPubSubClient(c)
+ if err != nil {
+ return nil, err
+ }
+ return client.getTopic(id)
+}
+
+func getSubscription(c context.Context, id string) (*pubsub.Subscription, error) {
+ client, err := getPubSubClient(c)
+ if err != nil {
+ return nil, err
+ }
+ return client.getSubscription(id)
+}
+
+func createSubscription(c context.Context, id string, cfg pubsub.SubscriptionConfig) (
+ *pubsub.Subscription, error) {
+
+ client, err := getPubSubClient(c)
+ if err != nil {
+ return nil, err
+ }
+ return client.createSubscription(id, cfg)
+}
+
+// EnsurePubSubSubscribed makes sure the following subscriptions are in place:
+// * buildbucket, via the settings.Buildbucket.Topic setting
+func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error {
+ if settings.Buildbucket != nil {
+ // Install the production pubsub client pointing to the buildbucket project
+ // into the context.
+ c, err := withClient(c, settings.Buildbucket.Project)
+ if err != nil {
+ return err
+ }
+ return ensureBuildbucketSubscribed(c, settings.Buildbucket.Project)
+ }
+ // TODO(hinoka): Ensure buildbot subscribed.
+ return nil
+}
+
+// ensureSubscribed is called by a cron job and ensures that the Milo
+// instance is properly subscribed to the buildbucket subscription endpoint.
+func ensureBuildbucketSubscribed(c context.Context, projectID string) error {
+ topicID := "builds"
+ // Check to see if the topic exists first.
+ topic, err := getTopic(c, topicID)
+ switch err {
+ case errNotExist:
+ logging.WithError(err).Errorf(c, "%s does not exist", topicID)
+ return err
+ case nil:
+ // continue
+ default:
+ if strings.Contains(err.Error(), "PermissionDenied") {
+ URL := "https://console.cloud.google.com/iam-admin/iam/project?project=" + projectID
+ acct, serr := info.ServiceAccount(c)
+ if serr != nil {
+ acct = fmt.Sprintf("Unknown: %s", serr.Error())
+ }
+ // The documentation is incorrect. We need Editor permission because
+ // the Subscriber permission does NOT permit attaching subscriptions to
+ // topics or to view topics.
+ logging.WithError(err).Errorf(
+ c, "please go to %s and add %s as a Pub/Sub Editor", URL, acct)
+ } else {
+ logging.WithError(err).Errorf(c, "could not check topic %#v", topic)
+ }
+ return err
+ }
+ // Now check to see if the subscription already exists.
+ subID := info.AppID(c)
+ // Get the pubsub module of our app. We do not want to use info.ModuleHostname()
+ // because it returns a version pinned hostname instead of the default route.
+ sub, err := getSubscription(c, subID)
+ switch err {
+ case errNotExist:
+ // continue
+ case nil:
+ logging.Infof(c, "subscription %#v exists, no need to update", sub)
+ return nil
+ default:
+ logging.WithError(err).Errorf(c, "could not check subscription %#v", sub)
+ return err
+ }
+ pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c)
+
+ // No subscription exists, attach a new subscription to the existing topic.
+ endpointURL := url.URL{
+ Scheme: "https",
+ Host: pubsubModuleHost,
+ Path: "/_ah/push-handlers/buildbucket",
+ }
+ subConfig := pubsub.SubscriptionConfig{
+ Topic: topic,
+ PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()},
+ AckDeadline: time.Minute * 10,
+ }
+ newSub, err := createSubscription(c, subID, subConfig)
+ if err != nil {
+ if strings.Contains(err.Error(), "The supplied HTTP URL is not registered") {
+ registerURL := "https://console.cloud.google.com/apis/credentials/domainverification?project=" + projectID
+ verifyURL := "https://www.google.com/webmasters/verification/verification?hl=en-GB&siteUrl=http://" + pubsubModuleHost
+ logging.WithError(err).Errorf(
+ c, "The domain has to be verified and added.\n\n"+
+ "1. Go to %s\n"+
+ "2. Verify the domain\n"+
+ "3. Go to %s\n"+
+ "4. Add %s to allowed domains\n\n",
+ verifyURL, registerURL, pubsubModuleHost)
+ } else {
+ logging.WithError(err).Errorf(c, "could not create subscription %#v", sub)
+ }
+ return err
+ }
+ // Success!
+ logging.Infof(c, "successfully created subscription %#v", newSub)
+ return nil
+}
« no previous file with comments | « milo/common/config_test.go ('k') | milo/common/pubsub_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698