Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4)

Side by Side Diff: server/internal/logdog/collector/streamstatecache_test.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package collector
6
7 import (
8 "fmt"
9 "sync"
10 "testing"
11 "time"
12
13 "github.com/luci/luci-go/common/api/logdog_coordinator/service/v1"
14 "github.com/luci/luci-go/common/clock/testclock"
15 "github.com/luci/luci-go/common/errors"
16 "github.com/luci/luci-go/common/logdog/types"
17 cc "github.com/luci/luci-go/server/internal/logdog/coordinatorClient"
18 "golang.org/x/net/context"
19
20 . "github.com/smartystreets/goconvey/convey"
21 )
22
23 func TestStreamStateCache(t *testing.T) {
24 t.Parallel()
25
26 FocusConvey(`Using a test configuration`, t, func() {
27 c, tc := testclock.UseTime(context.Background(), testclock.TestT imeLocal)
28 tcc := testCoordinatorClient{}
29
30 o := streamStateCacheOptions{
31 coordinator: &tcc,
32 cacheSize: 4,
33 expiration: 1 * time.Second,
34 }
35
36 st := cc.State{
37 Path: "foo/+/bar",
38 State: &service.LogStreamState{
39 TerminalIndex: -1,
40 },
41 }
42
43 FocusConvey(`A streamStateCache`, func() {
44 ssc := newStreamStateCache(o)
45
46 resultC := make(chan *stateProxy)
47 req := func(s cc.State) {
48 var res *stateProxy
49 defer func() {
50 resultC <- res
51 }()
52
53 st, err := ssc.getOrRegister(c, &s)
54 if err == nil {
55 res = st
56 }
57 }
58
59 Convey(`Can register a stream`, func() {
60 s, err := ssc.getOrRegister(c, &st)
61 So(err, ShouldBeNil)
62 So(s.proto, ShouldEqual, "remote")
63 So(tcc.calls, ShouldEqual, 1)
64
65 Convey(`Will not re-register the same stream.`, func() {
66 st.ProtoVersion = ""
67
68 s, err := ssc.getOrRegister(c, &st)
69 So(err, ShouldBeNil)
70 So(s.proto, ShouldEqual, "remote")
71 So(tcc.calls, ShouldEqual, 1)
72 })
73
74 Convey(`When the registration expires`, func() {
75 st.ProtoVersion = ""
76 tc.Add(time.Second)
77
78 Convey(`Will re-register the stream.`, f unc() {
79 s, err := ssc.getOrRegister(c, & st)
80 So(err, ShouldBeNil)
81 So(s.proto, ShouldEqual, "remote ")
82 So(tcc.calls, ShouldEqual, 2)
83 })
84 })
85
86 Convey(`Can terminate a registered stream`, func () {
87 s.terminalIndex = 1337
88 So(ssc.setTerminalIndex(c, s), ShouldBeN il)
89 So(tcc.calls, ShouldEqual, 2) // +1 call
90
91 Convey(`Registering the stream will incl ude the terminal index.`, func() {
92 // Fill it in with junk to make sure we are getting cached.
93 st.State.TerminalIndex = 123
94 st.ProtoVersion = ""
95
96 s, err := ssc.getOrRegister(c, & st)
97 So(err, ShouldBeNil)
98 So(s.proto, ShouldEqual, "remote ")
99 So(s.terminalIndex, ShouldEqual, 1337)
100 So(tcc.calls, ShouldEqual, 2) // No additional calls.
101 })
102 })
103 })
104
105 FocusConvey(`When the terminal index is set before the f etch finishes, it will be returned.`, func() {
106 tcc.callC = make(chan struct{})
107 tcc.errC = make(chan error)
108
109 go req(st)
110
111 // Wait for our request to block on RegisterStre am.
112 <-tcc.callC
113
114 // Set the terminal index. We will use a minimal stateProxy. We know
115 // that this will happen after the streamStateCa cheEntry is registered
116 // because both block on the LRU cache's Mutate, which is atomic, and
117 // getOrRegister must have added the streamState CacheEntry in order for
118 // the lock to be available for setTerminalIndex to proceed.
119 terminalErrC := make(chan error)
120 go func() {
121 terminalErrC <- ssc.setTerminalIndex(c, &stateProxy{
122 path: st.Path,
123 terminalIndex: 1337,
124 })
125 }()
126
127 // Let both requests succeed.
128 <-tcc.callC
129 tcc.errC <- nil
130 tcc.errC <- nil
131
132 // Read the stateProxy from our getOrRegister re quest.
133 s := <-resultC
134 So(s, ShouldNotBeNil)
135 So(s.terminalIndex, ShouldEqual, 1337)
136 })
137
138 Convey(`When multiple goroutines register the same strea m, it gets registered once.`, func() {
139 tcc.callC = make(chan struct{})
140 tcc.errC = make(chan error)
141
142 errs := make(errors.MultiError, 256)
143 for i := 0; i < len(errs); i++ {
144 go req(st)
145 }
146
147 <-tcc.callC
148 tcc.errC <- nil
149 for i := 0; i < len(errs); i++ {
150 <-resultC
151 }
152
153 So(errors.SingleError(errs), ShouldBeNil)
154 So(tcc.calls, ShouldEqual, 1)
155 })
156
157 Convey(`Multiple registrations for the same stream will result in two requests if the first expires.`, func() {
158 tcc.callC = make(chan struct{})
159 tcc.errC = make(chan error)
160
161 // First request.
162 go req(st)
163
164 // Wait for the request to happen, then advance time past the request's
165 // expiration.
166 <-tcc.callC
167 tc.Add(time.Second)
168
169 // Second request.
170 go req(st)
171
172 // Release both calls and reap the results.
173 <-tcc.callC
174 tcc.errC <- nil
175 tcc.errC <- nil
176
177 r1 := <-resultC
178 r2 := <-resultC
179
180 So(r1.proto, ShouldEqual, "remote")
181 So(r2.proto, ShouldEqual, "remote")
182 So(tcc.calls, ShouldEqual, 2)
183 })
184
185 Convey(`A registration error will result in a getOrRegis ter error.`, func() {
186 tcc.errC = make(chan error, 1)
187 tcc.errC <- errors.New("test error")
188
189 _, err := ssc.getOrRegister(c, &st)
190 So(err, ShouldNotBeNil)
191 So(tcc.calls, ShouldEqual, 1)
192
193 Convey(`A second registration without error will make a new request.`, func() {
194 tcc.errC = nil
195
196 _, err := ssc.getOrRegister(c, &st)
197 So(err, ShouldBeNil)
198 So(tcc.calls, ShouldEqual, 2)
199 })
200 })
201 })
202
203 Convey(`A streamStateCache can register multiple streams at once .`, func() {
204 tcc.callC = make(chan struct{})
205 tcc.errC = make(chan error)
206 ssc := newStreamStateCache(o)
207
208 count := 256
209 wg := sync.WaitGroup{}
210 errs := make(errors.MultiError, count)
211 state := make([]*stateProxy, count)
212 wg.Add(count)
213 for i := 0; i < count; i++ {
214 st := st
215 st.Path = types.StreamPath(fmt.Sprintf("foo/+/ba r%d", i))
216
217 go func(i int) {
218 defer wg.Done()
219 state[i], errs[i] = ssc.getOrRegister(c, &st)
220 }(i)
221 }
222
223 // Wait for all of them to simultaneously call.
224 for i := 0; i < count; i++ {
225 <-tcc.callC
226 }
227
228 // They're all blocked on errC; allow them to continue.
229 for i := 0; i < count; i++ {
230 tcc.errC <- nil
231 }
232
233 // Wait for them to finish.
234 wg.Wait()
235
236 // Confirm that all registered successfully.
237 So(errors.SingleError(errs), ShouldBeNil)
238
239 remotes := 0
240 for i := 0; i < count; i++ {
241 if state[i].proto == "remote" {
242 remotes++
243 }
244 }
245 So(remotes, ShouldEqual, count)
246 })
247 })
248 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698