Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(341)

Side by Side Diff: milo/common/pubsub.go

Issue 2981683002: Milo: Move buildbucket pubsub sub from buildbucket project to milo project (Closed)
Patch Set: Fixes Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | milo/common/pubsub_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 package common 1 package common
2 2
3 import ( 3 import (
4 "encoding/base64" 4 "encoding/base64"
5 "errors" 5 "errors"
6 "fmt" 6 "fmt"
7 "net/url" 7 "net/url"
8 "strings" 8 "strings"
9 "sync"
9 "time" 10 "time"
10 11
11 "cloud.google.com/go/pubsub" 12 "cloud.google.com/go/pubsub"
12 "golang.org/x/net/context" 13 "golang.org/x/net/context"
13 14
14 "github.com/luci/gae/service/info" 15 "github.com/luci/gae/service/info"
16 luciErrors "github.com/luci/luci-go/common/errors"
iannucci 2017/07/13 01:54:02 just use this instead of errors. there's no reason
Ryan Tseng 2017/07/14 00:51:06 Done.
15 "github.com/luci/luci-go/common/logging" 17 "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/milo/api/config" 18 "github.com/luci/luci-go/milo/api/config"
17 ) 19 )
18 20
19 var pubSubClientKey = "stores a pubsubClient" 21 var pubSubClientKey = "stores a pubsubClient"
iannucci 2017/07/13 01:54:02 *pubsubClients ?
Ryan Tseng 2017/07/14 00:51:06 Done.
20 22
21 type PubSubMessage struct { 23 type PubSubMessage struct {
22 Attributes map[string]string `json:"attributes"` 24 Attributes map[string]string `json:"attributes"`
23 Data string `json:"data"` 25 Data string `json:"data"`
24 MessageID string `json:"message_id"` 26 MessageID string `json:"message_id"`
25 } 27 }
26 28
27 type PubSubSubscription struct { 29 type PubSubSubscription struct {
28 Message PubSubMessage `json:"message"` 30 Message PubSubMessage `json:"message"`
29 Subscription string `json:"subscription"` 31 Subscription string `json:"subscription"`
30 } 32 }
31 33
32 var errNotExist = errors.New("does not exist") 34 var errNotExist = errors.New("does not exist")
33 35
34 // GetData returns the expanded form of Data (decoded from base64). 36 // GetData returns the expanded form of Data (decoded from base64).
35 func (m *PubSubSubscription) GetData() ([]byte, error) { 37 func (m *PubSubSubscription) GetData() ([]byte, error) {
36 return base64.StdEncoding.DecodeString(m.Message.Data) 38 return base64.StdEncoding.DecodeString(m.Message.Data)
37 } 39 }
38 40
39 // pubsubClient is an interface representing a pubsub.Client containing only 41 // pubsubClient is an interface representing a pubsub.Client containing only
40 // the functions that Milo calls. Internal use only, can be swapped 42 // the functions that Milo calls. Internal use only, can be swapped
41 // out for testing. 43 // out for testing.
42 type pubsubClient interface { 44 type pubsubClient interface {
43 // getTopic returns the pubsub topic if it exists, a notExist error if 45 // getTopic returns the pubsub topic if it exists, a notExist error if
44 // it does not exist, or an error if there was an error. 46 // it does not exist, or an error if there was an error.
45 » getTopic(string) (*pubsub.Topic, error) 47 » getTopic(context.Context, string) (*pubsub.Topic, error)
46 48
47 // getSubscription returns the pubsub subscription if it exists, 49 // getSubscription returns the pubsub subscription if it exists,
48 // a notExist error if it does not exist, or an error if there was an er ror. 50 // a notExist error if it does not exist, or an error if there was an er ror.
49 » getSubscription(string) (*pubsub.Subscription, error) 51 » getSubscription(context.Context, string) (*pubsub.Subscription, error)
50 52
51 » createSubscription(string, pubsub.SubscriptionConfig) ( 53 » createSubscription(context.Context, string, pubsub.SubscriptionConfig) (
52 *pubsub.Subscription, error) 54 *pubsub.Subscription, error)
53 } 55 }
54 56
57 type pubsubClients struct {
iannucci 2017/07/13 02:16:27 now that I'm looking at this again... this is a bi
Ryan Tseng 2017/07/14 00:51:06 Made this back into a client factory
58 clients map[string]pubsubClient
59 lock sync.RWMutex
60 }
61
62 func newClientsBundle() *pubsubClients {
63 return &pubsubClients{
64 clients: map[string]pubsubClient{},
65 }
66 }
67
55 // prodPubSubClient is a wrapper around the production pubsub client. 68 // prodPubSubClient is a wrapper around the production pubsub client.
56 type prodPubSubClient struct { 69 type prodPubSubClient struct {
57 » ctx context.Context 70 » *pubsub.Client
58 » client *pubsub.Client
59 } 71 }
60 72
61 func (pc *prodPubSubClient) getTopic(id string) (*pubsub.Topic, error) { 73 func (pc *prodPubSubClient) getTopic(c context.Context, id string) (*pubsub.Topi c, error) {
62 » topic := pc.client.Topic(id) 74 » topic := pc.Client.Topic(id)
63 » exists, err := topic.Exists(pc.ctx) 75 » exists, err := topic.Exists(c)
64 switch { 76 switch {
65 case err != nil: 77 case err != nil:
66 return nil, err 78 return nil, err
67 case !exists: 79 case !exists:
68 return nil, errNotExist 80 return nil, errNotExist
69 } 81 }
70 return topic, nil 82 return topic, nil
71 } 83 }
72 84
73 func (pc *prodPubSubClient) getSubscription(id string) (*pubsub.Subscription, er ror) { 85 func (pc *prodPubSubClient) getSubscription(c context.Context, id string) (*pubs ub.Subscription, error) {
74 » sub := pc.client.Subscription(id) 86 » sub := pc.Client.Subscription(id)
75 » exists, err := sub.Exists(pc.ctx) 87 » exists, err := sub.Exists(c)
76 switch { 88 switch {
77 case err != nil: 89 case err != nil:
78 return nil, err 90 return nil, err
79 case !exists: 91 case !exists:
80 return nil, errNotExist 92 return nil, errNotExist
81 } 93 }
82 return sub, nil 94 return sub, nil
83 } 95 }
84 96
85 func (pc *prodPubSubClient) createSubscription(id string, cfg pubsub.Subscriptio nConfig) ( 97 func (pc *prodPubSubClient) createSubscription(
98 » c context.Context, id string, cfg pubsub.SubscriptionConfig) (
86 *pubsub.Subscription, error) { 99 *pubsub.Subscription, error) {
87 100
88 » return pc.client.CreateSubscription(pc.ctx, id, cfg) 101 » return pc.Client.CreateSubscription(c, id, cfg)
89 } 102 }
90 103
91 // getPubSubClient extracts a debug PubSub client out of the context. 104 // getPubSubClient extracts a debug PubSub client out of the context.
92 func getPubSubClient(c context.Context) (pubsubClient, error) { 105 func getPubSubClient(c context.Context, projectID string) (pubsubClient, error) {
93 » if client, ok := c.Value(&pubSubClientKey).(pubsubClient); ok { 106 » v := c.Value(&pubSubClientKey)
94 » » return client, nil 107 » if clients, ok := v.(*pubsubClients); ok {
108 » » clients.lock.RLock()
109 » » client, ok := clients.clients[projectID]
110 » » clients.lock.RUnlock()
111 » » if ok {
112 » » » return client, nil
113 » » }
114 » » return nil, fmt.Errorf("no pubsub clients installed for %s", pro jectID)
95 } 115 }
96 return nil, errors.New("no pubsub clients installed") 116 return nil, errors.New("no pubsub clients installed")
97 } 117 }
98 118
99 // withClient returns a context with a pubsub client instantiated to the 119 // withClient returns a context with a pubsub client instantiated to the
100 // given project ID 120 // given project ID
101 func withClient(c context.Context, projectID string) (context.Context, error) { 121 func withClient(c context.Context, projectID string) (context.Context, error) {
102 if projectID == "" { 122 if projectID == "" {
103 » » return nil, errors.New("missing buildbucket project") 123 » » return nil, errors.New("missing project id")
104 } 124 }
105 client, err := pubsub.NewClient(c, projectID) 125 client, err := pubsub.NewClient(c, projectID)
nodir 2017/07/13 02:07:11 Just noticed c here. Memory leakish problem happen
Ryan Tseng 2017/07/13 02:27:18 The client is in the context because of the guidel
106 if err != nil { 126 if err != nil {
107 return nil, err 127 return nil, err
108 } 128 }
109 » return context.WithValue(c, &pubSubClientKey, &prodPubSubClient{c, clien t}), nil 129 » var clients *pubsubClients
130 » var ok bool
131 » if clients, ok = c.Value(&pubSubClientKey).(*pubsubClients); !ok {
132 » » clients = newClientsBundle()
133 » » c = context.WithValue(c, &pubSubClientKey, clients)
134 » }
135 » clients.lock.Lock()
136 » clients.clients[projectID] = &prodPubSubClient{client}
nodir 2017/07/13 02:07:11 What if it is already there?
Ryan Tseng 2017/07/13 02:27:18 it's overwritten
137 » clients.lock.Unlock()
138 » return c, nil
110 } 139 }
111 140
112 func getTopic(c context.Context, id string) (*pubsub.Topic, error) { 141 func getTopic(c context.Context, project, id string) (*pubsub.Topic, error) {
113 » client, err := getPubSubClient(c) 142 » client, err := getPubSubClient(c, project)
114 if err != nil { 143 if err != nil {
115 return nil, err 144 return nil, err
116 } 145 }
117 » return client.getTopic(id) 146 » return client.getTopic(c, id)
118 } 147 }
119 148
120 func getSubscription(c context.Context, id string) (*pubsub.Subscription, error) { 149 func getSubscription(c context.Context, project, id string) (*pubsub.Subscriptio n, error) {
121 » client, err := getPubSubClient(c) 150 » client, err := getPubSubClient(c, project)
122 if err != nil { 151 if err != nil {
123 return nil, err 152 return nil, err
124 } 153 }
125 » return client.getSubscription(id) 154 » return client.getSubscription(c, id)
126 } 155 }
127 156
128 func createSubscription(c context.Context, id string, cfg pubsub.SubscriptionCon fig) ( 157 func createSubscription(c context.Context, project, id string, cfg pubsub.Subscr iptionConfig) (
129 *pubsub.Subscription, error) { 158 *pubsub.Subscription, error) {
130 159
131 » client, err := getPubSubClient(c) 160 » client, err := getPubSubClient(c, project)
132 if err != nil { 161 if err != nil {
133 return nil, err 162 return nil, err
134 } 163 }
135 » return client.createSubscription(id, cfg) 164 » return client.createSubscription(c, id, cfg)
136 } 165 }
137 166
138 // EnsurePubSubSubscribed makes sure the following subscriptions are in place: 167 // EnsurePubSubSubscribed makes sure the following subscriptions are in place:
139 // * buildbucket, via the settings.Buildbucket.Topic setting 168 // * buildbucket, via the settings.Buildbucket.Topic setting
140 func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error { 169 func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error {
141 if settings.Buildbucket != nil { 170 if settings.Buildbucket != nil {
142 // Install the production pubsub client pointing to the buildbuc ket project 171 // Install the production pubsub client pointing to the buildbuc ket project
143 » » // into the context. 172 » » // into the context, so that we can get a reference to the topic .
144 c, err := withClient(c, settings.Buildbucket.Project) 173 c, err := withClient(c, settings.Buildbucket.Project)
145 if err != nil { 174 if err != nil {
146 return err 175 return err
147 } 176 }
177 // We also need a client for this project for the subscription.
178 c, err = withClient(c, info.AppID(c))
179 if err != nil {
180 return err
181 }
148 return ensureBuildbucketSubscribed(c, settings.Buildbucket.Proje ct) 182 return ensureBuildbucketSubscribed(c, settings.Buildbucket.Proje ct)
149 } 183 }
150 // TODO(hinoka): Ensure buildbot subscribed. 184 // TODO(hinoka): Ensure buildbot subscribed.
151 return nil 185 return nil
152 } 186 }
153 187
154 // ensureSubscribed is called by a cron job and ensures that the Milo 188 // ensureSubscribed is called by a cron job and ensures that the Milo
155 // instance is properly subscribed to the buildbucket subscription endpoint. 189 // instance is properly subscribed to the buildbucket subscription endpoint.
156 func ensureBuildbucketSubscribed(c context.Context, projectID string) error { 190 func ensureBuildbucketSubscribed(c context.Context, projectID string) error {
157 topicID := "builds" 191 topicID := "builds"
158 // Check to see if the topic exists first. 192 // Check to see if the topic exists first.
159 » topic, err := getTopic(c, topicID) 193 » topic, err := getTopic(c, projectID, topicID)
160 switch err { 194 switch err {
161 case errNotExist: 195 case errNotExist:
162 » » logging.WithError(err).Errorf(c, "%s does not exist", topicID) 196 » » return luciErrors.Annotate(err, "%s does not exist", topicID).Er r()
163 » » return err
164 case nil: 197 case nil:
165 // continue 198 // continue
166 default: 199 default:
167 if strings.Contains(err.Error(), "PermissionDenied") { 200 if strings.Contains(err.Error(), "PermissionDenied") {
168 URL := "https://console.cloud.google.com/iam-admin/iam/p roject?project=" + projectID 201 URL := "https://console.cloud.google.com/iam-admin/iam/p roject?project=" + projectID
169 acct, serr := info.ServiceAccount(c) 202 acct, serr := info.ServiceAccount(c)
170 if serr != nil { 203 if serr != nil {
171 acct = fmt.Sprintf("Unknown: %s", serr.Error()) 204 acct = fmt.Sprintf("Unknown: %s", serr.Error())
172 } 205 }
173 // The documentation is incorrect. We need Editor permi ssion because 206 // The documentation is incorrect. We need Editor permi ssion because
174 // the Subscriber permission does NOT permit attaching s ubscriptions to 207 // the Subscriber permission does NOT permit attaching s ubscriptions to
175 // topics or to view topics. 208 // topics or to view topics.
176 logging.WithError(err).Errorf( 209 logging.WithError(err).Errorf(
177 c, "please go to %s and add %s as a Pub/Sub Edit or", URL, acct) 210 c, "please go to %s and add %s as a Pub/Sub Edit or", URL, acct)
178 } else { 211 } else {
179 logging.WithError(err).Errorf(c, "could not check topic %#v", topic) 212 logging.WithError(err).Errorf(c, "could not check topic %#v", topic)
180 } 213 }
181 return err 214 return err
182 } 215 }
183 // Now check to see if the subscription already exists. 216 // Now check to see if the subscription already exists.
184 » subID := info.AppID(c) 217 » appID := info.AppID(c)
185 » // Get the pubsub module of our app. We do not want to use info.ModuleH ostname() 218 » sub, err := getSubscription(c, appID, "buildbucket")
186 » // because it returns a version pinned hostname instead of the default r oute.
187 » sub, err := getSubscription(c, subID)
188 switch err { 219 switch err {
189 case errNotExist: 220 case errNotExist:
190 // continue 221 // continue
191 case nil: 222 case nil:
192 logging.Infof(c, "subscription %#v exists, no need to update", s ub) 223 logging.Infof(c, "subscription %#v exists, no need to update", s ub)
193 return nil 224 return nil
194 default: 225 default:
195 logging.WithError(err).Errorf(c, "could not check subscription % #v", sub) 226 logging.WithError(err).Errorf(c, "could not check subscription % #v", sub)
196 return err 227 return err
197 } 228 }
229 // Get the pubsub module of our app. We do not want to use info.ModuleH ostname()
230 // because it returns a version pinned hostname instead of the default r oute.
198 pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c) 231 pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c)
199 232
200 // No subscription exists, attach a new subscription to the existing top ic. 233 // No subscription exists, attach a new subscription to the existing top ic.
201 endpointURL := url.URL{ 234 endpointURL := url.URL{
202 Scheme: "https", 235 Scheme: "https",
203 Host: pubsubModuleHost, 236 Host: pubsubModuleHost,
204 Path: "/_ah/push-handlers/buildbucket", 237 Path: "/_ah/push-handlers/buildbucket",
205 } 238 }
206 subConfig := pubsub.SubscriptionConfig{ 239 subConfig := pubsub.SubscriptionConfig{
207 Topic: topic, 240 Topic: topic,
208 PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()}, 241 PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()},
209 AckDeadline: time.Minute * 10, 242 AckDeadline: time.Minute * 10,
210 } 243 }
211 » newSub, err := createSubscription(c, subID, subConfig) 244 » newSub, err := createSubscription(c, appID, "buildbucket", subConfig)
212 if err != nil { 245 if err != nil {
213 » » if strings.Contains(err.Error(), "The supplied HTTP URL is not r egistered") { 246 » » return luciErrors.Annotate(err, "could not create subscription % #v", sub).Err()
214 » » » registerURL := "https://console.cloud.google.com/apis/cr edentials/domainverification?project=" + projectID
215 » » » verifyURL := "https://www.google.com/webmasters/verifica tion/verification?hl=en-GB&siteUrl=http://" + pubsubModuleHost
216 » » » logging.WithError(err).Errorf(
217 » » » » c, "The domain has to be verified and added.\n\n "+
218 » » » » » "1. Go to %s\n"+
219 » » » » » "2. Verify the domain\n"+
220 » » » » » "3. Go to %s\n"+
221 » » » » » "4. Add %s to allowed domains\n\n",
222 » » » » verifyURL, registerURL, pubsubModuleHost)
223 » » } else {
224 » » » logging.WithError(err).Errorf(c, "could not create subsc ription %#v", sub)
225 » » }
226 » » return err
227 } 247 }
228 // Success! 248 // Success!
229 logging.Infof(c, "successfully created subscription %#v", newSub) 249 logging.Infof(c, "successfully created subscription %#v", newSub)
230 return nil 250 return nil
231 } 251 }
OLDNEW
« no previous file with comments | « no previous file | milo/common/pubsub_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698