Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package main | 5 package main |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "time" | 9 "time" |
| 10 | 10 |
| 11 "github.com/luci/luci-go/client/internal/flags/multiflag" | 11 "github.com/luci/luci-go/client/internal/flags/multiflag" |
| 12 "github.com/luci/luci-go/client/internal/logdog/butler/output" | 12 "github.com/luci/luci-go/client/internal/logdog/butler/output" |
| 13 "github.com/luci/luci-go/client/internal/logdog/butler/output/pubsub" | 13 "github.com/luci/luci-go/client/internal/logdog/butler/output/pubsub" |
| 14 "github.com/luci/luci-go/common/gcloud/gcps" | 14 "github.com/luci/luci-go/common/gcloud/gcps" |
| 15 log "github.com/luci/luci-go/common/logging" | 15 log "github.com/luci/luci-go/common/logging" |
| 16 "github.com/luci/luci-go/common/retry" | |
| 17 "golang.org/x/net/context" | |
| 18 ) | 16 ) |
| 19 | 17 |
| 20 func init() { | 18 func init() { |
| 21 registerOutputFactory(new(pubsubOutputFactory)) | 19 registerOutputFactory(new(pubsubOutputFactory)) |
| 22 } | 20 } |
| 23 | 21 |
| 24 // pubsubOutputFactory for Google Cloud PubSub. | 22 // pubsubOutputFactory for Google Cloud PubSub. |
| 25 type pubsubOutputFactory struct { | 23 type pubsubOutputFactory struct { |
| 26 topic gcps.Topic | 24 topic gcps.Topic |
| 27 project string | 25 project string |
| (...skipping 24 matching lines...) Expand all Loading... | |
| 52 return nil, fmt.Errorf("pubsub: invalid topic name: %s", err) | 50 return nil, fmt.Errorf("pubsub: invalid topic name: %s", err) |
| 53 } | 51 } |
| 54 | 52 |
| 55 // Instantiate our Pub/Sub instance. We will use the non-cancelling cont ext, | 53 // Instantiate our Pub/Sub instance. We will use the non-cancelling cont ext, |
| 56 // as we want Pub/Sub system to drain without interruption if the applic ation | 54 // as we want Pub/Sub system to drain without interruption if the applic ation |
| 57 // is otherwise interrupted. | 55 // is otherwise interrupted. |
| 58 ctx := log.SetFields(a.ncCtx, log.Fields{ | 56 ctx := log.SetFields(a.ncCtx, log.Fields{ |
| 59 "topic": f.topic, | 57 "topic": f.topic, |
| 60 "project": f.project, | 58 "project": f.project, |
| 61 }) | 59 }) |
| 62 » ctx, err := a.authenticatedContext(ctx, f.project) | 60 » client, err := a.authenticatedClient(ctx) |
| 63 if err != nil { | 61 if err != nil { |
| 64 return nil, fmt.Errorf("pubsub: failed to initialize Pub/Sub con text: %s", err) | 62 return nil, fmt.Errorf("pubsub: failed to initialize Pub/Sub con text: %s", err) |
| 65 } | 63 } |
| 66 » ps := gcps.New(ctx) | 64 » ps := &gcps.Retry{ |
|
dnj (Google)
2016/01/21 04:36:24
Use new gcps.Retry and Pub/Sub instantiation.
| |
| 65 » » PS: gcps.New(client, f.project), | |
| 66 » » C: func(err error, d time.Duration) { | |
| 67 » » » log.Fields{ | |
| 68 » » » » log.ErrorKey: err, | |
| 69 » » » » "delay": d, | |
| 70 » » » }.Warningf(ctx, "Transient error during Pub/Sub operatio n; retrying...") | |
| 71 » » }, | |
| 72 » } | |
| 67 | 73 |
| 68 // Assert that our Topic exists. | 74 // Assert that our Topic exists. |
| 69 » if err := f.assertTopicExists(ctx, ps); err != nil { | 75 » exists, err := ps.TopicExists(ctx, f.topic) |
| 70 » » log.WithError(err).Errorf(ctx, "Topic does not exist.") | 76 » if err != nil { |
| 77 » » log.WithError(err).Errorf(ctx, "Failed to check for topic.") | |
| 71 return nil, err | 78 return nil, err |
| 72 } | 79 } |
| 80 if !exists { | |
| 81 log.Fields{ | |
| 82 "topic": f.topic, | |
| 83 }.Errorf(ctx, "Pub/Sub Topic does not exist.") | |
| 84 return nil, fmt.Errorf("pubsub: topic %q does not exist", f.topi c) | |
| 85 } | |
| 73 | 86 |
| 74 return pubsub.New(ctx, pubsub.Config{ | 87 return pubsub.New(ctx, pubsub.Config{ |
| 75 » » PubSub: ps, | 88 » » Publisher: ps, |
| 76 » » Topic: f.topic, | 89 » » Topic: f.topic, |
| 77 » » Compress: !f.noCompress, | 90 » » Compress: !f.noCompress, |
| 78 }), nil | 91 }), nil |
| 79 } | 92 } |
| 80 | |
| 81 func (f *pubsubOutputFactory) assertTopicExists(ctx context.Context, ps gcps.Pub Sub) error { | |
| 82 log.Infof(ctx, "Checking that Pub/Sub topic exists.") | |
| 83 | |
| 84 exists := false | |
| 85 err := retry.Retry(ctx, retry.TransientOnly(retry.Default()), func() err or { | |
| 86 e, err := ps.TopicExists(f.topic) | |
| 87 if err != nil { | |
| 88 return err | |
| 89 } | |
| 90 exists = e | |
| 91 return nil | |
| 92 }, func(err error, d time.Duration) { | |
| 93 log.Fields{ | |
| 94 log.ErrorKey: err, | |
| 95 "delay": d, | |
| 96 }.Warningf(ctx, "Transient error during topic check; retrying.") | |
| 97 }) | |
| 98 if err != nil { | |
| 99 return fmt.Errorf("pubsub: failed to check for topic: %s", err) | |
| 100 } | |
| 101 if !exists { | |
| 102 return fmt.Errorf("pubsub: topic [%s] does not exist", f.topic) | |
| 103 } | |
| 104 return nil | |
| 105 } | |
| OLD | NEW |