OLD | NEW |
| (Empty) |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package meter | |
6 | |
7 import ( | |
8 "errors" | |
9 | |
10 "github.com/luci/luci-go/common/clock" | |
11 log "github.com/luci/luci-go/common/logging" | |
12 "golang.org/x/net/context" | |
13 ) | |
14 | |
15 var ( | |
16 // ErrFull is an error returned by Add if the input queue is full. | |
17 ErrFull = errors.New("meter: input queue is full") | |
18 ) | |
19 | |
20 // The default size of the work buffer. | |
21 const initialWorkBufferSize = 128 | |
22 | |
23 // An Meter accepts units of Work and clusters them based on a clustering | |
24 // configuration. | |
25 // | |
26 // The meter can cluster work by: | |
27 // - Count: When the pool of work reaches a specified amount. | |
28 // - Time: After a specified amount of time. | |
29 // | |
30 // The meter can also be triggered to purge externally via Flush. | |
31 // | |
32 // A Meter may be configured to discard work when it is being consumed too | |
33 // slowly. By default, the Meter will block input Work until output Work is | |
34 // consumed. | |
35 type Meter interface { | |
36 // Add adds new Work to the Meter. If the Meter's ingest channel is full
, | |
37 // Add will return with ErrFull. | |
38 Add(w interface{}) error | |
39 | |
40 // AddWait adds new Work to the Meter, blocking until the Work has been | |
41 // enqueued. | |
42 AddWait(w interface{}) | |
43 | |
44 // Flush externally instructs the meter to dump any buffered work. | |
45 Flush() | |
46 | |
47 // Stop shuts down the Meter, stopping further input processing and | |
48 // blocking until the current enqueued work has finished. | |
49 Stop() | |
50 } | |
51 | |
52 // meter is a Meter implementation. An instance can be created via New(). | |
53 type meter struct { | |
54 *Config | |
55 | |
56 ctx context.Context | |
57 workC chan interface{} // Channel to forward work into. | |
58 flushNowC chan bool // Structure to trigger a work dump. | |
59 closeAckC chan struct{} // Closed when consumeWork() finishes. | |
60 } | |
61 | |
62 var _ Meter = (*meter)(nil) | |
63 | |
64 // New instantiates and starts a new Meter. | |
65 func New(ctx context.Context, config Config) Meter { | |
66 m := newImpl(ctx, &config) | |
67 | |
68 // This will run until "workC" is closed. When it finishes, it will clos
e | |
69 // "closeAckC". | |
70 go m.consumeWork() | |
71 | |
72 return m | |
73 } | |
74 | |
75 func newImpl(ctx context.Context, config *Config) *meter { | |
76 if config.Callback == nil { | |
77 panic("A callback must be provided.") | |
78 } | |
79 | |
80 return &meter{ | |
81 Config: config, | |
82 | |
83 ctx: ctx, | |
84 workC: make(chan interface{}, config.getAddBufferSize()), | |
85 flushNowC: make(chan bool, 1), | |
86 closeAckC: make(chan struct{}), | |
87 } | |
88 } | |
89 | |
90 func (m *meter) Add(w interface{}) error { | |
91 select { | |
92 case m.workC <- w: | |
93 return nil | |
94 | |
95 default: | |
96 return ErrFull | |
97 } | |
98 } | |
99 | |
100 func (m *meter) AddWait(w interface{}) { | |
101 m.workC <- w | |
102 } | |
103 | |
104 func (m *meter) Stop() { | |
105 close(m.workC) | |
106 <-m.closeAckC | |
107 } | |
108 | |
109 func (m *meter) Flush() { | |
110 select { | |
111 case m.flushNowC <- true: | |
112 break | |
113 default: | |
114 break | |
115 } | |
116 } | |
117 | |
118 // Main buffering function, which runs in a goroutine. | |
119 func (m *meter) consumeWork() { | |
120 // Acknowledge when this goroutine finishes. | |
121 defer close(m.closeAckC) | |
122 | |
123 timerRunning := false | |
124 flushTimer := clock.NewTimer(m.ctx) | |
125 defer func() { | |
126 flushTimer.Stop() | |
127 }() | |
128 | |
129 flushCount := m.Count | |
130 flushTime := m.Delay | |
131 | |
132 // Build our work buffer. | |
133 workBufferSize := initialWorkBufferSize | |
134 if flushCount > 0 { | |
135 // Will never buffer more than this much Work. | |
136 workBufferSize = flushCount | |
137 } | |
138 bufferedWork := make([]interface{}, 0, workBufferSize) | |
139 | |
140 log.Debugf(m.ctx, "Starting work loop.") | |
141 active := true | |
142 for active { | |
143 flushRound := false | |
144 | |
145 select { | |
146 case work, ok := <-m.workC: | |
147 if !ok { | |
148 log.Debugf(m.ctx, "Received instance close signa
l; terminating.") | |
149 | |
150 // Don't immediately exit the loop, since there
may still be buffered | |
151 // Work to flush. | |
152 active = false | |
153 flushRound = true | |
154 break | |
155 } | |
156 | |
157 // Count the work in this group. If we're not using a gi
ven metric, try | |
158 // and avoid wasting time collecting it. | |
159 bufferedWork = append(bufferedWork, work) | |
160 | |
161 // Handle work count threshold. We do this first, since
it's trivial to | |
162 // setup/compute. | |
163 if flushCount > 0 && len(bufferedWork) >= flushCount { | |
164 flushRound = true | |
165 } | |
166 // Start our flush timer, if it's not already ticking. O
nly waste time | |
167 // doing this if we're not already flushing, since flush
ing will clear the | |
168 // timer. | |
169 if !flushRound && flushTime > 0 && !timerRunning { | |
170 log.Infof(log.SetFields(m.ctx, log.Fields{ | |
171 "flushInterval": flushTime, | |
172 }), "Starting flush timer.") | |
173 flushTimer.Reset(flushTime) | |
174 timerRunning = true | |
175 } | |
176 | |
177 // Invoke work callback, if one is set. | |
178 if m.IngestCallback != nil { | |
179 flushRound = m.IngestCallback(work) || flushRoun
d | |
180 } | |
181 | |
182 case <-m.flushNowC: | |
183 flushRound = true | |
184 | |
185 case <-flushTimer.GetC(): | |
186 log.Infof(m.ctx, "Flush timer has triggered.") | |
187 timerRunning = false | |
188 flushRound = true | |
189 } | |
190 | |
191 // Should we flush? | |
192 if flushRound { | |
193 flushTimer.Stop() | |
194 timerRunning = false | |
195 | |
196 if len(bufferedWork) > 0 { | |
197 // Clone bufferedWork, since we re-use it. | |
198 workToSend := make([]interface{}, len(bufferedWo
rk)) | |
199 copy(workToSend, bufferedWork) | |
200 | |
201 // Clear our Work slice for re-use. This does no
t resize the underlying | |
202 // array, since it's likely to fill again. | |
203 for idx := range bufferedWork { | |
204 bufferedWork[idx] = nil | |
205 } | |
206 bufferedWork = bufferedWork[:0] | |
207 | |
208 // Callback with this work. | |
209 m.Callback(workToSend) | |
210 } | |
211 } | |
212 } | |
213 } | |
OLD | NEW |