Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 package common | 1 package common |
| 2 | 2 |
| 3 import ( | 3 import ( |
| 4 "encoding/base64" | 4 "encoding/base64" |
| 5 "errors" | 5 "errors" |
| 6 "fmt" | 6 "fmt" |
| 7 "net/url" | 7 "net/url" |
| 8 "strings" | 8 "strings" |
| 9 "time" | 9 "time" |
| 10 | 10 |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 45 getTopic(string) (*pubsub.Topic, error) | 45 getTopic(string) (*pubsub.Topic, error) |
| 46 | 46 |
| 47 // getSubscription returns the pubsub subscription if it exists, | 47 // getSubscription returns the pubsub subscription if it exists, |
| 48 // a notExist error if it does not exist, or an error if there was an er ror. | 48 // a notExist error if it does not exist, or an error if there was an er ror. |
| 49 getSubscription(string) (*pubsub.Subscription, error) | 49 getSubscription(string) (*pubsub.Subscription, error) |
| 50 | 50 |
| 51 createSubscription(string, pubsub.SubscriptionConfig) ( | 51 createSubscription(string, pubsub.SubscriptionConfig) ( |
| 52 *pubsub.Subscription, error) | 52 *pubsub.Subscription, error) |
| 53 } | 53 } |
| 54 | 54 |
| 55 type pubsubClients map[string]pubsubClient | |
| 56 | |
| 55 // prodPubSubClient is a wrapper around the production pubsub client. | 57 // prodPubSubClient is a wrapper around the production pubsub client. |
| 56 type prodPubSubClient struct { | 58 type prodPubSubClient struct { |
| 57 ctx context.Context | 59 ctx context.Context |
| 58 client *pubsub.Client | 60 client *pubsub.Client |
| 59 } | 61 } |
| 60 | 62 |
| 61 func (pc *prodPubSubClient) getTopic(id string) (*pubsub.Topic, error) { | 63 func (pc *prodPubSubClient) getTopic(id string) (*pubsub.Topic, error) { |
| 62 topic := pc.client.Topic(id) | 64 topic := pc.client.Topic(id) |
| 63 exists, err := topic.Exists(pc.ctx) | 65 exists, err := topic.Exists(pc.ctx) |
| 64 switch { | 66 switch { |
| (...skipping 17 matching lines...) Expand all Loading... | |
| 82 return sub, nil | 84 return sub, nil |
| 83 } | 85 } |
| 84 | 86 |
| 85 func (pc *prodPubSubClient) createSubscription(id string, cfg pubsub.Subscriptio nConfig) ( | 87 func (pc *prodPubSubClient) createSubscription(id string, cfg pubsub.Subscriptio nConfig) ( |
| 86 *pubsub.Subscription, error) { | 88 *pubsub.Subscription, error) { |
| 87 | 89 |
| 88 return pc.client.CreateSubscription(pc.ctx, id, cfg) | 90 return pc.client.CreateSubscription(pc.ctx, id, cfg) |
| 89 } | 91 } |
| 90 | 92 |
| 91 // getPubSubClient extracts a debug PubSub client out of the context. | 93 // getPubSubClient extracts a debug PubSub client out of the context. |
| 92 func getPubSubClient(c context.Context) (pubsubClient, error) { | 94 func getPubSubClient(c context.Context, projectID string) (pubsubClient, error) { |
| 93 » if client, ok := c.Value(&pubSubClientKey).(pubsubClient); ok { | 95 » v := c.Value(&pubSubClientKey) |
| 94 » » return client, nil | 96 » if clients, ok := v.(pubsubClients); ok { |
| 97 » » if client, ok := clients[projectID]; ok { | |
| 98 » » » return client, nil | |
| 99 » » } | |
| 100 » » return nil, fmt.Errorf("no pubsub clients installed for" + proje ctID) | |
|
nodir
2017/07/12 21:31:12
space missing
...for %s"
Ryan Tseng
2017/07/12 22:26:08
Done.
| |
| 95 } | 101 } |
| 96 return nil, errors.New("no pubsub clients installed") | 102 return nil, errors.New("no pubsub clients installed") |
| 97 } | 103 } |
| 98 | 104 |
| 99 // withClient returns a context with a pubsub client instantiated to the | 105 // withClient returns a context with a pubsub client instantiated to the |
| 100 // given project ID | 106 // given project ID |
| 101 func withClient(c context.Context, projectID string) (context.Context, error) { | 107 func withClient(c context.Context, projectID string) (context.Context, error) { |
| 102 if projectID == "" { | 108 if projectID == "" { |
| 103 » » return nil, errors.New("missing buildbucket project") | 109 » » return nil, errors.New("missing project id") |
| 104 } | 110 } |
| 105 client, err := pubsub.NewClient(c, projectID) | 111 client, err := pubsub.NewClient(c, projectID) |
| 106 if err != nil { | 112 if err != nil { |
| 107 return nil, err | 113 return nil, err |
| 108 } | 114 } |
| 109 » return context.WithValue(c, &pubSubClientKey, &prodPubSubClient{c, clien t}), nil | 115 » var clients pubsubClients |
| 116 » var ok bool | |
| 117 » if clients, ok = c.Value(&pubSubClientKey).(pubsubClients); !ok { | |
| 118 » » clients = pubsubClients{} | |
| 119 » » c = context.WithValue(c, &pubSubClientKey, clients) | |
| 120 » } | |
| 121 » clients[projectID] = &prodPubSubClient{c, client} | |
|
nodir
2017/07/12 21:31:12
it used to be gouroutine safe, but now it is not.
Ryan Tseng
2017/07/12 22:26:08
Done.
| |
| 122 » return c, nil | |
| 110 } | 123 } |
| 111 | 124 |
| 112 func getTopic(c context.Context, id string) (*pubsub.Topic, error) { | 125 func getTopic(c context.Context, project, id string) (*pubsub.Topic, error) { |
| 113 » client, err := getPubSubClient(c) | 126 » client, err := getPubSubClient(c, project) |
| 114 if err != nil { | 127 if err != nil { |
| 115 return nil, err | 128 return nil, err |
| 116 } | 129 } |
| 117 return client.getTopic(id) | 130 return client.getTopic(id) |
| 118 } | 131 } |
| 119 | 132 |
| 120 func getSubscription(c context.Context, id string) (*pubsub.Subscription, error) { | 133 func getSubscription(c context.Context, project, id string) (*pubsub.Subscriptio n, error) { |
| 121 » client, err := getPubSubClient(c) | 134 » client, err := getPubSubClient(c, project) |
| 122 if err != nil { | 135 if err != nil { |
| 123 return nil, err | 136 return nil, err |
| 124 } | 137 } |
| 125 return client.getSubscription(id) | 138 return client.getSubscription(id) |
| 126 } | 139 } |
| 127 | 140 |
| 128 func createSubscription(c context.Context, id string, cfg pubsub.SubscriptionCon fig) ( | 141 func createSubscription(c context.Context, project, id string, cfg pubsub.Subscr iptionConfig) ( |
| 129 *pubsub.Subscription, error) { | 142 *pubsub.Subscription, error) { |
| 130 | 143 |
| 131 » client, err := getPubSubClient(c) | 144 » client, err := getPubSubClient(c, project) |
| 132 if err != nil { | 145 if err != nil { |
| 133 return nil, err | 146 return nil, err |
| 134 } | 147 } |
| 135 return client.createSubscription(id, cfg) | 148 return client.createSubscription(id, cfg) |
| 136 } | 149 } |
| 137 | 150 |
| 138 // EnsurePubSubSubscribed makes sure the following subscriptions are in place: | 151 // EnsurePubSubSubscribed makes sure the following subscriptions are in place: |
| 139 // * buildbucket, via the settings.Buildbucket.Topic setting | 152 // * buildbucket, via the settings.Buildbucket.Topic setting |
| 140 func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error { | 153 func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error { |
| 141 if settings.Buildbucket != nil { | 154 if settings.Buildbucket != nil { |
| 142 // Install the production pubsub client pointing to the buildbuc ket project | 155 // Install the production pubsub client pointing to the buildbuc ket project |
| 143 » » // into the context. | 156 » » // into the context, so that we can get a reference to the topic . |
| 144 c, err := withClient(c, settings.Buildbucket.Project) | 157 c, err := withClient(c, settings.Buildbucket.Project) |
| 145 if err != nil { | 158 if err != nil { |
| 146 return err | 159 return err |
| 147 } | 160 } |
| 161 // We also need a client for this project for the subscription. | |
| 162 c, err = withClient(c, info.AppID(c)) | |
| 163 if err != nil { | |
| 164 return err | |
| 165 } | |
| 148 return ensureBuildbucketSubscribed(c, settings.Buildbucket.Proje ct) | 166 return ensureBuildbucketSubscribed(c, settings.Buildbucket.Proje ct) |
| 149 } | 167 } |
| 150 // TODO(hinoka): Ensure buildbot subscribed. | 168 // TODO(hinoka): Ensure buildbot subscribed. |
| 151 return nil | 169 return nil |
| 152 } | 170 } |
| 153 | 171 |
| 154 // ensureSubscribed is called by a cron job and ensures that the Milo | 172 // ensureSubscribed is called by a cron job and ensures that the Milo |
| 155 // instance is properly subscribed to the buildbucket subscription endpoint. | 173 // instance is properly subscribed to the buildbucket subscription endpoint. |
| 156 func ensureBuildbucketSubscribed(c context.Context, projectID string) error { | 174 func ensureBuildbucketSubscribed(c context.Context, projectID string) error { |
| 157 topicID := "builds" | 175 topicID := "builds" |
| 158 // Check to see if the topic exists first. | 176 // Check to see if the topic exists first. |
| 159 » topic, err := getTopic(c, topicID) | 177 » topic, err := getTopic(c, projectID, topicID) |
| 160 switch err { | 178 switch err { |
| 161 case errNotExist: | 179 case errNotExist: |
| 162 logging.WithError(err).Errorf(c, "%s does not exist", topicID) | 180 logging.WithError(err).Errorf(c, "%s does not exist", topicID) |
| 163 return err | 181 return err |
| 164 case nil: | 182 case nil: |
| 165 // continue | 183 // continue |
| 166 default: | 184 default: |
| 167 if strings.Contains(err.Error(), "PermissionDenied") { | 185 if strings.Contains(err.Error(), "PermissionDenied") { |
| 168 URL := "https://console.cloud.google.com/iam-admin/iam/p roject?project=" + projectID | 186 URL := "https://console.cloud.google.com/iam-admin/iam/p roject?project=" + projectID |
| 169 acct, serr := info.ServiceAccount(c) | 187 acct, serr := info.ServiceAccount(c) |
| 170 if serr != nil { | 188 if serr != nil { |
| 171 acct = fmt.Sprintf("Unknown: %s", serr.Error()) | 189 acct = fmt.Sprintf("Unknown: %s", serr.Error()) |
| 172 } | 190 } |
| 173 // The documentation is incorrect. We need Editor permi ssion because | 191 // The documentation is incorrect. We need Editor permi ssion because |
| 174 // the Subscriber permission does NOT permit attaching s ubscriptions to | 192 // the Subscriber permission does NOT permit attaching s ubscriptions to |
| 175 // topics or to view topics. | 193 // topics or to view topics. |
| 176 logging.WithError(err).Errorf( | 194 logging.WithError(err).Errorf( |
| 177 c, "please go to %s and add %s as a Pub/Sub Edit or", URL, acct) | 195 c, "please go to %s and add %s as a Pub/Sub Edit or", URL, acct) |
| 178 } else { | 196 } else { |
| 179 logging.WithError(err).Errorf(c, "could not check topic %#v", topic) | 197 logging.WithError(err).Errorf(c, "could not check topic %#v", topic) |
| 180 } | 198 } |
| 181 return err | 199 return err |
| 182 } | 200 } |
| 183 // Now check to see if the subscription already exists. | 201 // Now check to see if the subscription already exists. |
| 184 » subID := info.AppID(c) | 202 » appID := info.AppID(c) |
| 185 » // Get the pubsub module of our app. We do not want to use info.ModuleH ostname() | 203 » sub, err := getSubscription(c, appID, "buildbucket") |
| 186 » // because it returns a version pinned hostname instead of the default r oute. | |
| 187 » sub, err := getSubscription(c, subID) | |
| 188 switch err { | 204 switch err { |
| 189 case errNotExist: | 205 case errNotExist: |
| 190 // continue | 206 // continue |
| 191 case nil: | 207 case nil: |
| 192 logging.Infof(c, "subscription %#v exists, no need to update", s ub) | 208 logging.Infof(c, "subscription %#v exists, no need to update", s ub) |
| 193 return nil | 209 return nil |
| 194 default: | 210 default: |
| 195 logging.WithError(err).Errorf(c, "could not check subscription % #v", sub) | 211 logging.WithError(err).Errorf(c, "could not check subscription % #v", sub) |
| 196 return err | 212 return err |
| 197 } | 213 } |
| 214 // Get the pubsub module of our app. We do not want to use info.ModuleH ostname() | |
| 215 // because it returns a version pinned hostname instead of the default r oute. | |
| 198 pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c) | 216 pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c) |
| 199 | 217 |
| 200 // No subscription exists, attach a new subscription to the existing top ic. | 218 // No subscription exists, attach a new subscription to the existing top ic. |
| 201 endpointURL := url.URL{ | 219 endpointURL := url.URL{ |
| 202 Scheme: "https", | 220 Scheme: "https", |
| 203 Host: pubsubModuleHost, | 221 Host: pubsubModuleHost, |
| 204 Path: "/_ah/push-handlers/buildbucket", | 222 Path: "/_ah/push-handlers/buildbucket", |
| 205 } | 223 } |
| 206 subConfig := pubsub.SubscriptionConfig{ | 224 subConfig := pubsub.SubscriptionConfig{ |
| 207 Topic: topic, | 225 Topic: topic, |
| 208 PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()}, | 226 PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()}, |
| 209 AckDeadline: time.Minute * 10, | 227 AckDeadline: time.Minute * 10, |
| 210 } | 228 } |
| 211 » newSub, err := createSubscription(c, subID, subConfig) | 229 » newSub, err := createSubscription(c, appID, "buildbucket", subConfig) |
| 212 if err != nil { | 230 if err != nil { |
| 213 » » if strings.Contains(err.Error(), "The supplied HTTP URL is not r egistered") { | 231 » » logging.WithError(err).Errorf(c, "could not create subscription %#v", sub) |
|
nodir
2017/07/12 21:31:12
why do we both log error and return it? the caller
Ryan Tseng
2017/07/12 22:26:08
Done.
| |
| 214 » » » registerURL := "https://console.cloud.google.com/apis/cr edentials/domainverification?project=" + projectID | |
| 215 » » » verifyURL := "https://www.google.com/webmasters/verifica tion/verification?hl=en-GB&siteUrl=http://" + pubsubModuleHost | |
| 216 » » » logging.WithError(err).Errorf( | |
| 217 » » » » c, "The domain has to be verified and added.\n\n "+ | |
| 218 » » » » » "1. Go to %s\n"+ | |
| 219 » » » » » "2. Verify the domain\n"+ | |
| 220 » » » » » "3. Go to %s\n"+ | |
| 221 » » » » » "4. Add %s to allowed domains\n\n", | |
| 222 » » » » verifyURL, registerURL, pubsubModuleHost) | |
| 223 » » } else { | |
| 224 » » » logging.WithError(err).Errorf(c, "could not create subsc ription %#v", sub) | |
| 225 » » } | |
| 226 return err | 232 return err |
| 227 } | 233 } |
| 228 // Success! | 234 // Success! |
| 229 logging.Infof(c, "successfully created subscription %#v", newSub) | 235 logging.Infof(c, "successfully created subscription %#v", newSub) |
| 230 return nil | 236 return nil |
| 231 } | 237 } |
| OLD | NEW |