Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(72)

Side by Side Diff: milo/common/pubsub.go

Issue 2955223002: Milo: Buildbucket PubSub ingestion outline (Closed)
Patch Set: Add tests Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 package common
2
3 import (
4 "encoding/base64"
5 "errors"
6 "fmt"
7 "net/url"
8 "strings"
9 "time"
10
11 "cloud.google.com/go/pubsub"
12 "golang.org/x/net/context"
13
14 "github.com/luci/gae/service/info"
15 "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/milo/api/config"
17 )
18
19 var pubSubClientKey = "stores a pubsubClient"
20
21 type PubSubMessage struct {
22 Attributes map[string]string `json:"attributes"`
23 Data string `json:"data"`
24 MessageID string `json:"message_id"`
25 }
26
27 type PubSubSubscription struct {
28 Message PubSubMessage `json:"message"`
29 Subscription string `json:"subscription"`
30 }
31
32 var errNotExist = errors.New("does not exist")
33
34 // GetData returns the expanded form of Data (decoded from base64).
35 func (m *PubSubSubscription) GetData() ([]byte, error) {
36 return base64.StdEncoding.DecodeString(m.Message.Data)
37 }
38
39 // pubsubClient is an interface representing a pubsub.Client containing only
40 // the functions that Milo calls. Internal use only, can be swapped
41 // out for testing.
42 type pubsubClient interface {
43 getTopic(string) (*pubsub.Topic, error)
44 getSubscription(string) (*pubsub.Subscription, error)
45 createSubscription(string, pubsub.SubscriptionConfig) (
46 *pubsub.Subscription, error)
47 }
48
49 // prodPubSubClient is a wrapper around the production pubsub client.
50 type prodPubSubClient struct {
51 ctx context.Context
52 client *pubsub.Client
53 }
54
55 // getTopic returns the pubsub topic if it exists, a notExist error if
56 // it does not exist, or an error if there was an error.
iannucci 2017/07/08 00:05:25 move the docstrings to the interface
Ryan Tseng 2017/07/08 01:47:34 Done.
57 func (pc *prodPubSubClient) getTopic(id string) (*pubsub.Topic, error) {
58 topic := pc.client.Topic(id)
59 exists, err := topic.Exists(pc.ctx)
60 switch {
61 case err != nil:
62 return nil, err
63 case !exists:
64 return nil, errNotExist
65 }
66 return topic, nil
67 }
68
69 // getSubscription returns the pubsub subscriptionif it exists,
iannucci 2017/07/08 00:05:25 nit: missing space
Ryan Tseng 2017/07/08 01:47:34 Done.
70 // a notExist error if it does not exist, or an error if there was an error.
71 func (pc *prodPubSubClient) getSubscription(id string) (
72 *pubsub.Subscription, error) {
iannucci 2017/07/08 00:05:25 no newline... 80 cols is not a thing in go
Ryan Tseng 2017/07/08 01:47:34 Done.
73 sub := pc.client.Subscription(id)
74 exists, err := sub.Exists(pc.ctx)
75 switch {
76 case err != nil:
77 return nil, err
78 case !exists:
79 return nil, errNotExist
80 }
81 return sub, nil
82 }
83
84 func (pc *prodPubSubClient) createSubscription(
85 id string, cfg pubsub.SubscriptionConfig) (
iannucci 2017/07/08 00:05:25 nix newlines
Ryan Tseng 2017/07/08 01:47:34 Done.
86 *pubsub.Subscription, error) {
87
88 return pc.client.CreateSubscription(pc.ctx, id, cfg)
89 }
90
91 // getPubSubClient tries to extract a debug PubSub client out of the context.
iannucci 2017/07/08 00:05:25 I would consider instead just having GetTopic(cont
Ryan Tseng 2017/07/08 01:47:34 Done.
92 // if none exists, then return a prod client.
93 func getPubSubClient(c context.Context, projectID string) (pubsubClient, error) {
94 if client, ok := c.Value(&pubSubClientKey).(pubsubClient); ok {
95 return client, nil
96 }
97 client, err := pubsub.NewClient(c, projectID)
98 return &prodPubSubClient{c, client}, err
99 }
100
101 // EnsurePubSubSubscribed makes sure the following subscriptions are in place:
102 // * buildbucket, via the settings.Buildbucket.Topic setting
103 func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error {
104 if settings.Buildbucket != nil {
105 err := ensureBuildbucketSubscribed(c, settings.Buildbucket.Proje ct)
106 return err
107 }
108 // TODO(hinoka): Ensure buildbot subscribed.
109 return nil
110 }
111
112 // ensureSubscribed is called by a cron job and ensures that the Milo
113 // instance is properly subscribed to the buildbucket subscription endpoint.
114 func ensureBuildbucketSubscribed(c context.Context, projectID string) error {
115 topicID := "builds"
116 if projectID == "" {
117 return fmt.Errorf("missing buildbucket project")
iannucci 2017/07/08 00:05:25 errors.New
Ryan Tseng 2017/07/08 01:47:33 Done.
118 }
119 client, err := getPubSubClient(c, projectID)
120 if err != nil {
121 return err
122 }
123 // Check to see if the topic exists first.
124 topic, err := client.getTopic(topicID)
125 switch err {
126 case errNotExist:
127 err = fmt.Errorf("topic %#v does not exist", topic)
iannucci 2017/07/08 00:05:25 why not have getTopic do this?
Ryan Tseng 2017/07/08 01:47:34 Done.
128 logging.Errorf(c, err.Error())
129 return err
130 case nil:
131 // continue
132 default:
133 if strings.Contains(err.Error(), "PermissionDenied") {
iannucci 2017/07/08 00:05:25 oof, is this the best way?
Ryan Tseng 2017/07/08 01:47:34 As far as I can tell :(
134 URL := "https://console.cloud.google.com/iam-admin/iam/p roject?project=" + projectID
iannucci 2017/07/08 00:05:25 may be worth figuring out the gcloud sdk command f
Ryan Tseng 2017/07/08 01:47:34 gcloud doesn't appear to be able to add/manipulate
135 acct, serr := info.ServiceAccount(c)
136 if serr != nil {
137 acct = fmt.Sprintf("Unknown: %s", serr.Error())
138 }
139 // The documentation is incorrect. We need Editor permi ssion because
140 // the Subscriber permission does NOT permit attaching s ubscriptions to
141 // topics or to view topics.
142 logging.WithError(err).Errorf(
143 c, "please go to %s and add %s as a Pub/Sub Edit or", URL, acct)
144 } else {
145 logging.WithError(err).Errorf(c, "could not check topic %#v", topic)
146 }
147 return err
148 }
149 // Now check to see if the subscription already exists.
150 subID := info.AppID(c)
151 // Get the pubsub module of our app. We do not want to use info.ModuleH ostname()
152 // because it returns a version pinned hostname instead of the default r oute.
153 sub, err := client.getSubscription(subID)
154 switch err {
155 case errNotExist:
156 // continue
157 case nil:
158 logging.Infof(c, "subscription %#v exists, no need to update", s ub)
159 return nil
160 default:
161 logging.WithError(err).Errorf(c, "could not check subscription % #v", sub)
162 return err
163 }
164 pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c)
165
166 // No subscription exists, attach a new subscription to the existing top ic.
167 endpointURL := url.URL{
168 Scheme: "https",
169 Host: pubsubModuleHost,
170 Path: "/_ah/push-handlers/buildbucket",
171 }
172 subConfig := pubsub.SubscriptionConfig{
173 Topic: topic,
174 PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()},
175 AckDeadline: time.Minute * 10,
176 }
177 newSub, err := client.createSubscription(subID, subConfig)
178 if err != nil {
179 if strings.Contains(err.Error(), "The supplied HTTP URL is not r egistered") {
180 registerURL := "https://pantheon.corp.google.com/apis/cr edentials/domainverification?project=" + projectID
iannucci 2017/07/08 00:05:25 this should be a console url
Ryan Tseng 2017/07/08 01:47:34 derp. everytime i go to console.cloud.google.com
181 verifyURL := "https://www.google.com/webmasters/verifica tion/verification?hl=en-GB&siteUrl=http://" + pubsubModuleHost
182 logging.WithError(err).Errorf(
183 c, "The domain has to be verified and added.\n\n "+
184 "1. Go to %s\n"+
185 "2. Verify the domain\n"+
186 "3. Go to %s\n"+
187 "4. Add %s to allowed domains\n\n",
188 verifyURL, registerURL, pubsubModuleHost)
189 } else {
190 logging.WithError(err).Errorf(c, "could not create subsc ription %#v", sub)
191 }
192 return err
193 }
194 // Success!
195 logging.Infof(c, "successfully created subscription %#v", newSub)
196 return nil
197 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698