Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(517)

Side by Side Diff: common/meter/meter_test.go

Issue 1679023005: Add Context cancellation to clock. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Actually upload the patch. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « common/meter/meter.go ('k') | common/parallel/semaphore.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package meter
6
7 import (
8 "fmt"
9 "testing"
10 "time"
11
12 "github.com/luci/luci-go/common/clock/testclock"
13 . "github.com/smartystreets/goconvey/convey"
14 "golang.org/x/net/context"
15 )
16
17 // goconvey assertion that a slice of int ([]int) has the expected Value
18 // parameters.
19 func shouldHaveWork(actual interface{}, expected ...interface{}) string {
20 sentWork := actual.([]interface{})
21
22 expectedWork := make([]int, len(expected))
23 for idx, exp := range expected {
24 switch t := exp.(type) {
25 case int:
26 expectedWork[idx] = t
27 default:
28 panic("Unsupported work value type.")
29 }
30 }
31
32 if len(sentWork) != len(expectedWork) {
33 return fmt.Sprintf("Work count (%d) doesn't match expected (%d)" , len(sentWork), len(expectedWork))
34 }
35
36 for idx, exp := range expectedWork {
37 testSentWork := sentWork[idx].(int)
38 if testSentWork != exp {
39 return fmt.Sprintf("Work #%d content does not match expe cted: %d != %d",
40 idx, testSentWork, exp)
41 }
42 }
43 return ""
44 }
45
46 // Test a Meter instance.
47 func TestMeter(t *testing.T) {
48 t.Parallel()
49
50 Convey(`In a test environment`, t, func() {
51 ctx, tc := testclock.UseTime(context.Background(), time.Date(201 5, 1, 1, 0, 0, 0, 0, time.UTC))
52
53 // Setup a channel to acknowledge work for test synchronization. We will use
54 // the work callback to trigger this channel.
55 workC := make(chan interface{})
56 workAckC := make(chan interface{})
57
58 config := Config{
59 Callback: func(bundle []interface{}) {
60 workC <- bundle
61 },
62 IngestCallback: func(work interface{}) bool {
63 workAckC <- work
64 return false
65 },
66 }
67
68 Convey(`Will panic if no callback is supplied.`, func() {
69 config.Callback = nil
70 So(func() { New(ctx, config) }, ShouldPanic)
71 })
72
73 Convey(`Timer: Will buffer work until the timer is signalled.`, func() {
74 config.Delay = 1 * time.Second // Doesn't matter, non-ze ro will cause timer to be used.
75 m := New(ctx, config)
76 defer m.Stop()
77
78 // Send three messages. None of them should be forwarded to the underlying
79 // Output.
80 for _, v := range []int{0, 1, 2} {
81 m.AddWait(v)
82 <-workAckC
83 }
84
85 // Signal our timer.
86 tc.Add(1 * time.Second)
87 // All three messages should be sent to our underlying O utput at the same
88 // time.
89 So(<-workC, shouldHaveWork, 0, 1, 2)
90 })
91
92 Convey(`Flush: Will buffer messages until a flush is triggered.` , func() {
93 m := New(ctx, config) // Will never auto-flush.
94 defer m.Stop()
95
96 // Send two messages. Nothing should be forwarded.
97 m.AddWait(0)
98 <-workAckC
99 m.AddWait(1)
100 <-workAckC
101
102 // Send a third Work unit. We should receive a bundle of {0, 1, 2}.
103 m.AddWait(2)
104 <-workAckC
105
106 m.Flush()
107 So(<-workC, shouldHaveWork, 0, 1, 2)
108 })
109
110 Convey(`Count: Will buffer messages until count is reached.`, fu nc() {
111 config.Count = 3
112 m := New(ctx, config)
113 defer m.Stop()
114
115 // Send two messages. Nothing should be forwarded.
116 m.AddWait(0)
117 <-workAckC
118 m.AddWait(1)
119 <-workAckC
120
121 // Send a third message. Our underlying Output should re ceive the set of
122 // three.
123 m.AddWait(2)
124 <-workAckC
125 So(<-workC, shouldHaveWork, 0, 1, 2)
126 })
127
128 Convey(`WorkCallback: Will buffer messages until flushed via cal lback.`, func() {
129 count := 0
130 config.IngestCallback = func(interface{}) bool {
131 count++
132 return count >= 3
133 }
134
135 m := New(ctx, config)
136 defer m.Stop()
137
138 m.AddWait(0)
139 m.AddWait(1)
140 m.AddWait(2)
141 So(<-workC, shouldHaveWork, 0, 1, 2)
142 })
143
144 Convey(`Configured with multiple constraints`, func() {
145 config.Delay = 1 * time.Second // Doesn't matter, non-ze ro will cause timer to be used.
146 config.Count = 3
147 m := New(ctx, config)
148 defer m.Stop()
149
150 // Fill our buckets up to near threshold without dumping messages.
151 m.AddWait(0)
152 <-workAckC
153 m.AddWait(1)
154 <-workAckC
155
156 // Hit count thresholds and flush at the same time.
157 m.AddWait(2)
158 <-workAckC
159 m.Flush()
160 So(<-workC, shouldHaveWork, 0, 1, 2)
161
162 // Fill our buckets up to near threshold again.
163 m.AddWait(3)
164 <-workAckC
165 m.AddWait(4)
166 <-workAckC
167
168 // Hit time threshold.
169 tc.Add(1 * time.Second)
170 So(<-workC, shouldHaveWork, 3, 4)
171
172 // Hit count threshold.
173 m.AddWait(5)
174 <-workAckC
175 m.AddWait(6)
176 <-workAckC
177 m.AddWait(7)
178 <-workAckC
179 So(<-workC, shouldHaveWork, 5, 6, 7)
180 })
181
182 Convey(`When full, will return ErrFull if not blocking.`, func() {
183 m := newImpl(ctx, &config)
184
185 // Fill up the work channel (do not reap workC).
186 id := 0
187 for i := 0; i < config.getAddBufferSize(); i++ {
188 So(m.Add(i), ShouldBeNil)
189 id++
190 }
191
192 // Add another work unit.
193 So(m.Add(id+1), ShouldEqual, ErrFull)
194 })
195 })
196 }
OLDNEW
« no previous file with comments | « common/meter/meter.go ('k') | common/parallel/semaphore.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698