OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package engine | 5 package engine |
6 | 6 |
7 import ( | 7 import ( |
8 "net/http" | 8 "net/http" |
9 "sort" | 9 "sort" |
10 "strings" | 10 "strings" |
11 | 11 |
12 "golang.org/x/net/context" | 12 "golang.org/x/net/context" |
13 "google.golang.org/api/googleapi" | 13 "google.golang.org/api/googleapi" |
14 "google.golang.org/api/pubsub/v1" | 14 "google.golang.org/api/pubsub/v1" |
15 | 15 |
16 "github.com/luci/luci-go/common/data/stringset" | 16 "github.com/luci/luci-go/common/data/stringset" |
17 "github.com/luci/luci-go/common/errors" | 17 "github.com/luci/luci-go/common/errors" |
18 "github.com/luci/luci-go/common/logging" | 18 "github.com/luci/luci-go/common/logging" |
| 19 "github.com/luci/luci-go/common/retry/transient" |
19 "github.com/luci/luci-go/server/auth" | 20 "github.com/luci/luci-go/server/auth" |
20 ) | 21 ) |
21 | 22 |
22 // createPubSubService returns configured instance of pubsub.Service. | 23 // createPubSubService returns configured instance of pubsub.Service. |
23 func createPubSubService(c context.Context, pubSubURL string) (*pubsub.Service,
error) { | 24 func createPubSubService(c context.Context, pubSubURL string) (*pubsub.Service,
error) { |
24 // In real mode (not a unit test), use authenticated transport. | 25 // In real mode (not a unit test), use authenticated transport. |
25 var transport http.RoundTripper | 26 var transport http.RoundTripper |
26 if pubSubURL == "" { | 27 if pubSubURL == "" { |
27 var err error | 28 var err error |
28 transport, err = auth.GetRPCTransport(c, auth.AsSelf, auth.WithS
copes(pubsub.PubsubScope)) | 29 transport, err = auth.GetRPCTransport(c, auth.AsSelf, auth.WithS
copes(pubsub.PubsubScope)) |
(...skipping 24 matching lines...) Expand all Loading... |
53 service, err := createPubSubService(c, pubSubURL) | 54 service, err := createPubSubService(c, pubSubURL) |
54 if err != nil { | 55 if err != nil { |
55 return err | 56 return err |
56 } | 57 } |
57 | 58 |
58 // Create the topic. Ignore HTTP 409 (it means the topic already exists)
. | 59 // Create the topic. Ignore HTTP 409 (it means the topic already exists)
. |
59 logging.Infof(c, "Ensuring topic %q exists", topic) | 60 logging.Infof(c, "Ensuring topic %q exists", topic) |
60 _, err = service.Projects.Topics.Create(topic, &pubsub.Topic{}).Context(
c).Do() | 61 _, err = service.Projects.Topics.Create(topic, &pubsub.Topic{}).Context(
c).Do() |
61 if err != nil && !isHTTP409(err) { | 62 if err != nil && !isHTTP409(err) { |
62 logging.Errorf(c, "Failed - %s", err) | 63 logging.Errorf(c, "Failed - %s", err) |
63 » » return errors.WrapTransient(err) | 64 » » return transient.Tag.Apply(err) |
64 } | 65 } |
65 | 66 |
66 // Create the subscription to this topic. Ignore HTTP 409. | 67 // Create the subscription to this topic. Ignore HTTP 409. |
67 logging.Infof(c, "Ensuring subscription %q exists", sub) | 68 logging.Infof(c, "Ensuring subscription %q exists", sub) |
68 _, err = service.Projects.Subscriptions.Create(sub, &pubsub.Subscription
{ | 69 _, err = service.Projects.Subscriptions.Create(sub, &pubsub.Subscription
{ |
69 Topic: topic, | 70 Topic: topic, |
70 AckDeadlineSeconds: 70, // GAE request timeout plus some spare t
ime | 71 AckDeadlineSeconds: 70, // GAE request timeout plus some spare t
ime |
71 PushConfig: &pubsub.PushConfig{ | 72 PushConfig: &pubsub.PushConfig{ |
72 PushEndpoint: pushURL, // if "", the subscription will b
e pull based | 73 PushEndpoint: pushURL, // if "", the subscription will b
e pull based |
73 }, | 74 }, |
74 }).Context(c).Do() | 75 }).Context(c).Do() |
75 if err != nil && !isHTTP409(err) { | 76 if err != nil && !isHTTP409(err) { |
76 logging.Errorf(c, "Failed - %s", err) | 77 logging.Errorf(c, "Failed - %s", err) |
77 » » return errors.WrapTransient(err) | 78 » » return transient.Tag.Apply(err) |
78 } | 79 } |
79 | 80 |
80 // Modify topic's IAM policy to allow publisher to publish. | 81 // Modify topic's IAM policy to allow publisher to publish. |
81 if strings.HasSuffix(publisher, ".gserviceaccount.com") { | 82 if strings.HasSuffix(publisher, ".gserviceaccount.com") { |
82 publisher = "serviceAccount:" + publisher | 83 publisher = "serviceAccount:" + publisher |
83 } else { | 84 } else { |
84 publisher = "user:" + publisher | 85 publisher = "user:" + publisher |
85 } | 86 } |
86 logging.Infof(c, "Ensuring %q can publish to the topic", publisher) | 87 logging.Infof(c, "Ensuring %q can publish to the topic", publisher) |
87 | 88 |
88 // Do two attempts, to account for possible race condition. Two attempts | 89 // Do two attempts, to account for possible race condition. Two attempts |
89 // should be enough to handle concurrent calls to 'configureTopic': seco
nd | 90 // should be enough to handle concurrent calls to 'configureTopic': seco
nd |
90 // attempt will read already correct IAM policy and will just end right
away. | 91 // attempt will read already correct IAM policy and will just end right
away. |
91 for attempt := 0; attempt < 2; attempt++ { | 92 for attempt := 0; attempt < 2; attempt++ { |
92 err = modifyTopicIAMPolicy(c, service, topic, func(policy iamPol
icy) error { | 93 err = modifyTopicIAMPolicy(c, service, topic, func(policy iamPol
icy) error { |
93 policy.grantRole("roles/pubsub.publisher", publisher) | 94 policy.grantRole("roles/pubsub.publisher", publisher) |
94 return nil | 95 return nil |
95 }) | 96 }) |
96 if err == nil { | 97 if err == nil { |
97 return nil | 98 return nil |
98 } | 99 } |
99 logging.Errorf(c, "Failed - %s", err) | 100 logging.Errorf(c, "Failed - %s", err) |
100 } | 101 } |
101 » return errors.WrapTransient(err) | 102 » return transient.Tag.Apply(err) |
102 } | 103 } |
103 | 104 |
104 // pullSubcription pulls one message from PubSub subscription. | 105 // pullSubcription pulls one message from PubSub subscription. |
105 // | 106 // |
106 // Used on dev server only. Returns the message and callback to call to | 107 // Used on dev server only. Returns the message and callback to call to |
107 // acknowledge the message. | 108 // acknowledge the message. |
108 func pullSubcription(c context.Context, subscription, pubSubURL string) (*pubsub
.PubsubMessage, func(), error) { | 109 func pullSubcription(c context.Context, subscription, pubSubURL string) (*pubsub
.PubsubMessage, func(), error) { |
109 service, err := createPubSubService(c, pubSubURL) | 110 service, err := createPubSubService(c, pubSubURL) |
110 if err != nil { | 111 if err != nil { |
111 return nil, nil, err | 112 return nil, nil, err |
(...skipping 131 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
243 func (p iamPolicy) grantRole(role, principal string) { | 244 func (p iamPolicy) grantRole(role, principal string) { |
244 switch existing := p[role]; { | 245 switch existing := p[role]; { |
245 case existing != nil && existing.Has(principal): // already there | 246 case existing != nil && existing.Has(principal): // already there |
246 return | 247 return |
247 case existing != nil: // the role is there, but not the principal | 248 case existing != nil: // the role is there, but not the principal |
248 existing.Add(principal) | 249 existing.Add(principal) |
249 default: | 250 default: |
250 p[role] = stringset.NewFromSlice(principal) | 251 p[role] = stringset.NewFromSlice(principal) |
251 } | 252 } |
252 } | 253 } |
OLD | NEW |