Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(143)

Side by Side Diff: client/cmd/logdog_butler/output_pubsub.go

Issue 1838303002: Use native Pub/Sub library primitives. (Closed) Base URL: https://github.com/luci/luci-go@logdog-go1.6
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package main 5 package main
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "time" 9 "time"
10 10
11 "github.com/luci/luci-go/client/internal/logdog/butler/output" 11 "github.com/luci/luci-go/client/internal/logdog/butler/output"
12 » "github.com/luci/luci-go/client/internal/logdog/butler/output/pubsub" 12 » out "github.com/luci/luci-go/client/internal/logdog/butler/output/pubsub "
13 "github.com/luci/luci-go/common/flag/multiflag" 13 "github.com/luci/luci-go/common/flag/multiflag"
14 ps "github.com/luci/luci-go/common/gcloud/pubsub" 14 ps "github.com/luci/luci-go/common/gcloud/pubsub"
15 log "github.com/luci/luci-go/common/logging" 15 log "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/common/retry"
17 "golang.org/x/net/context"
18 "google.golang.org/cloud"
19 "google.golang.org/cloud/pubsub"
16 ) 20 )
17 21
18 func init() { 22 func init() {
19 registerOutputFactory(new(pubsubOutputFactory)) 23 registerOutputFactory(new(pubsubOutputFactory))
20 } 24 }
21 25
22 // pubsubOutputFactory for Google Cloud PubSub. 26 // pubsubOutputFactory for Google Cloud PubSub.
23 type pubsubOutputFactory struct { 27 type pubsubOutputFactory struct {
24 topic ps.Topic 28 topic ps.Topic
25 noCompress bool 29 noCompress bool
(...skipping 22 matching lines...) Expand all
48 if err := f.topic.Validate(); err != nil { 52 if err := f.topic.Validate(); err != nil {
49 return nil, fmt.Errorf("pubsub: invalid topic name: %s", err) 53 return nil, fmt.Errorf("pubsub: invalid topic name: %s", err)
50 } 54 }
51 55
52 // Instantiate our Pub/Sub instance. We will use the non-cancelling cont ext, 56 // Instantiate our Pub/Sub instance. We will use the non-cancelling cont ext,
53 // as we want Pub/Sub system to drain without interruption if the applic ation 57 // as we want Pub/Sub system to drain without interruption if the applic ation
54 // is otherwise interrupted. 58 // is otherwise interrupted.
55 ctx := log.SetFields(a.ncCtx, log.Fields{ 59 ctx := log.SetFields(a.ncCtx, log.Fields{
56 "topic": f.topic, 60 "topic": f.topic,
57 }) 61 })
58 » client, err := a.authenticatedClient(ctx) 62 » ts, err := a.authenticatedTokenSource(ctx)
59 if err != nil { 63 if err != nil {
60 » » return nil, fmt.Errorf("pubsub: failed to initialize Pub/Sub con text: %s", err) 64 » » return nil, fmt.Errorf("pubsub: failed to initialize Pub/Sub tok en source: %s", err)
61 } 65 }
62 » psConn := &ps.Retry{ 66
63 » » Connection: ps.NewConnection(client), 67 » // Split topic into Pub/Sub project and name.
64 » » Callback: func(err error, d time.Duration) { 68 » project, name := f.topic.Split()
65 » » » log.Fields{ 69
66 » » » » log.ErrorKey: err, 70 » psClient, err := pubsub.NewClient(ctx, project, cloud.WithTokenSource(ts ))
67 » » » » "delay": d, 71 » if err != nil {
68 » » » }.Warningf(ctx, "Transient error during Pub/Sub operatio n; retrying...") 72 » » return nil, fmt.Errorf("pubsub: failed to get Pub/Sub client: %s ", err)
69 » » }, 73 » }
74 » psTopic, err := psClient.NewTopic(ctx, name)
75 » if err != nil {
76 » » return nil, fmt.Errorf("pubsub: failed to initialize Pub/Sub top ic: %s", err)
70 } 77 }
71 78
72 // Assert that our Topic exists. 79 // Assert that our Topic exists.
73 » exists, err := psConn.TopicExists(ctx, f.topic) 80 » exists, err := retryTopicExists(ctx, psTopic)
74 if err != nil { 81 if err != nil {
75 log.WithError(err).Errorf(ctx, "Failed to check for topic.") 82 log.WithError(err).Errorf(ctx, "Failed to check for topic.")
76 return nil, err 83 return nil, err
77 } 84 }
78 if !exists { 85 if !exists {
79 log.Fields{ 86 log.Fields{
80 "topic": f.topic, 87 "topic": f.topic,
81 }.Errorf(ctx, "Pub/Sub Topic does not exist.") 88 }.Errorf(ctx, "Pub/Sub Topic does not exist.")
82 return nil, fmt.Errorf("pubsub: topic %q does not exist", f.topi c) 89 return nil, fmt.Errorf("pubsub: topic %q does not exist", f.topi c)
83 } 90 }
84 91
85 » return pubsub.New(ctx, pubsub.Config{ 92 » return out.New(ctx, out.Config{
86 » » Publisher: psConn, 93 » » Topic: psTopic,
87 » » Topic: f.topic, 94 » » Compress: !f.noCompress,
88 » » Compress: !f.noCompress, 95 » » Track: f.track,
89 » » Track: f.track,
90 }), nil 96 }), nil
91 } 97 }
98
99 func retryTopicExists(ctx context.Context, t *pubsub.TopicHandle) (bool, error) {
100 var exists bool
101 err := retry.Retry(ctx, retry.Default, func() (err error) {
102 exists, err = t.Exists(ctx)
103 return
104 }, func(err error, d time.Duration) {
105 log.Fields{
106 log.ErrorKey: err,
107 "delay": d,
108 }.Errorf(ctx, "Failed to check if topic exists; retrying...")
109 })
110 return exists, err
111 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698