Chromium Code Reviews| Index: milo/common/pubsub.go |
| diff --git a/milo/common/pubsub.go b/milo/common/pubsub.go |
| index 56b79c2b05902c61395af9853b59294f0865550e..c39dfa1a87ca20a7ce477f0416320db15fc088ca 100644 |
| --- a/milo/common/pubsub.go |
| +++ b/milo/common/pubsub.go |
| @@ -2,7 +2,6 @@ package common |
| import ( |
| "encoding/base64" |
| - "errors" |
| "fmt" |
| "net/url" |
| "strings" |
| @@ -12,11 +11,12 @@ import ( |
| "golang.org/x/net/context" |
| "github.com/luci/gae/service/info" |
| + "github.com/luci/luci-go/common/errors" |
| "github.com/luci/luci-go/common/logging" |
| "github.com/luci/luci-go/milo/api/config" |
| ) |
| -var pubSubClientKey = "stores a pubsubClient" |
| +var pubsubClientFactoryKey = "stores a pubsubClientFactory" |
| type PubSubMessage struct { |
| Attributes map[string]string `json:"attributes"` |
| @@ -42,25 +42,30 @@ func (m *PubSubSubscription) GetData() ([]byte, error) { |
| type pubsubClient interface { |
| // getTopic returns the pubsub topic if it exists, a notExist error if |
| // it does not exist, or an error if there was an error. |
| - getTopic(string) (*pubsub.Topic, error) |
| + getTopic(context.Context, string) (*pubsub.Topic, error) |
| // getSubscription returns the pubsub subscription if it exists, |
| // a notExist error if it does not exist, or an error if there was an error. |
| - getSubscription(string) (*pubsub.Subscription, error) |
| + getSubscription(context.Context, string) (*pubsub.Subscription, error) |
| - createSubscription(string, pubsub.SubscriptionConfig) ( |
| + createSubscription(context.Context, string, pubsub.SubscriptionConfig) ( |
| *pubsub.Subscription, error) |
| } |
| +// pubsubClientFactory is a stubbable factory that produces pubsubClients bound |
| +// to project IDs. |
| +type pubsubClientFactory interface { |
|
nodir
2017/07/14 01:09:04
wouldn't
type pubsubClientFactory func(ctx conte
Ryan Tseng
2017/07/14 17:41:49
Thanks, I forgot this was legal
edit: this is pos
nodir
2017/07/14 18:20:03
I am not following. The following compiles: https:
|
| + newClient(context.Context, string) (pubsubClient, error) |
| +} |
| + |
| // prodPubSubClient is a wrapper around the production pubsub client. |
| type prodPubSubClient struct { |
| - ctx context.Context |
| - client *pubsub.Client |
| + *pubsub.Client |
| } |
| -func (pc *prodPubSubClient) getTopic(id string) (*pubsub.Topic, error) { |
| - topic := pc.client.Topic(id) |
| - exists, err := topic.Exists(pc.ctx) |
| +func (pc *prodPubSubClient) getTopic(c context.Context, id string) (*pubsub.Topic, error) { |
| + topic := pc.Client.Topic(id) |
| + exists, err := topic.Exists(c) |
| switch { |
| case err != nil: |
| return nil, err |
| @@ -70,9 +75,9 @@ func (pc *prodPubSubClient) getTopic(id string) (*pubsub.Topic, error) { |
| return topic, nil |
| } |
| -func (pc *prodPubSubClient) getSubscription(id string) (*pubsub.Subscription, error) { |
| - sub := pc.client.Subscription(id) |
| - exists, err := sub.Exists(pc.ctx) |
| +func (pc *prodPubSubClient) getSubscription(c context.Context, id string) (*pubsub.Subscription, error) { |
| + sub := pc.Client.Subscription(id) |
| + exists, err := sub.Exists(c) |
| switch { |
| case err != nil: |
| return nil, err |
| @@ -82,69 +87,39 @@ func (pc *prodPubSubClient) getSubscription(id string) (*pubsub.Subscription, er |
| return sub, nil |
| } |
| -func (pc *prodPubSubClient) createSubscription(id string, cfg pubsub.SubscriptionConfig) ( |
| +func (pc *prodPubSubClient) createSubscription( |
| + c context.Context, id string, cfg pubsub.SubscriptionConfig) ( |
| *pubsub.Subscription, error) { |
| - return pc.client.CreateSubscription(pc.ctx, id, cfg) |
| + return pc.Client.CreateSubscription(c, id, cfg) |
| } |
| -// getPubSubClient extracts a debug PubSub client out of the context. |
| -func getPubSubClient(c context.Context) (pubsubClient, error) { |
| - if client, ok := c.Value(&pubSubClientKey).(pubsubClient); ok { |
| - return client, nil |
| - } |
| - return nil, errors.New("no pubsub clients installed") |
| +type prodPubSubClientFactory struct{} |
| + |
| +func (fac *prodPubSubClientFactory) newClient(c context.Context, projectID string) (pubsubClient, error) { |
| + cli, err := pubsub.NewClient(c, projectID) |
| + return &prodPubSubClient{cli}, err |
| } |
| // withClient returns a context with a pubsub client instantiated to the |
| // given project ID |
|
nodir
2017/07/14 01:09:04
this comment is stale
Ryan Tseng
2017/07/14 17:41:50
Done.
|
| -func withClient(c context.Context, projectID string) (context.Context, error) { |
| - if projectID == "" { |
| - return nil, errors.New("missing buildbucket project") |
| - } |
| - client, err := pubsub.NewClient(c, projectID) |
| - if err != nil { |
| - return nil, err |
| - } |
| - return context.WithValue(c, &pubSubClientKey, &prodPubSubClient{c, client}), nil |
| +func withClientFactory(c context.Context, fac pubsubClientFactory) context.Context { |
| + return context.WithValue(c, &pubsubClientFactoryKey, fac) |
| } |
| -func getTopic(c context.Context, id string) (*pubsub.Topic, error) { |
| - client, err := getPubSubClient(c) |
| - if err != nil { |
| - return nil, err |
| - } |
| - return client.getTopic(id) |
| -} |
| - |
| -func getSubscription(c context.Context, id string) (*pubsub.Subscription, error) { |
| - client, err := getPubSubClient(c) |
| - if err != nil { |
| - return nil, err |
| - } |
| - return client.getSubscription(id) |
| -} |
| - |
| -func createSubscription(c context.Context, id string, cfg pubsub.SubscriptionConfig) ( |
| - *pubsub.Subscription, error) { |
| - |
| - client, err := getPubSubClient(c) |
| - if err != nil { |
| - return nil, err |
| +func newPubSubClient(c context.Context, projectID string) (pubsubClient, error) { |
| + if fac, ok := c.Value(&pubsubClientFactoryKey).(pubsubClientFactory); !ok { |
| + panic("no pubsub client factory installed") |
| + } else { |
| + return fac.newClient(c, projectID) |
| } |
| - return client.createSubscription(id, cfg) |
| } |
| // EnsurePubSubSubscribed makes sure the following subscriptions are in place: |
| // * buildbucket, via the settings.Buildbucket.Topic setting |
| func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error { |
| if settings.Buildbucket != nil { |
| - // Install the production pubsub client pointing to the buildbucket project |
| - // into the context. |
| - c, err := withClient(c, settings.Buildbucket.Project) |
| - if err != nil { |
| - return err |
| - } |
| + c = withClientFactory(c, &prodPubSubClientFactory{}) |
| return ensureBuildbucketSubscribed(c, settings.Buildbucket.Project) |
| } |
| // TODO(hinoka): Ensure buildbot subscribed. |
| @@ -155,12 +130,15 @@ func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error |
| // instance is properly subscribed to the buildbucket subscription endpoint. |
| func ensureBuildbucketSubscribed(c context.Context, projectID string) error { |
| topicID := "builds" |
| - // Check to see if the topic exists first. |
| - topic, err := getTopic(c, topicID) |
| + // Check the buildbucket project to see if the topic exists first. |
| + bbClient, err := newPubSubClient(c, projectID) |
| + if err != nil { |
| + return err |
| + } |
| + topic, err := bbClient.getTopic(c, topicID) |
| switch err { |
| case errNotExist: |
| - logging.WithError(err).Errorf(c, "%s does not exist", topicID) |
| - return err |
| + return errors.Annotate(err, "%s does not exist", topicID).Err() |
| case nil: |
| // continue |
| default: |
| @@ -181,10 +159,11 @@ func ensureBuildbucketSubscribed(c context.Context, projectID string) error { |
| return err |
| } |
| // Now check to see if the subscription already exists. |
| - subID := info.AppID(c) |
| - // Get the pubsub module of our app. We do not want to use info.ModuleHostname() |
| - // because it returns a version pinned hostname instead of the default route. |
| - sub, err := getSubscription(c, subID) |
| + miloClient, err := newPubSubClient(c, info.AppID(c)) |
| + if err != nil { |
| + return err |
| + } |
| + sub, err := miloClient.getSubscription(c, "buildbucket") |
| switch err { |
| case errNotExist: |
| // continue |
| @@ -195,6 +174,8 @@ func ensureBuildbucketSubscribed(c context.Context, projectID string) error { |
| logging.WithError(err).Errorf(c, "could not check subscription %#v", sub) |
| return err |
| } |
| + // Get the pubsub module of our app. We do not want to use info.ModuleHostname() |
| + // because it returns a version pinned hostname instead of the default route. |
| pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c) |
| // No subscription exists, attach a new subscription to the existing topic. |
| @@ -208,22 +189,9 @@ func ensureBuildbucketSubscribed(c context.Context, projectID string) error { |
| PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()}, |
| AckDeadline: time.Minute * 10, |
| } |
| - newSub, err := createSubscription(c, subID, subConfig) |
| + newSub, err := miloClient.createSubscription(c, "buildbucket", subConfig) |
| if err != nil { |
| - if strings.Contains(err.Error(), "The supplied HTTP URL is not registered") { |
| - registerURL := "https://console.cloud.google.com/apis/credentials/domainverification?project=" + projectID |
| - verifyURL := "https://www.google.com/webmasters/verification/verification?hl=en-GB&siteUrl=http://" + pubsubModuleHost |
| - logging.WithError(err).Errorf( |
| - c, "The domain has to be verified and added.\n\n"+ |
| - "1. Go to %s\n"+ |
| - "2. Verify the domain\n"+ |
| - "3. Go to %s\n"+ |
| - "4. Add %s to allowed domains\n\n", |
| - verifyURL, registerURL, pubsubModuleHost) |
| - } else { |
| - logging.WithError(err).Errorf(c, "could not create subscription %#v", sub) |
| - } |
| - return err |
| + return errors.Annotate(err, "could not create subscription %#v", sub).Err() |
| } |
| // Success! |
| logging.Infof(c, "successfully created subscription %#v", newSub) |