Chromium Code Reviews| Index: server/internal/logdog/collector/streamstatecache_test.go |
| diff --git a/server/internal/logdog/collector/streamstatecache_test.go b/server/internal/logdog/collector/streamstatecache_test.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..7e6e4f98d2daf20d1538e3768c526c3fb193f31a |
| --- /dev/null |
| +++ b/server/internal/logdog/collector/streamstatecache_test.go |
| @@ -0,0 +1,248 @@ |
| +// Copyright 2016 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package collector |
| + |
| +import ( |
| + "fmt" |
| + "sync" |
| + "testing" |
| + "time" |
| + |
| + "github.com/luci/luci-go/common/api/logdog_coordinator/service/v1" |
| + "github.com/luci/luci-go/common/clock/testclock" |
| + "github.com/luci/luci-go/common/errors" |
| + "github.com/luci/luci-go/common/logdog/types" |
| + cc "github.com/luci/luci-go/server/internal/logdog/coordinatorClient" |
| + "golang.org/x/net/context" |
| + |
| + . "github.com/smartystreets/goconvey/convey" |
| +) |
| + |
| +func TestStreamStateCache(t *testing.T) { |
| + t.Parallel() |
| + |
| + FocusConvey(`Using a test configuration`, t, func() { |
|
martiniss
2016/01/26 02:24:18
Leftover?
dnj (Google)
2016/01/26 05:06:05
Oops yep. Removed.
|
| + c, tc := testclock.UseTime(context.Background(), testclock.TestTimeLocal) |
| + tcc := testCoordinatorClient{} |
| + |
| + o := streamStateCacheOptions{ |
| + coordinator: &tcc, |
| + cacheSize: 4, |
| + expiration: 1 * time.Second, |
| + } |
| + |
| + st := cc.State{ |
| + Path: "foo/+/bar", |
| + State: &service.LogStreamState{ |
| + TerminalIndex: -1, |
| + }, |
| + } |
| + |
| + FocusConvey(`A streamStateCache`, func() { |
| + ssc := newStreamStateCache(o) |
| + |
| + resultC := make(chan *stateProxy) |
| + req := func(s cc.State) { |
| + var res *stateProxy |
| + defer func() { |
| + resultC <- res |
| + }() |
| + |
| + st, err := ssc.getOrRegister(c, &s) |
| + if err == nil { |
| + res = st |
| + } |
| + } |
| + |
| + Convey(`Can register a stream`, func() { |
| + s, err := ssc.getOrRegister(c, &st) |
| + So(err, ShouldBeNil) |
| + So(s.proto, ShouldEqual, "remote") |
| + So(tcc.calls, ShouldEqual, 1) |
| + |
| + Convey(`Will not re-register the same stream.`, func() { |
| + st.ProtoVersion = "" |
|
martiniss
2016/01/26 02:24:18
why are we setting this? Maybe add a comment
dnj (Google)
2016/01/26 05:06:05
Added a comment explaining.
|
| + |
| + s, err := ssc.getOrRegister(c, &st) |
| + So(err, ShouldBeNil) |
| + So(s.proto, ShouldEqual, "remote") |
| + So(tcc.calls, ShouldEqual, 1) |
| + }) |
| + |
| + Convey(`When the registration expires`, func() { |
| + st.ProtoVersion = "" |
| + tc.Add(time.Second) |
| + |
| + Convey(`Will re-register the stream.`, func() { |
| + s, err := ssc.getOrRegister(c, &st) |
| + So(err, ShouldBeNil) |
| + So(s.proto, ShouldEqual, "remote") |
| + So(tcc.calls, ShouldEqual, 2) |
| + }) |
| + }) |
| + |
| + Convey(`Can terminate a registered stream`, func() { |
| + s.terminalIndex = 1337 |
| + So(ssc.setTerminalIndex(c, s), ShouldBeNil) |
| + So(tcc.calls, ShouldEqual, 2) // +1 call |
| + |
| + Convey(`Registering the stream will include the terminal index.`, func() { |
| + // Fill it in with junk to make sure we are getting cached. |
| + st.State.TerminalIndex = 123 |
| + st.ProtoVersion = "" |
| + |
| + s, err := ssc.getOrRegister(c, &st) |
| + So(err, ShouldBeNil) |
| + So(s.proto, ShouldEqual, "remote") |
| + So(s.terminalIndex, ShouldEqual, 1337) |
| + So(tcc.calls, ShouldEqual, 2) // No additional calls. |
| + }) |
| + }) |
| + }) |
| + |
| + FocusConvey(`When the terminal index is set before the fetch finishes, it will be returned.`, func() { |
| + tcc.callC = make(chan struct{}) |
| + tcc.errC = make(chan error) |
| + |
| + go req(st) |
| + |
| + // Wait for our request to block on RegisterStream. |
| + <-tcc.callC |
| + |
| + // Set the terminal index. We will use a minimal stateProxy. We know |
| + // that this will happen after the streamStateCacheEntry is registered |
| + // because both block on the LRU cache's Mutate, which is atomic, and |
| + // getOrRegister must have added the streamStateCacheEntry in order for |
| + // the lock to be available for setTerminalIndex to proceed. |
| + terminalErrC := make(chan error) |
| + go func() { |
| + terminalErrC <- ssc.setTerminalIndex(c, &stateProxy{ |
| + path: st.Path, |
| + terminalIndex: 1337, |
| + }) |
| + }() |
| + |
| + // Let both requests succeed. |
| + <-tcc.callC |
| + tcc.errC <- nil |
| + tcc.errC <- nil |
| + |
| + // Read the stateProxy from our getOrRegister request. |
| + s := <-resultC |
| + So(s, ShouldNotBeNil) |
| + So(s.terminalIndex, ShouldEqual, 1337) |
| + }) |
| + |
| + Convey(`When multiple goroutines register the same stream, it gets registered once.`, func() { |
| + tcc.callC = make(chan struct{}) |
| + tcc.errC = make(chan error) |
| + |
| + errs := make(errors.MultiError, 256) |
| + for i := 0; i < len(errs); i++ { |
| + go req(st) |
| + } |
| + |
| + <-tcc.callC |
| + tcc.errC <- nil |
| + for i := 0; i < len(errs); i++ { |
| + <-resultC |
| + } |
| + |
| + So(errors.SingleError(errs), ShouldBeNil) |
| + So(tcc.calls, ShouldEqual, 1) |
| + }) |
| + |
| + Convey(`Multiple registrations for the same stream will result in two requests if the first expires.`, func() { |
| + tcc.callC = make(chan struct{}) |
| + tcc.errC = make(chan error) |
| + |
| + // First request. |
| + go req(st) |
| + |
| + // Wait for the request to happen, then advance time past the request's |
| + // expiration. |
| + <-tcc.callC |
| + tc.Add(time.Second) |
| + |
| + // Second request. |
| + go req(st) |
| + |
| + // Release both calls and reap the results. |
| + <-tcc.callC |
| + tcc.errC <- nil |
| + tcc.errC <- nil |
| + |
| + r1 := <-resultC |
| + r2 := <-resultC |
| + |
| + So(r1.proto, ShouldEqual, "remote") |
| + So(r2.proto, ShouldEqual, "remote") |
| + So(tcc.calls, ShouldEqual, 2) |
| + }) |
| + |
| + Convey(`A registration error will result in a getOrRegister error.`, func() { |
| + tcc.errC = make(chan error, 1) |
| + tcc.errC <- errors.New("test error") |
| + |
| + _, err := ssc.getOrRegister(c, &st) |
| + So(err, ShouldNotBeNil) |
| + So(tcc.calls, ShouldEqual, 1) |
| + |
| + Convey(`A second registration without error will make a new request.`, func() { |
| + tcc.errC = nil |
| + |
| + _, err := ssc.getOrRegister(c, &st) |
| + So(err, ShouldBeNil) |
| + So(tcc.calls, ShouldEqual, 2) |
| + }) |
| + }) |
| + }) |
| + |
| + Convey(`A streamStateCache can register multiple streams at once.`, func() { |
| + tcc.callC = make(chan struct{}) |
| + tcc.errC = make(chan error) |
| + ssc := newStreamStateCache(o) |
| + |
| + count := 256 |
| + wg := sync.WaitGroup{} |
| + errs := make(errors.MultiError, count) |
| + state := make([]*stateProxy, count) |
| + wg.Add(count) |
| + for i := 0; i < count; i++ { |
| + st := st |
| + st.Path = types.StreamPath(fmt.Sprintf("foo/+/bar%d", i)) |
| + |
| + go func(i int) { |
| + defer wg.Done() |
| + state[i], errs[i] = ssc.getOrRegister(c, &st) |
| + }(i) |
| + } |
| + |
| + // Wait for all of them to simultaneously call. |
| + for i := 0; i < count; i++ { |
| + <-tcc.callC |
| + } |
| + |
| + // They're all blocked on errC; allow them to continue. |
| + for i := 0; i < count; i++ { |
| + tcc.errC <- nil |
| + } |
| + |
| + // Wait for them to finish. |
| + wg.Wait() |
| + |
| + // Confirm that all registered successfully. |
| + So(errors.SingleError(errs), ShouldBeNil) |
| + |
| + remotes := 0 |
| + for i := 0; i < count; i++ { |
| + if state[i].proto == "remote" { |
| + remotes++ |
| + } |
| + } |
| + So(remotes, ShouldEqual, count) |
| + }) |
| + }) |
| +} |