Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
|
dnj (Google)
2016/02/10 03:19:03
Lots of LOC removed here. This was originally shar
| |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package meter | |
| 6 | |
| 7 import ( | |
| 8 "errors" | |
| 9 | |
| 10 "github.com/luci/luci-go/common/clock" | |
| 11 log "github.com/luci/luci-go/common/logging" | |
| 12 "golang.org/x/net/context" | |
| 13 ) | |
| 14 | |
| 15 var ( | |
| 16 // ErrFull is an error returned by Add if the input queue is full. | |
| 17 ErrFull = errors.New("meter: input queue is full") | |
| 18 ) | |
| 19 | |
| 20 // The default size of the work buffer. | |
| 21 const initialWorkBufferSize = 128 | |
| 22 | |
| 23 // An Meter accepts units of Work and clusters them based on a clustering | |
| 24 // configuration. | |
| 25 // | |
| 26 // The meter can cluster work by: | |
| 27 // - Count: When the pool of work reaches a specified amount. | |
| 28 // - Time: After a specified amount of time. | |
| 29 // | |
| 30 // The meter can also be triggered to purge externally via Flush. | |
| 31 // | |
| 32 // A Meter may be configured to discard work when it is being consumed too | |
| 33 // slowly. By default, the Meter will block input Work until output Work is | |
| 34 // consumed. | |
| 35 type Meter interface { | |
| 36 // Add adds new Work to the Meter. If the Meter's ingest channel is full , | |
| 37 // Add will return with ErrFull. | |
| 38 Add(w interface{}) error | |
| 39 | |
| 40 // AddWait adds new Work to the Meter, blocking until the Work has been | |
| 41 // enqueued. | |
| 42 AddWait(w interface{}) | |
| 43 | |
| 44 // Flush externally instructs the meter to dump any buffered work. | |
| 45 Flush() | |
| 46 | |
| 47 // Stop shuts down the Meter, stopping further input processing and | |
| 48 // blocking until the current enqueued work has finished. | |
| 49 Stop() | |
| 50 } | |
| 51 | |
| 52 // meter is a Meter implementation. An instance can be created via New(). | |
| 53 type meter struct { | |
| 54 *Config | |
| 55 | |
| 56 ctx context.Context | |
| 57 workC chan interface{} // Channel to forward work into. | |
| 58 flushNowC chan bool // Structure to trigger a work dump. | |
| 59 closeAckC chan struct{} // Closed when consumeWork() finishes. | |
| 60 } | |
| 61 | |
| 62 var _ Meter = (*meter)(nil) | |
| 63 | |
| 64 // New instantiates and starts a new Meter. | |
| 65 func New(ctx context.Context, config Config) Meter { | |
| 66 m := newImpl(ctx, &config) | |
| 67 | |
| 68 // This will run until "workC" is closed. When it finishes, it will clos e | |
| 69 // "closeAckC". | |
| 70 go m.consumeWork() | |
| 71 | |
| 72 return m | |
| 73 } | |
| 74 | |
| 75 func newImpl(ctx context.Context, config *Config) *meter { | |
| 76 if config.Callback == nil { | |
| 77 panic("A callback must be provided.") | |
| 78 } | |
| 79 | |
| 80 return &meter{ | |
| 81 Config: config, | |
| 82 | |
| 83 ctx: ctx, | |
| 84 workC: make(chan interface{}, config.getAddBufferSize()), | |
| 85 flushNowC: make(chan bool, 1), | |
| 86 closeAckC: make(chan struct{}), | |
| 87 } | |
| 88 } | |
| 89 | |
| 90 func (m *meter) Add(w interface{}) error { | |
| 91 select { | |
| 92 case m.workC <- w: | |
| 93 return nil | |
| 94 | |
| 95 default: | |
| 96 return ErrFull | |
| 97 } | |
| 98 } | |
| 99 | |
| 100 func (m *meter) AddWait(w interface{}) { | |
| 101 m.workC <- w | |
| 102 } | |
| 103 | |
| 104 func (m *meter) Stop() { | |
| 105 close(m.workC) | |
| 106 <-m.closeAckC | |
| 107 } | |
| 108 | |
| 109 func (m *meter) Flush() { | |
| 110 select { | |
| 111 case m.flushNowC <- true: | |
| 112 break | |
| 113 default: | |
| 114 break | |
| 115 } | |
| 116 } | |
| 117 | |
| 118 // Main buffering function, which runs in a goroutine. | |
| 119 func (m *meter) consumeWork() { | |
| 120 // Acknowledge when this goroutine finishes. | |
| 121 defer close(m.closeAckC) | |
| 122 | |
| 123 timerRunning := false | |
| 124 flushTimer := clock.NewTimer(m.ctx) | |
| 125 defer func() { | |
| 126 flushTimer.Stop() | |
| 127 }() | |
| 128 | |
| 129 flushCount := m.Count | |
| 130 flushTime := m.Delay | |
| 131 | |
| 132 // Build our work buffer. | |
| 133 workBufferSize := initialWorkBufferSize | |
| 134 if flushCount > 0 { | |
| 135 // Will never buffer more than this much Work. | |
| 136 workBufferSize = flushCount | |
| 137 } | |
| 138 bufferedWork := make([]interface{}, 0, workBufferSize) | |
| 139 | |
| 140 log.Debugf(m.ctx, "Starting work loop.") | |
| 141 active := true | |
| 142 for active { | |
| 143 flushRound := false | |
| 144 | |
| 145 select { | |
| 146 case work, ok := <-m.workC: | |
| 147 if !ok { | |
| 148 log.Debugf(m.ctx, "Received instance close signa l; terminating.") | |
| 149 | |
| 150 // Don't immediately exit the loop, since there may still be buffered | |
| 151 // Work to flush. | |
| 152 active = false | |
| 153 flushRound = true | |
| 154 break | |
| 155 } | |
| 156 | |
| 157 // Count the work in this group. If we're not using a gi ven metric, try | |
| 158 // and avoid wasting time collecting it. | |
| 159 bufferedWork = append(bufferedWork, work) | |
| 160 | |
| 161 // Handle work count threshold. We do this first, since it's trivial to | |
| 162 // setup/compute. | |
| 163 if flushCount > 0 && len(bufferedWork) >= flushCount { | |
| 164 flushRound = true | |
| 165 } | |
| 166 // Start our flush timer, if it's not already ticking. O nly waste time | |
| 167 // doing this if we're not already flushing, since flush ing will clear the | |
| 168 // timer. | |
| 169 if !flushRound && flushTime > 0 && !timerRunning { | |
| 170 log.Infof(log.SetFields(m.ctx, log.Fields{ | |
| 171 "flushInterval": flushTime, | |
| 172 }), "Starting flush timer.") | |
| 173 flushTimer.Reset(flushTime) | |
| 174 timerRunning = true | |
| 175 } | |
| 176 | |
| 177 // Invoke work callback, if one is set. | |
| 178 if m.IngestCallback != nil { | |
| 179 flushRound = m.IngestCallback(work) || flushRoun d | |
| 180 } | |
| 181 | |
| 182 case <-m.flushNowC: | |
| 183 flushRound = true | |
| 184 | |
| 185 case <-flushTimer.GetC(): | |
| 186 log.Infof(m.ctx, "Flush timer has triggered.") | |
| 187 timerRunning = false | |
| 188 flushRound = true | |
| 189 } | |
| 190 | |
| 191 // Should we flush? | |
| 192 if flushRound { | |
| 193 flushTimer.Stop() | |
| 194 timerRunning = false | |
| 195 | |
| 196 if len(bufferedWork) > 0 { | |
| 197 // Clone bufferedWork, since we re-use it. | |
| 198 workToSend := make([]interface{}, len(bufferedWo rk)) | |
| 199 copy(workToSend, bufferedWork) | |
| 200 | |
| 201 // Clear our Work slice for re-use. This does no t resize the underlying | |
| 202 // array, since it's likely to fill again. | |
| 203 for idx := range bufferedWork { | |
| 204 bufferedWork[idx] = nil | |
| 205 } | |
| 206 bufferedWork = bufferedWork[:0] | |
| 207 | |
| 208 // Callback with this work. | |
| 209 m.Callback(workToSend) | |
| 210 } | |
| 211 } | |
| 212 } | |
| 213 } | |
| OLD | NEW |