| OLD | NEW |
| (Empty) | |
| 1 package common |
| 2 |
| 3 import ( |
| 4 "encoding/base64" |
| 5 "errors" |
| 6 "fmt" |
| 7 "net/url" |
| 8 "strings" |
| 9 "time" |
| 10 |
| 11 "cloud.google.com/go/pubsub" |
| 12 "golang.org/x/net/context" |
| 13 |
| 14 "github.com/luci/gae/service/info" |
| 15 "github.com/luci/luci-go/common/logging" |
| 16 "github.com/luci/luci-go/milo/api/config" |
| 17 ) |
| 18 |
| 19 var pubSubClientKey = "stores a pubsubClient" |
| 20 |
| 21 type PubSubMessage struct { |
| 22 Attributes map[string]string `json:"attributes"` |
| 23 Data string `json:"data"` |
| 24 MessageID string `json:"message_id"` |
| 25 } |
| 26 |
| 27 type PubSubSubscription struct { |
| 28 Message PubSubMessage `json:"message"` |
| 29 Subscription string `json:"subscription"` |
| 30 } |
| 31 |
| 32 var errNotExist = errors.New("does not exist") |
| 33 |
| 34 // GetData returns the expanded form of Data (decoded from base64). |
| 35 func (m *PubSubSubscription) GetData() ([]byte, error) { |
| 36 return base64.StdEncoding.DecodeString(m.Message.Data) |
| 37 } |
| 38 |
| 39 // pubsubClient is an interface representing a pubsub.Client containing only |
| 40 // the functions that Milo calls. Internal use only, can be swapped |
| 41 // out for testing. |
| 42 type pubsubClient interface { |
| 43 // getTopic returns the pubsub topic if it exists, a notExist error if |
| 44 // it does not exist, or an error if there was an error. |
| 45 getTopic(string) (*pubsub.Topic, error) |
| 46 |
| 47 // getSubscription returns the pubsub subscription if it exists, |
| 48 // a notExist error if it does not exist, or an error if there was an er
ror. |
| 49 getSubscription(string) (*pubsub.Subscription, error) |
| 50 |
| 51 createSubscription(string, pubsub.SubscriptionConfig) ( |
| 52 *pubsub.Subscription, error) |
| 53 } |
| 54 |
| 55 // prodPubSubClient is a wrapper around the production pubsub client. |
| 56 type prodPubSubClient struct { |
| 57 ctx context.Context |
| 58 client *pubsub.Client |
| 59 } |
| 60 |
| 61 func (pc *prodPubSubClient) getTopic(id string) (*pubsub.Topic, error) { |
| 62 topic := pc.client.Topic(id) |
| 63 exists, err := topic.Exists(pc.ctx) |
| 64 switch { |
| 65 case err != nil: |
| 66 return nil, err |
| 67 case !exists: |
| 68 return nil, errNotExist |
| 69 } |
| 70 return topic, nil |
| 71 } |
| 72 |
| 73 func (pc *prodPubSubClient) getSubscription(id string) (*pubsub.Subscription, er
ror) { |
| 74 sub := pc.client.Subscription(id) |
| 75 exists, err := sub.Exists(pc.ctx) |
| 76 switch { |
| 77 case err != nil: |
| 78 return nil, err |
| 79 case !exists: |
| 80 return nil, errNotExist |
| 81 } |
| 82 return sub, nil |
| 83 } |
| 84 |
| 85 func (pc *prodPubSubClient) createSubscription(id string, cfg pubsub.Subscriptio
nConfig) ( |
| 86 *pubsub.Subscription, error) { |
| 87 |
| 88 return pc.client.CreateSubscription(pc.ctx, id, cfg) |
| 89 } |
| 90 |
| 91 // getPubSubClient extracts a debug PubSub client out of the context. |
| 92 func getPubSubClient(c context.Context) (pubsubClient, error) { |
| 93 if client, ok := c.Value(&pubSubClientKey).(pubsubClient); ok { |
| 94 return client, nil |
| 95 } |
| 96 return nil, errors.New("no pubsub clients installed") |
| 97 } |
| 98 |
| 99 // withClient returns a context with a pubsub client instantiated to the |
| 100 // given project ID |
| 101 func withClient(c context.Context, projectID string) (context.Context, error) { |
| 102 if projectID == "" { |
| 103 return nil, errors.New("missing buildbucket project") |
| 104 } |
| 105 client, err := pubsub.NewClient(c, projectID) |
| 106 if err != nil { |
| 107 return nil, err |
| 108 } |
| 109 return context.WithValue(c, &pubSubClientKey, &prodPubSubClient{c, clien
t}), nil |
| 110 } |
| 111 |
| 112 func getTopic(c context.Context, id string) (*pubsub.Topic, error) { |
| 113 client, err := getPubSubClient(c) |
| 114 if err != nil { |
| 115 return nil, err |
| 116 } |
| 117 return client.getTopic(id) |
| 118 } |
| 119 |
| 120 func getSubscription(c context.Context, id string) (*pubsub.Subscription, error)
{ |
| 121 client, err := getPubSubClient(c) |
| 122 if err != nil { |
| 123 return nil, err |
| 124 } |
| 125 return client.getSubscription(id) |
| 126 } |
| 127 |
| 128 func createSubscription(c context.Context, id string, cfg pubsub.SubscriptionCon
fig) ( |
| 129 *pubsub.Subscription, error) { |
| 130 |
| 131 client, err := getPubSubClient(c) |
| 132 if err != nil { |
| 133 return nil, err |
| 134 } |
| 135 return client.createSubscription(id, cfg) |
| 136 } |
| 137 |
| 138 // EnsurePubSubSubscribed makes sure the following subscriptions are in place: |
| 139 // * buildbucket, via the settings.Buildbucket.Topic setting |
| 140 func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error
{ |
| 141 if settings.Buildbucket != nil { |
| 142 // Install the production pubsub client pointing to the buildbuc
ket project |
| 143 // into the context. |
| 144 c, err := withClient(c, settings.Buildbucket.Project) |
| 145 if err != nil { |
| 146 return err |
| 147 } |
| 148 return ensureBuildbucketSubscribed(c, settings.Buildbucket.Proje
ct) |
| 149 } |
| 150 // TODO(hinoka): Ensure buildbot subscribed. |
| 151 return nil |
| 152 } |
| 153 |
| 154 // ensureSubscribed is called by a cron job and ensures that the Milo |
| 155 // instance is properly subscribed to the buildbucket subscription endpoint. |
| 156 func ensureBuildbucketSubscribed(c context.Context, projectID string) error { |
| 157 topicID := "builds" |
| 158 // Check to see if the topic exists first. |
| 159 topic, err := getTopic(c, topicID) |
| 160 switch err { |
| 161 case errNotExist: |
| 162 logging.WithError(err).Errorf(c, "%s does not exist", topicID) |
| 163 return err |
| 164 case nil: |
| 165 // continue |
| 166 default: |
| 167 if strings.Contains(err.Error(), "PermissionDenied") { |
| 168 URL := "https://console.cloud.google.com/iam-admin/iam/p
roject?project=" + projectID |
| 169 acct, serr := info.ServiceAccount(c) |
| 170 if serr != nil { |
| 171 acct = fmt.Sprintf("Unknown: %s", serr.Error()) |
| 172 } |
| 173 // The documentation is incorrect. We need Editor permi
ssion because |
| 174 // the Subscriber permission does NOT permit attaching s
ubscriptions to |
| 175 // topics or to view topics. |
| 176 logging.WithError(err).Errorf( |
| 177 c, "please go to %s and add %s as a Pub/Sub Edit
or", URL, acct) |
| 178 } else { |
| 179 logging.WithError(err).Errorf(c, "could not check topic
%#v", topic) |
| 180 } |
| 181 return err |
| 182 } |
| 183 // Now check to see if the subscription already exists. |
| 184 subID := info.AppID(c) |
| 185 // Get the pubsub module of our app. We do not want to use info.ModuleH
ostname() |
| 186 // because it returns a version pinned hostname instead of the default r
oute. |
| 187 sub, err := getSubscription(c, subID) |
| 188 switch err { |
| 189 case errNotExist: |
| 190 // continue |
| 191 case nil: |
| 192 logging.Infof(c, "subscription %#v exists, no need to update", s
ub) |
| 193 return nil |
| 194 default: |
| 195 logging.WithError(err).Errorf(c, "could not check subscription %
#v", sub) |
| 196 return err |
| 197 } |
| 198 pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c) |
| 199 |
| 200 // No subscription exists, attach a new subscription to the existing top
ic. |
| 201 endpointURL := url.URL{ |
| 202 Scheme: "https", |
| 203 Host: pubsubModuleHost, |
| 204 Path: "/_ah/push-handlers/buildbucket", |
| 205 } |
| 206 subConfig := pubsub.SubscriptionConfig{ |
| 207 Topic: topic, |
| 208 PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()}, |
| 209 AckDeadline: time.Minute * 10, |
| 210 } |
| 211 newSub, err := createSubscription(c, subID, subConfig) |
| 212 if err != nil { |
| 213 if strings.Contains(err.Error(), "The supplied HTTP URL is not r
egistered") { |
| 214 registerURL := "https://console.cloud.google.com/apis/cr
edentials/domainverification?project=" + projectID |
| 215 verifyURL := "https://www.google.com/webmasters/verifica
tion/verification?hl=en-GB&siteUrl=http://" + pubsubModuleHost |
| 216 logging.WithError(err).Errorf( |
| 217 c, "The domain has to be verified and added.\n\n
"+ |
| 218 "1. Go to %s\n"+ |
| 219 "2. Verify the domain\n"+ |
| 220 "3. Go to %s\n"+ |
| 221 "4. Add %s to allowed domains\n\n", |
| 222 verifyURL, registerURL, pubsubModuleHost) |
| 223 } else { |
| 224 logging.WithError(err).Errorf(c, "could not create subsc
ription %#v", sub) |
| 225 } |
| 226 return err |
| 227 } |
| 228 // Success! |
| 229 logging.Infof(c, "successfully created subscription %#v", newSub) |
| 230 return nil |
| 231 } |
| OLD | NEW |