| Index: milo/common/pubsub.go
|
| diff --git a/milo/common/pubsub.go b/milo/common/pubsub.go
|
| index 56b79c2b05902c61395af9853b59294f0865550e..56ec5c5c963a92740124d1d1b9115fb846d0ca64 100644
|
| --- a/milo/common/pubsub.go
|
| +++ b/milo/common/pubsub.go
|
| @@ -2,7 +2,6 @@ package common
|
|
|
| import (
|
| "encoding/base64"
|
| - "errors"
|
| "fmt"
|
| "net/url"
|
| "strings"
|
| @@ -12,11 +11,12 @@ import (
|
| "golang.org/x/net/context"
|
|
|
| "github.com/luci/gae/service/info"
|
| + "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/logging"
|
| "github.com/luci/luci-go/milo/api/config"
|
| )
|
|
|
| -var pubSubClientKey = "stores a pubsubClient"
|
| +var pubsubClientFactoryKey = "stores a pubsubClientFactory"
|
|
|
| type PubSubMessage struct {
|
| Attributes map[string]string `json:"attributes"`
|
| @@ -42,25 +42,28 @@ func (m *PubSubSubscription) GetData() ([]byte, error) {
|
| type pubsubClient interface {
|
| // getTopic returns the pubsub topic if it exists, a notExist error if
|
| // it does not exist, or an error if there was an error.
|
| - getTopic(string) (*pubsub.Topic, error)
|
| + getTopic(context.Context, string) (*pubsub.Topic, error)
|
|
|
| // getSubscription returns the pubsub subscription if it exists,
|
| // a notExist error if it does not exist, or an error if there was an error.
|
| - getSubscription(string) (*pubsub.Subscription, error)
|
| + getSubscription(context.Context, string) (*pubsub.Subscription, error)
|
|
|
| - createSubscription(string, pubsub.SubscriptionConfig) (
|
| + createSubscription(context.Context, string, pubsub.SubscriptionConfig) (
|
| *pubsub.Subscription, error)
|
| }
|
|
|
| +// pubsubClientFactory is a stubbable factory that produces pubsubClients bound
|
| +// to project IDs.
|
| +type pubsubClientFactory func(context.Context, string) (pubsubClient, error)
|
| +
|
| // prodPubSubClient is a wrapper around the production pubsub client.
|
| type prodPubSubClient struct {
|
| - ctx context.Context
|
| - client *pubsub.Client
|
| + *pubsub.Client
|
| }
|
|
|
| -func (pc *prodPubSubClient) getTopic(id string) (*pubsub.Topic, error) {
|
| - topic := pc.client.Topic(id)
|
| - exists, err := topic.Exists(pc.ctx)
|
| +func (pc *prodPubSubClient) getTopic(c context.Context, id string) (*pubsub.Topic, error) {
|
| + topic := pc.Client.Topic(id)
|
| + exists, err := topic.Exists(c)
|
| switch {
|
| case err != nil:
|
| return nil, err
|
| @@ -70,9 +73,9 @@ func (pc *prodPubSubClient) getTopic(id string) (*pubsub.Topic, error) {
|
| return topic, nil
|
| }
|
|
|
| -func (pc *prodPubSubClient) getSubscription(id string) (*pubsub.Subscription, error) {
|
| - sub := pc.client.Subscription(id)
|
| - exists, err := sub.Exists(pc.ctx)
|
| +func (pc *prodPubSubClient) getSubscription(c context.Context, id string) (*pubsub.Subscription, error) {
|
| + sub := pc.Client.Subscription(id)
|
| + exists, err := sub.Exists(c)
|
| switch {
|
| case err != nil:
|
| return nil, err
|
| @@ -82,85 +85,55 @@ func (pc *prodPubSubClient) getSubscription(id string) (*pubsub.Subscription, er
|
| return sub, nil
|
| }
|
|
|
| -func (pc *prodPubSubClient) createSubscription(id string, cfg pubsub.SubscriptionConfig) (
|
| +func (pc *prodPubSubClient) createSubscription(
|
| + c context.Context, id string, cfg pubsub.SubscriptionConfig) (
|
| *pubsub.Subscription, error) {
|
|
|
| - return pc.client.CreateSubscription(pc.ctx, id, cfg)
|
| -}
|
| -
|
| -// getPubSubClient extracts a debug PubSub client out of the context.
|
| -func getPubSubClient(c context.Context) (pubsubClient, error) {
|
| - if client, ok := c.Value(&pubSubClientKey).(pubsubClient); ok {
|
| - return client, nil
|
| - }
|
| - return nil, errors.New("no pubsub clients installed")
|
| -}
|
| -
|
| -// withClient returns a context with a pubsub client instantiated to the
|
| -// given project ID
|
| -func withClient(c context.Context, projectID string) (context.Context, error) {
|
| - if projectID == "" {
|
| - return nil, errors.New("missing buildbucket project")
|
| - }
|
| - client, err := pubsub.NewClient(c, projectID)
|
| - if err != nil {
|
| - return nil, err
|
| - }
|
| - return context.WithValue(c, &pubSubClientKey, &prodPubSubClient{c, client}), nil
|
| + return pc.Client.CreateSubscription(c, id, cfg)
|
| }
|
|
|
| -func getTopic(c context.Context, id string) (*pubsub.Topic, error) {
|
| - client, err := getPubSubClient(c)
|
| - if err != nil {
|
| - return nil, err
|
| - }
|
| - return client.getTopic(id)
|
| +func prodPubSubClientFactory(c context.Context, projectID string) (pubsubClient, error) {
|
| + cli, err := pubsub.NewClient(c, projectID)
|
| + return &prodPubSubClient{cli}, err
|
| }
|
|
|
| -func getSubscription(c context.Context, id string) (*pubsub.Subscription, error) {
|
| - client, err := getPubSubClient(c)
|
| - if err != nil {
|
| - return nil, err
|
| - }
|
| - return client.getSubscription(id)
|
| +// withClientFactory returns a context with a given pubsub client factory.
|
| +func withClientFactory(c context.Context, fac pubsubClientFactory) context.Context {
|
| + return context.WithValue(c, &pubsubClientFactoryKey, fac)
|
| }
|
|
|
| -func createSubscription(c context.Context, id string, cfg pubsub.SubscriptionConfig) (
|
| - *pubsub.Subscription, error) {
|
| -
|
| - client, err := getPubSubClient(c)
|
| - if err != nil {
|
| - return nil, err
|
| +func newPubSubClient(c context.Context, projectID string) (pubsubClient, error) {
|
| + if fac, ok := c.Value(&pubsubClientFactoryKey).(pubsubClientFactory); !ok {
|
| + panic("no pubsub client factory installed")
|
| + } else {
|
| + return fac(c, projectID)
|
| }
|
| - return client.createSubscription(id, cfg)
|
| }
|
|
|
| // EnsurePubSubSubscribed makes sure the following subscriptions are in place:
|
| // * buildbucket, via the settings.Buildbucket.Topic setting
|
| func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error {
|
| if settings.Buildbucket != nil {
|
| - // Install the production pubsub client pointing to the buildbucket project
|
| - // into the context.
|
| - c, err := withClient(c, settings.Buildbucket.Project)
|
| - if err != nil {
|
| - return err
|
| - }
|
| + c = withClientFactory(c, prodPubSubClientFactory)
|
| return ensureBuildbucketSubscribed(c, settings.Buildbucket.Project)
|
| }
|
| // TODO(hinoka): Ensure buildbot subscribed.
|
| return nil
|
| }
|
|
|
| -// ensureSubscribed is called by a cron job and ensures that the Milo
|
| +// ensureBuildbucketSubscribedis called by a cron job and ensures that the Milo
|
| // instance is properly subscribed to the buildbucket subscription endpoint.
|
| func ensureBuildbucketSubscribed(c context.Context, projectID string) error {
|
| topicID := "builds"
|
| - // Check to see if the topic exists first.
|
| - topic, err := getTopic(c, topicID)
|
| + // Check the buildbucket project to see if the topic exists first.
|
| + bbClient, err := newPubSubClient(c, projectID)
|
| + if err != nil {
|
| + return err
|
| + }
|
| + topic, err := bbClient.getTopic(c, topicID)
|
| switch err {
|
| case errNotExist:
|
| - logging.WithError(err).Errorf(c, "%s does not exist", topicID)
|
| - return err
|
| + return errors.Annotate(err, "%s does not exist", topicID).Err()
|
| case nil:
|
| // continue
|
| default:
|
| @@ -181,10 +154,11 @@ func ensureBuildbucketSubscribed(c context.Context, projectID string) error {
|
| return err
|
| }
|
| // Now check to see if the subscription already exists.
|
| - subID := info.AppID(c)
|
| - // Get the pubsub module of our app. We do not want to use info.ModuleHostname()
|
| - // because it returns a version pinned hostname instead of the default route.
|
| - sub, err := getSubscription(c, subID)
|
| + miloClient, err := newPubSubClient(c, info.AppID(c))
|
| + if err != nil {
|
| + return err
|
| + }
|
| + sub, err := miloClient.getSubscription(c, "buildbucket")
|
| switch err {
|
| case errNotExist:
|
| // continue
|
| @@ -195,6 +169,8 @@ func ensureBuildbucketSubscribed(c context.Context, projectID string) error {
|
| logging.WithError(err).Errorf(c, "could not check subscription %#v", sub)
|
| return err
|
| }
|
| + // Get the pubsub module of our app. We do not want to use info.ModuleHostname()
|
| + // because it returns a version pinned hostname instead of the default route.
|
| pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c)
|
|
|
| // No subscription exists, attach a new subscription to the existing topic.
|
| @@ -208,22 +184,9 @@ func ensureBuildbucketSubscribed(c context.Context, projectID string) error {
|
| PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()},
|
| AckDeadline: time.Minute * 10,
|
| }
|
| - newSub, err := createSubscription(c, subID, subConfig)
|
| + newSub, err := miloClient.createSubscription(c, "buildbucket", subConfig)
|
| if err != nil {
|
| - if strings.Contains(err.Error(), "The supplied HTTP URL is not registered") {
|
| - registerURL := "https://console.cloud.google.com/apis/credentials/domainverification?project=" + projectID
|
| - verifyURL := "https://www.google.com/webmasters/verification/verification?hl=en-GB&siteUrl=http://" + pubsubModuleHost
|
| - logging.WithError(err).Errorf(
|
| - c, "The domain has to be verified and added.\n\n"+
|
| - "1. Go to %s\n"+
|
| - "2. Verify the domain\n"+
|
| - "3. Go to %s\n"+
|
| - "4. Add %s to allowed domains\n\n",
|
| - verifyURL, registerURL, pubsubModuleHost)
|
| - } else {
|
| - logging.WithError(err).Errorf(c, "could not create subscription %#v", sub)
|
| - }
|
| - return err
|
| + return errors.Annotate(err, "could not create subscription %#v", sub).Err()
|
| }
|
| // Success!
|
| logging.Infof(c, "successfully created subscription %#v", newSub)
|
|
|