Index: common/meter/meter_test.go |
diff --git a/common/meter/meter_test.go b/common/meter/meter_test.go |
deleted file mode 100644 |
index 0db118c23436aaec03cba02f11899b76aa4ae1a9..0000000000000000000000000000000000000000 |
--- a/common/meter/meter_test.go |
+++ /dev/null |
@@ -1,196 +0,0 @@ |
-// Copyright 2015 The Chromium Authors. All rights reserved. |
-// Use of this source code is governed by a BSD-style license that can be |
-// found in the LICENSE file. |
- |
-package meter |
- |
-import ( |
- "fmt" |
- "testing" |
- "time" |
- |
- "github.com/luci/luci-go/common/clock/testclock" |
- . "github.com/smartystreets/goconvey/convey" |
- "golang.org/x/net/context" |
-) |
- |
-// goconvey assertion that a slice of int ([]int) has the expected Value |
-// parameters. |
-func shouldHaveWork(actual interface{}, expected ...interface{}) string { |
- sentWork := actual.([]interface{}) |
- |
- expectedWork := make([]int, len(expected)) |
- for idx, exp := range expected { |
- switch t := exp.(type) { |
- case int: |
- expectedWork[idx] = t |
- default: |
- panic("Unsupported work value type.") |
- } |
- } |
- |
- if len(sentWork) != len(expectedWork) { |
- return fmt.Sprintf("Work count (%d) doesn't match expected (%d)", len(sentWork), len(expectedWork)) |
- } |
- |
- for idx, exp := range expectedWork { |
- testSentWork := sentWork[idx].(int) |
- if testSentWork != exp { |
- return fmt.Sprintf("Work #%d content does not match expected: %d != %d", |
- idx, testSentWork, exp) |
- } |
- } |
- return "" |
-} |
- |
-// Test a Meter instance. |
-func TestMeter(t *testing.T) { |
- t.Parallel() |
- |
- Convey(`In a test environment`, t, func() { |
- ctx, tc := testclock.UseTime(context.Background(), time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC)) |
- |
- // Setup a channel to acknowledge work for test synchronization. We will use |
- // the work callback to trigger this channel. |
- workC := make(chan interface{}) |
- workAckC := make(chan interface{}) |
- |
- config := Config{ |
- Callback: func(bundle []interface{}) { |
- workC <- bundle |
- }, |
- IngestCallback: func(work interface{}) bool { |
- workAckC <- work |
- return false |
- }, |
- } |
- |
- Convey(`Will panic if no callback is supplied.`, func() { |
- config.Callback = nil |
- So(func() { New(ctx, config) }, ShouldPanic) |
- }) |
- |
- Convey(`Timer: Will buffer work until the timer is signalled.`, func() { |
- config.Delay = 1 * time.Second // Doesn't matter, non-zero will cause timer to be used. |
- m := New(ctx, config) |
- defer m.Stop() |
- |
- // Send three messages. None of them should be forwarded to the underlying |
- // Output. |
- for _, v := range []int{0, 1, 2} { |
- m.AddWait(v) |
- <-workAckC |
- } |
- |
- // Signal our timer. |
- tc.Add(1 * time.Second) |
- // All three messages should be sent to our underlying Output at the same |
- // time. |
- So(<-workC, shouldHaveWork, 0, 1, 2) |
- }) |
- |
- Convey(`Flush: Will buffer messages until a flush is triggered.`, func() { |
- m := New(ctx, config) // Will never auto-flush. |
- defer m.Stop() |
- |
- // Send two messages. Nothing should be forwarded. |
- m.AddWait(0) |
- <-workAckC |
- m.AddWait(1) |
- <-workAckC |
- |
- // Send a third Work unit. We should receive a bundle of {0, 1, 2}. |
- m.AddWait(2) |
- <-workAckC |
- |
- m.Flush() |
- So(<-workC, shouldHaveWork, 0, 1, 2) |
- }) |
- |
- Convey(`Count: Will buffer messages until count is reached.`, func() { |
- config.Count = 3 |
- m := New(ctx, config) |
- defer m.Stop() |
- |
- // Send two messages. Nothing should be forwarded. |
- m.AddWait(0) |
- <-workAckC |
- m.AddWait(1) |
- <-workAckC |
- |
- // Send a third message. Our underlying Output should receive the set of |
- // three. |
- m.AddWait(2) |
- <-workAckC |
- So(<-workC, shouldHaveWork, 0, 1, 2) |
- }) |
- |
- Convey(`WorkCallback: Will buffer messages until flushed via callback.`, func() { |
- count := 0 |
- config.IngestCallback = func(interface{}) bool { |
- count++ |
- return count >= 3 |
- } |
- |
- m := New(ctx, config) |
- defer m.Stop() |
- |
- m.AddWait(0) |
- m.AddWait(1) |
- m.AddWait(2) |
- So(<-workC, shouldHaveWork, 0, 1, 2) |
- }) |
- |
- Convey(`Configured with multiple constraints`, func() { |
- config.Delay = 1 * time.Second // Doesn't matter, non-zero will cause timer to be used. |
- config.Count = 3 |
- m := New(ctx, config) |
- defer m.Stop() |
- |
- // Fill our buckets up to near threshold without dumping messages. |
- m.AddWait(0) |
- <-workAckC |
- m.AddWait(1) |
- <-workAckC |
- |
- // Hit count thresholds and flush at the same time. |
- m.AddWait(2) |
- <-workAckC |
- m.Flush() |
- So(<-workC, shouldHaveWork, 0, 1, 2) |
- |
- // Fill our buckets up to near threshold again. |
- m.AddWait(3) |
- <-workAckC |
- m.AddWait(4) |
- <-workAckC |
- |
- // Hit time threshold. |
- tc.Add(1 * time.Second) |
- So(<-workC, shouldHaveWork, 3, 4) |
- |
- // Hit count threshold. |
- m.AddWait(5) |
- <-workAckC |
- m.AddWait(6) |
- <-workAckC |
- m.AddWait(7) |
- <-workAckC |
- So(<-workC, shouldHaveWork, 5, 6, 7) |
- }) |
- |
- Convey(`When full, will return ErrFull if not blocking.`, func() { |
- m := newImpl(ctx, &config) |
- |
- // Fill up the work channel (do not reap workC). |
- id := 0 |
- for i := 0; i < config.getAddBufferSize(); i++ { |
- So(m.Add(i), ShouldBeNil) |
- id++ |
- } |
- |
- // Add another work unit. |
- So(m.Add(id+1), ShouldEqual, ErrFull) |
- }) |
- }) |
-} |