OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package coordinator | 5 package coordinator |
6 | 6 |
7 import ( | 7 import ( |
8 "fmt" | 8 "fmt" |
9 "sync" | 9 "sync" |
10 "sync/atomic" | 10 "sync/atomic" |
11 "testing" | 11 "testing" |
12 "time" | 12 "time" |
13 | 13 |
14 "github.com/luci/luci-go/common/clock/testclock" | 14 "github.com/luci/luci-go/common/clock/testclock" |
| 15 "github.com/luci/luci-go/common/config" |
15 "github.com/luci/luci-go/common/errors" | 16 "github.com/luci/luci-go/common/errors" |
16 "github.com/luci/luci-go/common/logdog/types" | 17 "github.com/luci/luci-go/common/logdog/types" |
17 "github.com/luci/luci-go/common/proto/logdog/logpb" | 18 "github.com/luci/luci-go/common/proto/logdog/logpb" |
18 "golang.org/x/net/context" | 19 "golang.org/x/net/context" |
19 | 20 |
20 . "github.com/smartystreets/goconvey/convey" | 21 . "github.com/smartystreets/goconvey/convey" |
21 ) | 22 ) |
22 | 23 |
23 // testCoordinator is an implementation of Coordinator that can be used for | 24 // testCoordinator is an implementation of Coordinator that can be used for |
24 // testing. | 25 // testing. |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
69 } | 70 } |
70 | 71 |
71 func TestStreamStateCache(t *testing.T) { | 72 func TestStreamStateCache(t *testing.T) { |
72 t.Parallel() | 73 t.Parallel() |
73 | 74 |
74 Convey(`Using a test configuration`, t, func() { | 75 Convey(`Using a test configuration`, t, func() { |
75 c, tc := testclock.UseTime(context.Background(), testclock.TestT
imeLocal) | 76 c, tc := testclock.UseTime(context.Background(), testclock.TestT
imeLocal) |
76 tcc := testCoordinator{} | 77 tcc := testCoordinator{} |
77 | 78 |
78 st := LogStreamState{ | 79 st := LogStreamState{ |
| 80 Project: "test-project", |
79 Path: "foo/+/bar", | 81 Path: "foo/+/bar", |
80 TerminalIndex: -1, | 82 TerminalIndex: -1, |
81 } | 83 } |
82 | 84 |
83 // Note: In all of these tests, we check if "proto" field (Proto
Version) | 85 // Note: In all of these tests, we check if "proto" field (Proto
Version) |
84 // is "remote". We use ProtoVersion as a channel between our fak
e remote | 86 // is "remote". We use ProtoVersion as a channel between our fak
e remote |
85 // service. When our fake remote service returns a LogStreamStat
e, it sets | 87 // service. When our fake remote service returns a LogStreamStat
e, it sets |
86 // "remote" to true to differentiate it from the local pushed st
ate. | 88 // "remote" to true to differentiate it from the local pushed st
ate. |
87 // | 89 // |
88 // If a LogStreamState has "remote" set to true, that implies th
at it was | 90 // If a LogStreamState has "remote" set to true, that implies th
at it was |
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
159 <-tcc.callC | 161 <-tcc.callC |
160 | 162 |
161 // Set the terminal index. We will use a minimal
LogStreamState. We know | 163 // Set the terminal index. We will use a minimal
LogStreamState. We know |
162 // that this will happen after the streamStateCa
cheEntry is registered | 164 // that this will happen after the streamStateCa
cheEntry is registered |
163 // because both block on the LRU cache's Mutate,
which is atomic, and | 165 // because both block on the LRU cache's Mutate,
which is atomic, and |
164 // RegisterStream must have added the streamStat
eCacheEntry in order for | 166 // RegisterStream must have added the streamStat
eCacheEntry in order for |
165 // the lock to be available for TerminateStream
to proceed. | 167 // the lock to be available for TerminateStream
to proceed. |
166 terminalErrC := make(chan error) | 168 terminalErrC := make(chan error) |
167 go func() { | 169 go func() { |
168 terminalErrC <- ssc.TerminateStream(c, &
LogStreamState{ | 170 terminalErrC <- ssc.TerminateStream(c, &
LogStreamState{ |
| 171 Project: st.Project, |
169 Path: st.Path, | 172 Path: st.Path, |
170 TerminalIndex: 1337, | 173 TerminalIndex: 1337, |
171 }) | 174 }) |
172 }() | 175 }() |
173 | 176 |
174 // Let both requests succeed. | 177 // Let both requests succeed. |
175 <-tcc.callC | 178 <-tcc.callC |
176 tcc.errC <- nil | 179 tcc.errC <- nil |
177 tcc.errC <- nil | 180 tcc.errC <- nil |
178 | 181 |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
238 So(tcc.calls, ShouldEqual, 1) | 241 So(tcc.calls, ShouldEqual, 1) |
239 | 242 |
240 Convey(`A second registration without error will
make a new request.`, func() { | 243 Convey(`A second registration without error will
make a new request.`, func() { |
241 tcc.errC = nil | 244 tcc.errC = nil |
242 | 245 |
243 _, err := ssc.RegisterStream(c, &st, nil
) | 246 _, err := ssc.RegisterStream(c, &st, nil
) |
244 So(err, ShouldBeNil) | 247 So(err, ShouldBeNil) |
245 So(tcc.calls, ShouldEqual, 2) | 248 So(tcc.calls, ShouldEqual, 2) |
246 }) | 249 }) |
247 }) | 250 }) |
| 251 |
| 252 Convey(`Different projects with the sme stream name will
not conflict.`, func() { |
| 253 var projects = []config.ProjectName{"", "foo", "
bar"} |
| 254 |
| 255 for i, p := range projects { |
| 256 st.Project = p |
| 257 s, err := ssc.RegisterStream(c, &st, nil
) |
| 258 So(err, ShouldBeNil) |
| 259 |
| 260 s.TerminalIndex = types.MessageIndex(i) |
| 261 So(ssc.TerminateStream(c, s), ShouldBeNi
l) |
| 262 } |
| 263 So(tcc.calls, ShouldEqual, len(projects)*2) |
| 264 |
| 265 for i, p := range projects { |
| 266 st.Project = p |
| 267 st.TerminalIndex = -1 |
| 268 |
| 269 s, err := ssc.RegisterStream(c, &st, nil
) |
| 270 So(err, ShouldBeNil) |
| 271 So(s.TerminalIndex, ShouldEqual, types.M
essageIndex(i)) |
| 272 } |
| 273 So(tcc.calls, ShouldEqual, len(projects)*2) |
| 274 }) |
248 }) | 275 }) |
249 | 276 |
250 Convey(`A streamStateCache can register multiple streams at once
.`, func() { | 277 Convey(`A streamStateCache can register multiple streams at once
.`, func() { |
251 ssc := NewCache(&tcc, 0, 0) | 278 ssc := NewCache(&tcc, 0, 0) |
252 tcc.callC = make(chan struct{}) | 279 tcc.callC = make(chan struct{}) |
253 tcc.errC = make(chan error) | 280 tcc.errC = make(chan error) |
254 | 281 |
255 count := 2048 | 282 count := 2048 |
256 wg := sync.WaitGroup{} | 283 wg := sync.WaitGroup{} |
257 errs := make(errors.MultiError, count) | 284 errs := make(errors.MultiError, count) |
(...skipping 28 matching lines...) Expand all Loading... |
286 remotes := 0 | 313 remotes := 0 |
287 for i := 0; i < count; i++ { | 314 for i := 0; i < count; i++ { |
288 if state[i].ProtoVersion == "remote" { | 315 if state[i].ProtoVersion == "remote" { |
289 remotes++ | 316 remotes++ |
290 } | 317 } |
291 } | 318 } |
292 So(remotes, ShouldEqual, count) | 319 So(remotes, ShouldEqual, count) |
293 }) | 320 }) |
294 }) | 321 }) |
295 } | 322 } |
OLD | NEW |