Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(365)

Side by Side Diff: common/meter/meter.go

Issue 1679023005: Add Context cancellation to clock. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Much more invasive, cancel by default, remove meter package. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
dnj (Google) 2016/02/10 03:19:03 Lots of LOC removed here. This was originally shar
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package meter
6
7 import (
8 "errors"
9
10 "github.com/luci/luci-go/common/clock"
11 log "github.com/luci/luci-go/common/logging"
12 "golang.org/x/net/context"
13 )
14
15 var (
16 // ErrFull is an error returned by Add if the input queue is full.
17 ErrFull = errors.New("meter: input queue is full")
18 )
19
20 // The default size of the work buffer.
21 const initialWorkBufferSize = 128
22
23 // An Meter accepts units of Work and clusters them based on a clustering
24 // configuration.
25 //
26 // The meter can cluster work by:
27 // - Count: When the pool of work reaches a specified amount.
28 // - Time: After a specified amount of time.
29 //
30 // The meter can also be triggered to purge externally via Flush.
31 //
32 // A Meter may be configured to discard work when it is being consumed too
33 // slowly. By default, the Meter will block input Work until output Work is
34 // consumed.
35 type Meter interface {
36 // Add adds new Work to the Meter. If the Meter's ingest channel is full ,
37 // Add will return with ErrFull.
38 Add(w interface{}) error
39
40 // AddWait adds new Work to the Meter, blocking until the Work has been
41 // enqueued.
42 AddWait(w interface{})
43
44 // Flush externally instructs the meter to dump any buffered work.
45 Flush()
46
47 // Stop shuts down the Meter, stopping further input processing and
48 // blocking until the current enqueued work has finished.
49 Stop()
50 }
51
52 // meter is a Meter implementation. An instance can be created via New().
53 type meter struct {
54 *Config
55
56 ctx context.Context
57 workC chan interface{} // Channel to forward work into.
58 flushNowC chan bool // Structure to trigger a work dump.
59 closeAckC chan struct{} // Closed when consumeWork() finishes.
60 }
61
62 var _ Meter = (*meter)(nil)
63
64 // New instantiates and starts a new Meter.
65 func New(ctx context.Context, config Config) Meter {
66 m := newImpl(ctx, &config)
67
68 // This will run until "workC" is closed. When it finishes, it will clos e
69 // "closeAckC".
70 go m.consumeWork()
71
72 return m
73 }
74
75 func newImpl(ctx context.Context, config *Config) *meter {
76 if config.Callback == nil {
77 panic("A callback must be provided.")
78 }
79
80 return &meter{
81 Config: config,
82
83 ctx: ctx,
84 workC: make(chan interface{}, config.getAddBufferSize()),
85 flushNowC: make(chan bool, 1),
86 closeAckC: make(chan struct{}),
87 }
88 }
89
90 func (m *meter) Add(w interface{}) error {
91 select {
92 case m.workC <- w:
93 return nil
94
95 default:
96 return ErrFull
97 }
98 }
99
100 func (m *meter) AddWait(w interface{}) {
101 m.workC <- w
102 }
103
104 func (m *meter) Stop() {
105 close(m.workC)
106 <-m.closeAckC
107 }
108
109 func (m *meter) Flush() {
110 select {
111 case m.flushNowC <- true:
112 break
113 default:
114 break
115 }
116 }
117
118 // Main buffering function, which runs in a goroutine.
119 func (m *meter) consumeWork() {
120 // Acknowledge when this goroutine finishes.
121 defer close(m.closeAckC)
122
123 timerRunning := false
124 flushTimer := clock.NewTimer(m.ctx)
125 defer func() {
126 flushTimer.Stop()
127 }()
128
129 flushCount := m.Count
130 flushTime := m.Delay
131
132 // Build our work buffer.
133 workBufferSize := initialWorkBufferSize
134 if flushCount > 0 {
135 // Will never buffer more than this much Work.
136 workBufferSize = flushCount
137 }
138 bufferedWork := make([]interface{}, 0, workBufferSize)
139
140 log.Debugf(m.ctx, "Starting work loop.")
141 active := true
142 for active {
143 flushRound := false
144
145 select {
146 case work, ok := <-m.workC:
147 if !ok {
148 log.Debugf(m.ctx, "Received instance close signa l; terminating.")
149
150 // Don't immediately exit the loop, since there may still be buffered
151 // Work to flush.
152 active = false
153 flushRound = true
154 break
155 }
156
157 // Count the work in this group. If we're not using a gi ven metric, try
158 // and avoid wasting time collecting it.
159 bufferedWork = append(bufferedWork, work)
160
161 // Handle work count threshold. We do this first, since it's trivial to
162 // setup/compute.
163 if flushCount > 0 && len(bufferedWork) >= flushCount {
164 flushRound = true
165 }
166 // Start our flush timer, if it's not already ticking. O nly waste time
167 // doing this if we're not already flushing, since flush ing will clear the
168 // timer.
169 if !flushRound && flushTime > 0 && !timerRunning {
170 log.Infof(log.SetFields(m.ctx, log.Fields{
171 "flushInterval": flushTime,
172 }), "Starting flush timer.")
173 flushTimer.Reset(flushTime)
174 timerRunning = true
175 }
176
177 // Invoke work callback, if one is set.
178 if m.IngestCallback != nil {
179 flushRound = m.IngestCallback(work) || flushRoun d
180 }
181
182 case <-m.flushNowC:
183 flushRound = true
184
185 case <-flushTimer.GetC():
186 log.Infof(m.ctx, "Flush timer has triggered.")
187 timerRunning = false
188 flushRound = true
189 }
190
191 // Should we flush?
192 if flushRound {
193 flushTimer.Stop()
194 timerRunning = false
195
196 if len(bufferedWork) > 0 {
197 // Clone bufferedWork, since we re-use it.
198 workToSend := make([]interface{}, len(bufferedWo rk))
199 copy(workToSend, bufferedWork)
200
201 // Clear our Work slice for re-use. This does no t resize the underlying
202 // array, since it's likely to fill again.
203 for idx := range bufferedWork {
204 bufferedWork[idx] = nil
205 }
206 bufferedWork = bufferedWork[:0]
207
208 // Callback with this work.
209 m.Callback(workToSend)
210 }
211 }
212 }
213 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698