| OLD | NEW |
| 1 package common | 1 package common |
| 2 | 2 |
| 3 import ( | 3 import ( |
| 4 "encoding/base64" | 4 "encoding/base64" |
| 5 "errors" | |
| 6 "fmt" | 5 "fmt" |
| 7 "net/url" | 6 "net/url" |
| 8 "strings" | 7 "strings" |
| 9 "time" | 8 "time" |
| 10 | 9 |
| 11 "cloud.google.com/go/pubsub" | 10 "cloud.google.com/go/pubsub" |
| 12 "golang.org/x/net/context" | 11 "golang.org/x/net/context" |
| 13 | 12 |
| 14 "github.com/luci/gae/service/info" | 13 "github.com/luci/gae/service/info" |
| 14 "github.com/luci/luci-go/common/errors" |
| 15 "github.com/luci/luci-go/common/logging" | 15 "github.com/luci/luci-go/common/logging" |
| 16 "github.com/luci/luci-go/milo/api/config" | 16 "github.com/luci/luci-go/milo/api/config" |
| 17 ) | 17 ) |
| 18 | 18 |
| 19 var pubSubClientKey = "stores a pubsubClient" | 19 var pubsubClientFactoryKey = "stores a pubsubClientFactory" |
| 20 | 20 |
| 21 type PubSubMessage struct { | 21 type PubSubMessage struct { |
| 22 Attributes map[string]string `json:"attributes"` | 22 Attributes map[string]string `json:"attributes"` |
| 23 Data string `json:"data"` | 23 Data string `json:"data"` |
| 24 MessageID string `json:"message_id"` | 24 MessageID string `json:"message_id"` |
| 25 } | 25 } |
| 26 | 26 |
| 27 type PubSubSubscription struct { | 27 type PubSubSubscription struct { |
| 28 Message PubSubMessage `json:"message"` | 28 Message PubSubMessage `json:"message"` |
| 29 Subscription string `json:"subscription"` | 29 Subscription string `json:"subscription"` |
| 30 } | 30 } |
| 31 | 31 |
| 32 var errNotExist = errors.New("does not exist") | 32 var errNotExist = errors.New("does not exist") |
| 33 | 33 |
| 34 // GetData returns the expanded form of Data (decoded from base64). | 34 // GetData returns the expanded form of Data (decoded from base64). |
| 35 func (m *PubSubSubscription) GetData() ([]byte, error) { | 35 func (m *PubSubSubscription) GetData() ([]byte, error) { |
| 36 return base64.StdEncoding.DecodeString(m.Message.Data) | 36 return base64.StdEncoding.DecodeString(m.Message.Data) |
| 37 } | 37 } |
| 38 | 38 |
| 39 // pubsubClient is an interface representing a pubsub.Client containing only | 39 // pubsubClient is an interface representing a pubsub.Client containing only |
| 40 // the functions that Milo calls. Internal use only, can be swapped | 40 // the functions that Milo calls. Internal use only, can be swapped |
| 41 // out for testing. | 41 // out for testing. |
| 42 type pubsubClient interface { | 42 type pubsubClient interface { |
| 43 // getTopic returns the pubsub topic if it exists, a notExist error if | 43 // getTopic returns the pubsub topic if it exists, a notExist error if |
| 44 // it does not exist, or an error if there was an error. | 44 // it does not exist, or an error if there was an error. |
| 45 » getTopic(string) (*pubsub.Topic, error) | 45 » getTopic(context.Context, string) (*pubsub.Topic, error) |
| 46 | 46 |
| 47 // getSubscription returns the pubsub subscription if it exists, | 47 // getSubscription returns the pubsub subscription if it exists, |
| 48 // a notExist error if it does not exist, or an error if there was an er
ror. | 48 // a notExist error if it does not exist, or an error if there was an er
ror. |
| 49 » getSubscription(string) (*pubsub.Subscription, error) | 49 » getSubscription(context.Context, string) (*pubsub.Subscription, error) |
| 50 | 50 |
| 51 » createSubscription(string, pubsub.SubscriptionConfig) ( | 51 » createSubscription(context.Context, string, pubsub.SubscriptionConfig) ( |
| 52 *pubsub.Subscription, error) | 52 *pubsub.Subscription, error) |
| 53 } | 53 } |
| 54 | 54 |
| 55 // pubsubClientFactory is a stubbable factory that produces pubsubClients bound |
| 56 // to project IDs. |
| 57 type pubsubClientFactory func(context.Context, string) (pubsubClient, error) |
| 58 |
| 55 // prodPubSubClient is a wrapper around the production pubsub client. | 59 // prodPubSubClient is a wrapper around the production pubsub client. |
| 56 type prodPubSubClient struct { | 60 type prodPubSubClient struct { |
| 57 » ctx context.Context | 61 » *pubsub.Client |
| 58 » client *pubsub.Client | |
| 59 } | 62 } |
| 60 | 63 |
| 61 func (pc *prodPubSubClient) getTopic(id string) (*pubsub.Topic, error) { | 64 func (pc *prodPubSubClient) getTopic(c context.Context, id string) (*pubsub.Topi
c, error) { |
| 62 » topic := pc.client.Topic(id) | 65 » topic := pc.Client.Topic(id) |
| 63 » exists, err := topic.Exists(pc.ctx) | 66 » exists, err := topic.Exists(c) |
| 64 switch { | 67 switch { |
| 65 case err != nil: | 68 case err != nil: |
| 66 return nil, err | 69 return nil, err |
| 67 case !exists: | 70 case !exists: |
| 68 return nil, errNotExist | 71 return nil, errNotExist |
| 69 } | 72 } |
| 70 return topic, nil | 73 return topic, nil |
| 71 } | 74 } |
| 72 | 75 |
| 73 func (pc *prodPubSubClient) getSubscription(id string) (*pubsub.Subscription, er
ror) { | 76 func (pc *prodPubSubClient) getSubscription(c context.Context, id string) (*pubs
ub.Subscription, error) { |
| 74 » sub := pc.client.Subscription(id) | 77 » sub := pc.Client.Subscription(id) |
| 75 » exists, err := sub.Exists(pc.ctx) | 78 » exists, err := sub.Exists(c) |
| 76 switch { | 79 switch { |
| 77 case err != nil: | 80 case err != nil: |
| 78 return nil, err | 81 return nil, err |
| 79 case !exists: | 82 case !exists: |
| 80 return nil, errNotExist | 83 return nil, errNotExist |
| 81 } | 84 } |
| 82 return sub, nil | 85 return sub, nil |
| 83 } | 86 } |
| 84 | 87 |
| 85 func (pc *prodPubSubClient) createSubscription(id string, cfg pubsub.Subscriptio
nConfig) ( | 88 func (pc *prodPubSubClient) createSubscription( |
| 89 » c context.Context, id string, cfg pubsub.SubscriptionConfig) ( |
| 86 *pubsub.Subscription, error) { | 90 *pubsub.Subscription, error) { |
| 87 | 91 |
| 88 » return pc.client.CreateSubscription(pc.ctx, id, cfg) | 92 » return pc.Client.CreateSubscription(c, id, cfg) |
| 89 } | 93 } |
| 90 | 94 |
| 91 // getPubSubClient extracts a debug PubSub client out of the context. | 95 func prodPubSubClientFactory(c context.Context, projectID string) (pubsubClient,
error) { |
| 92 func getPubSubClient(c context.Context) (pubsubClient, error) { | 96 » cli, err := pubsub.NewClient(c, projectID) |
| 93 » if client, ok := c.Value(&pubSubClientKey).(pubsubClient); ok { | 97 » return &prodPubSubClient{cli}, err |
| 94 » » return client, nil | |
| 95 » } | |
| 96 » return nil, errors.New("no pubsub clients installed") | |
| 97 } | 98 } |
| 98 | 99 |
| 99 // withClient returns a context with a pubsub client instantiated to the | 100 // withClientFactory returns a context with a given pubsub client factory. |
| 100 // given project ID | 101 func withClientFactory(c context.Context, fac pubsubClientFactory) context.Conte
xt { |
| 101 func withClient(c context.Context, projectID string) (context.Context, error) { | 102 » return context.WithValue(c, &pubsubClientFactoryKey, fac) |
| 102 » if projectID == "" { | |
| 103 » » return nil, errors.New("missing buildbucket project") | |
| 104 » } | |
| 105 » client, err := pubsub.NewClient(c, projectID) | |
| 106 » if err != nil { | |
| 107 » » return nil, err | |
| 108 » } | |
| 109 » return context.WithValue(c, &pubSubClientKey, &prodPubSubClient{c, clien
t}), nil | |
| 110 } | 103 } |
| 111 | 104 |
| 112 func getTopic(c context.Context, id string) (*pubsub.Topic, error) { | 105 func newPubSubClient(c context.Context, projectID string) (pubsubClient, error)
{ |
| 113 » client, err := getPubSubClient(c) | 106 » if fac, ok := c.Value(&pubsubClientFactoryKey).(pubsubClientFactory); !o
k { |
| 114 » if err != nil { | 107 » » panic("no pubsub client factory installed") |
| 115 » » return nil, err | 108 » } else { |
| 109 » » return fac(c, projectID) |
| 116 } | 110 } |
| 117 return client.getTopic(id) | |
| 118 } | |
| 119 | |
| 120 func getSubscription(c context.Context, id string) (*pubsub.Subscription, error)
{ | |
| 121 client, err := getPubSubClient(c) | |
| 122 if err != nil { | |
| 123 return nil, err | |
| 124 } | |
| 125 return client.getSubscription(id) | |
| 126 } | |
| 127 | |
| 128 func createSubscription(c context.Context, id string, cfg pubsub.SubscriptionCon
fig) ( | |
| 129 *pubsub.Subscription, error) { | |
| 130 | |
| 131 client, err := getPubSubClient(c) | |
| 132 if err != nil { | |
| 133 return nil, err | |
| 134 } | |
| 135 return client.createSubscription(id, cfg) | |
| 136 } | 111 } |
| 137 | 112 |
| 138 // EnsurePubSubSubscribed makes sure the following subscriptions are in place: | 113 // EnsurePubSubSubscribed makes sure the following subscriptions are in place: |
| 139 // * buildbucket, via the settings.Buildbucket.Topic setting | 114 // * buildbucket, via the settings.Buildbucket.Topic setting |
| 140 func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error
{ | 115 func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error
{ |
| 141 if settings.Buildbucket != nil { | 116 if settings.Buildbucket != nil { |
| 142 » » // Install the production pubsub client pointing to the buildbuc
ket project | 117 » » c = withClientFactory(c, prodPubSubClientFactory) |
| 143 » » // into the context. | |
| 144 » » c, err := withClient(c, settings.Buildbucket.Project) | |
| 145 » » if err != nil { | |
| 146 » » » return err | |
| 147 » » } | |
| 148 return ensureBuildbucketSubscribed(c, settings.Buildbucket.Proje
ct) | 118 return ensureBuildbucketSubscribed(c, settings.Buildbucket.Proje
ct) |
| 149 } | 119 } |
| 150 // TODO(hinoka): Ensure buildbot subscribed. | 120 // TODO(hinoka): Ensure buildbot subscribed. |
| 151 return nil | 121 return nil |
| 152 } | 122 } |
| 153 | 123 |
| 154 // ensureSubscribed is called by a cron job and ensures that the Milo | 124 // ensureBuildbucketSubscribedis called by a cron job and ensures that the Milo |
| 155 // instance is properly subscribed to the buildbucket subscription endpoint. | 125 // instance is properly subscribed to the buildbucket subscription endpoint. |
| 156 func ensureBuildbucketSubscribed(c context.Context, projectID string) error { | 126 func ensureBuildbucketSubscribed(c context.Context, projectID string) error { |
| 157 topicID := "builds" | 127 topicID := "builds" |
| 158 » // Check to see if the topic exists first. | 128 » // Check the buildbucket project to see if the topic exists first. |
| 159 » topic, err := getTopic(c, topicID) | 129 » bbClient, err := newPubSubClient(c, projectID) |
| 130 » if err != nil { |
| 131 » » return err |
| 132 » } |
| 133 » topic, err := bbClient.getTopic(c, topicID) |
| 160 switch err { | 134 switch err { |
| 161 case errNotExist: | 135 case errNotExist: |
| 162 » » logging.WithError(err).Errorf(c, "%s does not exist", topicID) | 136 » » return errors.Annotate(err, "%s does not exist", topicID).Err() |
| 163 » » return err | |
| 164 case nil: | 137 case nil: |
| 165 // continue | 138 // continue |
| 166 default: | 139 default: |
| 167 if strings.Contains(err.Error(), "PermissionDenied") { | 140 if strings.Contains(err.Error(), "PermissionDenied") { |
| 168 URL := "https://console.cloud.google.com/iam-admin/iam/p
roject?project=" + projectID | 141 URL := "https://console.cloud.google.com/iam-admin/iam/p
roject?project=" + projectID |
| 169 acct, serr := info.ServiceAccount(c) | 142 acct, serr := info.ServiceAccount(c) |
| 170 if serr != nil { | 143 if serr != nil { |
| 171 acct = fmt.Sprintf("Unknown: %s", serr.Error()) | 144 acct = fmt.Sprintf("Unknown: %s", serr.Error()) |
| 172 } | 145 } |
| 173 // The documentation is incorrect. We need Editor permi
ssion because | 146 // The documentation is incorrect. We need Editor permi
ssion because |
| 174 // the Subscriber permission does NOT permit attaching s
ubscriptions to | 147 // the Subscriber permission does NOT permit attaching s
ubscriptions to |
| 175 // topics or to view topics. | 148 // topics or to view topics. |
| 176 logging.WithError(err).Errorf( | 149 logging.WithError(err).Errorf( |
| 177 c, "please go to %s and add %s as a Pub/Sub Edit
or", URL, acct) | 150 c, "please go to %s and add %s as a Pub/Sub Edit
or", URL, acct) |
| 178 } else { | 151 } else { |
| 179 logging.WithError(err).Errorf(c, "could not check topic
%#v", topic) | 152 logging.WithError(err).Errorf(c, "could not check topic
%#v", topic) |
| 180 } | 153 } |
| 181 return err | 154 return err |
| 182 } | 155 } |
| 183 // Now check to see if the subscription already exists. | 156 // Now check to see if the subscription already exists. |
| 184 » subID := info.AppID(c) | 157 » miloClient, err := newPubSubClient(c, info.AppID(c)) |
| 185 » // Get the pubsub module of our app. We do not want to use info.ModuleH
ostname() | 158 » if err != nil { |
| 186 » // because it returns a version pinned hostname instead of the default r
oute. | 159 » » return err |
| 187 » sub, err := getSubscription(c, subID) | 160 » } |
| 161 » sub, err := miloClient.getSubscription(c, "buildbucket") |
| 188 switch err { | 162 switch err { |
| 189 case errNotExist: | 163 case errNotExist: |
| 190 // continue | 164 // continue |
| 191 case nil: | 165 case nil: |
| 192 logging.Infof(c, "subscription %#v exists, no need to update", s
ub) | 166 logging.Infof(c, "subscription %#v exists, no need to update", s
ub) |
| 193 return nil | 167 return nil |
| 194 default: | 168 default: |
| 195 logging.WithError(err).Errorf(c, "could not check subscription %
#v", sub) | 169 logging.WithError(err).Errorf(c, "could not check subscription %
#v", sub) |
| 196 return err | 170 return err |
| 197 } | 171 } |
| 172 // Get the pubsub module of our app. We do not want to use info.ModuleH
ostname() |
| 173 // because it returns a version pinned hostname instead of the default r
oute. |
| 198 pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c) | 174 pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c) |
| 199 | 175 |
| 200 // No subscription exists, attach a new subscription to the existing top
ic. | 176 // No subscription exists, attach a new subscription to the existing top
ic. |
| 201 endpointURL := url.URL{ | 177 endpointURL := url.URL{ |
| 202 Scheme: "https", | 178 Scheme: "https", |
| 203 Host: pubsubModuleHost, | 179 Host: pubsubModuleHost, |
| 204 Path: "/_ah/push-handlers/buildbucket", | 180 Path: "/_ah/push-handlers/buildbucket", |
| 205 } | 181 } |
| 206 subConfig := pubsub.SubscriptionConfig{ | 182 subConfig := pubsub.SubscriptionConfig{ |
| 207 Topic: topic, | 183 Topic: topic, |
| 208 PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()}, | 184 PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()}, |
| 209 AckDeadline: time.Minute * 10, | 185 AckDeadline: time.Minute * 10, |
| 210 } | 186 } |
| 211 » newSub, err := createSubscription(c, subID, subConfig) | 187 » newSub, err := miloClient.createSubscription(c, "buildbucket", subConfig
) |
| 212 if err != nil { | 188 if err != nil { |
| 213 » » if strings.Contains(err.Error(), "The supplied HTTP URL is not r
egistered") { | 189 » » return errors.Annotate(err, "could not create subscription %#v",
sub).Err() |
| 214 » » » registerURL := "https://console.cloud.google.com/apis/cr
edentials/domainverification?project=" + projectID | |
| 215 » » » verifyURL := "https://www.google.com/webmasters/verifica
tion/verification?hl=en-GB&siteUrl=http://" + pubsubModuleHost | |
| 216 » » » logging.WithError(err).Errorf( | |
| 217 » » » » c, "The domain has to be verified and added.\n\n
"+ | |
| 218 » » » » » "1. Go to %s\n"+ | |
| 219 » » » » » "2. Verify the domain\n"+ | |
| 220 » » » » » "3. Go to %s\n"+ | |
| 221 » » » » » "4. Add %s to allowed domains\n\n", | |
| 222 » » » » verifyURL, registerURL, pubsubModuleHost) | |
| 223 » » } else { | |
| 224 » » » logging.WithError(err).Errorf(c, "could not create subsc
ription %#v", sub) | |
| 225 » » } | |
| 226 » » return err | |
| 227 } | 190 } |
| 228 // Success! | 191 // Success! |
| 229 logging.Infof(c, "successfully created subscription %#v", newSub) | 192 logging.Infof(c, "successfully created subscription %#v", newSub) |
| 230 return nil | 193 return nil |
| 231 } | 194 } |
| OLD | NEW |