| Index: common/meter/meter.go
|
| diff --git a/common/meter/meter.go b/common/meter/meter.go
|
| deleted file mode 100644
|
| index cc0da8a676e69dcff4c4297a4ab05838b1175d71..0000000000000000000000000000000000000000
|
| --- a/common/meter/meter.go
|
| +++ /dev/null
|
| @@ -1,213 +0,0 @@
|
| -// Copyright 2015 The Chromium Authors. All rights reserved.
|
| -// Use of this source code is governed by a BSD-style license that can be
|
| -// found in the LICENSE file.
|
| -
|
| -package meter
|
| -
|
| -import (
|
| - "errors"
|
| -
|
| - "github.com/luci/luci-go/common/clock"
|
| - log "github.com/luci/luci-go/common/logging"
|
| - "golang.org/x/net/context"
|
| -)
|
| -
|
| -var (
|
| - // ErrFull is an error returned by Add if the input queue is full.
|
| - ErrFull = errors.New("meter: input queue is full")
|
| -)
|
| -
|
| -// The default size of the work buffer.
|
| -const initialWorkBufferSize = 128
|
| -
|
| -// An Meter accepts units of Work and clusters them based on a clustering
|
| -// configuration.
|
| -//
|
| -// The meter can cluster work by:
|
| -// - Count: When the pool of work reaches a specified amount.
|
| -// - Time: After a specified amount of time.
|
| -//
|
| -// The meter can also be triggered to purge externally via Flush.
|
| -//
|
| -// A Meter may be configured to discard work when it is being consumed too
|
| -// slowly. By default, the Meter will block input Work until output Work is
|
| -// consumed.
|
| -type Meter interface {
|
| - // Add adds new Work to the Meter. If the Meter's ingest channel is full,
|
| - // Add will return with ErrFull.
|
| - Add(w interface{}) error
|
| -
|
| - // AddWait adds new Work to the Meter, blocking until the Work has been
|
| - // enqueued.
|
| - AddWait(w interface{})
|
| -
|
| - // Flush externally instructs the meter to dump any buffered work.
|
| - Flush()
|
| -
|
| - // Stop shuts down the Meter, stopping further input processing and
|
| - // blocking until the current enqueued work has finished.
|
| - Stop()
|
| -}
|
| -
|
| -// meter is a Meter implementation. An instance can be created via New().
|
| -type meter struct {
|
| - *Config
|
| -
|
| - ctx context.Context
|
| - workC chan interface{} // Channel to forward work into.
|
| - flushNowC chan bool // Structure to trigger a work dump.
|
| - closeAckC chan struct{} // Closed when consumeWork() finishes.
|
| -}
|
| -
|
| -var _ Meter = (*meter)(nil)
|
| -
|
| -// New instantiates and starts a new Meter.
|
| -func New(ctx context.Context, config Config) Meter {
|
| - m := newImpl(ctx, &config)
|
| -
|
| - // This will run until "workC" is closed. When it finishes, it will close
|
| - // "closeAckC".
|
| - go m.consumeWork()
|
| -
|
| - return m
|
| -}
|
| -
|
| -func newImpl(ctx context.Context, config *Config) *meter {
|
| - if config.Callback == nil {
|
| - panic("A callback must be provided.")
|
| - }
|
| -
|
| - return &meter{
|
| - Config: config,
|
| -
|
| - ctx: ctx,
|
| - workC: make(chan interface{}, config.getAddBufferSize()),
|
| - flushNowC: make(chan bool, 1),
|
| - closeAckC: make(chan struct{}),
|
| - }
|
| -}
|
| -
|
| -func (m *meter) Add(w interface{}) error {
|
| - select {
|
| - case m.workC <- w:
|
| - return nil
|
| -
|
| - default:
|
| - return ErrFull
|
| - }
|
| -}
|
| -
|
| -func (m *meter) AddWait(w interface{}) {
|
| - m.workC <- w
|
| -}
|
| -
|
| -func (m *meter) Stop() {
|
| - close(m.workC)
|
| - <-m.closeAckC
|
| -}
|
| -
|
| -func (m *meter) Flush() {
|
| - select {
|
| - case m.flushNowC <- true:
|
| - break
|
| - default:
|
| - break
|
| - }
|
| -}
|
| -
|
| -// Main buffering function, which runs in a goroutine.
|
| -func (m *meter) consumeWork() {
|
| - // Acknowledge when this goroutine finishes.
|
| - defer close(m.closeAckC)
|
| -
|
| - timerRunning := false
|
| - flushTimer := clock.NewTimer(m.ctx)
|
| - defer func() {
|
| - flushTimer.Stop()
|
| - }()
|
| -
|
| - flushCount := m.Count
|
| - flushTime := m.Delay
|
| -
|
| - // Build our work buffer.
|
| - workBufferSize := initialWorkBufferSize
|
| - if flushCount > 0 {
|
| - // Will never buffer more than this much Work.
|
| - workBufferSize = flushCount
|
| - }
|
| - bufferedWork := make([]interface{}, 0, workBufferSize)
|
| -
|
| - log.Debugf(m.ctx, "Starting work loop.")
|
| - active := true
|
| - for active {
|
| - flushRound := false
|
| -
|
| - select {
|
| - case work, ok := <-m.workC:
|
| - if !ok {
|
| - log.Debugf(m.ctx, "Received instance close signal; terminating.")
|
| -
|
| - // Don't immediately exit the loop, since there may still be buffered
|
| - // Work to flush.
|
| - active = false
|
| - flushRound = true
|
| - break
|
| - }
|
| -
|
| - // Count the work in this group. If we're not using a given metric, try
|
| - // and avoid wasting time collecting it.
|
| - bufferedWork = append(bufferedWork, work)
|
| -
|
| - // Handle work count threshold. We do this first, since it's trivial to
|
| - // setup/compute.
|
| - if flushCount > 0 && len(bufferedWork) >= flushCount {
|
| - flushRound = true
|
| - }
|
| - // Start our flush timer, if it's not already ticking. Only waste time
|
| - // doing this if we're not already flushing, since flushing will clear the
|
| - // timer.
|
| - if !flushRound && flushTime > 0 && !timerRunning {
|
| - log.Infof(log.SetFields(m.ctx, log.Fields{
|
| - "flushInterval": flushTime,
|
| - }), "Starting flush timer.")
|
| - flushTimer.Reset(flushTime)
|
| - timerRunning = true
|
| - }
|
| -
|
| - // Invoke work callback, if one is set.
|
| - if m.IngestCallback != nil {
|
| - flushRound = m.IngestCallback(work) || flushRound
|
| - }
|
| -
|
| - case <-m.flushNowC:
|
| - flushRound = true
|
| -
|
| - case <-flushTimer.GetC():
|
| - log.Infof(m.ctx, "Flush timer has triggered.")
|
| - timerRunning = false
|
| - flushRound = true
|
| - }
|
| -
|
| - // Should we flush?
|
| - if flushRound {
|
| - flushTimer.Stop()
|
| - timerRunning = false
|
| -
|
| - if len(bufferedWork) > 0 {
|
| - // Clone bufferedWork, since we re-use it.
|
| - workToSend := make([]interface{}, len(bufferedWork))
|
| - copy(workToSend, bufferedWork)
|
| -
|
| - // Clear our Work slice for re-use. This does not resize the underlying
|
| - // array, since it's likely to fill again.
|
| - for idx := range bufferedWork {
|
| - bufferedWork[idx] = nil
|
| - }
|
| - bufferedWork = bufferedWork[:0]
|
| -
|
| - // Callback with this work.
|
| - m.Callback(workToSend)
|
| - }
|
| - }
|
| - }
|
| -}
|
|
|