| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package coordinator | 5 package coordinator |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "sync" | 9 "sync" |
| 10 "sync/atomic" | 10 "sync/atomic" |
| 11 "testing" | 11 "testing" |
| 12 "time" | 12 "time" |
| 13 | 13 |
| 14 "github.com/luci/luci-go/common/clock/testclock" | 14 "github.com/luci/luci-go/common/clock/testclock" |
| 15 "github.com/luci/luci-go/common/config" |
| 15 "github.com/luci/luci-go/common/errors" | 16 "github.com/luci/luci-go/common/errors" |
| 16 "github.com/luci/luci-go/common/logdog/types" | 17 "github.com/luci/luci-go/common/logdog/types" |
| 17 "github.com/luci/luci-go/common/proto/logdog/logpb" | 18 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 18 "golang.org/x/net/context" | 19 "golang.org/x/net/context" |
| 19 | 20 |
| 20 . "github.com/smartystreets/goconvey/convey" | 21 . "github.com/smartystreets/goconvey/convey" |
| 21 ) | 22 ) |
| 22 | 23 |
| 23 // testCoordinator is an implementation of Coordinator that can be used for | 24 // testCoordinator is an implementation of Coordinator that can be used for |
| 24 // testing. | 25 // testing. |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 69 } | 70 } |
| 70 | 71 |
| 71 func TestStreamStateCache(t *testing.T) { | 72 func TestStreamStateCache(t *testing.T) { |
| 72 t.Parallel() | 73 t.Parallel() |
| 73 | 74 |
| 74 Convey(`Using a test configuration`, t, func() { | 75 Convey(`Using a test configuration`, t, func() { |
| 75 c, tc := testclock.UseTime(context.Background(), testclock.TestT
imeLocal) | 76 c, tc := testclock.UseTime(context.Background(), testclock.TestT
imeLocal) |
| 76 tcc := testCoordinator{} | 77 tcc := testCoordinator{} |
| 77 | 78 |
| 78 st := LogStreamState{ | 79 st := LogStreamState{ |
| 80 Project: "test-project", |
| 79 Path: "foo/+/bar", | 81 Path: "foo/+/bar", |
| 80 TerminalIndex: -1, | 82 TerminalIndex: -1, |
| 81 } | 83 } |
| 82 | 84 |
| 83 // Note: In all of these tests, we check if "proto" field (Proto
Version) | 85 // Note: In all of these tests, we check if "proto" field (Proto
Version) |
| 84 // is "remote". We use ProtoVersion as a channel between our fak
e remote | 86 // is "remote". We use ProtoVersion as a channel between our fak
e remote |
| 85 // service. When our fake remote service returns a LogStreamStat
e, it sets | 87 // service. When our fake remote service returns a LogStreamStat
e, it sets |
| 86 // "remote" to true to differentiate it from the local pushed st
ate. | 88 // "remote" to true to differentiate it from the local pushed st
ate. |
| 87 // | 89 // |
| 88 // If a LogStreamState has "remote" set to true, that implies th
at it was | 90 // If a LogStreamState has "remote" set to true, that implies th
at it was |
| (...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 159 <-tcc.callC | 161 <-tcc.callC |
| 160 | 162 |
| 161 // Set the terminal index. We will use a minimal
LogStreamState. We know | 163 // Set the terminal index. We will use a minimal
LogStreamState. We know |
| 162 // that this will happen after the streamStateCa
cheEntry is registered | 164 // that this will happen after the streamStateCa
cheEntry is registered |
| 163 // because both block on the LRU cache's Mutate,
which is atomic, and | 165 // because both block on the LRU cache's Mutate,
which is atomic, and |
| 164 // RegisterStream must have added the streamStat
eCacheEntry in order for | 166 // RegisterStream must have added the streamStat
eCacheEntry in order for |
| 165 // the lock to be available for TerminateStream
to proceed. | 167 // the lock to be available for TerminateStream
to proceed. |
| 166 terminalErrC := make(chan error) | 168 terminalErrC := make(chan error) |
| 167 go func() { | 169 go func() { |
| 168 terminalErrC <- ssc.TerminateStream(c, &
LogStreamState{ | 170 terminalErrC <- ssc.TerminateStream(c, &
LogStreamState{ |
| 171 Project: st.Project, |
| 169 Path: st.Path, | 172 Path: st.Path, |
| 170 TerminalIndex: 1337, | 173 TerminalIndex: 1337, |
| 171 }) | 174 }) |
| 172 }() | 175 }() |
| 173 | 176 |
| 174 // Let both requests succeed. | 177 // Let both requests succeed. |
| 175 <-tcc.callC | 178 <-tcc.callC |
| 176 tcc.errC <- nil | 179 tcc.errC <- nil |
| 177 tcc.errC <- nil | 180 tcc.errC <- nil |
| 178 | 181 |
| (...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 238 So(tcc.calls, ShouldEqual, 1) | 241 So(tcc.calls, ShouldEqual, 1) |
| 239 | 242 |
| 240 Convey(`A second registration without error will
make a new request.`, func() { | 243 Convey(`A second registration without error will
make a new request.`, func() { |
| 241 tcc.errC = nil | 244 tcc.errC = nil |
| 242 | 245 |
| 243 _, err := ssc.RegisterStream(c, &st, nil
) | 246 _, err := ssc.RegisterStream(c, &st, nil
) |
| 244 So(err, ShouldBeNil) | 247 So(err, ShouldBeNil) |
| 245 So(tcc.calls, ShouldEqual, 2) | 248 So(tcc.calls, ShouldEqual, 2) |
| 246 }) | 249 }) |
| 247 }) | 250 }) |
| 251 |
| 252 Convey(`Different projects with the sme stream name will
not conflict.`, func() { |
| 253 var projects = []config.ProjectName{"", "foo", "
bar"} |
| 254 |
| 255 for i, p := range projects { |
| 256 st.Project = p |
| 257 s, err := ssc.RegisterStream(c, &st, nil
) |
| 258 So(err, ShouldBeNil) |
| 259 |
| 260 s.TerminalIndex = types.MessageIndex(i) |
| 261 So(ssc.TerminateStream(c, s), ShouldBeNi
l) |
| 262 } |
| 263 So(tcc.calls, ShouldEqual, len(projects)*2) |
| 264 |
| 265 for i, p := range projects { |
| 266 st.Project = p |
| 267 st.TerminalIndex = -1 |
| 268 |
| 269 s, err := ssc.RegisterStream(c, &st, nil
) |
| 270 So(err, ShouldBeNil) |
| 271 So(s.TerminalIndex, ShouldEqual, types.M
essageIndex(i)) |
| 272 } |
| 273 So(tcc.calls, ShouldEqual, len(projects)*2) |
| 274 }) |
| 248 }) | 275 }) |
| 249 | 276 |
| 250 Convey(`A streamStateCache can register multiple streams at once
.`, func() { | 277 Convey(`A streamStateCache can register multiple streams at once
.`, func() { |
| 251 ssc := NewCache(&tcc, 0, 0) | 278 ssc := NewCache(&tcc, 0, 0) |
| 252 tcc.callC = make(chan struct{}) | 279 tcc.callC = make(chan struct{}) |
| 253 tcc.errC = make(chan error) | 280 tcc.errC = make(chan error) |
| 254 | 281 |
| 255 count := 2048 | 282 count := 2048 |
| 256 wg := sync.WaitGroup{} | 283 wg := sync.WaitGroup{} |
| 257 errs := make(errors.MultiError, count) | 284 errs := make(errors.MultiError, count) |
| (...skipping 28 matching lines...) Expand all Loading... |
| 286 remotes := 0 | 313 remotes := 0 |
| 287 for i := 0; i < count; i++ { | 314 for i := 0; i < count; i++ { |
| 288 if state[i].ProtoVersion == "remote" { | 315 if state[i].ProtoVersion == "remote" { |
| 289 remotes++ | 316 remotes++ |
| 290 } | 317 } |
| 291 } | 318 } |
| 292 So(remotes, ShouldEqual, count) | 319 So(remotes, ShouldEqual, count) |
| 293 }) | 320 }) |
| 294 }) | 321 }) |
| 295 } | 322 } |
| OLD | NEW |