Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 package common | |
| 2 | |
| 3 import ( | |
| 4 "encoding/base64" | |
| 5 "fmt" | |
| 6 "net/url" | |
| 7 "strings" | |
| 8 "time" | |
| 9 | |
| 10 "cloud.google.com/go/pubsub" | |
| 11 "golang.org/x/net/context" | |
| 12 | |
| 13 "github.com/luci/gae/service/info" | |
| 14 "github.com/luci/luci-go/common/logging" | |
| 15 "github.com/luci/luci-go/milo/api/config" | |
| 16 ) | |
| 17 | |
| 18 type PubSubMessage struct { | |
| 19 Attributes map[string]string `json:"attributes"` | |
| 20 Data string `json:"data"` | |
| 21 MessageID string `json:"message_id"` | |
| 22 } | |
| 23 | |
| 24 type PubSubSubscription struct { | |
| 25 Message PubSubMessage `json:"message"` | |
| 26 Subscription string `json:"subscription"` | |
| 27 } | |
| 28 | |
| 29 // GetData returns the expanded form of Data (decoded from base64). | |
| 30 func (m *PubSubSubscription) GetData() ([]byte, error) { | |
| 31 return base64.StdEncoding.DecodeString(m.Message.Data) | |
| 32 } | |
| 33 | |
| 34 // ensurePubSubSubscribed makes sure the following subscriptions are in place: | |
| 35 // * buildbucket, via the settings.Buildbucket.Topic setting | |
| 36 func ensurePubSubSubscribed(c context.Context, settings *config.Settings) error { | |
| 37 if settings.Buildbucket != nil { | |
| 38 err := ensureBuildbucketSubscribed(c, settings.Buildbucket.Proje ct) | |
| 39 return err | |
| 40 } | |
| 41 // TODO(hinoka): Ensure buildbot subscribed. | |
| 42 return nil | |
| 43 } | |
| 44 | |
| 45 // ensureSubscribed is called by a cron job and ensures that the Milo | |
| 46 // instance is properly subscribed to the buildbucket subscription endpoint. | |
| 47 func ensureBuildbucketSubscribed(c context.Context, projectID string) error { | |
| 48 topicID := "builds" | |
|
Ryan Tseng
2017/06/28 22:14:18
This is hardcoded in the buildbucket code
| |
| 49 if projectID == "" { | |
| 50 return fmt.Errorf("missing buildbucket project", projectID) | |
| 51 } | |
| 52 client, err := pubsub.NewClient(c, projectID) | |
| 53 if err != nil { | |
| 54 return err | |
| 55 } | |
| 56 // Check to see if the topic exists first. | |
| 57 topic := client.Topic(topicID) | |
| 58 if exists, err := topic.Exists(c); err != nil { | |
| 59 if strings.Contains(err.Error(), "PermissionDenied") { | |
| 60 URL := "https://console.cloud.google.com/iam-admin/iam/p roject?project=" + projectID | |
| 61 acct, serr := info.ServiceAccount(c) | |
| 62 if serr != nil { | |
| 63 acct = fmt.Sprintf("Unknown: %s", serr.Error()) | |
| 64 } | |
| 65 // The documentation is incorrect. The following permis sions are needed: | |
| 66 // PubSub Viewer - To view Topic (This action) | |
| 67 // PubSub Editor - To subscribe to Topics (Below) | |
| 68 // PubSub Subscriber does NOT allow viewing topics, and it does NOT | |
| 69 // allow for attaching subscriptions to topics. | |
| 70 logging.WithError(err).Errorf( | |
| 71 c, "please go to %s and add %s as a Pub/Sub Edit or", URL, acct) | |
| 72 } else { | |
| 73 logging.WithError(err).Errorf(c, "could not check topic %#v", topic) | |
| 74 } | |
| 75 return err | |
| 76 } else if !exists { | |
| 77 err = fmt.Errorf("topic %#v does not exist", topic) | |
| 78 logging.Errorf(c, err.Error()) | |
| 79 return err | |
| 80 } | |
| 81 | |
| 82 // Now check to see if the subscription already exists. | |
| 83 subID := info.AppID(c) | |
| 84 // Get the pubsub module of our app. We do not want to use info.ModuleH ostname() | |
| 85 // because it returns a version pinned hostname instead of the default r oute. | |
| 86 pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c) | |
| 87 sub := client.Subscription(subID) | |
| 88 if exists, err := sub.Exists(c); err != nil { | |
| 89 logging.WithError(err).Errorf(c, "could not check subscription % #v", sub) | |
| 90 return err | |
| 91 } else if exists { | |
| 92 logging.Infof(c, "subscription %#v exists, no need to update", s ub) | |
| 93 return nil | |
| 94 } | |
| 95 if err != nil { | |
| 96 logging.WithError(err).Errorf(c, "could not get hostname for pub sub module") | |
| 97 return err | |
| 98 } | |
| 99 endpointURL := url.URL{ | |
| 100 Scheme: "https", | |
| 101 Host: pubsubModuleHost, | |
| 102 Path: "/_ah/push-handlers/buildbucket", | |
| 103 } | |
| 104 subConfig := pubsub.SubscriptionConfig{ | |
| 105 Topic: topic, | |
| 106 PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()}, | |
| 107 AckDeadline: time.Minute * 10, | |
| 108 } | |
| 109 newSub, err := client.CreateSubscription(c, subID, subConfig) | |
| 110 if err != nil { | |
| 111 if strings.Contains(err.Error(), "The supplied HTTP URL is not r egistered") { | |
| 112 registerURL := "https://pantheon.corp.google.com/apis/cr edentials/domainverification?project=" + projectID | |
| 113 verifyURL := "https://www.google.com/webmasters/verifica tion/verification?hl=en-GB&siteUrl=http://" + pubsubModuleHost | |
| 114 logging.WithError(err).Errorf( | |
| 115 c, "The domain has to be verified and added.\n\n "+ | |
| 116 "1. Go to %s\n"+ | |
| 117 "2. Verify the domain\n"+ | |
| 118 "3. Go to %s\n"+ | |
| 119 "4. Add %s to allowed domain\n\n", | |
| 120 verifyURL, registerURL, pubsubModuleHost) | |
| 121 } else { | |
| 122 logging.WithError(err).Errorf(c, "could not create subsc ription %#v", sub) | |
| 123 } | |
| 124 return err | |
| 125 } | |
| 126 // Success! | |
| 127 logging.Infof(c, "successfully created subscription %#v", newSub) | |
| 128 return nil | |
| 129 } | |
| OLD | NEW |