Chromium Code Reviews| Index: common/meter/meter.go |
| diff --git a/common/meter/meter.go b/common/meter/meter.go |
| deleted file mode 100644 |
| index cc0da8a676e69dcff4c4297a4ab05838b1175d71..0000000000000000000000000000000000000000 |
| --- a/common/meter/meter.go |
| +++ /dev/null |
| @@ -1,213 +0,0 @@ |
| -// Copyright 2015 The Chromium Authors. All rights reserved. |
|
dnj (Google)
2016/02/10 03:19:03
Lots of LOC removed here. This was originally shar
|
| -// Use of this source code is governed by a BSD-style license that can be |
| -// found in the LICENSE file. |
| - |
| -package meter |
| - |
| -import ( |
| - "errors" |
| - |
| - "github.com/luci/luci-go/common/clock" |
| - log "github.com/luci/luci-go/common/logging" |
| - "golang.org/x/net/context" |
| -) |
| - |
| -var ( |
| - // ErrFull is an error returned by Add if the input queue is full. |
| - ErrFull = errors.New("meter: input queue is full") |
| -) |
| - |
| -// The default size of the work buffer. |
| -const initialWorkBufferSize = 128 |
| - |
| -// An Meter accepts units of Work and clusters them based on a clustering |
| -// configuration. |
| -// |
| -// The meter can cluster work by: |
| -// - Count: When the pool of work reaches a specified amount. |
| -// - Time: After a specified amount of time. |
| -// |
| -// The meter can also be triggered to purge externally via Flush. |
| -// |
| -// A Meter may be configured to discard work when it is being consumed too |
| -// slowly. By default, the Meter will block input Work until output Work is |
| -// consumed. |
| -type Meter interface { |
| - // Add adds new Work to the Meter. If the Meter's ingest channel is full, |
| - // Add will return with ErrFull. |
| - Add(w interface{}) error |
| - |
| - // AddWait adds new Work to the Meter, blocking until the Work has been |
| - // enqueued. |
| - AddWait(w interface{}) |
| - |
| - // Flush externally instructs the meter to dump any buffered work. |
| - Flush() |
| - |
| - // Stop shuts down the Meter, stopping further input processing and |
| - // blocking until the current enqueued work has finished. |
| - Stop() |
| -} |
| - |
| -// meter is a Meter implementation. An instance can be created via New(). |
| -type meter struct { |
| - *Config |
| - |
| - ctx context.Context |
| - workC chan interface{} // Channel to forward work into. |
| - flushNowC chan bool // Structure to trigger a work dump. |
| - closeAckC chan struct{} // Closed when consumeWork() finishes. |
| -} |
| - |
| -var _ Meter = (*meter)(nil) |
| - |
| -// New instantiates and starts a new Meter. |
| -func New(ctx context.Context, config Config) Meter { |
| - m := newImpl(ctx, &config) |
| - |
| - // This will run until "workC" is closed. When it finishes, it will close |
| - // "closeAckC". |
| - go m.consumeWork() |
| - |
| - return m |
| -} |
| - |
| -func newImpl(ctx context.Context, config *Config) *meter { |
| - if config.Callback == nil { |
| - panic("A callback must be provided.") |
| - } |
| - |
| - return &meter{ |
| - Config: config, |
| - |
| - ctx: ctx, |
| - workC: make(chan interface{}, config.getAddBufferSize()), |
| - flushNowC: make(chan bool, 1), |
| - closeAckC: make(chan struct{}), |
| - } |
| -} |
| - |
| -func (m *meter) Add(w interface{}) error { |
| - select { |
| - case m.workC <- w: |
| - return nil |
| - |
| - default: |
| - return ErrFull |
| - } |
| -} |
| - |
| -func (m *meter) AddWait(w interface{}) { |
| - m.workC <- w |
| -} |
| - |
| -func (m *meter) Stop() { |
| - close(m.workC) |
| - <-m.closeAckC |
| -} |
| - |
| -func (m *meter) Flush() { |
| - select { |
| - case m.flushNowC <- true: |
| - break |
| - default: |
| - break |
| - } |
| -} |
| - |
| -// Main buffering function, which runs in a goroutine. |
| -func (m *meter) consumeWork() { |
| - // Acknowledge when this goroutine finishes. |
| - defer close(m.closeAckC) |
| - |
| - timerRunning := false |
| - flushTimer := clock.NewTimer(m.ctx) |
| - defer func() { |
| - flushTimer.Stop() |
| - }() |
| - |
| - flushCount := m.Count |
| - flushTime := m.Delay |
| - |
| - // Build our work buffer. |
| - workBufferSize := initialWorkBufferSize |
| - if flushCount > 0 { |
| - // Will never buffer more than this much Work. |
| - workBufferSize = flushCount |
| - } |
| - bufferedWork := make([]interface{}, 0, workBufferSize) |
| - |
| - log.Debugf(m.ctx, "Starting work loop.") |
| - active := true |
| - for active { |
| - flushRound := false |
| - |
| - select { |
| - case work, ok := <-m.workC: |
| - if !ok { |
| - log.Debugf(m.ctx, "Received instance close signal; terminating.") |
| - |
| - // Don't immediately exit the loop, since there may still be buffered |
| - // Work to flush. |
| - active = false |
| - flushRound = true |
| - break |
| - } |
| - |
| - // Count the work in this group. If we're not using a given metric, try |
| - // and avoid wasting time collecting it. |
| - bufferedWork = append(bufferedWork, work) |
| - |
| - // Handle work count threshold. We do this first, since it's trivial to |
| - // setup/compute. |
| - if flushCount > 0 && len(bufferedWork) >= flushCount { |
| - flushRound = true |
| - } |
| - // Start our flush timer, if it's not already ticking. Only waste time |
| - // doing this if we're not already flushing, since flushing will clear the |
| - // timer. |
| - if !flushRound && flushTime > 0 && !timerRunning { |
| - log.Infof(log.SetFields(m.ctx, log.Fields{ |
| - "flushInterval": flushTime, |
| - }), "Starting flush timer.") |
| - flushTimer.Reset(flushTime) |
| - timerRunning = true |
| - } |
| - |
| - // Invoke work callback, if one is set. |
| - if m.IngestCallback != nil { |
| - flushRound = m.IngestCallback(work) || flushRound |
| - } |
| - |
| - case <-m.flushNowC: |
| - flushRound = true |
| - |
| - case <-flushTimer.GetC(): |
| - log.Infof(m.ctx, "Flush timer has triggered.") |
| - timerRunning = false |
| - flushRound = true |
| - } |
| - |
| - // Should we flush? |
| - if flushRound { |
| - flushTimer.Stop() |
| - timerRunning = false |
| - |
| - if len(bufferedWork) > 0 { |
| - // Clone bufferedWork, since we re-use it. |
| - workToSend := make([]interface{}, len(bufferedWork)) |
| - copy(workToSend, bufferedWork) |
| - |
| - // Clear our Work slice for re-use. This does not resize the underlying |
| - // array, since it's likely to fill again. |
| - for idx := range bufferedWork { |
| - bufferedWork[idx] = nil |
| - } |
| - bufferedWork = bufferedWork[:0] |
| - |
| - // Callback with this work. |
| - m.Callback(workToSend) |
| - } |
| - } |
| - } |
| -} |