Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(520)

Side by Side Diff: client/cmd/logdog_butler/output_pubsub.go

Issue 1838303002: Use native Pub/Sub library primitives. (Closed) Base URL: https://github.com/luci/luci-go@logdog-go1.6
Patch Set: Use "Topic" instead of "NewTopic" ... don't want to create :) Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package main 5 package main
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "time" 9 "time"
10 10
11 "github.com/luci/luci-go/client/internal/logdog/butler/output" 11 "github.com/luci/luci-go/client/internal/logdog/butler/output"
12 » "github.com/luci/luci-go/client/internal/logdog/butler/output/pubsub" 12 » out "github.com/luci/luci-go/client/internal/logdog/butler/output/pubsub "
13 "github.com/luci/luci-go/common/flag/multiflag" 13 "github.com/luci/luci-go/common/flag/multiflag"
14 ps "github.com/luci/luci-go/common/gcloud/pubsub" 14 ps "github.com/luci/luci-go/common/gcloud/pubsub"
15 log "github.com/luci/luci-go/common/logging" 15 log "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/common/retry"
17 "golang.org/x/net/context"
18 "google.golang.org/cloud"
19 "google.golang.org/cloud/pubsub"
16 ) 20 )
17 21
18 func init() { 22 func init() {
19 registerOutputFactory(new(pubsubOutputFactory)) 23 registerOutputFactory(new(pubsubOutputFactory))
20 } 24 }
21 25
22 // pubsubOutputFactory for Google Cloud PubSub. 26 // pubsubOutputFactory for Google Cloud PubSub.
23 type pubsubOutputFactory struct { 27 type pubsubOutputFactory struct {
24 topic ps.Topic 28 topic ps.Topic
25 noCompress bool 29 noCompress bool
(...skipping 22 matching lines...) Expand all
48 if err := f.topic.Validate(); err != nil { 52 if err := f.topic.Validate(); err != nil {
49 return nil, fmt.Errorf("pubsub: invalid topic name: %s", err) 53 return nil, fmt.Errorf("pubsub: invalid topic name: %s", err)
50 } 54 }
51 55
52 // Instantiate our Pub/Sub instance. We will use the non-cancelling cont ext, 56 // Instantiate our Pub/Sub instance. We will use the non-cancelling cont ext,
53 // as we want Pub/Sub system to drain without interruption if the applic ation 57 // as we want Pub/Sub system to drain without interruption if the applic ation
54 // is otherwise interrupted. 58 // is otherwise interrupted.
55 ctx := log.SetFields(a.ncCtx, log.Fields{ 59 ctx := log.SetFields(a.ncCtx, log.Fields{
56 "topic": f.topic, 60 "topic": f.topic,
57 }) 61 })
58 » client, err := a.authenticatedClient(ctx) 62 » ts, err := a.tokenSource(ctx)
59 if err != nil { 63 if err != nil {
60 » » return nil, fmt.Errorf("pubsub: failed to initialize Pub/Sub con text: %s", err) 64 » » return nil, fmt.Errorf("pubsub: failed to initialize Pub/Sub tok en source: %s", err)
61 » }
62 » psConn := &ps.Retry{
63 » » Connection: ps.NewConnection(client),
64 » » Callback: func(err error, d time.Duration) {
65 » » » log.Fields{
66 » » » » log.ErrorKey: err,
67 » » » » "delay": d,
68 » » » }.Warningf(ctx, "Transient error during Pub/Sub operatio n; retrying...")
69 » » },
70 } 65 }
71 66
67 // Split topic into Pub/Sub project and name.
68 project, name := f.topic.Split()
69
70 psClient, err := pubsub.NewClient(ctx, project, cloud.WithTokenSource(ts ))
71 if err != nil {
72 return nil, fmt.Errorf("pubsub: failed to get Pub/Sub client: %s ", err)
73 }
74 psTopic := psClient.Topic(name)
75
72 // Assert that our Topic exists. 76 // Assert that our Topic exists.
73 » exists, err := psConn.TopicExists(ctx, f.topic) 77 » exists, err := retryTopicExists(ctx, psTopic)
74 if err != nil { 78 if err != nil {
75 log.WithError(err).Errorf(ctx, "Failed to check for topic.") 79 log.WithError(err).Errorf(ctx, "Failed to check for topic.")
76 return nil, err 80 return nil, err
77 } 81 }
78 if !exists { 82 if !exists {
79 log.Fields{ 83 log.Fields{
80 "topic": f.topic, 84 "topic": f.topic,
81 }.Errorf(ctx, "Pub/Sub Topic does not exist.") 85 }.Errorf(ctx, "Pub/Sub Topic does not exist.")
82 return nil, fmt.Errorf("pubsub: topic %q does not exist", f.topi c) 86 return nil, fmt.Errorf("pubsub: topic %q does not exist", f.topi c)
83 } 87 }
84 88
85 » return pubsub.New(ctx, pubsub.Config{ 89 » return out.New(ctx, out.Config{
86 » » Publisher: psConn, 90 » » Topic: psTopic,
87 » » Topic: f.topic, 91 » » Compress: !f.noCompress,
88 » » Compress: !f.noCompress, 92 » » Track: f.track,
89 » » Track: f.track,
90 }), nil 93 }), nil
91 } 94 }
95
96 func retryTopicExists(ctx context.Context, t *pubsub.TopicHandle) (bool, error) {
97 var exists bool
98 err := retry.Retry(ctx, retry.Default, func() (err error) {
99 exists, err = t.Exists(ctx)
100 return
101 }, func(err error, d time.Duration) {
102 log.Fields{
103 log.ErrorKey: err,
104 "delay": d,
105 }.Errorf(ctx, "Failed to check if topic exists; retrying...")
106 })
107 return exists, err
108 }
OLDNEW
« no previous file with comments | « client/cmd/logdog_butler/main.go ('k') | client/internal/logdog/butler/output/pubsub/pubsubOutput.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698