Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(438)

Side by Side Diff: client/cmd/logdog_butler/output_logdog.go

Issue 1975683002: LogDog: Implement prefix registration in Butler. (Closed) Base URL: https://github.com/luci/luci-go@logdog-butler-register-collector
Patch Set: Rebarse Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « client/cmd/logdog_butler/output_log.go ('k') | client/cmd/logdog_butler/output_pubsub.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package main
6
7 import (
8 "errors"
9 "fmt"
10 "runtime"
11 "strings"
12 "time"
13
14 "github.com/luci/luci-go/client/internal/logdog/butler/output"
15 out "github.com/luci/luci-go/client/internal/logdog/butler/output/pubsub "
16 api "github.com/luci/luci-go/common/api/logdog_coordinator/registration/ v1"
17 "github.com/luci/luci-go/common/auth"
18 "github.com/luci/luci-go/common/clock/clockflag"
19 "github.com/luci/luci-go/common/flag/multiflag"
20 ps "github.com/luci/luci-go/common/gcloud/pubsub"
21 log "github.com/luci/luci-go/common/logging"
22 "github.com/luci/luci-go/common/proto/google"
23 "github.com/luci/luci-go/common/prpc"
24 "github.com/luci/luci-go/common/retry"
25 "golang.org/x/net/context"
26 "google.golang.org/cloud"
27 "google.golang.org/cloud/pubsub"
28 )
29
30 func init() {
31 registerOutputFactory(new(logdogOutputFactory))
32 }
33
34 // logdogOutputFactory for publishing logs using a LogDog Coordinator host.
35 type logdogOutputFactory struct {
36 host string
37 prefixExpiration clockflag.Duration
38
39 track bool
40 }
41
42 var _ outputFactory = (*logdogOutputFactory)(nil)
43
44 func (f *logdogOutputFactory) option() multiflag.Option {
45 opt := newOutputOption("logdog", "Output to a LogDog Coordinator instanc e.", f)
46
47 flags := opt.Flags()
48 flags.StringVar(&f.host, "host", "",
49 "The LogDog Coordinator host name.")
50 flags.Var(&f.prefixExpiration, "prefix-expiration",
51 "Amount of time after registration that the prefix will be activ e. If omitted, the service "+
52 "default will be used. This should exceed the expected l ifetime of the job by a fair margin.")
53
54 // TODO(dnj): Default to false when mandatory debugging is finished.
55 flags.BoolVar(&f.track, "track", true,
56 "Track each sent message. This adds CPU/memory overhead.")
57
58 return opt
59 }
60
61 func (f *logdogOutputFactory) configOutput(a *application) (output.Output, error ) {
62 // Open a pRPC client to our Coordinator instance.
63 authenticator, err := a.authenticator(a)
64 if err != nil {
65 log.WithError(err).Errorf(a, "Failed to get authenticator.")
66 return nil, err
67 }
68 httpClient, err := authenticator.Client()
69 if err != nil {
70 log.WithError(err).Errorf(a, "Failed to get authenticated HTTP c lient.")
71 return nil, err
72 }
73
74 // Configure our pRPC client.
75 client := prpc.Client{
76 C: httpClient,
77 Host: f.host,
78 Options: prpc.DefaultOptions(),
79 }
80
81 // If our host begins with "localhost", set insecure option automaticall y.
82 if isLocalHost(f.host) {
83 log.Infof(a, "Detected localhost; enabling insecure RPC connecti on.")
84 client.Options.Insecure = true
85 }
86
87 // Register our Prefix with the Coordinator.
88 log.Fields{
89 "prefix": a.prefix,
90 "host": f.host,
91 }.Debugf(a, "Registering prefix space with Coordinator service.")
92
93 svc := api.NewRegistrationPRPCClient(&client)
94 resp, err := svc.RegisterPrefix(a, &api.RegisterPrefixRequest{
95 Project: string(a.project),
96 Prefix: string(a.prefix),
97 SourceInfo: []string{
98 "LogDog Butler",
99 fmt.Sprintf("GOARCH=%s", runtime.GOARCH),
100 fmt.Sprintf("GOOS=%s", runtime.GOOS),
101 },
102 Expiration: google.NewDuration(time.Duration(f.prefixExpiration) ),
103 })
104 if err != nil {
105 log.WithError(err).Errorf(a, "Failed to register prefix with Coo rdinator service.")
106 return nil, err
107 }
108 log.Fields{
109 "prefix": a.prefix,
110 "bundleTopic": resp.LogBundleTopic,
111 }.Debugf(a, "Successfully registered log stream prefix.")
112
113 // Validate the response topic.
114 fullTopic := ps.Topic(resp.LogBundleTopic)
115 if err := fullTopic.Validate(); err != nil {
116 log.Fields{
117 log.ErrorKey: err,
118 "fullTopic": fullTopic,
119 }.Errorf(a, "Coordinator returned invalid Pub/Sub topic.")
120 return nil, err
121 }
122
123 // Split our topic into project and topic name. This must succeed, since we
124 // just finished validating the topic.
125 proj, topic := fullTopic.Split()
126
127 // Instantiate our Pub/Sub instance.
128 //
129 // We will use the non-cancelling context, for all Pub/Sub calls, as we want
130 // the Pub/Sub system to drain without interruption if the application i s
131 // otherwise canceled.
132 psClient, err := pubsub.NewClient(a.ncCtx, proj, cloud.WithTokenSource(a uthenticator.TokenSource()))
133 if err != nil {
134 log.Fields{
135 log.ErrorKey: err,
136 "project": proj,
137 }.Errorf(a, "Failed to create Pub/Sub client.")
138 return nil, errors.New("failed to get Pub/Sub client")
139 }
140 psTopic := psClient.Topic(topic)
141
142 // Assert that our Topic exists.
143 exists, err := retryTopicExists(a, psTopic)
144 if err != nil {
145 log.Fields{
146 log.ErrorKey: err,
147 "project": proj,
148 "topic": topic,
149 }.Errorf(a, "Failed to check for Pub/Sub topic.")
150 return nil, errors.New("failed to check for Pub/Sub topic")
151 }
152 if !exists {
153 log.Fields{
154 "fullTopic": fullTopic,
155 }.Errorf(a, "Pub/Sub Topic does not exist.")
156 return nil, errors.New("PubSub topic does not exist")
157 }
158
159 // We own the prefix and all verifiable parameters have been validated.
160 // Successfully return our Output instance.
161 //
162 // Note that we use our non-cancelling context here.
163 return out.New(a.ncCtx, out.Config{
164 Topic: psTopic,
165 Secret: resp.Secret,
166 Compress: true,
167 Track: f.track,
168 }), nil
169 }
170
171 func (f *logdogOutputFactory) scopes() []string {
172 // E-mail scope needed for Coordinator authentication.
173 scopes := []string{auth.OAuthScopeEmail}
174 // Publisher scope needed to publish to Pub/Sub transport.
175 scopes = append(scopes, ps.PublisherScopes...)
176 return scopes
177 }
178
179 func retryTopicExists(ctx context.Context, t *pubsub.Topic) (bool, error) {
180 var exists bool
181 err := retry.Retry(ctx, retry.Default, func() (err error) {
182 exists, err = t.Exists(ctx)
183 return
184 }, func(err error, d time.Duration) {
185 log.Fields{
186 log.ErrorKey: err,
187 "delay": d,
188 }.Errorf(ctx, "Failed to check if topic exists; retrying...")
189 })
190 return exists, err
191 }
192
193 func isLocalHost(host string) bool {
194 switch {
195 case host == "localhost", strings.HasPrefix(host, "localhost:"):
196 case host == "127.0.0.1", strings.HasPrefix(host, "127.0.0.1:"):
197 case host == "[::1]", strings.HasPrefix(host, "[::1]:"):
198 case strings.HasPrefix(host, ":"):
199
200 default:
201 return false
202 }
203 return true
204 }
OLDNEW
« no previous file with comments | « client/cmd/logdog_butler/output_log.go ('k') | client/cmd/logdog_butler/output_pubsub.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698