Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(43)

Side by Side Diff: client/cmd/logdog_butler/main.go

Issue 1975683002: LogDog: Implement prefix registration in Butler. (Closed) Base URL: https://github.com/luci/luci-go@logdog-butler-register-collector
Patch Set: Rebarse Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | client/cmd/logdog_butler/output.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package main 5 package main
6 6
7 import ( 7 import (
8 "errors" 8 "errors"
9 "flag" 9 "flag"
10 "fmt" 10 "fmt"
11 "net/http" 11 "net/http"
12 "os" 12 "os"
13 "os/signal" 13 "os/signal"
14 "runtime/pprof" 14 "runtime/pprof"
15 "sort" 15 "sort"
16 "strings" 16 "strings"
17 "time" 17 "time"
18 18
19 "github.com/maruel/subcommands" 19 "github.com/maruel/subcommands"
20 "golang.org/x/net/context" 20 "golang.org/x/net/context"
21 "golang.org/x/oauth2"
22 21
23 "github.com/luci/luci-go/client/authcli" 22 "github.com/luci/luci-go/client/authcli"
24 "github.com/luci/luci-go/client/internal/logdog/butler" 23 "github.com/luci/luci-go/client/internal/logdog/butler"
25 "github.com/luci/luci-go/client/internal/logdog/butler/output" 24 "github.com/luci/luci-go/client/internal/logdog/butler/output"
26 "github.com/luci/luci-go/common/auth" 25 "github.com/luci/luci-go/common/auth"
27 "github.com/luci/luci-go/common/cli" 26 "github.com/luci/luci-go/common/cli"
28 "github.com/luci/luci-go/common/clock/clockflag" 27 "github.com/luci/luci-go/common/clock/clockflag"
28 "github.com/luci/luci-go/common/config"
29 "github.com/luci/luci-go/common/flag/multiflag" 29 "github.com/luci/luci-go/common/flag/multiflag"
30 "github.com/luci/luci-go/common/gcloud/pubsub"
31 "github.com/luci/luci-go/common/logdog/types" 30 "github.com/luci/luci-go/common/logdog/types"
32 log "github.com/luci/luci-go/common/logging" 31 log "github.com/luci/luci-go/common/logging"
33 "github.com/luci/luci-go/common/logging/gologger" 32 "github.com/luci/luci-go/common/logging/gologger"
34 "github.com/luci/luci-go/common/paniccatcher" 33 "github.com/luci/luci-go/common/paniccatcher"
35 ) 34 )
36 35
37 const ( 36 const (
38 // flagErrorReturnCode is returned when there is an error with the Butle r's 37 // flagErrorReturnCode is returned when there is an error with the Butle r's
39 // command-line configuration. 38 // command-line configuration.
40 configErrorReturnCode = 2 39 configErrorReturnCode = 2
41 40
42 // runtimeErrorReturnCode is returned when the execution fails due to a Butler 41 // runtimeErrorReturnCode is returned when the execution fails due to a Butler
43 // error. This is intended to help differentiate Butler errors from 42 // error. This is intended to help differentiate Butler errors from
44 // passthrough bootstrapped subprocess errors. 43 // passthrough bootstrapped subprocess errors.
45 // 44 //
46 // This will only be returned for runtime errors. If there is a flag err or 45 // This will only be returned for runtime errors. If there is a flag err or
47 // or a configuration error, standard Butler return codes (likely to ove rlap 46 // or a configuration error, standard Butler return codes (likely to ove rlap
48 // with standard process return codes) will be used. 47 // with standard process return codes) will be used.
49 runtimeErrorReturnCode = 250 48 runtimeErrorReturnCode = 250
50 ) 49 )
51 50
52 // buildScopes consumes a series of independent OAuth2 scope strings and
53 // combines them into a single deduplicated list.
54 func buildScopes(parts ...[]string) []string {
55 result := []string{}
56 seen := make(map[string]bool)
57 for _, scopes := range parts {
58 for _, s := range scopes {
59 if _, ok := seen[s]; ok {
60 continue
61 }
62 result = append(result, s)
63 seen[s] = true
64 }
65 }
66 return result
67 }
68
69 // application is the Butler application instance and its runtime configuration 51 // application is the Butler application instance and its runtime configuration
70 // and state. 52 // and state.
71 type application struct { 53 type application struct {
72 cli.Application 54 cli.Application
73 context.Context 55 context.Context
74 56
75 » butler butler.Config 57 » project config.ProjectName
76 » outputConfig outputConfigFlag 58 » prefix types.StreamName
59 » outputWorkers int
60 » outputConfig outputConfigFlag
77 61
78 authFlags authcli.Flags 62 authFlags authcli.Flags
79 63
80 maxBufferAge clockflag.Duration 64 maxBufferAge clockflag.Duration
81 noBufferLogs bool 65 noBufferLogs bool
82 66
83 cpuProfile string 67 cpuProfile string
84 68
85 client *http.Client 69 client *http.Client
86 70
87 // ncCtx is a context that will not be cancelled when cancelFunc is call ed. 71 // ncCtx is a context that will not be cancelled when cancelFunc is call ed.
88 ncCtx context.Context 72 ncCtx context.Context
89 cancelFunc func() 73 cancelFunc func()
90
91 output output.Output
92 } 74 }
93 75
94 func (a *application) addFlags(fs *flag.FlagSet) { 76 func (a *application) addFlags(fs *flag.FlagSet) {
95 a.outputConfig.Output = os.Stdout 77 a.outputConfig.Output = os.Stdout
96 a.outputConfig.Description = "Select and configure message output adapte r." 78 a.outputConfig.Description = "Select and configure message output adapte r."
97 a.outputConfig.Options = []multiflag.Option{ 79 a.outputConfig.Options = []multiflag.Option{
98 multiflag.HelpOption(&a.outputConfig.MultiFlag), 80 multiflag.HelpOption(&a.outputConfig.MultiFlag),
99 } 81 }
100 82
101 // Add registered conditional (build tag) options. 83 // Add registered conditional (build tag) options.
102 for _, f := range getOutputFactories() { 84 for _, f := range getOutputFactories() {
103 a.outputConfig.AddFactory(f) 85 a.outputConfig.AddFactory(f)
104 } 86 }
105 87
106 a.maxBufferAge = clockflag.Duration(butler.DefaultMaxBufferAge) 88 a.maxBufferAge = clockflag.Duration(butler.DefaultMaxBufferAge)
107 89
108 » fs.Var(&a.butler.Project, "project", 90 » fs.Var(&a.project, "project",
109 "The log prefix's project name (required).") 91 "The log prefix's project name (required).")
110 » fs.Var(&a.butler.Prefix, "prefix", 92 » fs.Var(&a.prefix, "prefix",
111 "Prefix to apply to all stream names.") 93 "Prefix to apply to all stream names.")
112 fs.Var(&a.outputConfig, "output", 94 fs.Var(&a.outputConfig, "output",
113 "The output name and configuration. Specify 'help' for more info rmation.") 95 "The output name and configuration. Specify 'help' for more info rmation.")
114 fs.StringVar(&a.cpuProfile, 96 fs.StringVar(&a.cpuProfile,
115 "cpuprofile", "", "If specified, enables CPU profiling and profi les to the specified path.") 97 "cpuprofile", "", "If specified, enables CPU profiling and profi les to the specified path.")
116 » fs.IntVar(&a.butler.OutputWorkers, "output-workers", butler.DefaultOutpu tWorkers, 98 » fs.IntVar(&a.outputWorkers, "output-workers", butler.DefaultOutputWorker s,
117 "The maximum number of parallel output dispatches.") 99 "The maximum number of parallel output dispatches.")
118 fs.Var(&a.maxBufferAge, "output-max-buffer-age", 100 fs.Var(&a.maxBufferAge, "output-max-buffer-age",
119 "Send buffered messages if they've been held for longer than thi s period.") 101 "Send buffered messages if they've been held for longer than thi s period.")
120 fs.BoolVar(&a.noBufferLogs, "output-no-buffer", false, 102 fs.BoolVar(&a.noBufferLogs, "output-no-buffer", false,
121 "If true, dispatch logs immediately. Setting this flag simplifie s output at the expense "+ 103 "If true, dispatch logs immediately. Setting this flag simplifie s output at the expense "+
122 "of wire-format efficiency.") 104 "of wire-format efficiency.")
123 } 105 }
124 106
125 func (a *application) authenticator(ctx context.Context) (*auth.Authenticator, e rror) { 107 func (a *application) authenticator(ctx context.Context) (*auth.Authenticator, e rror) {
126 opts, err := a.authFlags.Options() 108 opts, err := a.authFlags.Options()
127 if err != nil { 109 if err != nil {
128 return nil, err 110 return nil, err
129 } 111 }
130 return auth.NewAuthenticator(ctx, auth.SilentLogin, opts), nil 112 return auth.NewAuthenticator(ctx, auth.SilentLogin, opts), nil
131 } 113 }
132 114
133 func (a *application) authenticatedClient(ctx context.Context) (*http.Client, er ror) {
134 if a.client == nil {
135 authenticator, err := a.authenticator(ctx)
136 if err != nil {
137 return nil, err
138 }
139
140 client, err := authenticator.Client()
141 if err != nil {
142 return nil, err
143 }
144 a.client = client
145 }
146 return a.client, nil
147 }
148
149 func (a *application) tokenSource(ctx context.Context) (oauth2.TokenSource, erro r) {
150 authenticator, err := a.authenticator(ctx)
151 if err != nil {
152 return nil, err
153 }
154 return authenticator.TokenSource(), nil
155 }
156
157 func (a *application) configOutput() (output.Output, error) { 115 func (a *application) configOutput() (output.Output, error) {
158 factory := a.outputConfig.getFactory() 116 factory := a.outputConfig.getFactory()
159 if factory == nil { 117 if factory == nil {
160 return nil, errors.New("main: No output is configured") 118 return nil, errors.New("main: No output is configured")
161 } 119 }
162 120
163 output, err := factory.configOutput(a) 121 output, err := factory.configOutput(a)
164 if err != nil { 122 if err != nil {
165 return nil, err 123 return nil, err
166 } 124 }
167 125
168 return output, nil 126 return output, nil
169 } 127 }
170 128
171 // An execution harness that adds application-level management to a Butler run. 129 // runWithButler is an execution harness that adds application-level management
172 func (a *application) Main(runFunc func(b *butler.Butler) error) error { 130 // to a Butler run.
131 func (a *application) runWithButler(out output.Output, runFunc func(b *butler.Bu tler) error) error {
173 // Enable CPU profiling if specified 132 // Enable CPU profiling if specified
174 if a.cpuProfile != "" { 133 if a.cpuProfile != "" {
175 f, err := os.Create(a.cpuProfile) 134 f, err := os.Create(a.cpuProfile)
176 if err != nil { 135 if err != nil {
177 return fmt.Errorf("failed to create CPU profile output: %v", err) 136 return fmt.Errorf("failed to create CPU profile output: %v", err)
178 } 137 }
179 pprof.StartCPUProfile(f) 138 pprof.StartCPUProfile(f)
180 defer pprof.StopCPUProfile() 139 defer pprof.StopCPUProfile()
181 } 140 }
182 141
183 » // Generate a prefix secret for this Butler session. 142 » // Instantiate our Butler.
184 » var err error 143 » butlerOpts := butler.Config{
185 » if a.butler.Secret, err = types.NewPrefixSecret(); err != nil { 144 » » Project: a.project,
186 » » return fmt.Errorf("failed to generate prefix secret: %s", err) 145 » » Prefix: a.prefix,
146 » » MaxBufferAge: time.Duration(a.maxBufferAge),
147 » » BufferLogs: !a.noBufferLogs,
148 » » Output: out,
149 » » OutputWorkers: a.outputWorkers,
150 » » TeeStdout: os.Stdout,
151 » » TeeStderr: os.Stderr,
187 } 152 }
188 153 » b, err := butler.New(a, butlerOpts)
189 » // Instantiate our Butler.
190 » a.butler.MaxBufferAge = time.Duration(a.maxBufferAge)
191 » a.butler.BufferLogs = !a.noBufferLogs
192 » a.butler.Output = a.output
193 » a.butler.TeeStdout = os.Stdout
194 » a.butler.TeeStderr = os.Stderr
195 » if err := a.butler.Validate(); err != nil {
196 » » return err
197 » }
198
199 » b, err := butler.New(a, a.butler)
200 if err != nil { 154 if err != nil {
201 return err 155 return err
202 } 156 }
203 157
204 // Log the Butler's emitted streams. 158 // Log the Butler's emitted streams.
205 defer func() { 159 defer func() {
206 » » if r := a.output.Record(); r != nil { 160 » » if r := out.Record(); r != nil {
207 // Log detail stream record. 161 // Log detail stream record.
208 streams := make([]string, 0, len(r.Streams)) 162 streams := make([]string, 0, len(r.Streams))
209 for k := range r.Streams { 163 for k := range r.Streams {
210 streams = append(streams, string(k)) 164 streams = append(streams, string(k))
211 } 165 }
212 sort.Strings(streams) 166 sort.Strings(streams)
213 167
214 for i, stream := range streams { 168 for i, stream := range streams {
215 rec := r.Streams[types.StreamPath(stream)] 169 rec := r.Streams[types.StreamPath(stream)]
216 170
217 ranges := make([]string, len(rec.Ranges)) 171 ranges := make([]string, len(rec.Ranges))
218 for i, rng := range rec.Ranges { 172 for i, rng := range rec.Ranges {
219 ranges[i] = rng.String() 173 ranges[i] = rng.String()
220 } 174 }
221 log.Infof(a, "%d) Stream [%s]: %s", i, stream, s trings.Join(ranges, " ")) 175 log.Infof(a, "%d) Stream [%s]: %s", i, stream, s trings.Join(ranges, " "))
222 } 176 }
223 } else { 177 } else {
224 // No record; display stream overview. 178 // No record; display stream overview.
225 s := b.Streams() 179 s := b.Streams()
226 paths := make([]types.StreamPath, len(s)) 180 paths := make([]types.StreamPath, len(s))
227 for i, sn := range s { 181 for i, sn := range s {
228 » » » » paths[i] = a.butler.Prefix.Join(sn) 182 » » » » paths[i] = a.prefix.Join(sn)
229 } 183 }
230 log.Fields{ 184 log.Fields{
231 "count": len(paths), 185 "count": len(paths),
232 "streams": paths, 186 "streams": paths,
233 }.Infof(a, "Butler emitted %d stream(s).", len(paths)) 187 }.Infof(a, "Butler emitted %d stream(s).", len(paths))
234 } 188 }
235 }() 189 }()
236 190
237 // Execute our Butler run function with the instantiated Butler. 191 // Execute our Butler run function with the instantiated Butler.
238 if err := runFunc(b); err != nil { 192 if err := runFunc(b); err != nil {
239 log.Fields{ 193 log.Fields{
240 log.ErrorKey: err, 194 log.ErrorKey: err,
241 }.Errorf(a, "Butler terminated with error.") 195 }.Errorf(a, "Butler terminated with error.")
242 a.cancelFunc() 196 a.cancelFunc()
243 } 197 }
244 198
245 return b.Wait() 199 return b.Wait()
246 } 200 }
247 201
248 func mainImpl(ctx context.Context, argv []string) int { 202 func mainImpl(ctx context.Context, argv []string) int {
249 authOptions := auth.Options{ 203 authOptions := auth.Options{
250 » » Scopes: buildScopes( 204 » » Scopes: allOutputScopes(),
251 » » » []string{auth.OAuthScopeEmail},
252 » » » pubsub.PublisherScopes,
253 » » ),
254 } 205 }
255 206
256 a := &application{ 207 a := &application{
257 Context: ctx, 208 Context: ctx,
258 Application: cli.Application{ 209 Application: cli.Application{
259 Name: "butler", 210 Name: "butler",
260 Title: "Log collection and streaming bootstrap.", 211 Title: "Log collection and streaming bootstrap.",
261 Context: func(context.Context) context.Context { return ctx }, 212 Context: func(context.Context) context.Context { return ctx },
262 Commands: []*subcommands.Command{ 213 Commands: []*subcommands.Command{
263 subcommands.CmdHelp, 214 subcommands.CmdHelp,
(...skipping 18 matching lines...) Expand all
282 233
283 // Parse the top-level flag set. 234 // Parse the top-level flag set.
284 if err := flags.Parse(argv); err != nil { 235 if err := flags.Parse(argv); err != nil {
285 log.WithError(err).Errorf(a, "Failed to parse command-line.") 236 log.WithError(err).Errorf(a, "Failed to parse command-line.")
286 return configErrorReturnCode 237 return configErrorReturnCode
287 } 238 }
288 239
289 a.Context = logConfig.Set(a.Context) 240 a.Context = logConfig.Set(a.Context)
290 241
291 // TODO(dnj): Force all invocations to supply a Project. 242 // TODO(dnj): Force all invocations to supply a Project.
292 » if a.butler.Project != "" { 243 » if a.project != "" {
293 » » if err := a.butler.Project.Validate(); err != nil { 244 » » if err := a.project.Validate(); err != nil {
294 log.WithError(err).Errorf(a, "Invalid project (-project) .") 245 log.WithError(err).Errorf(a, "Invalid project (-project) .")
295 return configErrorReturnCode 246 return configErrorReturnCode
296 } 247 }
297 } 248 }
298 249
299 // Validate our Prefix; generate a user prefix if one was not supplied. 250 // Validate our Prefix; generate a user prefix if one was not supplied.
300 » prefix := a.butler.Prefix 251 » prefix := a.prefix
301 if prefix == "" { 252 if prefix == "" {
302 // Auto-generate a prefix. 253 // Auto-generate a prefix.
303 prefix, err := generateRandomUserPrefix(a) 254 prefix, err := generateRandomUserPrefix(a)
304 if err != nil { 255 if err != nil {
305 log.WithError(err).Errorf(a, "Failed to generate user pr efix.") 256 log.WithError(err).Errorf(a, "Failed to generate user pr efix.")
306 return configErrorReturnCode 257 return configErrorReturnCode
307 } 258 }
308 » » a.butler.Prefix = prefix 259 » » a.prefix = prefix
309 } 260 }
310 261
311 // Signal handler to catch 'Control-C'. This will gracefully shutdown th e 262 // Signal handler to catch 'Control-C'. This will gracefully shutdown th e
312 // butler the first time a signal is received. It will die abruptly if t he 263 // butler the first time a signal is received. It will die abruptly if t he
313 // signal continues to be received. 264 // signal continues to be received.
314 a.ncCtx = a.Context 265 a.ncCtx = a.Context
315 a.Context, a.cancelFunc = context.WithCancel(a.Context) 266 a.Context, a.cancelFunc = context.WithCancel(a.Context)
316 signalC := make(chan os.Signal, 1) 267 signalC := make(chan os.Signal, 1)
317 signal.Notify(signalC, os.Interrupt) 268 signal.Notify(signalC, os.Interrupt)
318 go func() { 269 go func() {
(...skipping 10 matching lines...) Expand all
329 os.Exit(1) 280 os.Exit(1)
330 } 281 }
331 } 282 }
332 }() 283 }()
333 defer func() { 284 defer func() {
334 signal.Stop(signalC) 285 signal.Stop(signalC)
335 close(signalC) 286 close(signalC)
336 }() 287 }()
337 288
338 log.Fields{ 289 log.Fields{
339 » » "prefix": a.butler.Prefix, 290 » » "prefix": a.prefix,
340 }.Infof(a, "Using session prefix.") 291 }.Infof(a, "Using session prefix.")
341 » if err := a.butler.Prefix.Validate(); err != nil { 292 » if err := a.prefix.Validate(); err != nil {
342 log.WithError(err).Errorf(a, "Invalid session prefix.") 293 log.WithError(err).Errorf(a, "Invalid session prefix.")
343 return configErrorReturnCode 294 return configErrorReturnCode
344 } 295 }
345 296
346 // Configure our Butler Output.
347 var err error
348 a.output, err = a.configOutput()
349 if err != nil {
350 log.WithError(err).Errorf(a, "Failed to create output instance." )
351 return runtimeErrorReturnCode
352 }
353 defer a.output.Close()
354
355 // Run our subcommand (and parse subcommand flags). 297 // Run our subcommand (and parse subcommand flags).
356 return subcommands.Run(a, flags.Args()) 298 return subcommands.Run(a, flags.Args())
357 } 299 }
358 300
359 // Main execution function. This immediately jumps to 'mainImpl' and uses its 301 // Main execution function. This immediately jumps to 'mainImpl' and uses its
360 // result as an exit code. 302 // result as an exit code.
361 func main() { 303 func main() {
362 ctx := context.Background() 304 ctx := context.Background()
363 ctx = gologger.StdConfig.Use(ctx) 305 ctx = gologger.StdConfig.Use(ctx)
364 306
365 // Exit with the specified return code. 307 // Exit with the specified return code.
366 rc := 0 308 rc := 0
367 defer func() { 309 defer func() {
368 log.Infof(log.SetField(ctx, "returnCode", rc), "Terminating.") 310 log.Infof(log.SetField(ctx, "returnCode", rc), "Terminating.")
369 os.Exit(rc) 311 os.Exit(rc)
370 }() 312 }()
371 313
372 paniccatcher.Do(func() { 314 paniccatcher.Do(func() {
373 rc = mainImpl(ctx, os.Args[1:]) 315 rc = mainImpl(ctx, os.Args[1:])
374 }, func(p *paniccatcher.Panic) { 316 }, func(p *paniccatcher.Panic) {
375 log.Fields{ 317 log.Fields{
376 "panic.error": p.Reason, 318 "panic.error": p.Reason,
377 }.Errorf(ctx, "Panic caught in main:\n%s", p.Stack) 319 }.Errorf(ctx, "Panic caught in main:\n%s", p.Stack)
378 rc = runtimeErrorReturnCode 320 rc = runtimeErrorReturnCode
379 }) 321 })
380 } 322 }
OLDNEW
« no previous file with comments | « no previous file | client/cmd/logdog_butler/output.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698