Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 package common | |
| 2 | |
| 3 import ( | |
| 4 "encoding/base64" | |
| 5 "errors" | |
| 6 "fmt" | |
| 7 "net/url" | |
| 8 "strings" | |
| 9 "time" | |
| 10 | |
| 11 "cloud.google.com/go/pubsub" | |
| 12 "golang.org/x/net/context" | |
| 13 | |
| 14 "github.com/luci/gae/service/info" | |
| 15 "github.com/luci/luci-go/common/logging" | |
| 16 "github.com/luci/luci-go/milo/api/config" | |
| 17 ) | |
| 18 | |
| 19 var pubSubClientKey = "stores a pubsubClient" | |
| 20 | |
| 21 type PubSubMessage struct { | |
| 22 Attributes map[string]string `json:"attributes"` | |
| 23 Data string `json:"data"` | |
| 24 MessageID string `json:"message_id"` | |
| 25 } | |
| 26 | |
| 27 type PubSubSubscription struct { | |
| 28 Message PubSubMessage `json:"message"` | |
| 29 Subscription string `json:"subscription"` | |
| 30 } | |
| 31 | |
| 32 var errNotExist = errors.New("does not exist") | |
| 33 | |
| 34 // GetData returns the expanded form of Data (decoded from base64). | |
| 35 func (m *PubSubSubscription) GetData() ([]byte, error) { | |
| 36 return base64.StdEncoding.DecodeString(m.Message.Data) | |
| 37 } | |
| 38 | |
| 39 // pubsubClient is an interface representing a pubsub.Client containing only | |
| 40 // the functions that Milo calls. Internal use only, can be swapped | |
| 41 // out for testing. | |
| 42 type pubsubClient interface { | |
| 43 getTopic(string) (*pubsub.Topic, error) | |
| 44 getSubscription(string) (*pubsub.Subscription, error) | |
| 45 createSubscription(string, pubsub.SubscriptionConfig) ( | |
| 46 *pubsub.Subscription, error) | |
| 47 } | |
| 48 | |
| 49 // prodPubSubClient is a wrapper around the production pubsub client. | |
| 50 type prodPubSubClient struct { | |
| 51 ctx context.Context | |
| 52 client *pubsub.Client | |
| 53 } | |
| 54 | |
| 55 // getTopic returns the pubsub topic if it exists, a notExist error if | |
| 56 // it does not exist, or an error if there was an error. | |
|
iannucci
2017/07/08 00:05:25
move the docstrings to the interface
Ryan Tseng
2017/07/08 01:47:34
Done.
| |
| 57 func (pc *prodPubSubClient) getTopic(id string) (*pubsub.Topic, error) { | |
| 58 topic := pc.client.Topic(id) | |
| 59 exists, err := topic.Exists(pc.ctx) | |
| 60 switch { | |
| 61 case err != nil: | |
| 62 return nil, err | |
| 63 case !exists: | |
| 64 return nil, errNotExist | |
| 65 } | |
| 66 return topic, nil | |
| 67 } | |
| 68 | |
| 69 // getSubscription returns the pubsub subscriptionif it exists, | |
|
iannucci
2017/07/08 00:05:25
nit: missing space
Ryan Tseng
2017/07/08 01:47:34
Done.
| |
| 70 // a notExist error if it does not exist, or an error if there was an error. | |
| 71 func (pc *prodPubSubClient) getSubscription(id string) ( | |
| 72 *pubsub.Subscription, error) { | |
|
iannucci
2017/07/08 00:05:25
no newline... 80 cols is not a thing in go
Ryan Tseng
2017/07/08 01:47:34
Done.
| |
| 73 sub := pc.client.Subscription(id) | |
| 74 exists, err := sub.Exists(pc.ctx) | |
| 75 switch { | |
| 76 case err != nil: | |
| 77 return nil, err | |
| 78 case !exists: | |
| 79 return nil, errNotExist | |
| 80 } | |
| 81 return sub, nil | |
| 82 } | |
| 83 | |
| 84 func (pc *prodPubSubClient) createSubscription( | |
| 85 id string, cfg pubsub.SubscriptionConfig) ( | |
|
iannucci
2017/07/08 00:05:25
nix newlines
Ryan Tseng
2017/07/08 01:47:34
Done.
| |
| 86 *pubsub.Subscription, error) { | |
| 87 | |
| 88 return pc.client.CreateSubscription(pc.ctx, id, cfg) | |
| 89 } | |
| 90 | |
| 91 // getPubSubClient tries to extract a debug PubSub client out of the context. | |
|
iannucci
2017/07/08 00:05:25
I would consider instead just having GetTopic(cont
Ryan Tseng
2017/07/08 01:47:34
Done.
| |
| 92 // if none exists, then return a prod client. | |
| 93 func getPubSubClient(c context.Context, projectID string) (pubsubClient, error) { | |
| 94 if client, ok := c.Value(&pubSubClientKey).(pubsubClient); ok { | |
| 95 return client, nil | |
| 96 } | |
| 97 client, err := pubsub.NewClient(c, projectID) | |
| 98 return &prodPubSubClient{c, client}, err | |
| 99 } | |
| 100 | |
| 101 // EnsurePubSubSubscribed makes sure the following subscriptions are in place: | |
| 102 // * buildbucket, via the settings.Buildbucket.Topic setting | |
| 103 func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error { | |
| 104 if settings.Buildbucket != nil { | |
| 105 err := ensureBuildbucketSubscribed(c, settings.Buildbucket.Proje ct) | |
| 106 return err | |
| 107 } | |
| 108 // TODO(hinoka): Ensure buildbot subscribed. | |
| 109 return nil | |
| 110 } | |
| 111 | |
| 112 // ensureSubscribed is called by a cron job and ensures that the Milo | |
| 113 // instance is properly subscribed to the buildbucket subscription endpoint. | |
| 114 func ensureBuildbucketSubscribed(c context.Context, projectID string) error { | |
| 115 topicID := "builds" | |
| 116 if projectID == "" { | |
| 117 return fmt.Errorf("missing buildbucket project") | |
|
iannucci
2017/07/08 00:05:25
errors.New
Ryan Tseng
2017/07/08 01:47:33
Done.
| |
| 118 } | |
| 119 client, err := getPubSubClient(c, projectID) | |
| 120 if err != nil { | |
| 121 return err | |
| 122 } | |
| 123 // Check to see if the topic exists first. | |
| 124 topic, err := client.getTopic(topicID) | |
| 125 switch err { | |
| 126 case errNotExist: | |
| 127 err = fmt.Errorf("topic %#v does not exist", topic) | |
|
iannucci
2017/07/08 00:05:25
why not have getTopic do this?
Ryan Tseng
2017/07/08 01:47:34
Done.
| |
| 128 logging.Errorf(c, err.Error()) | |
| 129 return err | |
| 130 case nil: | |
| 131 // continue | |
| 132 default: | |
| 133 if strings.Contains(err.Error(), "PermissionDenied") { | |
|
iannucci
2017/07/08 00:05:25
oof, is this the best way?
Ryan Tseng
2017/07/08 01:47:34
As far as I can tell :(
| |
| 134 URL := "https://console.cloud.google.com/iam-admin/iam/p roject?project=" + projectID | |
|
iannucci
2017/07/08 00:05:25
may be worth figuring out the gcloud sdk command f
Ryan Tseng
2017/07/08 01:47:34
gcloud doesn't appear to be able to add/manipulate
| |
| 135 acct, serr := info.ServiceAccount(c) | |
| 136 if serr != nil { | |
| 137 acct = fmt.Sprintf("Unknown: %s", serr.Error()) | |
| 138 } | |
| 139 // The documentation is incorrect. We need Editor permi ssion because | |
| 140 // the Subscriber permission does NOT permit attaching s ubscriptions to | |
| 141 // topics or to view topics. | |
| 142 logging.WithError(err).Errorf( | |
| 143 c, "please go to %s and add %s as a Pub/Sub Edit or", URL, acct) | |
| 144 } else { | |
| 145 logging.WithError(err).Errorf(c, "could not check topic %#v", topic) | |
| 146 } | |
| 147 return err | |
| 148 } | |
| 149 // Now check to see if the subscription already exists. | |
| 150 subID := info.AppID(c) | |
| 151 // Get the pubsub module of our app. We do not want to use info.ModuleH ostname() | |
| 152 // because it returns a version pinned hostname instead of the default r oute. | |
| 153 sub, err := client.getSubscription(subID) | |
| 154 switch err { | |
| 155 case errNotExist: | |
| 156 // continue | |
| 157 case nil: | |
| 158 logging.Infof(c, "subscription %#v exists, no need to update", s ub) | |
| 159 return nil | |
| 160 default: | |
| 161 logging.WithError(err).Errorf(c, "could not check subscription % #v", sub) | |
| 162 return err | |
| 163 } | |
| 164 pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c) | |
| 165 | |
| 166 // No subscription exists, attach a new subscription to the existing top ic. | |
| 167 endpointURL := url.URL{ | |
| 168 Scheme: "https", | |
| 169 Host: pubsubModuleHost, | |
| 170 Path: "/_ah/push-handlers/buildbucket", | |
| 171 } | |
| 172 subConfig := pubsub.SubscriptionConfig{ | |
| 173 Topic: topic, | |
| 174 PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()}, | |
| 175 AckDeadline: time.Minute * 10, | |
| 176 } | |
| 177 newSub, err := client.createSubscription(subID, subConfig) | |
| 178 if err != nil { | |
| 179 if strings.Contains(err.Error(), "The supplied HTTP URL is not r egistered") { | |
| 180 registerURL := "https://pantheon.corp.google.com/apis/cr edentials/domainverification?project=" + projectID | |
|
iannucci
2017/07/08 00:05:25
this should be a console url
Ryan Tseng
2017/07/08 01:47:34
derp. everytime i go to console.cloud.google.com
| |
| 181 verifyURL := "https://www.google.com/webmasters/verifica tion/verification?hl=en-GB&siteUrl=http://" + pubsubModuleHost | |
| 182 logging.WithError(err).Errorf( | |
| 183 c, "The domain has to be verified and added.\n\n "+ | |
| 184 "1. Go to %s\n"+ | |
| 185 "2. Verify the domain\n"+ | |
| 186 "3. Go to %s\n"+ | |
| 187 "4. Add %s to allowed domains\n\n", | |
| 188 verifyURL, registerURL, pubsubModuleHost) | |
| 189 } else { | |
| 190 logging.WithError(err).Errorf(c, "could not create subsc ription %#v", sub) | |
| 191 } | |
| 192 return err | |
| 193 } | |
| 194 // Success! | |
| 195 logging.Infof(c, "successfully created subscription %#v", newSub) | |
| 196 return nil | |
| 197 } | |
| OLD | NEW |