Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1460)

Side by Side Diff: scheduler/appengine/engine/pubsub.go

Issue 2951393002: [errors] de-specialize Transient in favor of Tags. (Closed)
Patch Set: more refactor Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package engine 5 package engine
6 6
7 import ( 7 import (
8 "net/http" 8 "net/http"
9 "sort" 9 "sort"
10 "strings" 10 "strings"
11 11
12 "golang.org/x/net/context" 12 "golang.org/x/net/context"
13 "google.golang.org/api/googleapi" 13 "google.golang.org/api/googleapi"
14 "google.golang.org/api/pubsub/v1" 14 "google.golang.org/api/pubsub/v1"
15 15
16 "github.com/luci/luci-go/common/data/stringset" 16 "github.com/luci/luci-go/common/data/stringset"
17 "github.com/luci/luci-go/common/errors" 17 "github.com/luci/luci-go/common/errors"
18 "github.com/luci/luci-go/common/logging" 18 "github.com/luci/luci-go/common/logging"
19 "github.com/luci/luci-go/common/retry/transient"
19 "github.com/luci/luci-go/server/auth" 20 "github.com/luci/luci-go/server/auth"
20 ) 21 )
21 22
22 // createPubSubService returns configured instance of pubsub.Service. 23 // createPubSubService returns configured instance of pubsub.Service.
23 func createPubSubService(c context.Context, pubSubURL string) (*pubsub.Service, error) { 24 func createPubSubService(c context.Context, pubSubURL string) (*pubsub.Service, error) {
24 // In real mode (not a unit test), use authenticated transport. 25 // In real mode (not a unit test), use authenticated transport.
25 var transport http.RoundTripper 26 var transport http.RoundTripper
26 if pubSubURL == "" { 27 if pubSubURL == "" {
27 var err error 28 var err error
28 transport, err = auth.GetRPCTransport(c, auth.AsSelf, auth.WithS copes(pubsub.PubsubScope)) 29 transport, err = auth.GetRPCTransport(c, auth.AsSelf, auth.WithS copes(pubsub.PubsubScope))
(...skipping 24 matching lines...) Expand all
53 service, err := createPubSubService(c, pubSubURL) 54 service, err := createPubSubService(c, pubSubURL)
54 if err != nil { 55 if err != nil {
55 return err 56 return err
56 } 57 }
57 58
58 // Create the topic. Ignore HTTP 409 (it means the topic already exists) . 59 // Create the topic. Ignore HTTP 409 (it means the topic already exists) .
59 logging.Infof(c, "Ensuring topic %q exists", topic) 60 logging.Infof(c, "Ensuring topic %q exists", topic)
60 _, err = service.Projects.Topics.Create(topic, &pubsub.Topic{}).Context( c).Do() 61 _, err = service.Projects.Topics.Create(topic, &pubsub.Topic{}).Context( c).Do()
61 if err != nil && !isHTTP409(err) { 62 if err != nil && !isHTTP409(err) {
62 logging.Errorf(c, "Failed - %s", err) 63 logging.Errorf(c, "Failed - %s", err)
63 » » return errors.WrapTransient(err) 64 » » return transient.Tag.Apply(err)
64 } 65 }
65 66
66 // Create the subscription to this topic. Ignore HTTP 409. 67 // Create the subscription to this topic. Ignore HTTP 409.
67 logging.Infof(c, "Ensuring subscription %q exists", sub) 68 logging.Infof(c, "Ensuring subscription %q exists", sub)
68 _, err = service.Projects.Subscriptions.Create(sub, &pubsub.Subscription { 69 _, err = service.Projects.Subscriptions.Create(sub, &pubsub.Subscription {
69 Topic: topic, 70 Topic: topic,
70 AckDeadlineSeconds: 70, // GAE request timeout plus some spare t ime 71 AckDeadlineSeconds: 70, // GAE request timeout plus some spare t ime
71 PushConfig: &pubsub.PushConfig{ 72 PushConfig: &pubsub.PushConfig{
72 PushEndpoint: pushURL, // if "", the subscription will b e pull based 73 PushEndpoint: pushURL, // if "", the subscription will b e pull based
73 }, 74 },
74 }).Context(c).Do() 75 }).Context(c).Do()
75 if err != nil && !isHTTP409(err) { 76 if err != nil && !isHTTP409(err) {
76 logging.Errorf(c, "Failed - %s", err) 77 logging.Errorf(c, "Failed - %s", err)
77 » » return errors.WrapTransient(err) 78 » » return transient.Tag.Apply(err)
78 } 79 }
79 80
80 // Modify topic's IAM policy to allow publisher to publish. 81 // Modify topic's IAM policy to allow publisher to publish.
81 if strings.HasSuffix(publisher, ".gserviceaccount.com") { 82 if strings.HasSuffix(publisher, ".gserviceaccount.com") {
82 publisher = "serviceAccount:" + publisher 83 publisher = "serviceAccount:" + publisher
83 } else { 84 } else {
84 publisher = "user:" + publisher 85 publisher = "user:" + publisher
85 } 86 }
86 logging.Infof(c, "Ensuring %q can publish to the topic", publisher) 87 logging.Infof(c, "Ensuring %q can publish to the topic", publisher)
87 88
88 // Do two attempts, to account for possible race condition. Two attempts 89 // Do two attempts, to account for possible race condition. Two attempts
89 // should be enough to handle concurrent calls to 'configureTopic': seco nd 90 // should be enough to handle concurrent calls to 'configureTopic': seco nd
90 // attempt will read already correct IAM policy and will just end right away. 91 // attempt will read already correct IAM policy and will just end right away.
91 for attempt := 0; attempt < 2; attempt++ { 92 for attempt := 0; attempt < 2; attempt++ {
92 err = modifyTopicIAMPolicy(c, service, topic, func(policy iamPol icy) error { 93 err = modifyTopicIAMPolicy(c, service, topic, func(policy iamPol icy) error {
93 policy.grantRole("roles/pubsub.publisher", publisher) 94 policy.grantRole("roles/pubsub.publisher", publisher)
94 return nil 95 return nil
95 }) 96 })
96 if err == nil { 97 if err == nil {
97 return nil 98 return nil
98 } 99 }
99 logging.Errorf(c, "Failed - %s", err) 100 logging.Errorf(c, "Failed - %s", err)
100 } 101 }
101 » return errors.WrapTransient(err) 102 » return transient.Tag.Apply(err)
102 } 103 }
103 104
104 // pullSubcription pulls one message from PubSub subscription. 105 // pullSubcription pulls one message from PubSub subscription.
105 // 106 //
106 // Used on dev server only. Returns the message and callback to call to 107 // Used on dev server only. Returns the message and callback to call to
107 // acknowledge the message. 108 // acknowledge the message.
108 func pullSubcription(c context.Context, subscription, pubSubURL string) (*pubsub .PubsubMessage, func(), error) { 109 func pullSubcription(c context.Context, subscription, pubSubURL string) (*pubsub .PubsubMessage, func(), error) {
109 service, err := createPubSubService(c, pubSubURL) 110 service, err := createPubSubService(c, pubSubURL)
110 if err != nil { 111 if err != nil {
111 return nil, nil, err 112 return nil, nil, err
(...skipping 131 matching lines...) Expand 10 before | Expand all | Expand 10 after
243 func (p iamPolicy) grantRole(role, principal string) { 244 func (p iamPolicy) grantRole(role, principal string) {
244 switch existing := p[role]; { 245 switch existing := p[role]; {
245 case existing != nil && existing.Has(principal): // already there 246 case existing != nil && existing.Has(principal): // already there
246 return 247 return
247 case existing != nil: // the role is there, but not the principal 248 case existing != nil: // the role is there, but not the principal
248 existing.Add(principal) 249 existing.Add(principal)
249 default: 250 default:
250 p[role] = stringset.NewFromSlice(principal) 251 p[role] = stringset.NewFromSlice(principal)
251 } 252 }
252 } 253 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698