Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 package common | 1 package common |
| 2 | 2 |
| 3 import ( | 3 import ( |
| 4 "encoding/base64" | 4 "encoding/base64" |
| 5 "errors" | |
| 6 "fmt" | 5 "fmt" |
| 7 "net/url" | 6 "net/url" |
| 8 "strings" | 7 "strings" |
| 9 "time" | 8 "time" |
| 10 | 9 |
| 11 "cloud.google.com/go/pubsub" | 10 "cloud.google.com/go/pubsub" |
| 12 "golang.org/x/net/context" | 11 "golang.org/x/net/context" |
| 13 | 12 |
| 14 "github.com/luci/gae/service/info" | 13 "github.com/luci/gae/service/info" |
| 14 "github.com/luci/luci-go/common/errors" | |
| 15 "github.com/luci/luci-go/common/logging" | 15 "github.com/luci/luci-go/common/logging" |
| 16 "github.com/luci/luci-go/milo/api/config" | 16 "github.com/luci/luci-go/milo/api/config" |
| 17 ) | 17 ) |
| 18 | 18 |
| 19 var pubSubClientKey = "stores a pubsubClient" | 19 var pubsubClientFactoryKey = "stores a pubsubClientFactory" |
| 20 | 20 |
| 21 type PubSubMessage struct { | 21 type PubSubMessage struct { |
| 22 Attributes map[string]string `json:"attributes"` | 22 Attributes map[string]string `json:"attributes"` |
| 23 Data string `json:"data"` | 23 Data string `json:"data"` |
| 24 MessageID string `json:"message_id"` | 24 MessageID string `json:"message_id"` |
| 25 } | 25 } |
| 26 | 26 |
| 27 type PubSubSubscription struct { | 27 type PubSubSubscription struct { |
| 28 Message PubSubMessage `json:"message"` | 28 Message PubSubMessage `json:"message"` |
| 29 Subscription string `json:"subscription"` | 29 Subscription string `json:"subscription"` |
| 30 } | 30 } |
| 31 | 31 |
| 32 var errNotExist = errors.New("does not exist") | 32 var errNotExist = errors.New("does not exist") |
| 33 | 33 |
| 34 // GetData returns the expanded form of Data (decoded from base64). | 34 // GetData returns the expanded form of Data (decoded from base64). |
| 35 func (m *PubSubSubscription) GetData() ([]byte, error) { | 35 func (m *PubSubSubscription) GetData() ([]byte, error) { |
| 36 return base64.StdEncoding.DecodeString(m.Message.Data) | 36 return base64.StdEncoding.DecodeString(m.Message.Data) |
| 37 } | 37 } |
| 38 | 38 |
| 39 // pubsubClient is an interface representing a pubsub.Client containing only | 39 // pubsubClient is an interface representing a pubsub.Client containing only |
| 40 // the functions that Milo calls. Internal use only, can be swapped | 40 // the functions that Milo calls. Internal use only, can be swapped |
| 41 // out for testing. | 41 // out for testing. |
| 42 type pubsubClient interface { | 42 type pubsubClient interface { |
| 43 // getTopic returns the pubsub topic if it exists, a notExist error if | 43 // getTopic returns the pubsub topic if it exists, a notExist error if |
| 44 // it does not exist, or an error if there was an error. | 44 // it does not exist, or an error if there was an error. |
| 45 » getTopic(string) (*pubsub.Topic, error) | 45 » getTopic(context.Context, string) (*pubsub.Topic, error) |
| 46 | 46 |
| 47 // getSubscription returns the pubsub subscription if it exists, | 47 // getSubscription returns the pubsub subscription if it exists, |
| 48 // a notExist error if it does not exist, or an error if there was an er ror. | 48 // a notExist error if it does not exist, or an error if there was an er ror. |
| 49 » getSubscription(string) (*pubsub.Subscription, error) | 49 » getSubscription(context.Context, string) (*pubsub.Subscription, error) |
| 50 | 50 |
| 51 » createSubscription(string, pubsub.SubscriptionConfig) ( | 51 » createSubscription(context.Context, string, pubsub.SubscriptionConfig) ( |
| 52 *pubsub.Subscription, error) | 52 *pubsub.Subscription, error) |
| 53 } | 53 } |
| 54 | 54 |
| 55 // pubsubClientFactory is a stubbable factory that produces pubsubClients bound | |
| 56 // to project IDs. | |
| 57 type pubsubClientFactory interface { | |
|
nodir
2017/07/14 01:09:04
wouldn't
type pubsubClientFactory func(ctx conte
Ryan Tseng
2017/07/14 17:41:49
Thanks, I forgot this was legal
edit: this is pos
nodir
2017/07/14 18:20:03
I am not following. The following compiles: https:
| |
| 58 newClient(context.Context, string) (pubsubClient, error) | |
| 59 } | |
| 60 | |
| 55 // prodPubSubClient is a wrapper around the production pubsub client. | 61 // prodPubSubClient is a wrapper around the production pubsub client. |
| 56 type prodPubSubClient struct { | 62 type prodPubSubClient struct { |
| 57 » ctx context.Context | 63 » *pubsub.Client |
| 58 » client *pubsub.Client | |
| 59 } | 64 } |
| 60 | 65 |
| 61 func (pc *prodPubSubClient) getTopic(id string) (*pubsub.Topic, error) { | 66 func (pc *prodPubSubClient) getTopic(c context.Context, id string) (*pubsub.Topi c, error) { |
| 62 » topic := pc.client.Topic(id) | 67 » topic := pc.Client.Topic(id) |
| 63 » exists, err := topic.Exists(pc.ctx) | 68 » exists, err := topic.Exists(c) |
| 64 switch { | 69 switch { |
| 65 case err != nil: | 70 case err != nil: |
| 66 return nil, err | 71 return nil, err |
| 67 case !exists: | 72 case !exists: |
| 68 return nil, errNotExist | 73 return nil, errNotExist |
| 69 } | 74 } |
| 70 return topic, nil | 75 return topic, nil |
| 71 } | 76 } |
| 72 | 77 |
| 73 func (pc *prodPubSubClient) getSubscription(id string) (*pubsub.Subscription, er ror) { | 78 func (pc *prodPubSubClient) getSubscription(c context.Context, id string) (*pubs ub.Subscription, error) { |
| 74 » sub := pc.client.Subscription(id) | 79 » sub := pc.Client.Subscription(id) |
| 75 » exists, err := sub.Exists(pc.ctx) | 80 » exists, err := sub.Exists(c) |
| 76 switch { | 81 switch { |
| 77 case err != nil: | 82 case err != nil: |
| 78 return nil, err | 83 return nil, err |
| 79 case !exists: | 84 case !exists: |
| 80 return nil, errNotExist | 85 return nil, errNotExist |
| 81 } | 86 } |
| 82 return sub, nil | 87 return sub, nil |
| 83 } | 88 } |
| 84 | 89 |
| 85 func (pc *prodPubSubClient) createSubscription(id string, cfg pubsub.Subscriptio nConfig) ( | 90 func (pc *prodPubSubClient) createSubscription( |
| 91 » c context.Context, id string, cfg pubsub.SubscriptionConfig) ( | |
| 86 *pubsub.Subscription, error) { | 92 *pubsub.Subscription, error) { |
| 87 | 93 |
| 88 » return pc.client.CreateSubscription(pc.ctx, id, cfg) | 94 » return pc.Client.CreateSubscription(c, id, cfg) |
| 89 } | 95 } |
| 90 | 96 |
| 91 // getPubSubClient extracts a debug PubSub client out of the context. | 97 type prodPubSubClientFactory struct{} |
| 92 func getPubSubClient(c context.Context) (pubsubClient, error) { | 98 |
| 93 » if client, ok := c.Value(&pubSubClientKey).(pubsubClient); ok { | 99 func (fac *prodPubSubClientFactory) newClient(c context.Context, projectID strin g) (pubsubClient, error) { |
| 94 » » return client, nil | 100 » cli, err := pubsub.NewClient(c, projectID) |
| 95 » } | 101 » return &prodPubSubClient{cli}, err |
| 96 » return nil, errors.New("no pubsub clients installed") | |
| 97 } | 102 } |
| 98 | 103 |
| 99 // withClient returns a context with a pubsub client instantiated to the | 104 // withClient returns a context with a pubsub client instantiated to the |
| 100 // given project ID | 105 // given project ID |
|
nodir
2017/07/14 01:09:04
this comment is stale
Ryan Tseng
2017/07/14 17:41:50
Done.
| |
| 101 func withClient(c context.Context, projectID string) (context.Context, error) { | 106 func withClientFactory(c context.Context, fac pubsubClientFactory) context.Conte xt { |
| 102 » if projectID == "" { | 107 » return context.WithValue(c, &pubsubClientFactoryKey, fac) |
| 103 » » return nil, errors.New("missing buildbucket project") | |
| 104 » } | |
| 105 » client, err := pubsub.NewClient(c, projectID) | |
| 106 » if err != nil { | |
| 107 » » return nil, err | |
| 108 » } | |
| 109 » return context.WithValue(c, &pubSubClientKey, &prodPubSubClient{c, clien t}), nil | |
| 110 } | 108 } |
| 111 | 109 |
| 112 func getTopic(c context.Context, id string) (*pubsub.Topic, error) { | 110 func newPubSubClient(c context.Context, projectID string) (pubsubClient, error) { |
| 113 » client, err := getPubSubClient(c) | 111 » if fac, ok := c.Value(&pubsubClientFactoryKey).(pubsubClientFactory); !o k { |
| 114 » if err != nil { | 112 » » panic("no pubsub client factory installed") |
| 115 » » return nil, err | 113 » } else { |
| 114 » » return fac.newClient(c, projectID) | |
| 116 } | 115 } |
| 117 return client.getTopic(id) | |
| 118 } | |
| 119 | |
| 120 func getSubscription(c context.Context, id string) (*pubsub.Subscription, error) { | |
| 121 client, err := getPubSubClient(c) | |
| 122 if err != nil { | |
| 123 return nil, err | |
| 124 } | |
| 125 return client.getSubscription(id) | |
| 126 } | |
| 127 | |
| 128 func createSubscription(c context.Context, id string, cfg pubsub.SubscriptionCon fig) ( | |
| 129 *pubsub.Subscription, error) { | |
| 130 | |
| 131 client, err := getPubSubClient(c) | |
| 132 if err != nil { | |
| 133 return nil, err | |
| 134 } | |
| 135 return client.createSubscription(id, cfg) | |
| 136 } | 116 } |
| 137 | 117 |
| 138 // EnsurePubSubSubscribed makes sure the following subscriptions are in place: | 118 // EnsurePubSubSubscribed makes sure the following subscriptions are in place: |
| 139 // * buildbucket, via the settings.Buildbucket.Topic setting | 119 // * buildbucket, via the settings.Buildbucket.Topic setting |
| 140 func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error { | 120 func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error { |
| 141 if settings.Buildbucket != nil { | 121 if settings.Buildbucket != nil { |
| 142 » » // Install the production pubsub client pointing to the buildbuc ket project | 122 » » c = withClientFactory(c, &prodPubSubClientFactory{}) |
| 143 » » // into the context. | |
| 144 » » c, err := withClient(c, settings.Buildbucket.Project) | |
| 145 » » if err != nil { | |
| 146 » » » return err | |
| 147 » » } | |
| 148 return ensureBuildbucketSubscribed(c, settings.Buildbucket.Proje ct) | 123 return ensureBuildbucketSubscribed(c, settings.Buildbucket.Proje ct) |
| 149 } | 124 } |
| 150 // TODO(hinoka): Ensure buildbot subscribed. | 125 // TODO(hinoka): Ensure buildbot subscribed. |
| 151 return nil | 126 return nil |
| 152 } | 127 } |
| 153 | 128 |
| 154 // ensureSubscribed is called by a cron job and ensures that the Milo | 129 // ensureSubscribed is called by a cron job and ensures that the Milo |
| 155 // instance is properly subscribed to the buildbucket subscription endpoint. | 130 // instance is properly subscribed to the buildbucket subscription endpoint. |
| 156 func ensureBuildbucketSubscribed(c context.Context, projectID string) error { | 131 func ensureBuildbucketSubscribed(c context.Context, projectID string) error { |
| 157 topicID := "builds" | 132 topicID := "builds" |
| 158 » // Check to see if the topic exists first. | 133 » // Check the buildbucket project to see if the topic exists first. |
| 159 » topic, err := getTopic(c, topicID) | 134 » bbClient, err := newPubSubClient(c, projectID) |
| 135 » if err != nil { | |
| 136 » » return err | |
| 137 » } | |
| 138 » topic, err := bbClient.getTopic(c, topicID) | |
| 160 switch err { | 139 switch err { |
| 161 case errNotExist: | 140 case errNotExist: |
| 162 » » logging.WithError(err).Errorf(c, "%s does not exist", topicID) | 141 » » return errors.Annotate(err, "%s does not exist", topicID).Err() |
| 163 » » return err | |
| 164 case nil: | 142 case nil: |
| 165 // continue | 143 // continue |
| 166 default: | 144 default: |
| 167 if strings.Contains(err.Error(), "PermissionDenied") { | 145 if strings.Contains(err.Error(), "PermissionDenied") { |
| 168 URL := "https://console.cloud.google.com/iam-admin/iam/p roject?project=" + projectID | 146 URL := "https://console.cloud.google.com/iam-admin/iam/p roject?project=" + projectID |
| 169 acct, serr := info.ServiceAccount(c) | 147 acct, serr := info.ServiceAccount(c) |
| 170 if serr != nil { | 148 if serr != nil { |
| 171 acct = fmt.Sprintf("Unknown: %s", serr.Error()) | 149 acct = fmt.Sprintf("Unknown: %s", serr.Error()) |
| 172 } | 150 } |
| 173 // The documentation is incorrect. We need Editor permi ssion because | 151 // The documentation is incorrect. We need Editor permi ssion because |
| 174 // the Subscriber permission does NOT permit attaching s ubscriptions to | 152 // the Subscriber permission does NOT permit attaching s ubscriptions to |
| 175 // topics or to view topics. | 153 // topics or to view topics. |
| 176 logging.WithError(err).Errorf( | 154 logging.WithError(err).Errorf( |
| 177 c, "please go to %s and add %s as a Pub/Sub Edit or", URL, acct) | 155 c, "please go to %s and add %s as a Pub/Sub Edit or", URL, acct) |
| 178 } else { | 156 } else { |
| 179 logging.WithError(err).Errorf(c, "could not check topic %#v", topic) | 157 logging.WithError(err).Errorf(c, "could not check topic %#v", topic) |
| 180 } | 158 } |
| 181 return err | 159 return err |
| 182 } | 160 } |
| 183 // Now check to see if the subscription already exists. | 161 // Now check to see if the subscription already exists. |
| 184 » subID := info.AppID(c) | 162 » miloClient, err := newPubSubClient(c, info.AppID(c)) |
| 185 » // Get the pubsub module of our app. We do not want to use info.ModuleH ostname() | 163 » if err != nil { |
| 186 » // because it returns a version pinned hostname instead of the default r oute. | 164 » » return err |
| 187 » sub, err := getSubscription(c, subID) | 165 » } |
| 166 » sub, err := miloClient.getSubscription(c, "buildbucket") | |
| 188 switch err { | 167 switch err { |
| 189 case errNotExist: | 168 case errNotExist: |
| 190 // continue | 169 // continue |
| 191 case nil: | 170 case nil: |
| 192 logging.Infof(c, "subscription %#v exists, no need to update", s ub) | 171 logging.Infof(c, "subscription %#v exists, no need to update", s ub) |
| 193 return nil | 172 return nil |
| 194 default: | 173 default: |
| 195 logging.WithError(err).Errorf(c, "could not check subscription % #v", sub) | 174 logging.WithError(err).Errorf(c, "could not check subscription % #v", sub) |
| 196 return err | 175 return err |
| 197 } | 176 } |
| 177 // Get the pubsub module of our app. We do not want to use info.ModuleH ostname() | |
| 178 // because it returns a version pinned hostname instead of the default r oute. | |
| 198 pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c) | 179 pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c) |
| 199 | 180 |
| 200 // No subscription exists, attach a new subscription to the existing top ic. | 181 // No subscription exists, attach a new subscription to the existing top ic. |
| 201 endpointURL := url.URL{ | 182 endpointURL := url.URL{ |
| 202 Scheme: "https", | 183 Scheme: "https", |
| 203 Host: pubsubModuleHost, | 184 Host: pubsubModuleHost, |
| 204 Path: "/_ah/push-handlers/buildbucket", | 185 Path: "/_ah/push-handlers/buildbucket", |
| 205 } | 186 } |
| 206 subConfig := pubsub.SubscriptionConfig{ | 187 subConfig := pubsub.SubscriptionConfig{ |
| 207 Topic: topic, | 188 Topic: topic, |
| 208 PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()}, | 189 PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()}, |
| 209 AckDeadline: time.Minute * 10, | 190 AckDeadline: time.Minute * 10, |
| 210 } | 191 } |
| 211 » newSub, err := createSubscription(c, subID, subConfig) | 192 » newSub, err := miloClient.createSubscription(c, "buildbucket", subConfig ) |
| 212 if err != nil { | 193 if err != nil { |
| 213 » » if strings.Contains(err.Error(), "The supplied HTTP URL is not r egistered") { | 194 » » return errors.Annotate(err, "could not create subscription %#v", sub).Err() |
| 214 » » » registerURL := "https://console.cloud.google.com/apis/cr edentials/domainverification?project=" + projectID | |
| 215 » » » verifyURL := "https://www.google.com/webmasters/verifica tion/verification?hl=en-GB&siteUrl=http://" + pubsubModuleHost | |
| 216 » » » logging.WithError(err).Errorf( | |
| 217 » » » » c, "The domain has to be verified and added.\n\n "+ | |
| 218 » » » » » "1. Go to %s\n"+ | |
| 219 » » » » » "2. Verify the domain\n"+ | |
| 220 » » » » » "3. Go to %s\n"+ | |
| 221 » » » » » "4. Add %s to allowed domains\n\n", | |
| 222 » » » » verifyURL, registerURL, pubsubModuleHost) | |
| 223 » » } else { | |
| 224 » » » logging.WithError(err).Errorf(c, "could not create subsc ription %#v", sub) | |
| 225 » » } | |
| 226 » » return err | |
| 227 } | 195 } |
| 228 // Success! | 196 // Success! |
| 229 logging.Infof(c, "successfully created subscription %#v", newSub) | 197 logging.Infof(c, "successfully created subscription %#v", newSub) |
| 230 return nil | 198 return nil |
| 231 } | 199 } |
| OLD | NEW |