Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(592)

Side by Side Diff: milo/common/pubsub.go

Issue 2955223002: Milo: Buildbucket PubSub ingestion outline (Closed)
Patch Set: rebase Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « milo/common/config_test.go ('k') | milo/common/pubsub_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 package common
2
3 import (
4 "encoding/base64"
5 "errors"
6 "fmt"
7 "net/url"
8 "strings"
9 "time"
10
11 "cloud.google.com/go/pubsub"
12 "golang.org/x/net/context"
13
14 "github.com/luci/gae/service/info"
15 "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/milo/api/config"
17 )
18
19 var pubSubClientKey = "stores a pubsubClient"
20
21 type PubSubMessage struct {
22 Attributes map[string]string `json:"attributes"`
23 Data string `json:"data"`
24 MessageID string `json:"message_id"`
25 }
26
27 type PubSubSubscription struct {
28 Message PubSubMessage `json:"message"`
29 Subscription string `json:"subscription"`
30 }
31
32 var errNotExist = errors.New("does not exist")
33
34 // GetData returns the expanded form of Data (decoded from base64).
35 func (m *PubSubSubscription) GetData() ([]byte, error) {
36 return base64.StdEncoding.DecodeString(m.Message.Data)
37 }
38
39 // pubsubClient is an interface representing a pubsub.Client containing only
40 // the functions that Milo calls. Internal use only, can be swapped
41 // out for testing.
42 type pubsubClient interface {
43 // getTopic returns the pubsub topic if it exists, a notExist error if
44 // it does not exist, or an error if there was an error.
45 getTopic(string) (*pubsub.Topic, error)
46
47 // getSubscription returns the pubsub subscription if it exists,
48 // a notExist error if it does not exist, or an error if there was an er ror.
49 getSubscription(string) (*pubsub.Subscription, error)
50
51 createSubscription(string, pubsub.SubscriptionConfig) (
52 *pubsub.Subscription, error)
53 }
54
55 // prodPubSubClient is a wrapper around the production pubsub client.
56 type prodPubSubClient struct {
57 ctx context.Context
58 client *pubsub.Client
59 }
60
61 func (pc *prodPubSubClient) getTopic(id string) (*pubsub.Topic, error) {
62 topic := pc.client.Topic(id)
63 exists, err := topic.Exists(pc.ctx)
64 switch {
65 case err != nil:
66 return nil, err
67 case !exists:
68 return nil, errNotExist
69 }
70 return topic, nil
71 }
72
73 func (pc *prodPubSubClient) getSubscription(id string) (*pubsub.Subscription, er ror) {
74 sub := pc.client.Subscription(id)
75 exists, err := sub.Exists(pc.ctx)
76 switch {
77 case err != nil:
78 return nil, err
79 case !exists:
80 return nil, errNotExist
81 }
82 return sub, nil
83 }
84
85 func (pc *prodPubSubClient) createSubscription(id string, cfg pubsub.Subscriptio nConfig) (
86 *pubsub.Subscription, error) {
87
88 return pc.client.CreateSubscription(pc.ctx, id, cfg)
89 }
90
91 // getPubSubClient extracts a debug PubSub client out of the context.
92 func getPubSubClient(c context.Context) (pubsubClient, error) {
93 if client, ok := c.Value(&pubSubClientKey).(pubsubClient); ok {
94 return client, nil
95 }
96 return nil, errors.New("no pubsub clients installed")
97 }
98
99 // withClient returns a context with a pubsub client instantiated to the
100 // given project ID
101 func withClient(c context.Context, projectID string) (context.Context, error) {
102 if projectID == "" {
103 return nil, errors.New("missing buildbucket project")
104 }
105 client, err := pubsub.NewClient(c, projectID)
106 if err != nil {
107 return nil, err
108 }
109 return context.WithValue(c, &pubSubClientKey, &prodPubSubClient{c, clien t}), nil
110 }
111
112 func getTopic(c context.Context, id string) (*pubsub.Topic, error) {
113 client, err := getPubSubClient(c)
114 if err != nil {
115 return nil, err
116 }
117 return client.getTopic(id)
118 }
119
120 func getSubscription(c context.Context, id string) (*pubsub.Subscription, error) {
121 client, err := getPubSubClient(c)
122 if err != nil {
123 return nil, err
124 }
125 return client.getSubscription(id)
126 }
127
128 func createSubscription(c context.Context, id string, cfg pubsub.SubscriptionCon fig) (
129 *pubsub.Subscription, error) {
130
131 client, err := getPubSubClient(c)
132 if err != nil {
133 return nil, err
134 }
135 return client.createSubscription(id, cfg)
136 }
137
138 // EnsurePubSubSubscribed makes sure the following subscriptions are in place:
139 // * buildbucket, via the settings.Buildbucket.Topic setting
140 func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error {
141 if settings.Buildbucket != nil {
142 // Install the production pubsub client pointing to the buildbuc ket project
143 // into the context.
144 c, err := withClient(c, settings.Buildbucket.Project)
145 if err != nil {
146 return err
147 }
148 return ensureBuildbucketSubscribed(c, settings.Buildbucket.Proje ct)
149 }
150 // TODO(hinoka): Ensure buildbot subscribed.
151 return nil
152 }
153
154 // ensureSubscribed is called by a cron job and ensures that the Milo
155 // instance is properly subscribed to the buildbucket subscription endpoint.
156 func ensureBuildbucketSubscribed(c context.Context, projectID string) error {
157 topicID := "builds"
158 // Check to see if the topic exists first.
159 topic, err := getTopic(c, topicID)
160 switch err {
161 case errNotExist:
162 logging.WithError(err).Errorf(c, "%s does not exist", topicID)
163 return err
164 case nil:
165 // continue
166 default:
167 if strings.Contains(err.Error(), "PermissionDenied") {
168 URL := "https://console.cloud.google.com/iam-admin/iam/p roject?project=" + projectID
169 acct, serr := info.ServiceAccount(c)
170 if serr != nil {
171 acct = fmt.Sprintf("Unknown: %s", serr.Error())
172 }
173 // The documentation is incorrect. We need Editor permi ssion because
174 // the Subscriber permission does NOT permit attaching s ubscriptions to
175 // topics or to view topics.
176 logging.WithError(err).Errorf(
177 c, "please go to %s and add %s as a Pub/Sub Edit or", URL, acct)
178 } else {
179 logging.WithError(err).Errorf(c, "could not check topic %#v", topic)
180 }
181 return err
182 }
183 // Now check to see if the subscription already exists.
184 subID := info.AppID(c)
185 // Get the pubsub module of our app. We do not want to use info.ModuleH ostname()
186 // because it returns a version pinned hostname instead of the default r oute.
187 sub, err := getSubscription(c, subID)
188 switch err {
189 case errNotExist:
190 // continue
191 case nil:
192 logging.Infof(c, "subscription %#v exists, no need to update", s ub)
193 return nil
194 default:
195 logging.WithError(err).Errorf(c, "could not check subscription % #v", sub)
196 return err
197 }
198 pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c)
199
200 // No subscription exists, attach a new subscription to the existing top ic.
201 endpointURL := url.URL{
202 Scheme: "https",
203 Host: pubsubModuleHost,
204 Path: "/_ah/push-handlers/buildbucket",
205 }
206 subConfig := pubsub.SubscriptionConfig{
207 Topic: topic,
208 PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()},
209 AckDeadline: time.Minute * 10,
210 }
211 newSub, err := createSubscription(c, subID, subConfig)
212 if err != nil {
213 if strings.Contains(err.Error(), "The supplied HTTP URL is not r egistered") {
214 registerURL := "https://console.cloud.google.com/apis/cr edentials/domainverification?project=" + projectID
215 verifyURL := "https://www.google.com/webmasters/verifica tion/verification?hl=en-GB&siteUrl=http://" + pubsubModuleHost
216 logging.WithError(err).Errorf(
217 c, "The domain has to be verified and added.\n\n "+
218 "1. Go to %s\n"+
219 "2. Verify the domain\n"+
220 "3. Go to %s\n"+
221 "4. Add %s to allowed domains\n\n",
222 verifyURL, registerURL, pubsubModuleHost)
223 } else {
224 logging.WithError(err).Errorf(c, "could not create subsc ription %#v", sub)
225 }
226 return err
227 }
228 // Success!
229 logging.Infof(c, "successfully created subscription %#v", newSub)
230 return nil
231 }
OLDNEW
« no previous file with comments | « milo/common/config_test.go ('k') | milo/common/pubsub_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698