Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1860)

Side by Side Diff: milo/common/pubsub.go

Issue 2955223002: Milo: Buildbucket PubSub ingestion outline (Closed)
Patch Set: Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 package common
2
3 import (
4 "encoding/base64"
5 "fmt"
6 "net/url"
7 "strings"
8 "time"
9
10 "cloud.google.com/go/pubsub"
11 "golang.org/x/net/context"
12
13 "github.com/luci/gae/service/info"
14 "github.com/luci/luci-go/common/logging"
15 "github.com/luci/luci-go/milo/api/config"
16 )
17
18 type PubSubMessage struct {
19 Attributes map[string]string `json:"attributes"`
20 Data string `json:"data"`
21 MessageID string `json:"message_id"`
22 }
23
24 type PubSubSubscription struct {
25 Message PubSubMessage `json:"message"`
26 Subscription string `json:"subscription"`
27 }
28
29 // GetData returns the expanded form of Data (decoded from base64).
30 func (m *PubSubSubscription) GetData() ([]byte, error) {
31 return base64.StdEncoding.DecodeString(m.Message.Data)
32 }
33
34 // ensurePubSubSubscribed makes sure the following subscriptions are in place:
35 // * buildbucket, via the settings.Buildbucket.Topic setting
36 func ensurePubSubSubscribed(c context.Context, settings *config.Settings) error {
37 if settings.Buildbucket != nil {
38 err := ensureBuildbucketSubscribed(c, settings.Buildbucket.Proje ct)
39 return err
40 }
41 // TODO(hinoka): Ensure buildbot subscribed.
42 return nil
43 }
44
45 // ensureSubscribed is called by a cron job and ensures that the Milo
46 // instance is properly subscribed to the buildbucket subscription endpoint.
47 func ensureBuildbucketSubscribed(c context.Context, projectID string) error {
48 topicID := "builds"
Ryan Tseng 2017/06/28 22:14:18 This is hardcoded in the buildbucket code
49 if projectID == "" {
50 return fmt.Errorf("missing buildbucket project", projectID)
51 }
52 client, err := pubsub.NewClient(c, projectID)
53 if err != nil {
54 return err
55 }
56 // Check to see if the topic exists first.
57 topic := client.Topic(topicID)
58 if exists, err := topic.Exists(c); err != nil {
59 if strings.Contains(err.Error(), "PermissionDenied") {
60 URL := "https://console.cloud.google.com/iam-admin/iam/p roject?project=" + projectID
61 acct, serr := info.ServiceAccount(c)
62 if serr != nil {
63 acct = fmt.Sprintf("Unknown: %s", serr.Error())
64 }
65 // The documentation is incorrect. The following permis sions are needed:
66 // PubSub Viewer - To view Topic (This action)
67 // PubSub Editor - To subscribe to Topics (Below)
68 // PubSub Subscriber does NOT allow viewing topics, and it does NOT
69 // allow for attaching subscriptions to topics.
70 logging.WithError(err).Errorf(
71 c, "please go to %s and add %s as a Pub/Sub Edit or", URL, acct)
72 } else {
73 logging.WithError(err).Errorf(c, "could not check topic %#v", topic)
74 }
75 return err
76 } else if !exists {
77 err = fmt.Errorf("topic %#v does not exist", topic)
78 logging.Errorf(c, err.Error())
79 return err
80 }
81
82 // Now check to see if the subscription already exists.
83 subID := info.AppID(c)
84 // Get the pubsub module of our app. We do not want to use info.ModuleH ostname()
85 // because it returns a version pinned hostname instead of the default r oute.
86 pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c)
87 sub := client.Subscription(subID)
88 if exists, err := sub.Exists(c); err != nil {
89 logging.WithError(err).Errorf(c, "could not check subscription % #v", sub)
90 return err
91 } else if exists {
92 logging.Infof(c, "subscription %#v exists, no need to update", s ub)
93 return nil
94 }
95 if err != nil {
96 logging.WithError(err).Errorf(c, "could not get hostname for pub sub module")
97 return err
98 }
99 endpointURL := url.URL{
100 Scheme: "https",
101 Host: pubsubModuleHost,
102 Path: "/_ah/push-handlers/buildbucket",
103 }
104 subConfig := pubsub.SubscriptionConfig{
105 Topic: topic,
106 PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()},
107 AckDeadline: time.Minute * 10,
108 }
109 newSub, err := client.CreateSubscription(c, subID, subConfig)
110 if err != nil {
111 if strings.Contains(err.Error(), "The supplied HTTP URL is not r egistered") {
112 registerURL := "https://pantheon.corp.google.com/apis/cr edentials/domainverification?project=" + projectID
113 verifyURL := "https://www.google.com/webmasters/verifica tion/verification?hl=en-GB&siteUrl=http://" + pubsubModuleHost
114 logging.WithError(err).Errorf(
115 c, "The domain has to be verified and added.\n\n "+
116 "1. Go to %s\n"+
117 "2. Verify the domain\n"+
118 "3. Go to %s\n"+
119 "4. Add %s to allowed domain\n\n",
120 verifyURL, registerURL, pubsubModuleHost)
121 } else {
122 logging.WithError(err).Errorf(c, "could not create subsc ription %#v", sub)
123 }
124 return err
125 }
126 // Success!
127 logging.Infof(c, "successfully created subscription %#v", newSub)
128 return nil
129 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698