Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 package common | 1 package common |
| 2 | 2 |
| 3 import ( | 3 import ( |
| 4 "encoding/base64" | 4 "encoding/base64" |
| 5 "errors" | 5 "errors" |
| 6 "fmt" | 6 "fmt" |
| 7 "net/url" | 7 "net/url" |
| 8 "strings" | 8 "strings" |
| 9 "sync" | |
| 9 "time" | 10 "time" |
| 10 | 11 |
| 11 "cloud.google.com/go/pubsub" | 12 "cloud.google.com/go/pubsub" |
| 12 "golang.org/x/net/context" | 13 "golang.org/x/net/context" |
| 13 | 14 |
| 14 "github.com/luci/gae/service/info" | 15 "github.com/luci/gae/service/info" |
| 16 luciErrors "github.com/luci/luci-go/common/errors" | |
|
iannucci
2017/07/13 01:54:02
just use this instead of errors. there's no reason
Ryan Tseng
2017/07/14 00:51:06
Done.
| |
| 15 "github.com/luci/luci-go/common/logging" | 17 "github.com/luci/luci-go/common/logging" |
| 16 "github.com/luci/luci-go/milo/api/config" | 18 "github.com/luci/luci-go/milo/api/config" |
| 17 ) | 19 ) |
| 18 | 20 |
| 19 var pubSubClientKey = "stores a pubsubClient" | 21 var pubSubClientKey = "stores a pubsubClient" |
|
iannucci
2017/07/13 01:54:02
*pubsubClients ?
Ryan Tseng
2017/07/14 00:51:06
Done.
| |
| 20 | 22 |
| 21 type PubSubMessage struct { | 23 type PubSubMessage struct { |
| 22 Attributes map[string]string `json:"attributes"` | 24 Attributes map[string]string `json:"attributes"` |
| 23 Data string `json:"data"` | 25 Data string `json:"data"` |
| 24 MessageID string `json:"message_id"` | 26 MessageID string `json:"message_id"` |
| 25 } | 27 } |
| 26 | 28 |
| 27 type PubSubSubscription struct { | 29 type PubSubSubscription struct { |
| 28 Message PubSubMessage `json:"message"` | 30 Message PubSubMessage `json:"message"` |
| 29 Subscription string `json:"subscription"` | 31 Subscription string `json:"subscription"` |
| 30 } | 32 } |
| 31 | 33 |
| 32 var errNotExist = errors.New("does not exist") | 34 var errNotExist = errors.New("does not exist") |
| 33 | 35 |
| 34 // GetData returns the expanded form of Data (decoded from base64). | 36 // GetData returns the expanded form of Data (decoded from base64). |
| 35 func (m *PubSubSubscription) GetData() ([]byte, error) { | 37 func (m *PubSubSubscription) GetData() ([]byte, error) { |
| 36 return base64.StdEncoding.DecodeString(m.Message.Data) | 38 return base64.StdEncoding.DecodeString(m.Message.Data) |
| 37 } | 39 } |
| 38 | 40 |
| 39 // pubsubClient is an interface representing a pubsub.Client containing only | 41 // pubsubClient is an interface representing a pubsub.Client containing only |
| 40 // the functions that Milo calls. Internal use only, can be swapped | 42 // the functions that Milo calls. Internal use only, can be swapped |
| 41 // out for testing. | 43 // out for testing. |
| 42 type pubsubClient interface { | 44 type pubsubClient interface { |
| 43 // getTopic returns the pubsub topic if it exists, a notExist error if | 45 // getTopic returns the pubsub topic if it exists, a notExist error if |
| 44 // it does not exist, or an error if there was an error. | 46 // it does not exist, or an error if there was an error. |
| 45 » getTopic(string) (*pubsub.Topic, error) | 47 » getTopic(context.Context, string) (*pubsub.Topic, error) |
| 46 | 48 |
| 47 // getSubscription returns the pubsub subscription if it exists, | 49 // getSubscription returns the pubsub subscription if it exists, |
| 48 // a notExist error if it does not exist, or an error if there was an er ror. | 50 // a notExist error if it does not exist, or an error if there was an er ror. |
| 49 » getSubscription(string) (*pubsub.Subscription, error) | 51 » getSubscription(context.Context, string) (*pubsub.Subscription, error) |
| 50 | 52 |
| 51 » createSubscription(string, pubsub.SubscriptionConfig) ( | 53 » createSubscription(context.Context, string, pubsub.SubscriptionConfig) ( |
| 52 *pubsub.Subscription, error) | 54 *pubsub.Subscription, error) |
| 53 } | 55 } |
| 54 | 56 |
| 57 type pubsubClients struct { | |
|
iannucci
2017/07/13 02:16:27
now that I'm looking at this again... this is a bi
Ryan Tseng
2017/07/14 00:51:06
Made this back into a client factory
| |
| 58 clients map[string]pubsubClient | |
| 59 lock sync.RWMutex | |
| 60 } | |
| 61 | |
| 62 func newClientsBundle() *pubsubClients { | |
| 63 return &pubsubClients{ | |
| 64 clients: map[string]pubsubClient{}, | |
| 65 } | |
| 66 } | |
| 67 | |
| 55 // prodPubSubClient is a wrapper around the production pubsub client. | 68 // prodPubSubClient is a wrapper around the production pubsub client. |
| 56 type prodPubSubClient struct { | 69 type prodPubSubClient struct { |
| 57 » ctx context.Context | 70 » *pubsub.Client |
| 58 » client *pubsub.Client | |
| 59 } | 71 } |
| 60 | 72 |
| 61 func (pc *prodPubSubClient) getTopic(id string) (*pubsub.Topic, error) { | 73 func (pc *prodPubSubClient) getTopic(c context.Context, id string) (*pubsub.Topi c, error) { |
| 62 » topic := pc.client.Topic(id) | 74 » topic := pc.Client.Topic(id) |
| 63 » exists, err := topic.Exists(pc.ctx) | 75 » exists, err := topic.Exists(c) |
| 64 switch { | 76 switch { |
| 65 case err != nil: | 77 case err != nil: |
| 66 return nil, err | 78 return nil, err |
| 67 case !exists: | 79 case !exists: |
| 68 return nil, errNotExist | 80 return nil, errNotExist |
| 69 } | 81 } |
| 70 return topic, nil | 82 return topic, nil |
| 71 } | 83 } |
| 72 | 84 |
| 73 func (pc *prodPubSubClient) getSubscription(id string) (*pubsub.Subscription, er ror) { | 85 func (pc *prodPubSubClient) getSubscription(c context.Context, id string) (*pubs ub.Subscription, error) { |
| 74 » sub := pc.client.Subscription(id) | 86 » sub := pc.Client.Subscription(id) |
| 75 » exists, err := sub.Exists(pc.ctx) | 87 » exists, err := sub.Exists(c) |
| 76 switch { | 88 switch { |
| 77 case err != nil: | 89 case err != nil: |
| 78 return nil, err | 90 return nil, err |
| 79 case !exists: | 91 case !exists: |
| 80 return nil, errNotExist | 92 return nil, errNotExist |
| 81 } | 93 } |
| 82 return sub, nil | 94 return sub, nil |
| 83 } | 95 } |
| 84 | 96 |
| 85 func (pc *prodPubSubClient) createSubscription(id string, cfg pubsub.Subscriptio nConfig) ( | 97 func (pc *prodPubSubClient) createSubscription( |
| 98 » c context.Context, id string, cfg pubsub.SubscriptionConfig) ( | |
| 86 *pubsub.Subscription, error) { | 99 *pubsub.Subscription, error) { |
| 87 | 100 |
| 88 » return pc.client.CreateSubscription(pc.ctx, id, cfg) | 101 » return pc.Client.CreateSubscription(c, id, cfg) |
| 89 } | 102 } |
| 90 | 103 |
| 91 // getPubSubClient extracts a debug PubSub client out of the context. | 104 // getPubSubClient extracts a debug PubSub client out of the context. |
| 92 func getPubSubClient(c context.Context) (pubsubClient, error) { | 105 func getPubSubClient(c context.Context, projectID string) (pubsubClient, error) { |
| 93 » if client, ok := c.Value(&pubSubClientKey).(pubsubClient); ok { | 106 » v := c.Value(&pubSubClientKey) |
| 94 » » return client, nil | 107 » if clients, ok := v.(*pubsubClients); ok { |
| 108 » » clients.lock.RLock() | |
| 109 » » client, ok := clients.clients[projectID] | |
| 110 » » clients.lock.RUnlock() | |
| 111 » » if ok { | |
| 112 » » » return client, nil | |
| 113 » » } | |
| 114 » » return nil, fmt.Errorf("no pubsub clients installed for %s", pro jectID) | |
| 95 } | 115 } |
| 96 return nil, errors.New("no pubsub clients installed") | 116 return nil, errors.New("no pubsub clients installed") |
| 97 } | 117 } |
| 98 | 118 |
| 99 // withClient returns a context with a pubsub client instantiated to the | 119 // withClient returns a context with a pubsub client instantiated to the |
| 100 // given project ID | 120 // given project ID |
| 101 func withClient(c context.Context, projectID string) (context.Context, error) { | 121 func withClient(c context.Context, projectID string) (context.Context, error) { |
| 102 if projectID == "" { | 122 if projectID == "" { |
| 103 » » return nil, errors.New("missing buildbucket project") | 123 » » return nil, errors.New("missing project id") |
| 104 } | 124 } |
| 105 client, err := pubsub.NewClient(c, projectID) | 125 client, err := pubsub.NewClient(c, projectID) |
|
nodir
2017/07/13 02:07:11
Just noticed c here. Memory leakish problem happen
Ryan Tseng
2017/07/13 02:27:18
The client is in the context because of the guidel
| |
| 106 if err != nil { | 126 if err != nil { |
| 107 return nil, err | 127 return nil, err |
| 108 } | 128 } |
| 109 » return context.WithValue(c, &pubSubClientKey, &prodPubSubClient{c, clien t}), nil | 129 » var clients *pubsubClients |
| 130 » var ok bool | |
| 131 » if clients, ok = c.Value(&pubSubClientKey).(*pubsubClients); !ok { | |
| 132 » » clients = newClientsBundle() | |
| 133 » » c = context.WithValue(c, &pubSubClientKey, clients) | |
| 134 » } | |
| 135 » clients.lock.Lock() | |
| 136 » clients.clients[projectID] = &prodPubSubClient{client} | |
|
nodir
2017/07/13 02:07:11
What if it is already there?
Ryan Tseng
2017/07/13 02:27:18
it's overwritten
| |
| 137 » clients.lock.Unlock() | |
| 138 » return c, nil | |
| 110 } | 139 } |
| 111 | 140 |
| 112 func getTopic(c context.Context, id string) (*pubsub.Topic, error) { | 141 func getTopic(c context.Context, project, id string) (*pubsub.Topic, error) { |
| 113 » client, err := getPubSubClient(c) | 142 » client, err := getPubSubClient(c, project) |
| 114 if err != nil { | 143 if err != nil { |
| 115 return nil, err | 144 return nil, err |
| 116 } | 145 } |
| 117 » return client.getTopic(id) | 146 » return client.getTopic(c, id) |
| 118 } | 147 } |
| 119 | 148 |
| 120 func getSubscription(c context.Context, id string) (*pubsub.Subscription, error) { | 149 func getSubscription(c context.Context, project, id string) (*pubsub.Subscriptio n, error) { |
| 121 » client, err := getPubSubClient(c) | 150 » client, err := getPubSubClient(c, project) |
| 122 if err != nil { | 151 if err != nil { |
| 123 return nil, err | 152 return nil, err |
| 124 } | 153 } |
| 125 » return client.getSubscription(id) | 154 » return client.getSubscription(c, id) |
| 126 } | 155 } |
| 127 | 156 |
| 128 func createSubscription(c context.Context, id string, cfg pubsub.SubscriptionCon fig) ( | 157 func createSubscription(c context.Context, project, id string, cfg pubsub.Subscr iptionConfig) ( |
| 129 *pubsub.Subscription, error) { | 158 *pubsub.Subscription, error) { |
| 130 | 159 |
| 131 » client, err := getPubSubClient(c) | 160 » client, err := getPubSubClient(c, project) |
| 132 if err != nil { | 161 if err != nil { |
| 133 return nil, err | 162 return nil, err |
| 134 } | 163 } |
| 135 » return client.createSubscription(id, cfg) | 164 » return client.createSubscription(c, id, cfg) |
| 136 } | 165 } |
| 137 | 166 |
| 138 // EnsurePubSubSubscribed makes sure the following subscriptions are in place: | 167 // EnsurePubSubSubscribed makes sure the following subscriptions are in place: |
| 139 // * buildbucket, via the settings.Buildbucket.Topic setting | 168 // * buildbucket, via the settings.Buildbucket.Topic setting |
| 140 func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error { | 169 func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error { |
| 141 if settings.Buildbucket != nil { | 170 if settings.Buildbucket != nil { |
| 142 // Install the production pubsub client pointing to the buildbuc ket project | 171 // Install the production pubsub client pointing to the buildbuc ket project |
| 143 » » // into the context. | 172 » » // into the context, so that we can get a reference to the topic . |
| 144 c, err := withClient(c, settings.Buildbucket.Project) | 173 c, err := withClient(c, settings.Buildbucket.Project) |
| 145 if err != nil { | 174 if err != nil { |
| 146 return err | 175 return err |
| 147 } | 176 } |
| 177 // We also need a client for this project for the subscription. | |
| 178 c, err = withClient(c, info.AppID(c)) | |
| 179 if err != nil { | |
| 180 return err | |
| 181 } | |
| 148 return ensureBuildbucketSubscribed(c, settings.Buildbucket.Proje ct) | 182 return ensureBuildbucketSubscribed(c, settings.Buildbucket.Proje ct) |
| 149 } | 183 } |
| 150 // TODO(hinoka): Ensure buildbot subscribed. | 184 // TODO(hinoka): Ensure buildbot subscribed. |
| 151 return nil | 185 return nil |
| 152 } | 186 } |
| 153 | 187 |
| 154 // ensureSubscribed is called by a cron job and ensures that the Milo | 188 // ensureSubscribed is called by a cron job and ensures that the Milo |
| 155 // instance is properly subscribed to the buildbucket subscription endpoint. | 189 // instance is properly subscribed to the buildbucket subscription endpoint. |
| 156 func ensureBuildbucketSubscribed(c context.Context, projectID string) error { | 190 func ensureBuildbucketSubscribed(c context.Context, projectID string) error { |
| 157 topicID := "builds" | 191 topicID := "builds" |
| 158 // Check to see if the topic exists first. | 192 // Check to see if the topic exists first. |
| 159 » topic, err := getTopic(c, topicID) | 193 » topic, err := getTopic(c, projectID, topicID) |
| 160 switch err { | 194 switch err { |
| 161 case errNotExist: | 195 case errNotExist: |
| 162 » » logging.WithError(err).Errorf(c, "%s does not exist", topicID) | 196 » » return luciErrors.Annotate(err, "%s does not exist", topicID).Er r() |
| 163 » » return err | |
| 164 case nil: | 197 case nil: |
| 165 // continue | 198 // continue |
| 166 default: | 199 default: |
| 167 if strings.Contains(err.Error(), "PermissionDenied") { | 200 if strings.Contains(err.Error(), "PermissionDenied") { |
| 168 URL := "https://console.cloud.google.com/iam-admin/iam/p roject?project=" + projectID | 201 URL := "https://console.cloud.google.com/iam-admin/iam/p roject?project=" + projectID |
| 169 acct, serr := info.ServiceAccount(c) | 202 acct, serr := info.ServiceAccount(c) |
| 170 if serr != nil { | 203 if serr != nil { |
| 171 acct = fmt.Sprintf("Unknown: %s", serr.Error()) | 204 acct = fmt.Sprintf("Unknown: %s", serr.Error()) |
| 172 } | 205 } |
| 173 // The documentation is incorrect. We need Editor permi ssion because | 206 // The documentation is incorrect. We need Editor permi ssion because |
| 174 // the Subscriber permission does NOT permit attaching s ubscriptions to | 207 // the Subscriber permission does NOT permit attaching s ubscriptions to |
| 175 // topics or to view topics. | 208 // topics or to view topics. |
| 176 logging.WithError(err).Errorf( | 209 logging.WithError(err).Errorf( |
| 177 c, "please go to %s and add %s as a Pub/Sub Edit or", URL, acct) | 210 c, "please go to %s and add %s as a Pub/Sub Edit or", URL, acct) |
| 178 } else { | 211 } else { |
| 179 logging.WithError(err).Errorf(c, "could not check topic %#v", topic) | 212 logging.WithError(err).Errorf(c, "could not check topic %#v", topic) |
| 180 } | 213 } |
| 181 return err | 214 return err |
| 182 } | 215 } |
| 183 // Now check to see if the subscription already exists. | 216 // Now check to see if the subscription already exists. |
| 184 » subID := info.AppID(c) | 217 » appID := info.AppID(c) |
| 185 » // Get the pubsub module of our app. We do not want to use info.ModuleH ostname() | 218 » sub, err := getSubscription(c, appID, "buildbucket") |
| 186 » // because it returns a version pinned hostname instead of the default r oute. | |
| 187 » sub, err := getSubscription(c, subID) | |
| 188 switch err { | 219 switch err { |
| 189 case errNotExist: | 220 case errNotExist: |
| 190 // continue | 221 // continue |
| 191 case nil: | 222 case nil: |
| 192 logging.Infof(c, "subscription %#v exists, no need to update", s ub) | 223 logging.Infof(c, "subscription %#v exists, no need to update", s ub) |
| 193 return nil | 224 return nil |
| 194 default: | 225 default: |
| 195 logging.WithError(err).Errorf(c, "could not check subscription % #v", sub) | 226 logging.WithError(err).Errorf(c, "could not check subscription % #v", sub) |
| 196 return err | 227 return err |
| 197 } | 228 } |
| 229 // Get the pubsub module of our app. We do not want to use info.ModuleH ostname() | |
| 230 // because it returns a version pinned hostname instead of the default r oute. | |
| 198 pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c) | 231 pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c) |
| 199 | 232 |
| 200 // No subscription exists, attach a new subscription to the existing top ic. | 233 // No subscription exists, attach a new subscription to the existing top ic. |
| 201 endpointURL := url.URL{ | 234 endpointURL := url.URL{ |
| 202 Scheme: "https", | 235 Scheme: "https", |
| 203 Host: pubsubModuleHost, | 236 Host: pubsubModuleHost, |
| 204 Path: "/_ah/push-handlers/buildbucket", | 237 Path: "/_ah/push-handlers/buildbucket", |
| 205 } | 238 } |
| 206 subConfig := pubsub.SubscriptionConfig{ | 239 subConfig := pubsub.SubscriptionConfig{ |
| 207 Topic: topic, | 240 Topic: topic, |
| 208 PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()}, | 241 PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()}, |
| 209 AckDeadline: time.Minute * 10, | 242 AckDeadline: time.Minute * 10, |
| 210 } | 243 } |
| 211 » newSub, err := createSubscription(c, subID, subConfig) | 244 » newSub, err := createSubscription(c, appID, "buildbucket", subConfig) |
| 212 if err != nil { | 245 if err != nil { |
| 213 » » if strings.Contains(err.Error(), "The supplied HTTP URL is not r egistered") { | 246 » » return luciErrors.Annotate(err, "could not create subscription % #v", sub).Err() |
| 214 » » » registerURL := "https://console.cloud.google.com/apis/cr edentials/domainverification?project=" + projectID | |
| 215 » » » verifyURL := "https://www.google.com/webmasters/verifica tion/verification?hl=en-GB&siteUrl=http://" + pubsubModuleHost | |
| 216 » » » logging.WithError(err).Errorf( | |
| 217 » » » » c, "The domain has to be verified and added.\n\n "+ | |
| 218 » » » » » "1. Go to %s\n"+ | |
| 219 » » » » » "2. Verify the domain\n"+ | |
| 220 » » » » » "3. Go to %s\n"+ | |
| 221 » » » » » "4. Add %s to allowed domains\n\n", | |
| 222 » » » » verifyURL, registerURL, pubsubModuleHost) | |
| 223 » » } else { | |
| 224 » » » logging.WithError(err).Errorf(c, "could not create subsc ription %#v", sub) | |
| 225 » » } | |
| 226 » » return err | |
| 227 } | 247 } |
| 228 // Success! | 248 // Success! |
| 229 logging.Infof(c, "successfully created subscription %#v", newSub) | 249 logging.Infof(c, "successfully created subscription %#v", newSub) |
| 230 return nil | 250 return nil |
| 231 } | 251 } |
| OLD | NEW |