Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1)

Side by Side Diff: milo/common/pubsub.go

Issue 2981683002: Milo: Move buildbucket pubsub sub from buildbucket project to milo project (Closed)
Patch Set: subID -> appID Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | milo/common/pubsub_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 package common 1 package common
2 2
3 import ( 3 import (
4 "encoding/base64" 4 "encoding/base64"
5 "errors" 5 "errors"
6 "fmt" 6 "fmt"
7 "net/url" 7 "net/url"
8 "strings" 8 "strings"
9 "time" 9 "time"
10 10
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
45 getTopic(string) (*pubsub.Topic, error) 45 getTopic(string) (*pubsub.Topic, error)
46 46
47 // getSubscription returns the pubsub subscription if it exists, 47 // getSubscription returns the pubsub subscription if it exists,
48 // a notExist error if it does not exist, or an error if there was an er ror. 48 // a notExist error if it does not exist, or an error if there was an er ror.
49 getSubscription(string) (*pubsub.Subscription, error) 49 getSubscription(string) (*pubsub.Subscription, error)
50 50
51 createSubscription(string, pubsub.SubscriptionConfig) ( 51 createSubscription(string, pubsub.SubscriptionConfig) (
52 *pubsub.Subscription, error) 52 *pubsub.Subscription, error)
53 } 53 }
54 54
55 type pubsubClients map[string]pubsubClient
56
55 // prodPubSubClient is a wrapper around the production pubsub client. 57 // prodPubSubClient is a wrapper around the production pubsub client.
56 type prodPubSubClient struct { 58 type prodPubSubClient struct {
57 ctx context.Context 59 ctx context.Context
58 client *pubsub.Client 60 client *pubsub.Client
59 } 61 }
60 62
61 func (pc *prodPubSubClient) getTopic(id string) (*pubsub.Topic, error) { 63 func (pc *prodPubSubClient) getTopic(id string) (*pubsub.Topic, error) {
62 topic := pc.client.Topic(id) 64 topic := pc.client.Topic(id)
63 exists, err := topic.Exists(pc.ctx) 65 exists, err := topic.Exists(pc.ctx)
64 switch { 66 switch {
(...skipping 17 matching lines...) Expand all
82 return sub, nil 84 return sub, nil
83 } 85 }
84 86
85 func (pc *prodPubSubClient) createSubscription(id string, cfg pubsub.Subscriptio nConfig) ( 87 func (pc *prodPubSubClient) createSubscription(id string, cfg pubsub.Subscriptio nConfig) (
86 *pubsub.Subscription, error) { 88 *pubsub.Subscription, error) {
87 89
88 return pc.client.CreateSubscription(pc.ctx, id, cfg) 90 return pc.client.CreateSubscription(pc.ctx, id, cfg)
89 } 91 }
90 92
91 // getPubSubClient extracts a debug PubSub client out of the context. 93 // getPubSubClient extracts a debug PubSub client out of the context.
92 func getPubSubClient(c context.Context) (pubsubClient, error) { 94 func getPubSubClient(c context.Context, projectID string) (pubsubClient, error) {
93 » if client, ok := c.Value(&pubSubClientKey).(pubsubClient); ok { 95 » v := c.Value(&pubSubClientKey)
94 » » return client, nil 96 » if clients, ok := v.(pubsubClients); ok {
97 » » if client, ok := clients[projectID]; ok {
98 » » » return client, nil
99 » » }
100 » » return nil, fmt.Errorf("no pubsub clients installed for" + proje ctID)
nodir 2017/07/12 21:31:12 space missing ...for %s"
Ryan Tseng 2017/07/12 22:26:08 Done.
95 } 101 }
96 return nil, errors.New("no pubsub clients installed") 102 return nil, errors.New("no pubsub clients installed")
97 } 103 }
98 104
99 // withClient returns a context with a pubsub client instantiated to the 105 // withClient returns a context with a pubsub client instantiated to the
100 // given project ID 106 // given project ID
101 func withClient(c context.Context, projectID string) (context.Context, error) { 107 func withClient(c context.Context, projectID string) (context.Context, error) {
102 if projectID == "" { 108 if projectID == "" {
103 » » return nil, errors.New("missing buildbucket project") 109 » » return nil, errors.New("missing project id")
104 } 110 }
105 client, err := pubsub.NewClient(c, projectID) 111 client, err := pubsub.NewClient(c, projectID)
106 if err != nil { 112 if err != nil {
107 return nil, err 113 return nil, err
108 } 114 }
109 » return context.WithValue(c, &pubSubClientKey, &prodPubSubClient{c, clien t}), nil 115 » var clients pubsubClients
116 » var ok bool
117 » if clients, ok = c.Value(&pubSubClientKey).(pubsubClients); !ok {
118 » » clients = pubsubClients{}
119 » » c = context.WithValue(c, &pubSubClientKey, clients)
120 » }
121 » clients[projectID] = &prodPubSubClient{c, client}
nodir 2017/07/12 21:31:12 it used to be gouroutine safe, but now it is not.
Ryan Tseng 2017/07/12 22:26:08 Done.
122 » return c, nil
110 } 123 }
111 124
112 func getTopic(c context.Context, id string) (*pubsub.Topic, error) { 125 func getTopic(c context.Context, project, id string) (*pubsub.Topic, error) {
113 » client, err := getPubSubClient(c) 126 » client, err := getPubSubClient(c, project)
114 if err != nil { 127 if err != nil {
115 return nil, err 128 return nil, err
116 } 129 }
117 return client.getTopic(id) 130 return client.getTopic(id)
118 } 131 }
119 132
120 func getSubscription(c context.Context, id string) (*pubsub.Subscription, error) { 133 func getSubscription(c context.Context, project, id string) (*pubsub.Subscriptio n, error) {
121 » client, err := getPubSubClient(c) 134 » client, err := getPubSubClient(c, project)
122 if err != nil { 135 if err != nil {
123 return nil, err 136 return nil, err
124 } 137 }
125 return client.getSubscription(id) 138 return client.getSubscription(id)
126 } 139 }
127 140
128 func createSubscription(c context.Context, id string, cfg pubsub.SubscriptionCon fig) ( 141 func createSubscription(c context.Context, project, id string, cfg pubsub.Subscr iptionConfig) (
129 *pubsub.Subscription, error) { 142 *pubsub.Subscription, error) {
130 143
131 » client, err := getPubSubClient(c) 144 » client, err := getPubSubClient(c, project)
132 if err != nil { 145 if err != nil {
133 return nil, err 146 return nil, err
134 } 147 }
135 return client.createSubscription(id, cfg) 148 return client.createSubscription(id, cfg)
136 } 149 }
137 150
138 // EnsurePubSubSubscribed makes sure the following subscriptions are in place: 151 // EnsurePubSubSubscribed makes sure the following subscriptions are in place:
139 // * buildbucket, via the settings.Buildbucket.Topic setting 152 // * buildbucket, via the settings.Buildbucket.Topic setting
140 func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error { 153 func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error {
141 if settings.Buildbucket != nil { 154 if settings.Buildbucket != nil {
142 // Install the production pubsub client pointing to the buildbuc ket project 155 // Install the production pubsub client pointing to the buildbuc ket project
143 » » // into the context. 156 » » // into the context, so that we can get a reference to the topic .
144 c, err := withClient(c, settings.Buildbucket.Project) 157 c, err := withClient(c, settings.Buildbucket.Project)
145 if err != nil { 158 if err != nil {
146 return err 159 return err
147 } 160 }
161 // We also need a client for this project for the subscription.
162 c, err = withClient(c, info.AppID(c))
163 if err != nil {
164 return err
165 }
148 return ensureBuildbucketSubscribed(c, settings.Buildbucket.Proje ct) 166 return ensureBuildbucketSubscribed(c, settings.Buildbucket.Proje ct)
149 } 167 }
150 // TODO(hinoka): Ensure buildbot subscribed. 168 // TODO(hinoka): Ensure buildbot subscribed.
151 return nil 169 return nil
152 } 170 }
153 171
154 // ensureSubscribed is called by a cron job and ensures that the Milo 172 // ensureSubscribed is called by a cron job and ensures that the Milo
155 // instance is properly subscribed to the buildbucket subscription endpoint. 173 // instance is properly subscribed to the buildbucket subscription endpoint.
156 func ensureBuildbucketSubscribed(c context.Context, projectID string) error { 174 func ensureBuildbucketSubscribed(c context.Context, projectID string) error {
157 topicID := "builds" 175 topicID := "builds"
158 // Check to see if the topic exists first. 176 // Check to see if the topic exists first.
159 » topic, err := getTopic(c, topicID) 177 » topic, err := getTopic(c, projectID, topicID)
160 switch err { 178 switch err {
161 case errNotExist: 179 case errNotExist:
162 logging.WithError(err).Errorf(c, "%s does not exist", topicID) 180 logging.WithError(err).Errorf(c, "%s does not exist", topicID)
163 return err 181 return err
164 case nil: 182 case nil:
165 // continue 183 // continue
166 default: 184 default:
167 if strings.Contains(err.Error(), "PermissionDenied") { 185 if strings.Contains(err.Error(), "PermissionDenied") {
168 URL := "https://console.cloud.google.com/iam-admin/iam/p roject?project=" + projectID 186 URL := "https://console.cloud.google.com/iam-admin/iam/p roject?project=" + projectID
169 acct, serr := info.ServiceAccount(c) 187 acct, serr := info.ServiceAccount(c)
170 if serr != nil { 188 if serr != nil {
171 acct = fmt.Sprintf("Unknown: %s", serr.Error()) 189 acct = fmt.Sprintf("Unknown: %s", serr.Error())
172 } 190 }
173 // The documentation is incorrect. We need Editor permi ssion because 191 // The documentation is incorrect. We need Editor permi ssion because
174 // the Subscriber permission does NOT permit attaching s ubscriptions to 192 // the Subscriber permission does NOT permit attaching s ubscriptions to
175 // topics or to view topics. 193 // topics or to view topics.
176 logging.WithError(err).Errorf( 194 logging.WithError(err).Errorf(
177 c, "please go to %s and add %s as a Pub/Sub Edit or", URL, acct) 195 c, "please go to %s and add %s as a Pub/Sub Edit or", URL, acct)
178 } else { 196 } else {
179 logging.WithError(err).Errorf(c, "could not check topic %#v", topic) 197 logging.WithError(err).Errorf(c, "could not check topic %#v", topic)
180 } 198 }
181 return err 199 return err
182 } 200 }
183 // Now check to see if the subscription already exists. 201 // Now check to see if the subscription already exists.
184 » subID := info.AppID(c) 202 » appID := info.AppID(c)
185 » // Get the pubsub module of our app. We do not want to use info.ModuleH ostname() 203 » sub, err := getSubscription(c, appID, "buildbucket")
186 » // because it returns a version pinned hostname instead of the default r oute.
187 » sub, err := getSubscription(c, subID)
188 switch err { 204 switch err {
189 case errNotExist: 205 case errNotExist:
190 // continue 206 // continue
191 case nil: 207 case nil:
192 logging.Infof(c, "subscription %#v exists, no need to update", s ub) 208 logging.Infof(c, "subscription %#v exists, no need to update", s ub)
193 return nil 209 return nil
194 default: 210 default:
195 logging.WithError(err).Errorf(c, "could not check subscription % #v", sub) 211 logging.WithError(err).Errorf(c, "could not check subscription % #v", sub)
196 return err 212 return err
197 } 213 }
214 // Get the pubsub module of our app. We do not want to use info.ModuleH ostname()
215 // because it returns a version pinned hostname instead of the default r oute.
198 pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c) 216 pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c)
199 217
200 // No subscription exists, attach a new subscription to the existing top ic. 218 // No subscription exists, attach a new subscription to the existing top ic.
201 endpointURL := url.URL{ 219 endpointURL := url.URL{
202 Scheme: "https", 220 Scheme: "https",
203 Host: pubsubModuleHost, 221 Host: pubsubModuleHost,
204 Path: "/_ah/push-handlers/buildbucket", 222 Path: "/_ah/push-handlers/buildbucket",
205 } 223 }
206 subConfig := pubsub.SubscriptionConfig{ 224 subConfig := pubsub.SubscriptionConfig{
207 Topic: topic, 225 Topic: topic,
208 PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()}, 226 PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()},
209 AckDeadline: time.Minute * 10, 227 AckDeadline: time.Minute * 10,
210 } 228 }
211 » newSub, err := createSubscription(c, subID, subConfig) 229 » newSub, err := createSubscription(c, appID, "buildbucket", subConfig)
212 if err != nil { 230 if err != nil {
213 » » if strings.Contains(err.Error(), "The supplied HTTP URL is not r egistered") { 231 » » logging.WithError(err).Errorf(c, "could not create subscription %#v", sub)
nodir 2017/07/12 21:31:12 why do we both log error and return it? the caller
Ryan Tseng 2017/07/12 22:26:08 Done.
214 » » » registerURL := "https://console.cloud.google.com/apis/cr edentials/domainverification?project=" + projectID
215 » » » verifyURL := "https://www.google.com/webmasters/verifica tion/verification?hl=en-GB&siteUrl=http://" + pubsubModuleHost
216 » » » logging.WithError(err).Errorf(
217 » » » » c, "The domain has to be verified and added.\n\n "+
218 » » » » » "1. Go to %s\n"+
219 » » » » » "2. Verify the domain\n"+
220 » » » » » "3. Go to %s\n"+
221 » » » » » "4. Add %s to allowed domains\n\n",
222 » » » » verifyURL, registerURL, pubsubModuleHost)
223 » » } else {
224 » » » logging.WithError(err).Errorf(c, "could not create subsc ription %#v", sub)
225 » » }
226 return err 232 return err
227 } 233 }
228 // Success! 234 // Success!
229 logging.Infof(c, "successfully created subscription %#v", newSub) 235 logging.Infof(c, "successfully created subscription %#v", newSub)
230 return nil 236 return nil
231 } 237 }
OLDNEW
« no previous file with comments | « no previous file | milo/common/pubsub_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698