Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1014)

Unified Diff: milo/common/pubsub.go

Issue 2981683002: Milo: Move buildbucket pubsub sub from buildbucket project to milo project (Closed)
Patch Set: subID -> appID Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | milo/common/pubsub_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: milo/common/pubsub.go
diff --git a/milo/common/pubsub.go b/milo/common/pubsub.go
index 56b79c2b05902c61395af9853b59294f0865550e..1ee2f6b1c3c486525aebfc378315769ae1b8e352 100644
--- a/milo/common/pubsub.go
+++ b/milo/common/pubsub.go
@@ -52,6 +52,8 @@ type pubsubClient interface {
*pubsub.Subscription, error)
}
+type pubsubClients map[string]pubsubClient
+
// prodPubSubClient is a wrapper around the production pubsub client.
type prodPubSubClient struct {
ctx context.Context
@@ -89,9 +91,13 @@ func (pc *prodPubSubClient) createSubscription(id string, cfg pubsub.Subscriptio
}
// getPubSubClient extracts a debug PubSub client out of the context.
-func getPubSubClient(c context.Context) (pubsubClient, error) {
- if client, ok := c.Value(&pubSubClientKey).(pubsubClient); ok {
- return client, nil
+func getPubSubClient(c context.Context, projectID string) (pubsubClient, error) {
+ v := c.Value(&pubSubClientKey)
+ if clients, ok := v.(pubsubClients); ok {
+ if client, ok := clients[projectID]; ok {
+ return client, nil
+ }
+ return nil, fmt.Errorf("no pubsub clients installed for" + projectID)
nodir 2017/07/12 21:31:12 space missing ...for %s"
Ryan Tseng 2017/07/12 22:26:08 Done.
}
return nil, errors.New("no pubsub clients installed")
}
@@ -100,35 +106,42 @@ func getPubSubClient(c context.Context) (pubsubClient, error) {
// given project ID
func withClient(c context.Context, projectID string) (context.Context, error) {
if projectID == "" {
- return nil, errors.New("missing buildbucket project")
+ return nil, errors.New("missing project id")
}
client, err := pubsub.NewClient(c, projectID)
if err != nil {
return nil, err
}
- return context.WithValue(c, &pubSubClientKey, &prodPubSubClient{c, client}), nil
+ var clients pubsubClients
+ var ok bool
+ if clients, ok = c.Value(&pubSubClientKey).(pubsubClients); !ok {
+ clients = pubsubClients{}
+ c = context.WithValue(c, &pubSubClientKey, clients)
+ }
+ clients[projectID] = &prodPubSubClient{c, client}
nodir 2017/07/12 21:31:12 it used to be gouroutine safe, but now it is not.
Ryan Tseng 2017/07/12 22:26:08 Done.
+ return c, nil
}
-func getTopic(c context.Context, id string) (*pubsub.Topic, error) {
- client, err := getPubSubClient(c)
+func getTopic(c context.Context, project, id string) (*pubsub.Topic, error) {
+ client, err := getPubSubClient(c, project)
if err != nil {
return nil, err
}
return client.getTopic(id)
}
-func getSubscription(c context.Context, id string) (*pubsub.Subscription, error) {
- client, err := getPubSubClient(c)
+func getSubscription(c context.Context, project, id string) (*pubsub.Subscription, error) {
+ client, err := getPubSubClient(c, project)
if err != nil {
return nil, err
}
return client.getSubscription(id)
}
-func createSubscription(c context.Context, id string, cfg pubsub.SubscriptionConfig) (
+func createSubscription(c context.Context, project, id string, cfg pubsub.SubscriptionConfig) (
*pubsub.Subscription, error) {
- client, err := getPubSubClient(c)
+ client, err := getPubSubClient(c, project)
if err != nil {
return nil, err
}
@@ -140,11 +153,16 @@ func createSubscription(c context.Context, id string, cfg pubsub.SubscriptionCon
func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error {
if settings.Buildbucket != nil {
// Install the production pubsub client pointing to the buildbucket project
- // into the context.
+ // into the context, so that we can get a reference to the topic.
c, err := withClient(c, settings.Buildbucket.Project)
if err != nil {
return err
}
+ // We also need a client for this project for the subscription.
+ c, err = withClient(c, info.AppID(c))
+ if err != nil {
+ return err
+ }
return ensureBuildbucketSubscribed(c, settings.Buildbucket.Project)
}
// TODO(hinoka): Ensure buildbot subscribed.
@@ -156,7 +174,7 @@ func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error
func ensureBuildbucketSubscribed(c context.Context, projectID string) error {
topicID := "builds"
// Check to see if the topic exists first.
- topic, err := getTopic(c, topicID)
+ topic, err := getTopic(c, projectID, topicID)
switch err {
case errNotExist:
logging.WithError(err).Errorf(c, "%s does not exist", topicID)
@@ -181,10 +199,8 @@ func ensureBuildbucketSubscribed(c context.Context, projectID string) error {
return err
}
// Now check to see if the subscription already exists.
- subID := info.AppID(c)
- // Get the pubsub module of our app. We do not want to use info.ModuleHostname()
- // because it returns a version pinned hostname instead of the default route.
- sub, err := getSubscription(c, subID)
+ appID := info.AppID(c)
+ sub, err := getSubscription(c, appID, "buildbucket")
switch err {
case errNotExist:
// continue
@@ -195,6 +211,8 @@ func ensureBuildbucketSubscribed(c context.Context, projectID string) error {
logging.WithError(err).Errorf(c, "could not check subscription %#v", sub)
return err
}
+ // Get the pubsub module of our app. We do not want to use info.ModuleHostname()
+ // because it returns a version pinned hostname instead of the default route.
pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c)
// No subscription exists, attach a new subscription to the existing topic.
@@ -208,21 +226,9 @@ func ensureBuildbucketSubscribed(c context.Context, projectID string) error {
PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()},
AckDeadline: time.Minute * 10,
}
- newSub, err := createSubscription(c, subID, subConfig)
+ newSub, err := createSubscription(c, appID, "buildbucket", subConfig)
if err != nil {
- if strings.Contains(err.Error(), "The supplied HTTP URL is not registered") {
- registerURL := "https://console.cloud.google.com/apis/credentials/domainverification?project=" + projectID
- verifyURL := "https://www.google.com/webmasters/verification/verification?hl=en-GB&siteUrl=http://" + pubsubModuleHost
- logging.WithError(err).Errorf(
- c, "The domain has to be verified and added.\n\n"+
- "1. Go to %s\n"+
- "2. Verify the domain\n"+
- "3. Go to %s\n"+
- "4. Add %s to allowed domains\n\n",
- verifyURL, registerURL, pubsubModuleHost)
- } else {
- logging.WithError(err).Errorf(c, "could not create subscription %#v", sub)
- }
+ logging.WithError(err).Errorf(c, "could not create subscription %#v", sub)
nodir 2017/07/12 21:31:12 why do we both log error and return it? the caller
Ryan Tseng 2017/07/12 22:26:08 Done.
return err
}
// Success!
« no previous file with comments | « no previous file | milo/common/pubsub_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698