OLD | NEW |
| (Empty) |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package meter | |
6 | |
7 import ( | |
8 "fmt" | |
9 "testing" | |
10 "time" | |
11 | |
12 "github.com/luci/luci-go/common/clock/testclock" | |
13 . "github.com/smartystreets/goconvey/convey" | |
14 "golang.org/x/net/context" | |
15 ) | |
16 | |
17 // goconvey assertion that a slice of int ([]int) has the expected Value | |
18 // parameters. | |
19 func shouldHaveWork(actual interface{}, expected ...interface{}) string { | |
20 sentWork := actual.([]interface{}) | |
21 | |
22 expectedWork := make([]int, len(expected)) | |
23 for idx, exp := range expected { | |
24 switch t := exp.(type) { | |
25 case int: | |
26 expectedWork[idx] = t | |
27 default: | |
28 panic("Unsupported work value type.") | |
29 } | |
30 } | |
31 | |
32 if len(sentWork) != len(expectedWork) { | |
33 return fmt.Sprintf("Work count (%d) doesn't match expected (%d)"
, len(sentWork), len(expectedWork)) | |
34 } | |
35 | |
36 for idx, exp := range expectedWork { | |
37 testSentWork := sentWork[idx].(int) | |
38 if testSentWork != exp { | |
39 return fmt.Sprintf("Work #%d content does not match expe
cted: %d != %d", | |
40 idx, testSentWork, exp) | |
41 } | |
42 } | |
43 return "" | |
44 } | |
45 | |
46 // Test a Meter instance. | |
47 func TestMeter(t *testing.T) { | |
48 t.Parallel() | |
49 | |
50 Convey(`In a test environment`, t, func() { | |
51 ctx, tc := testclock.UseTime(context.Background(), time.Date(201
5, 1, 1, 0, 0, 0, 0, time.UTC)) | |
52 | |
53 // Setup a channel to acknowledge work for test synchronization.
We will use | |
54 // the work callback to trigger this channel. | |
55 workC := make(chan interface{}) | |
56 workAckC := make(chan interface{}) | |
57 | |
58 config := Config{ | |
59 Callback: func(bundle []interface{}) { | |
60 workC <- bundle | |
61 }, | |
62 IngestCallback: func(work interface{}) bool { | |
63 workAckC <- work | |
64 return false | |
65 }, | |
66 } | |
67 | |
68 Convey(`Will panic if no callback is supplied.`, func() { | |
69 config.Callback = nil | |
70 So(func() { New(ctx, config) }, ShouldPanic) | |
71 }) | |
72 | |
73 Convey(`Timer: Will buffer work until the timer is signalled.`,
func() { | |
74 config.Delay = 1 * time.Second // Doesn't matter, non-ze
ro will cause timer to be used. | |
75 m := New(ctx, config) | |
76 defer m.Stop() | |
77 | |
78 // Send three messages. None of them should be forwarded
to the underlying | |
79 // Output. | |
80 for _, v := range []int{0, 1, 2} { | |
81 m.AddWait(v) | |
82 <-workAckC | |
83 } | |
84 | |
85 // Signal our timer. | |
86 tc.Add(1 * time.Second) | |
87 // All three messages should be sent to our underlying O
utput at the same | |
88 // time. | |
89 So(<-workC, shouldHaveWork, 0, 1, 2) | |
90 }) | |
91 | |
92 Convey(`Flush: Will buffer messages until a flush is triggered.`
, func() { | |
93 m := New(ctx, config) // Will never auto-flush. | |
94 defer m.Stop() | |
95 | |
96 // Send two messages. Nothing should be forwarded. | |
97 m.AddWait(0) | |
98 <-workAckC | |
99 m.AddWait(1) | |
100 <-workAckC | |
101 | |
102 // Send a third Work unit. We should receive a bundle of
{0, 1, 2}. | |
103 m.AddWait(2) | |
104 <-workAckC | |
105 | |
106 m.Flush() | |
107 So(<-workC, shouldHaveWork, 0, 1, 2) | |
108 }) | |
109 | |
110 Convey(`Count: Will buffer messages until count is reached.`, fu
nc() { | |
111 config.Count = 3 | |
112 m := New(ctx, config) | |
113 defer m.Stop() | |
114 | |
115 // Send two messages. Nothing should be forwarded. | |
116 m.AddWait(0) | |
117 <-workAckC | |
118 m.AddWait(1) | |
119 <-workAckC | |
120 | |
121 // Send a third message. Our underlying Output should re
ceive the set of | |
122 // three. | |
123 m.AddWait(2) | |
124 <-workAckC | |
125 So(<-workC, shouldHaveWork, 0, 1, 2) | |
126 }) | |
127 | |
128 Convey(`WorkCallback: Will buffer messages until flushed via cal
lback.`, func() { | |
129 count := 0 | |
130 config.IngestCallback = func(interface{}) bool { | |
131 count++ | |
132 return count >= 3 | |
133 } | |
134 | |
135 m := New(ctx, config) | |
136 defer m.Stop() | |
137 | |
138 m.AddWait(0) | |
139 m.AddWait(1) | |
140 m.AddWait(2) | |
141 So(<-workC, shouldHaveWork, 0, 1, 2) | |
142 }) | |
143 | |
144 Convey(`Configured with multiple constraints`, func() { | |
145 config.Delay = 1 * time.Second // Doesn't matter, non-ze
ro will cause timer to be used. | |
146 config.Count = 3 | |
147 m := New(ctx, config) | |
148 defer m.Stop() | |
149 | |
150 // Fill our buckets up to near threshold without dumping
messages. | |
151 m.AddWait(0) | |
152 <-workAckC | |
153 m.AddWait(1) | |
154 <-workAckC | |
155 | |
156 // Hit count thresholds and flush at the same time. | |
157 m.AddWait(2) | |
158 <-workAckC | |
159 m.Flush() | |
160 So(<-workC, shouldHaveWork, 0, 1, 2) | |
161 | |
162 // Fill our buckets up to near threshold again. | |
163 m.AddWait(3) | |
164 <-workAckC | |
165 m.AddWait(4) | |
166 <-workAckC | |
167 | |
168 // Hit time threshold. | |
169 tc.Add(1 * time.Second) | |
170 So(<-workC, shouldHaveWork, 3, 4) | |
171 | |
172 // Hit count threshold. | |
173 m.AddWait(5) | |
174 <-workAckC | |
175 m.AddWait(6) | |
176 <-workAckC | |
177 m.AddWait(7) | |
178 <-workAckC | |
179 So(<-workC, shouldHaveWork, 5, 6, 7) | |
180 }) | |
181 | |
182 Convey(`When full, will return ErrFull if not blocking.`, func()
{ | |
183 m := newImpl(ctx, &config) | |
184 | |
185 // Fill up the work channel (do not reap workC). | |
186 id := 0 | |
187 for i := 0; i < config.getAddBufferSize(); i++ { | |
188 So(m.Add(i), ShouldBeNil) | |
189 id++ | |
190 } | |
191 | |
192 // Add another work unit. | |
193 So(m.Add(id+1), ShouldEqual, ErrFull) | |
194 }) | |
195 }) | |
196 } | |
OLD | NEW |