OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package butler | 5 package butler |
6 | 6 |
7 import ( | 7 import ( |
8 "errors" | 8 "errors" |
9 "fmt" | 9 "fmt" |
10 "io" | 10 "io" |
11 "sort" | 11 "sort" |
12 "sync" | 12 "sync" |
13 "time" | 13 "time" |
14 | 14 |
15 "github.com/luci/luci-go/client/internal/logdog/butler/bundler" | 15 "github.com/luci/luci-go/client/internal/logdog/butler/bundler" |
16 "github.com/luci/luci-go/client/internal/logdog/butler/output" | 16 "github.com/luci/luci-go/client/internal/logdog/butler/output" |
17 "github.com/luci/luci-go/client/internal/logdog/butler/streamserver" | 17 "github.com/luci/luci-go/client/internal/logdog/butler/streamserver" |
18 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" | 18 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" |
19 "github.com/luci/luci-go/common/clock" | 19 "github.com/luci/luci-go/common/clock" |
| 20 "github.com/luci/luci-go/common/config" |
20 "github.com/luci/luci-go/common/iotools" | 21 "github.com/luci/luci-go/common/iotools" |
21 "github.com/luci/luci-go/common/logdog/types" | 22 "github.com/luci/luci-go/common/logdog/types" |
22 log "github.com/luci/luci-go/common/logging" | 23 log "github.com/luci/luci-go/common/logging" |
23 "github.com/luci/luci-go/common/paniccatcher" | 24 "github.com/luci/luci-go/common/paniccatcher" |
24 "github.com/luci/luci-go/common/parallel" | 25 "github.com/luci/luci-go/common/parallel" |
25 "github.com/luci/luci-go/common/proto/google" | 26 "github.com/luci/luci-go/common/proto/google" |
26 "github.com/luci/luci-go/common/stringset" | 27 "github.com/luci/luci-go/common/stringset" |
27 "golang.org/x/net/context" | 28 "golang.org/x/net/context" |
28 ) | 29 ) |
29 | 30 |
30 const ( | 31 const ( |
31 // DefaultMaxBufferAge is the default amount of time that a log entry ma
y | 32 // DefaultMaxBufferAge is the default amount of time that a log entry ma
y |
32 // be buffered before being dispatched. | 33 // be buffered before being dispatched. |
33 DefaultMaxBufferAge = time.Duration(5 * time.Second) | 34 DefaultMaxBufferAge = time.Duration(5 * time.Second) |
34 | 35 |
35 // DefaultOutputWorkers is the default number of output workers to use. | 36 // DefaultOutputWorkers is the default number of output workers to use. |
36 DefaultOutputWorkers = 16 | 37 DefaultOutputWorkers = 16 |
37 | 38 |
38 // streamBufferSize is the maximum amount of stream data to buffer in me
mory. | 39 // streamBufferSize is the maximum amount of stream data to buffer in me
mory. |
39 streamBufferSize = 1024 * 1024 * 5 | 40 streamBufferSize = 1024 * 1024 * 5 |
40 ) | 41 ) |
41 | 42 |
42 // Config is the set of Butler configuration parameters. | 43 // Config is the set of Butler configuration parameters. |
43 type Config struct { | 44 type Config struct { |
44 // Output is the output instance to use for log dispatch. | 45 // Output is the output instance to use for log dispatch. |
45 Output output.Output | 46 Output output.Output |
46 // OutputWorkers is the number of simultaneous goroutines that will be u
sed | 47 // OutputWorkers is the number of simultaneous goroutines that will be u
sed |
47 // to output Butler log data. If zero, DefaultOutputWorkers will be used
. | 48 // to output Butler log data. If zero, DefaultOutputWorkers will be used
. |
48 OutputWorkers int | 49 OutputWorkers int |
| 50 |
| 51 // Project is the project that the log stream will be bound to. |
| 52 Project config.ProjectName |
49 // Prefix is the log stream common prefix value. | 53 // Prefix is the log stream common prefix value. |
50 Prefix types.StreamName | 54 Prefix types.StreamName |
| 55 // Secret is the prefix secret that will be used for streams generated b
y this |
| 56 // Butler. |
| 57 Secret types.PrefixSecret |
51 | 58 |
52 // BufferLogs, if true, instructs the butler to buffer collected log dat
a | 59 // BufferLogs, if true, instructs the butler to buffer collected log dat
a |
53 // before sending it to Output. | 60 // before sending it to Output. |
54 BufferLogs bool | 61 BufferLogs bool |
55 // If buffering logs, this is the maximum amount of time that a log will | 62 // If buffering logs, this is the maximum amount of time that a log will |
56 // be buffered before being marked for dispatch. If this is zero, | 63 // be buffered before being marked for dispatch. If this is zero, |
57 // DefaultMaxBufferAge will be used. | 64 // DefaultMaxBufferAge will be used. |
58 MaxBufferAge time.Duration | 65 MaxBufferAge time.Duration |
59 | 66 |
60 // TeeStdout, if not nil, is the Writer that will be used for streams | 67 // TeeStdout, if not nil, is the Writer that will be used for streams |
61 // requesting STDOUT tee. | 68 // requesting STDOUT tee. |
62 TeeStdout io.Writer | 69 TeeStdout io.Writer |
63 // TeeStderr, if not nil, is the Writer that will be used for streams | 70 // TeeStderr, if not nil, is the Writer that will be used for streams |
64 // requesting STDERR tee. | 71 // requesting STDERR tee. |
65 TeeStderr io.Writer | 72 TeeStderr io.Writer |
66 } | 73 } |
67 | 74 |
68 // Validate validates that the configuration is sufficient to instantiate a | 75 // Validate validates that the configuration is sufficient to instantiate a |
69 // Butler instance. | 76 // Butler instance. |
70 func (c *Config) Validate() error { | 77 func (c *Config) Validate() error { |
71 if c.Output == nil { | 78 if c.Output == nil { |
72 return errors.New("butler: an Output must be supplied") | 79 return errors.New("butler: an Output must be supplied") |
73 } | 80 } |
| 81 // TODO(dnj): Empty project should not validate here once projects are |
| 82 // mandatory. |
| 83 if c.Project != "" { |
| 84 if err := c.Project.Validate(); err != nil { |
| 85 return fmt.Errorf("invalid project: %v", err) |
| 86 } |
| 87 } |
74 if err := c.Prefix.Validate(); err != nil { | 88 if err := c.Prefix.Validate(); err != nil { |
75 » » return errors.New("butler: invalid Prefix") | 89 » » return fmt.Errorf("invalid prefix: %v", err) |
| 90 » } |
| 91 » if err := c.Secret.Validate(); err != nil { |
| 92 » » return fmt.Errorf("invalid secret: %v", err) |
76 } | 93 } |
77 return nil | 94 return nil |
78 } | 95 } |
79 | 96 |
80 // Butler is the Butler application structure. The Butler runs until closed. | 97 // Butler is the Butler application structure. The Butler runs until closed. |
81 // During operation, it acts as a service manager and data router, routing: | 98 // During operation, it acts as a service manager and data router, routing: |
82 // - Messages from Streams to the attached Output. | 99 // - Messages from Streams to the attached Output. |
83 // - Streams from a StreamServer to the Stream list (AddStream). | 100 // - Streams from a StreamServer to the Stream list (AddStream). |
84 type Butler struct { | 101 type Butler struct { |
85 c *Config | 102 c *Config |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
128 if err := config.Validate(); err != nil { | 145 if err := config.Validate(); err != nil { |
129 return nil, err | 146 return nil, err |
130 } | 147 } |
131 | 148 |
132 if config.OutputWorkers <= 0 { | 149 if config.OutputWorkers <= 0 { |
133 config.OutputWorkers = DefaultOutputWorkers | 150 config.OutputWorkers = DefaultOutputWorkers |
134 } | 151 } |
135 | 152 |
136 bc := bundler.Config{ | 153 bc := bundler.Config{ |
137 Clock: clock.Get(ctx), | 154 Clock: clock.Get(ctx), |
| 155 Project: config.Project, |
| 156 Prefix: config.Prefix, |
| 157 Secret: config.Secret, |
138 MaxBufferedBytes: streamBufferSize, | 158 MaxBufferedBytes: streamBufferSize, |
139 MaxBundleSize: config.Output.MaxSize(), | 159 MaxBundleSize: config.Output.MaxSize(), |
140 } | 160 } |
141 if config.BufferLogs { | 161 if config.BufferLogs { |
142 bc.MaxBufferDelay = config.MaxBufferAge | 162 bc.MaxBufferDelay = config.MaxBufferAge |
143 } | 163 } |
144 lb := bundler.New(bc) | 164 lb := bundler.New(bc) |
145 | 165 |
146 b := &Butler{ | 166 b := &Butler{ |
147 c: &config, | 167 c: &config, |
(...skipping 384 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
532 // shutdown prematurely, so this should be reasonably quick. | 552 // shutdown prematurely, so this should be reasonably quick. |
533 b.Activate() | 553 b.Activate() |
534 } | 554 } |
535 | 555 |
536 // Returns the configured Butler error. | 556 // Returns the configured Butler error. |
537 func (b *Butler) getRunErr() error { | 557 func (b *Butler) getRunErr() error { |
538 b.shutdownMu.Lock() | 558 b.shutdownMu.Lock() |
539 defer b.shutdownMu.Unlock() | 559 defer b.shutdownMu.Unlock() |
540 return b.runErr | 560 return b.runErr |
541 } | 561 } |
OLD | NEW |