Chromium Code Reviews| Index: milo/common/pubsub.go |
| diff --git a/milo/common/pubsub.go b/milo/common/pubsub.go |
| index 56b79c2b05902c61395af9853b59294f0865550e..1ee2f6b1c3c486525aebfc378315769ae1b8e352 100644 |
| --- a/milo/common/pubsub.go |
| +++ b/milo/common/pubsub.go |
| @@ -52,6 +52,8 @@ type pubsubClient interface { |
| *pubsub.Subscription, error) |
| } |
| +type pubsubClients map[string]pubsubClient |
| + |
| // prodPubSubClient is a wrapper around the production pubsub client. |
| type prodPubSubClient struct { |
| ctx context.Context |
| @@ -89,9 +91,13 @@ func (pc *prodPubSubClient) createSubscription(id string, cfg pubsub.Subscriptio |
| } |
| // getPubSubClient extracts a debug PubSub client out of the context. |
| -func getPubSubClient(c context.Context) (pubsubClient, error) { |
| - if client, ok := c.Value(&pubSubClientKey).(pubsubClient); ok { |
| - return client, nil |
| +func getPubSubClient(c context.Context, projectID string) (pubsubClient, error) { |
| + v := c.Value(&pubSubClientKey) |
| + if clients, ok := v.(pubsubClients); ok { |
| + if client, ok := clients[projectID]; ok { |
| + return client, nil |
| + } |
| + return nil, fmt.Errorf("no pubsub clients installed for" + projectID) |
|
nodir
2017/07/12 21:31:12
space missing
...for %s"
Ryan Tseng
2017/07/12 22:26:08
Done.
|
| } |
| return nil, errors.New("no pubsub clients installed") |
| } |
| @@ -100,35 +106,42 @@ func getPubSubClient(c context.Context) (pubsubClient, error) { |
| // given project ID |
| func withClient(c context.Context, projectID string) (context.Context, error) { |
| if projectID == "" { |
| - return nil, errors.New("missing buildbucket project") |
| + return nil, errors.New("missing project id") |
| } |
| client, err := pubsub.NewClient(c, projectID) |
| if err != nil { |
| return nil, err |
| } |
| - return context.WithValue(c, &pubSubClientKey, &prodPubSubClient{c, client}), nil |
| + var clients pubsubClients |
| + var ok bool |
| + if clients, ok = c.Value(&pubSubClientKey).(pubsubClients); !ok { |
| + clients = pubsubClients{} |
| + c = context.WithValue(c, &pubSubClientKey, clients) |
| + } |
| + clients[projectID] = &prodPubSubClient{c, client} |
|
nodir
2017/07/12 21:31:12
it used to be gouroutine safe, but now it is not.
Ryan Tseng
2017/07/12 22:26:08
Done.
|
| + return c, nil |
| } |
| -func getTopic(c context.Context, id string) (*pubsub.Topic, error) { |
| - client, err := getPubSubClient(c) |
| +func getTopic(c context.Context, project, id string) (*pubsub.Topic, error) { |
| + client, err := getPubSubClient(c, project) |
| if err != nil { |
| return nil, err |
| } |
| return client.getTopic(id) |
| } |
| -func getSubscription(c context.Context, id string) (*pubsub.Subscription, error) { |
| - client, err := getPubSubClient(c) |
| +func getSubscription(c context.Context, project, id string) (*pubsub.Subscription, error) { |
| + client, err := getPubSubClient(c, project) |
| if err != nil { |
| return nil, err |
| } |
| return client.getSubscription(id) |
| } |
| -func createSubscription(c context.Context, id string, cfg pubsub.SubscriptionConfig) ( |
| +func createSubscription(c context.Context, project, id string, cfg pubsub.SubscriptionConfig) ( |
| *pubsub.Subscription, error) { |
| - client, err := getPubSubClient(c) |
| + client, err := getPubSubClient(c, project) |
| if err != nil { |
| return nil, err |
| } |
| @@ -140,11 +153,16 @@ func createSubscription(c context.Context, id string, cfg pubsub.SubscriptionCon |
| func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error { |
| if settings.Buildbucket != nil { |
| // Install the production pubsub client pointing to the buildbucket project |
| - // into the context. |
| + // into the context, so that we can get a reference to the topic. |
| c, err := withClient(c, settings.Buildbucket.Project) |
| if err != nil { |
| return err |
| } |
| + // We also need a client for this project for the subscription. |
| + c, err = withClient(c, info.AppID(c)) |
| + if err != nil { |
| + return err |
| + } |
| return ensureBuildbucketSubscribed(c, settings.Buildbucket.Project) |
| } |
| // TODO(hinoka): Ensure buildbot subscribed. |
| @@ -156,7 +174,7 @@ func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error |
| func ensureBuildbucketSubscribed(c context.Context, projectID string) error { |
| topicID := "builds" |
| // Check to see if the topic exists first. |
| - topic, err := getTopic(c, topicID) |
| + topic, err := getTopic(c, projectID, topicID) |
| switch err { |
| case errNotExist: |
| logging.WithError(err).Errorf(c, "%s does not exist", topicID) |
| @@ -181,10 +199,8 @@ func ensureBuildbucketSubscribed(c context.Context, projectID string) error { |
| return err |
| } |
| // Now check to see if the subscription already exists. |
| - subID := info.AppID(c) |
| - // Get the pubsub module of our app. We do not want to use info.ModuleHostname() |
| - // because it returns a version pinned hostname instead of the default route. |
| - sub, err := getSubscription(c, subID) |
| + appID := info.AppID(c) |
| + sub, err := getSubscription(c, appID, "buildbucket") |
| switch err { |
| case errNotExist: |
| // continue |
| @@ -195,6 +211,8 @@ func ensureBuildbucketSubscribed(c context.Context, projectID string) error { |
| logging.WithError(err).Errorf(c, "could not check subscription %#v", sub) |
| return err |
| } |
| + // Get the pubsub module of our app. We do not want to use info.ModuleHostname() |
| + // because it returns a version pinned hostname instead of the default route. |
| pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c) |
| // No subscription exists, attach a new subscription to the existing topic. |
| @@ -208,21 +226,9 @@ func ensureBuildbucketSubscribed(c context.Context, projectID string) error { |
| PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()}, |
| AckDeadline: time.Minute * 10, |
| } |
| - newSub, err := createSubscription(c, subID, subConfig) |
| + newSub, err := createSubscription(c, appID, "buildbucket", subConfig) |
| if err != nil { |
| - if strings.Contains(err.Error(), "The supplied HTTP URL is not registered") { |
| - registerURL := "https://console.cloud.google.com/apis/credentials/domainverification?project=" + projectID |
| - verifyURL := "https://www.google.com/webmasters/verification/verification?hl=en-GB&siteUrl=http://" + pubsubModuleHost |
| - logging.WithError(err).Errorf( |
| - c, "The domain has to be verified and added.\n\n"+ |
| - "1. Go to %s\n"+ |
| - "2. Verify the domain\n"+ |
| - "3. Go to %s\n"+ |
| - "4. Add %s to allowed domains\n\n", |
| - verifyURL, registerURL, pubsubModuleHost) |
| - } else { |
| - logging.WithError(err).Errorf(c, "could not create subscription %#v", sub) |
| - } |
| + logging.WithError(err).Errorf(c, "could not create subscription %#v", sub) |
|
nodir
2017/07/12 21:31:12
why do we both log error and return it? the caller
Ryan Tseng
2017/07/12 22:26:08
Done.
|
| return err |
| } |
| // Success! |