Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(74)

Unified Diff: client/cmd/logdog_butler/output_pubsub.go

Issue 1975683002: LogDog: Implement prefix registration in Butler. (Closed) Base URL: https://github.com/luci/luci-go@logdog-butler-register-collector
Patch Set: Rebarse Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « client/cmd/logdog_butler/output_logdog.go ('k') | client/cmd/logdog_butler/subcommand_run.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: client/cmd/logdog_butler/output_pubsub.go
diff --git a/client/cmd/logdog_butler/output_pubsub.go b/client/cmd/logdog_butler/output_pubsub.go
deleted file mode 100644
index fdeadbd09d7f4c1ce7792d2eb2018d7477edd390..0000000000000000000000000000000000000000
--- a/client/cmd/logdog_butler/output_pubsub.go
+++ /dev/null
@@ -1,108 +0,0 @@
-// Copyright 2015 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-package main
-
-import (
- "fmt"
- "time"
-
- "github.com/luci/luci-go/client/internal/logdog/butler/output"
- out "github.com/luci/luci-go/client/internal/logdog/butler/output/pubsub"
- "github.com/luci/luci-go/common/flag/multiflag"
- ps "github.com/luci/luci-go/common/gcloud/pubsub"
- log "github.com/luci/luci-go/common/logging"
- "github.com/luci/luci-go/common/retry"
- "golang.org/x/net/context"
- "google.golang.org/cloud"
- "google.golang.org/cloud/pubsub"
-)
-
-func init() {
- registerOutputFactory(new(pubsubOutputFactory))
-}
-
-// pubsubOutputFactory for Google Cloud PubSub.
-type pubsubOutputFactory struct {
- topic ps.Topic
- noCompress bool
- track bool
-}
-
-var _ outputFactory = (*pubsubOutputFactory)(nil)
-
-func (f *pubsubOutputFactory) option() multiflag.Option {
- opt := newOutputOption("pubsub", "Output to a Google Cloud PubSub endpoint", f)
-
- flags := opt.Flags()
- flags.Var(&f.topic, "topic",
- "The Google Cloud PubSub topic name (projects/<project>/topics/<topic>).")
- flags.BoolVar(&f.noCompress, "nocompress", false,
- "Disable compression in published Pub/Sub messages.")
-
- // TODO(dnj): Default to false when mandatory debugging is finished.
- flags.BoolVar(&f.track, "track", true,
- "Track each sent message. This adds CPU/memory overhead.")
-
- return opt
-}
-
-func (f *pubsubOutputFactory) configOutput(a *application) (output.Output, error) {
- if err := f.topic.Validate(); err != nil {
- return nil, fmt.Errorf("pubsub: invalid topic name: %s", err)
- }
-
- // Instantiate our Pub/Sub instance. We will use the non-cancelling context,
- // as we want Pub/Sub system to drain without interruption if the application
- // is otherwise interrupted.
- ctx := log.SetFields(a.ncCtx, log.Fields{
- "topic": f.topic,
- })
- ts, err := a.tokenSource(ctx)
- if err != nil {
- return nil, fmt.Errorf("pubsub: failed to initialize Pub/Sub token source: %s", err)
- }
-
- // Split topic into Pub/Sub project and name.
- project, name := f.topic.Split()
-
- psClient, err := pubsub.NewClient(ctx, project, cloud.WithTokenSource(ts))
- if err != nil {
- return nil, fmt.Errorf("pubsub: failed to get Pub/Sub client: %s", err)
- }
- psTopic := psClient.Topic(name)
-
- // Assert that our Topic exists.
- exists, err := retryTopicExists(ctx, psTopic)
- if err != nil {
- log.WithError(err).Errorf(ctx, "Failed to check for topic.")
- return nil, err
- }
- if !exists {
- log.Fields{
- "topic": f.topic,
- }.Errorf(ctx, "Pub/Sub Topic does not exist.")
- return nil, fmt.Errorf("pubsub: topic %q does not exist", f.topic)
- }
-
- return out.New(ctx, out.Config{
- Topic: psTopic,
- Compress: !f.noCompress,
- Track: f.track,
- }), nil
-}
-
-func retryTopicExists(ctx context.Context, t *pubsub.Topic) (bool, error) {
- var exists bool
- err := retry.Retry(ctx, retry.Default, func() (err error) {
- exists, err = t.Exists(ctx)
- return
- }, func(err error, d time.Duration) {
- log.Fields{
- log.ErrorKey: err,
- "delay": d,
- }.Errorf(ctx, "Failed to check if topic exists; retrying...")
- })
- return exists, err
-}
« no previous file with comments | « client/cmd/logdog_butler/output_logdog.go ('k') | client/cmd/logdog_butler/subcommand_run.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698