Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1010)

Side by Side Diff: client/cmd/logdog_butler/output_pubsub.go

Issue 1975683002: LogDog: Implement prefix registration in Butler. (Closed) Base URL: https://github.com/luci/luci-go@logdog-butler-register-collector
Patch Set: Rebarse Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « client/cmd/logdog_butler/output_logdog.go ('k') | client/cmd/logdog_butler/subcommand_run.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package main
6
7 import (
8 "fmt"
9 "time"
10
11 "github.com/luci/luci-go/client/internal/logdog/butler/output"
12 out "github.com/luci/luci-go/client/internal/logdog/butler/output/pubsub "
13 "github.com/luci/luci-go/common/flag/multiflag"
14 ps "github.com/luci/luci-go/common/gcloud/pubsub"
15 log "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/common/retry"
17 "golang.org/x/net/context"
18 "google.golang.org/cloud"
19 "google.golang.org/cloud/pubsub"
20 )
21
22 func init() {
23 registerOutputFactory(new(pubsubOutputFactory))
24 }
25
26 // pubsubOutputFactory for Google Cloud PubSub.
27 type pubsubOutputFactory struct {
28 topic ps.Topic
29 noCompress bool
30 track bool
31 }
32
33 var _ outputFactory = (*pubsubOutputFactory)(nil)
34
35 func (f *pubsubOutputFactory) option() multiflag.Option {
36 opt := newOutputOption("pubsub", "Output to a Google Cloud PubSub endpoi nt", f)
37
38 flags := opt.Flags()
39 flags.Var(&f.topic, "topic",
40 "The Google Cloud PubSub topic name (projects/<project>/topics/< topic>).")
41 flags.BoolVar(&f.noCompress, "nocompress", false,
42 "Disable compression in published Pub/Sub messages.")
43
44 // TODO(dnj): Default to false when mandatory debugging is finished.
45 flags.BoolVar(&f.track, "track", true,
46 "Track each sent message. This adds CPU/memory overhead.")
47
48 return opt
49 }
50
51 func (f *pubsubOutputFactory) configOutput(a *application) (output.Output, error ) {
52 if err := f.topic.Validate(); err != nil {
53 return nil, fmt.Errorf("pubsub: invalid topic name: %s", err)
54 }
55
56 // Instantiate our Pub/Sub instance. We will use the non-cancelling cont ext,
57 // as we want Pub/Sub system to drain without interruption if the applic ation
58 // is otherwise interrupted.
59 ctx := log.SetFields(a.ncCtx, log.Fields{
60 "topic": f.topic,
61 })
62 ts, err := a.tokenSource(ctx)
63 if err != nil {
64 return nil, fmt.Errorf("pubsub: failed to initialize Pub/Sub tok en source: %s", err)
65 }
66
67 // Split topic into Pub/Sub project and name.
68 project, name := f.topic.Split()
69
70 psClient, err := pubsub.NewClient(ctx, project, cloud.WithTokenSource(ts ))
71 if err != nil {
72 return nil, fmt.Errorf("pubsub: failed to get Pub/Sub client: %s ", err)
73 }
74 psTopic := psClient.Topic(name)
75
76 // Assert that our Topic exists.
77 exists, err := retryTopicExists(ctx, psTopic)
78 if err != nil {
79 log.WithError(err).Errorf(ctx, "Failed to check for topic.")
80 return nil, err
81 }
82 if !exists {
83 log.Fields{
84 "topic": f.topic,
85 }.Errorf(ctx, "Pub/Sub Topic does not exist.")
86 return nil, fmt.Errorf("pubsub: topic %q does not exist", f.topi c)
87 }
88
89 return out.New(ctx, out.Config{
90 Topic: psTopic,
91 Compress: !f.noCompress,
92 Track: f.track,
93 }), nil
94 }
95
96 func retryTopicExists(ctx context.Context, t *pubsub.Topic) (bool, error) {
97 var exists bool
98 err := retry.Retry(ctx, retry.Default, func() (err error) {
99 exists, err = t.Exists(ctx)
100 return
101 }, func(err error, d time.Duration) {
102 log.Fields{
103 log.ErrorKey: err,
104 "delay": d,
105 }.Errorf(ctx, "Failed to check if topic exists; retrying...")
106 })
107 return exists, err
108 }
OLDNEW
« no previous file with comments | « client/cmd/logdog_butler/output_logdog.go ('k') | client/cmd/logdog_butler/subcommand_run.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698