Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(150)

Side by Side Diff: client/cmd/logdog_butler/output_pubsub.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package main 5 package main
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "time" 9 "time"
10 10
11 "github.com/luci/luci-go/client/internal/flags/multiflag" 11 "github.com/luci/luci-go/client/internal/flags/multiflag"
12 "github.com/luci/luci-go/client/internal/logdog/butler/output" 12 "github.com/luci/luci-go/client/internal/logdog/butler/output"
13 "github.com/luci/luci-go/client/internal/logdog/butler/output/pubsub" 13 "github.com/luci/luci-go/client/internal/logdog/butler/output/pubsub"
14 "github.com/luci/luci-go/common/gcloud/gcps" 14 "github.com/luci/luci-go/common/gcloud/gcps"
15 log "github.com/luci/luci-go/common/logging" 15 log "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/common/retry"
17 "golang.org/x/net/context"
18 ) 16 )
19 17
20 func init() { 18 func init() {
21 registerOutputFactory(new(pubsubOutputFactory)) 19 registerOutputFactory(new(pubsubOutputFactory))
22 } 20 }
23 21
24 // pubsubOutputFactory for Google Cloud PubSub. 22 // pubsubOutputFactory for Google Cloud PubSub.
25 type pubsubOutputFactory struct { 23 type pubsubOutputFactory struct {
26 topic gcps.Topic 24 topic gcps.Topic
27 project string 25 project string
(...skipping 24 matching lines...) Expand all
52 return nil, fmt.Errorf("pubsub: invalid topic name: %s", err) 50 return nil, fmt.Errorf("pubsub: invalid topic name: %s", err)
53 } 51 }
54 52
55 // Instantiate our Pub/Sub instance. We will use the non-cancelling cont ext, 53 // Instantiate our Pub/Sub instance. We will use the non-cancelling cont ext,
56 // as we want Pub/Sub system to drain without interruption if the applic ation 54 // as we want Pub/Sub system to drain without interruption if the applic ation
57 // is otherwise interrupted. 55 // is otherwise interrupted.
58 ctx := log.SetFields(a.ncCtx, log.Fields{ 56 ctx := log.SetFields(a.ncCtx, log.Fields{
59 "topic": f.topic, 57 "topic": f.topic,
60 "project": f.project, 58 "project": f.project,
61 }) 59 })
62 » ctx, err := a.authenticatedContext(ctx, f.project) 60 » client, err := a.authenticatedClient(ctx)
63 if err != nil { 61 if err != nil {
64 return nil, fmt.Errorf("pubsub: failed to initialize Pub/Sub con text: %s", err) 62 return nil, fmt.Errorf("pubsub: failed to initialize Pub/Sub con text: %s", err)
65 } 63 }
66 » ps := gcps.New(ctx) 64 » ps := &gcps.Retry{
dnj (Google) 2016/01/21 04:36:24 Use new gcps.Retry and Pub/Sub instantiation.
65 » » PS: gcps.New(client, f.project),
66 » » C: func(err error, d time.Duration) {
67 » » » log.Fields{
68 » » » » log.ErrorKey: err,
69 » » » » "delay": d,
70 » » » }.Warningf(ctx, "Transient error during Pub/Sub operatio n; retrying...")
71 » » },
72 » }
67 73
68 // Assert that our Topic exists. 74 // Assert that our Topic exists.
69 » if err := f.assertTopicExists(ctx, ps); err != nil { 75 » exists, err := ps.TopicExists(ctx, f.topic)
70 » » log.WithError(err).Errorf(ctx, "Topic does not exist.") 76 » if err != nil {
77 » » log.WithError(err).Errorf(ctx, "Failed to check for topic.")
71 return nil, err 78 return nil, err
72 } 79 }
80 if !exists {
81 log.Fields{
82 "topic": f.topic,
83 }.Errorf(ctx, "Pub/Sub Topic does not exist.")
84 return nil, fmt.Errorf("pubsub: topic %q does not exist", f.topi c)
85 }
73 86
74 return pubsub.New(ctx, pubsub.Config{ 87 return pubsub.New(ctx, pubsub.Config{
75 » » PubSub: ps, 88 » » Publisher: ps,
76 » » Topic: f.topic, 89 » » Topic: f.topic,
77 » » Compress: !f.noCompress, 90 » » Compress: !f.noCompress,
78 }), nil 91 }), nil
79 } 92 }
80
81 func (f *pubsubOutputFactory) assertTopicExists(ctx context.Context, ps gcps.Pub Sub) error {
82 log.Infof(ctx, "Checking that Pub/Sub topic exists.")
83
84 exists := false
85 err := retry.Retry(ctx, retry.TransientOnly(retry.Default()), func() err or {
86 e, err := ps.TopicExists(f.topic)
87 if err != nil {
88 return err
89 }
90 exists = e
91 return nil
92 }, func(err error, d time.Duration) {
93 log.Fields{
94 log.ErrorKey: err,
95 "delay": d,
96 }.Warningf(ctx, "Transient error during topic check; retrying.")
97 })
98 if err != nil {
99 return fmt.Errorf("pubsub: failed to check for topic: %s", err)
100 }
101 if !exists {
102 return fmt.Errorf("pubsub: topic [%s] does not exist", f.topic)
103 }
104 return nil
105 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698