Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package collector | |
| 6 | |
| 7 import ( | |
| 8 "fmt" | |
| 9 "sync" | |
| 10 "testing" | |
| 11 "time" | |
| 12 | |
| 13 "github.com/luci/luci-go/common/api/logdog_coordinator/service/v1" | |
| 14 "github.com/luci/luci-go/common/clock/testclock" | |
| 15 "github.com/luci/luci-go/common/errors" | |
| 16 "github.com/luci/luci-go/common/logdog/types" | |
| 17 cc "github.com/luci/luci-go/server/internal/logdog/coordinatorClient" | |
| 18 "golang.org/x/net/context" | |
| 19 | |
| 20 . "github.com/smartystreets/goconvey/convey" | |
| 21 ) | |
| 22 | |
| 23 func TestStreamStateCache(t *testing.T) { | |
| 24 t.Parallel() | |
| 25 | |
| 26 FocusConvey(`Using a test configuration`, t, func() { | |
|
martiniss
2016/01/26 02:24:18
Leftover?
dnj (Google)
2016/01/26 05:06:05
Oops yep. Removed.
| |
| 27 c, tc := testclock.UseTime(context.Background(), testclock.TestT imeLocal) | |
| 28 tcc := testCoordinatorClient{} | |
| 29 | |
| 30 o := streamStateCacheOptions{ | |
| 31 coordinator: &tcc, | |
| 32 cacheSize: 4, | |
| 33 expiration: 1 * time.Second, | |
| 34 } | |
| 35 | |
| 36 st := cc.State{ | |
| 37 Path: "foo/+/bar", | |
| 38 State: &service.LogStreamState{ | |
| 39 TerminalIndex: -1, | |
| 40 }, | |
| 41 } | |
| 42 | |
| 43 FocusConvey(`A streamStateCache`, func() { | |
| 44 ssc := newStreamStateCache(o) | |
| 45 | |
| 46 resultC := make(chan *stateProxy) | |
| 47 req := func(s cc.State) { | |
| 48 var res *stateProxy | |
| 49 defer func() { | |
| 50 resultC <- res | |
| 51 }() | |
| 52 | |
| 53 st, err := ssc.getOrRegister(c, &s) | |
| 54 if err == nil { | |
| 55 res = st | |
| 56 } | |
| 57 } | |
| 58 | |
| 59 Convey(`Can register a stream`, func() { | |
| 60 s, err := ssc.getOrRegister(c, &st) | |
| 61 So(err, ShouldBeNil) | |
| 62 So(s.proto, ShouldEqual, "remote") | |
| 63 So(tcc.calls, ShouldEqual, 1) | |
| 64 | |
| 65 Convey(`Will not re-register the same stream.`, func() { | |
| 66 st.ProtoVersion = "" | |
|
martiniss
2016/01/26 02:24:18
why are we setting this? Maybe add a comment
dnj (Google)
2016/01/26 05:06:05
Added a comment explaining.
| |
| 67 | |
| 68 s, err := ssc.getOrRegister(c, &st) | |
| 69 So(err, ShouldBeNil) | |
| 70 So(s.proto, ShouldEqual, "remote") | |
| 71 So(tcc.calls, ShouldEqual, 1) | |
| 72 }) | |
| 73 | |
| 74 Convey(`When the registration expires`, func() { | |
| 75 st.ProtoVersion = "" | |
| 76 tc.Add(time.Second) | |
| 77 | |
| 78 Convey(`Will re-register the stream.`, f unc() { | |
| 79 s, err := ssc.getOrRegister(c, & st) | |
| 80 So(err, ShouldBeNil) | |
| 81 So(s.proto, ShouldEqual, "remote ") | |
| 82 So(tcc.calls, ShouldEqual, 2) | |
| 83 }) | |
| 84 }) | |
| 85 | |
| 86 Convey(`Can terminate a registered stream`, func () { | |
| 87 s.terminalIndex = 1337 | |
| 88 So(ssc.setTerminalIndex(c, s), ShouldBeN il) | |
| 89 So(tcc.calls, ShouldEqual, 2) // +1 call | |
| 90 | |
| 91 Convey(`Registering the stream will incl ude the terminal index.`, func() { | |
| 92 // Fill it in with junk to make sure we are getting cached. | |
| 93 st.State.TerminalIndex = 123 | |
| 94 st.ProtoVersion = "" | |
| 95 | |
| 96 s, err := ssc.getOrRegister(c, & st) | |
| 97 So(err, ShouldBeNil) | |
| 98 So(s.proto, ShouldEqual, "remote ") | |
| 99 So(s.terminalIndex, ShouldEqual, 1337) | |
| 100 So(tcc.calls, ShouldEqual, 2) // No additional calls. | |
| 101 }) | |
| 102 }) | |
| 103 }) | |
| 104 | |
| 105 FocusConvey(`When the terminal index is set before the f etch finishes, it will be returned.`, func() { | |
| 106 tcc.callC = make(chan struct{}) | |
| 107 tcc.errC = make(chan error) | |
| 108 | |
| 109 go req(st) | |
| 110 | |
| 111 // Wait for our request to block on RegisterStre am. | |
| 112 <-tcc.callC | |
| 113 | |
| 114 // Set the terminal index. We will use a minimal stateProxy. We know | |
| 115 // that this will happen after the streamStateCa cheEntry is registered | |
| 116 // because both block on the LRU cache's Mutate, which is atomic, and | |
| 117 // getOrRegister must have added the streamState CacheEntry in order for | |
| 118 // the lock to be available for setTerminalIndex to proceed. | |
| 119 terminalErrC := make(chan error) | |
| 120 go func() { | |
| 121 terminalErrC <- ssc.setTerminalIndex(c, &stateProxy{ | |
| 122 path: st.Path, | |
| 123 terminalIndex: 1337, | |
| 124 }) | |
| 125 }() | |
| 126 | |
| 127 // Let both requests succeed. | |
| 128 <-tcc.callC | |
| 129 tcc.errC <- nil | |
| 130 tcc.errC <- nil | |
| 131 | |
| 132 // Read the stateProxy from our getOrRegister re quest. | |
| 133 s := <-resultC | |
| 134 So(s, ShouldNotBeNil) | |
| 135 So(s.terminalIndex, ShouldEqual, 1337) | |
| 136 }) | |
| 137 | |
| 138 Convey(`When multiple goroutines register the same strea m, it gets registered once.`, func() { | |
| 139 tcc.callC = make(chan struct{}) | |
| 140 tcc.errC = make(chan error) | |
| 141 | |
| 142 errs := make(errors.MultiError, 256) | |
| 143 for i := 0; i < len(errs); i++ { | |
| 144 go req(st) | |
| 145 } | |
| 146 | |
| 147 <-tcc.callC | |
| 148 tcc.errC <- nil | |
| 149 for i := 0; i < len(errs); i++ { | |
| 150 <-resultC | |
| 151 } | |
| 152 | |
| 153 So(errors.SingleError(errs), ShouldBeNil) | |
| 154 So(tcc.calls, ShouldEqual, 1) | |
| 155 }) | |
| 156 | |
| 157 Convey(`Multiple registrations for the same stream will result in two requests if the first expires.`, func() { | |
| 158 tcc.callC = make(chan struct{}) | |
| 159 tcc.errC = make(chan error) | |
| 160 | |
| 161 // First request. | |
| 162 go req(st) | |
| 163 | |
| 164 // Wait for the request to happen, then advance time past the request's | |
| 165 // expiration. | |
| 166 <-tcc.callC | |
| 167 tc.Add(time.Second) | |
| 168 | |
| 169 // Second request. | |
| 170 go req(st) | |
| 171 | |
| 172 // Release both calls and reap the results. | |
| 173 <-tcc.callC | |
| 174 tcc.errC <- nil | |
| 175 tcc.errC <- nil | |
| 176 | |
| 177 r1 := <-resultC | |
| 178 r2 := <-resultC | |
| 179 | |
| 180 So(r1.proto, ShouldEqual, "remote") | |
| 181 So(r2.proto, ShouldEqual, "remote") | |
| 182 So(tcc.calls, ShouldEqual, 2) | |
| 183 }) | |
| 184 | |
| 185 Convey(`A registration error will result in a getOrRegis ter error.`, func() { | |
| 186 tcc.errC = make(chan error, 1) | |
| 187 tcc.errC <- errors.New("test error") | |
| 188 | |
| 189 _, err := ssc.getOrRegister(c, &st) | |
| 190 So(err, ShouldNotBeNil) | |
| 191 So(tcc.calls, ShouldEqual, 1) | |
| 192 | |
| 193 Convey(`A second registration without error will make a new request.`, func() { | |
| 194 tcc.errC = nil | |
| 195 | |
| 196 _, err := ssc.getOrRegister(c, &st) | |
| 197 So(err, ShouldBeNil) | |
| 198 So(tcc.calls, ShouldEqual, 2) | |
| 199 }) | |
| 200 }) | |
| 201 }) | |
| 202 | |
| 203 Convey(`A streamStateCache can register multiple streams at once .`, func() { | |
| 204 tcc.callC = make(chan struct{}) | |
| 205 tcc.errC = make(chan error) | |
| 206 ssc := newStreamStateCache(o) | |
| 207 | |
| 208 count := 256 | |
| 209 wg := sync.WaitGroup{} | |
| 210 errs := make(errors.MultiError, count) | |
| 211 state := make([]*stateProxy, count) | |
| 212 wg.Add(count) | |
| 213 for i := 0; i < count; i++ { | |
| 214 st := st | |
| 215 st.Path = types.StreamPath(fmt.Sprintf("foo/+/ba r%d", i)) | |
| 216 | |
| 217 go func(i int) { | |
| 218 defer wg.Done() | |
| 219 state[i], errs[i] = ssc.getOrRegister(c, &st) | |
| 220 }(i) | |
| 221 } | |
| 222 | |
| 223 // Wait for all of them to simultaneously call. | |
| 224 for i := 0; i < count; i++ { | |
| 225 <-tcc.callC | |
| 226 } | |
| 227 | |
| 228 // They're all blocked on errC; allow them to continue. | |
| 229 for i := 0; i < count; i++ { | |
| 230 tcc.errC <- nil | |
| 231 } | |
| 232 | |
| 233 // Wait for them to finish. | |
| 234 wg.Wait() | |
| 235 | |
| 236 // Confirm that all registered successfully. | |
| 237 So(errors.SingleError(errs), ShouldBeNil) | |
| 238 | |
| 239 remotes := 0 | |
| 240 for i := 0; i < count; i++ { | |
| 241 if state[i].proto == "remote" { | |
| 242 remotes++ | |
| 243 } | |
| 244 } | |
| 245 So(remotes, ShouldEqual, count) | |
| 246 }) | |
| 247 }) | |
| 248 } | |
| OLD | NEW |