| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package engine | 5 package engine |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "net/http" | 8 "net/http" |
| 9 "sort" | 9 "sort" |
| 10 "strings" | 10 "strings" |
| 11 | 11 |
| 12 "golang.org/x/net/context" | 12 "golang.org/x/net/context" |
| 13 "google.golang.org/api/googleapi" | 13 "google.golang.org/api/googleapi" |
| 14 "google.golang.org/api/pubsub/v1" | 14 "google.golang.org/api/pubsub/v1" |
| 15 | 15 |
| 16 "github.com/luci/luci-go/common/data/stringset" | 16 "github.com/luci/luci-go/common/data/stringset" |
| 17 "github.com/luci/luci-go/common/errors" | 17 "github.com/luci/luci-go/common/errors" |
| 18 "github.com/luci/luci-go/common/logging" | 18 "github.com/luci/luci-go/common/logging" |
| 19 "github.com/luci/luci-go/common/retry" |
| 19 "github.com/luci/luci-go/server/auth" | 20 "github.com/luci/luci-go/server/auth" |
| 20 ) | 21 ) |
| 21 | 22 |
| 22 // createPubSubService returns configured instance of pubsub.Service. | 23 // createPubSubService returns configured instance of pubsub.Service. |
| 23 func createPubSubService(c context.Context, pubSubURL string) (*pubsub.Service,
error) { | 24 func createPubSubService(c context.Context, pubSubURL string) (*pubsub.Service,
error) { |
| 24 // In real mode (not a unit test), use authenticated transport. | 25 // In real mode (not a unit test), use authenticated transport. |
| 25 var transport http.RoundTripper | 26 var transport http.RoundTripper |
| 26 if pubSubURL == "" { | 27 if pubSubURL == "" { |
| 27 var err error | 28 var err error |
| 28 transport, err = auth.GetRPCTransport(c, auth.AsSelf, auth.WithS
copes(pubsub.PubsubScope)) | 29 transport, err = auth.GetRPCTransport(c, auth.AsSelf, auth.WithS
copes(pubsub.PubsubScope)) |
| (...skipping 24 matching lines...) Expand all Loading... |
| 53 service, err := createPubSubService(c, pubSubURL) | 54 service, err := createPubSubService(c, pubSubURL) |
| 54 if err != nil { | 55 if err != nil { |
| 55 return err | 56 return err |
| 56 } | 57 } |
| 57 | 58 |
| 58 // Create the topic. Ignore HTTP 409 (it means the topic already exists)
. | 59 // Create the topic. Ignore HTTP 409 (it means the topic already exists)
. |
| 59 logging.Infof(c, "Ensuring topic %q exists", topic) | 60 logging.Infof(c, "Ensuring topic %q exists", topic) |
| 60 _, err = service.Projects.Topics.Create(topic, &pubsub.Topic{}).Context(
c).Do() | 61 _, err = service.Projects.Topics.Create(topic, &pubsub.Topic{}).Context(
c).Do() |
| 61 if err != nil && !isHTTP409(err) { | 62 if err != nil && !isHTTP409(err) { |
| 62 logging.Errorf(c, "Failed - %s", err) | 63 logging.Errorf(c, "Failed - %s", err) |
| 63 » » return errors.WrapTransient(err) | 64 » » return retry.Tag.Apply(err) |
| 64 } | 65 } |
| 65 | 66 |
| 66 // Create the subscription to this topic. Ignore HTTP 409. | 67 // Create the subscription to this topic. Ignore HTTP 409. |
| 67 logging.Infof(c, "Ensuring subscription %q exists", sub) | 68 logging.Infof(c, "Ensuring subscription %q exists", sub) |
| 68 _, err = service.Projects.Subscriptions.Create(sub, &pubsub.Subscription
{ | 69 _, err = service.Projects.Subscriptions.Create(sub, &pubsub.Subscription
{ |
| 69 Topic: topic, | 70 Topic: topic, |
| 70 AckDeadlineSeconds: 70, // GAE request timeout plus some spare t
ime | 71 AckDeadlineSeconds: 70, // GAE request timeout plus some spare t
ime |
| 71 PushConfig: &pubsub.PushConfig{ | 72 PushConfig: &pubsub.PushConfig{ |
| 72 PushEndpoint: pushURL, // if "", the subscription will b
e pull based | 73 PushEndpoint: pushURL, // if "", the subscription will b
e pull based |
| 73 }, | 74 }, |
| 74 }).Context(c).Do() | 75 }).Context(c).Do() |
| 75 if err != nil && !isHTTP409(err) { | 76 if err != nil && !isHTTP409(err) { |
| 76 logging.Errorf(c, "Failed - %s", err) | 77 logging.Errorf(c, "Failed - %s", err) |
| 77 » » return errors.WrapTransient(err) | 78 » » return retry.Tag.Apply(err) |
| 78 } | 79 } |
| 79 | 80 |
| 80 // Modify topic's IAM policy to allow publisher to publish. | 81 // Modify topic's IAM policy to allow publisher to publish. |
| 81 if strings.HasSuffix(publisher, ".gserviceaccount.com") { | 82 if strings.HasSuffix(publisher, ".gserviceaccount.com") { |
| 82 publisher = "serviceAccount:" + publisher | 83 publisher = "serviceAccount:" + publisher |
| 83 } else { | 84 } else { |
| 84 publisher = "user:" + publisher | 85 publisher = "user:" + publisher |
| 85 } | 86 } |
| 86 logging.Infof(c, "Ensuring %q can publish to the topic", publisher) | 87 logging.Infof(c, "Ensuring %q can publish to the topic", publisher) |
| 87 | 88 |
| 88 // Do two attempts, to account for possible race condition. Two attempts | 89 // Do two attempts, to account for possible race condition. Two attempts |
| 89 // should be enough to handle concurrent calls to 'configureTopic': seco
nd | 90 // should be enough to handle concurrent calls to 'configureTopic': seco
nd |
| 90 // attempt will read already correct IAM policy and will just end right
away. | 91 // attempt will read already correct IAM policy and will just end right
away. |
| 91 for attempt := 0; attempt < 2; attempt++ { | 92 for attempt := 0; attempt < 2; attempt++ { |
| 92 err = modifyTopicIAMPolicy(c, service, topic, func(policy iamPol
icy) error { | 93 err = modifyTopicIAMPolicy(c, service, topic, func(policy iamPol
icy) error { |
| 93 policy.grantRole("roles/pubsub.publisher", publisher) | 94 policy.grantRole("roles/pubsub.publisher", publisher) |
| 94 return nil | 95 return nil |
| 95 }) | 96 }) |
| 96 if err == nil { | 97 if err == nil { |
| 97 return nil | 98 return nil |
| 98 } | 99 } |
| 99 logging.Errorf(c, "Failed - %s", err) | 100 logging.Errorf(c, "Failed - %s", err) |
| 100 } | 101 } |
| 101 » return errors.WrapTransient(err) | 102 » return retry.Tag.Apply(err) |
| 102 } | 103 } |
| 103 | 104 |
| 104 // pullSubcription pulls one message from PubSub subscription. | 105 // pullSubcription pulls one message from PubSub subscription. |
| 105 // | 106 // |
| 106 // Used on dev server only. Returns the message and callback to call to | 107 // Used on dev server only. Returns the message and callback to call to |
| 107 // acknowledge the message. | 108 // acknowledge the message. |
| 108 func pullSubcription(c context.Context, subscription, pubSubURL string) (*pubsub
.PubsubMessage, func(), error) { | 109 func pullSubcription(c context.Context, subscription, pubSubURL string) (*pubsub
.PubsubMessage, func(), error) { |
| 109 service, err := createPubSubService(c, pubSubURL) | 110 service, err := createPubSubService(c, pubSubURL) |
| 110 if err != nil { | 111 if err != nil { |
| 111 return nil, nil, err | 112 return nil, nil, err |
| (...skipping 131 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 243 func (p iamPolicy) grantRole(role, principal string) { | 244 func (p iamPolicy) grantRole(role, principal string) { |
| 244 switch existing := p[role]; { | 245 switch existing := p[role]; { |
| 245 case existing != nil && existing.Has(principal): // already there | 246 case existing != nil && existing.Has(principal): // already there |
| 246 return | 247 return |
| 247 case existing != nil: // the role is there, but not the principal | 248 case existing != nil: // the role is there, but not the principal |
| 248 existing.Add(principal) | 249 existing.Add(principal) |
| 249 default: | 250 default: |
| 250 p[role] = stringset.NewFromSlice(principal) | 251 p[role] = stringset.NewFromSlice(principal) |
| 251 } | 252 } |
| 252 } | 253 } |
| OLD | NEW |