Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(267)

Unified Diff: common/meter/meter.go

Issue 1679023005: Add Context cancellation to clock. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Actually upload the patch. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « common/meter/doc.go ('k') | common/meter/meter_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: common/meter/meter.go
diff --git a/common/meter/meter.go b/common/meter/meter.go
deleted file mode 100644
index cc0da8a676e69dcff4c4297a4ab05838b1175d71..0000000000000000000000000000000000000000
--- a/common/meter/meter.go
+++ /dev/null
@@ -1,213 +0,0 @@
-// Copyright 2015 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-package meter
-
-import (
- "errors"
-
- "github.com/luci/luci-go/common/clock"
- log "github.com/luci/luci-go/common/logging"
- "golang.org/x/net/context"
-)
-
-var (
- // ErrFull is an error returned by Add if the input queue is full.
- ErrFull = errors.New("meter: input queue is full")
-)
-
-// The default size of the work buffer.
-const initialWorkBufferSize = 128
-
-// An Meter accepts units of Work and clusters them based on a clustering
-// configuration.
-//
-// The meter can cluster work by:
-// - Count: When the pool of work reaches a specified amount.
-// - Time: After a specified amount of time.
-//
-// The meter can also be triggered to purge externally via Flush.
-//
-// A Meter may be configured to discard work when it is being consumed too
-// slowly. By default, the Meter will block input Work until output Work is
-// consumed.
-type Meter interface {
- // Add adds new Work to the Meter. If the Meter's ingest channel is full,
- // Add will return with ErrFull.
- Add(w interface{}) error
-
- // AddWait adds new Work to the Meter, blocking until the Work has been
- // enqueued.
- AddWait(w interface{})
-
- // Flush externally instructs the meter to dump any buffered work.
- Flush()
-
- // Stop shuts down the Meter, stopping further input processing and
- // blocking until the current enqueued work has finished.
- Stop()
-}
-
-// meter is a Meter implementation. An instance can be created via New().
-type meter struct {
- *Config
-
- ctx context.Context
- workC chan interface{} // Channel to forward work into.
- flushNowC chan bool // Structure to trigger a work dump.
- closeAckC chan struct{} // Closed when consumeWork() finishes.
-}
-
-var _ Meter = (*meter)(nil)
-
-// New instantiates and starts a new Meter.
-func New(ctx context.Context, config Config) Meter {
- m := newImpl(ctx, &config)
-
- // This will run until "workC" is closed. When it finishes, it will close
- // "closeAckC".
- go m.consumeWork()
-
- return m
-}
-
-func newImpl(ctx context.Context, config *Config) *meter {
- if config.Callback == nil {
- panic("A callback must be provided.")
- }
-
- return &meter{
- Config: config,
-
- ctx: ctx,
- workC: make(chan interface{}, config.getAddBufferSize()),
- flushNowC: make(chan bool, 1),
- closeAckC: make(chan struct{}),
- }
-}
-
-func (m *meter) Add(w interface{}) error {
- select {
- case m.workC <- w:
- return nil
-
- default:
- return ErrFull
- }
-}
-
-func (m *meter) AddWait(w interface{}) {
- m.workC <- w
-}
-
-func (m *meter) Stop() {
- close(m.workC)
- <-m.closeAckC
-}
-
-func (m *meter) Flush() {
- select {
- case m.flushNowC <- true:
- break
- default:
- break
- }
-}
-
-// Main buffering function, which runs in a goroutine.
-func (m *meter) consumeWork() {
- // Acknowledge when this goroutine finishes.
- defer close(m.closeAckC)
-
- timerRunning := false
- flushTimer := clock.NewTimer(m.ctx)
- defer func() {
- flushTimer.Stop()
- }()
-
- flushCount := m.Count
- flushTime := m.Delay
-
- // Build our work buffer.
- workBufferSize := initialWorkBufferSize
- if flushCount > 0 {
- // Will never buffer more than this much Work.
- workBufferSize = flushCount
- }
- bufferedWork := make([]interface{}, 0, workBufferSize)
-
- log.Debugf(m.ctx, "Starting work loop.")
- active := true
- for active {
- flushRound := false
-
- select {
- case work, ok := <-m.workC:
- if !ok {
- log.Debugf(m.ctx, "Received instance close signal; terminating.")
-
- // Don't immediately exit the loop, since there may still be buffered
- // Work to flush.
- active = false
- flushRound = true
- break
- }
-
- // Count the work in this group. If we're not using a given metric, try
- // and avoid wasting time collecting it.
- bufferedWork = append(bufferedWork, work)
-
- // Handle work count threshold. We do this first, since it's trivial to
- // setup/compute.
- if flushCount > 0 && len(bufferedWork) >= flushCount {
- flushRound = true
- }
- // Start our flush timer, if it's not already ticking. Only waste time
- // doing this if we're not already flushing, since flushing will clear the
- // timer.
- if !flushRound && flushTime > 0 && !timerRunning {
- log.Infof(log.SetFields(m.ctx, log.Fields{
- "flushInterval": flushTime,
- }), "Starting flush timer.")
- flushTimer.Reset(flushTime)
- timerRunning = true
- }
-
- // Invoke work callback, if one is set.
- if m.IngestCallback != nil {
- flushRound = m.IngestCallback(work) || flushRound
- }
-
- case <-m.flushNowC:
- flushRound = true
-
- case <-flushTimer.GetC():
- log.Infof(m.ctx, "Flush timer has triggered.")
- timerRunning = false
- flushRound = true
- }
-
- // Should we flush?
- if flushRound {
- flushTimer.Stop()
- timerRunning = false
-
- if len(bufferedWork) > 0 {
- // Clone bufferedWork, since we re-use it.
- workToSend := make([]interface{}, len(bufferedWork))
- copy(workToSend, bufferedWork)
-
- // Clear our Work slice for re-use. This does not resize the underlying
- // array, since it's likely to fill again.
- for idx := range bufferedWork {
- bufferedWork[idx] = nil
- }
- bufferedWork = bufferedWork[:0]
-
- // Callback with this work.
- m.Callback(workToSend)
- }
- }
- }
-}
« no previous file with comments | « common/meter/doc.go ('k') | common/meter/meter_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698