Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(6)

Side by Side Diff: server/internal/logdog/collector/coordinator/cache_test.go

Issue 1906023002: LogDog: Add project namespace to Butler/Collector. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-archivist
Patch Set: Rebase? Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package coordinator 5 package coordinator
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "sync" 9 "sync"
10 "sync/atomic" 10 "sync/atomic"
11 "testing" 11 "testing"
12 "time" 12 "time"
13 13
14 "github.com/luci/luci-go/common/clock/testclock" 14 "github.com/luci/luci-go/common/clock/testclock"
15 "github.com/luci/luci-go/common/config"
15 "github.com/luci/luci-go/common/errors" 16 "github.com/luci/luci-go/common/errors"
16 "github.com/luci/luci-go/common/logdog/types" 17 "github.com/luci/luci-go/common/logdog/types"
17 "github.com/luci/luci-go/common/proto/logdog/logpb" 18 "github.com/luci/luci-go/common/proto/logdog/logpb"
18 "golang.org/x/net/context" 19 "golang.org/x/net/context"
19 20
20 . "github.com/smartystreets/goconvey/convey" 21 . "github.com/smartystreets/goconvey/convey"
21 ) 22 )
22 23
23 // testCoordinator is an implementation of Coordinator that can be used for 24 // testCoordinator is an implementation of Coordinator that can be used for
24 // testing. 25 // testing.
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
69 } 70 }
70 71
71 func TestStreamStateCache(t *testing.T) { 72 func TestStreamStateCache(t *testing.T) {
72 t.Parallel() 73 t.Parallel()
73 74
74 Convey(`Using a test configuration`, t, func() { 75 Convey(`Using a test configuration`, t, func() {
75 c, tc := testclock.UseTime(context.Background(), testclock.TestT imeLocal) 76 c, tc := testclock.UseTime(context.Background(), testclock.TestT imeLocal)
76 tcc := testCoordinator{} 77 tcc := testCoordinator{}
77 78
78 st := LogStreamState{ 79 st := LogStreamState{
80 Project: "test-project",
79 Path: "foo/+/bar", 81 Path: "foo/+/bar",
80 TerminalIndex: -1, 82 TerminalIndex: -1,
81 } 83 }
82 84
83 // Note: In all of these tests, we check if "proto" field (Proto Version) 85 // Note: In all of these tests, we check if "proto" field (Proto Version)
84 // is "remote". We use ProtoVersion as a channel between our fak e remote 86 // is "remote". We use ProtoVersion as a channel between our fak e remote
85 // service. When our fake remote service returns a LogStreamStat e, it sets 87 // service. When our fake remote service returns a LogStreamStat e, it sets
86 // "remote" to true to differentiate it from the local pushed st ate. 88 // "remote" to true to differentiate it from the local pushed st ate.
87 // 89 //
88 // If a LogStreamState has "remote" set to true, that implies th at it was 90 // If a LogStreamState has "remote" set to true, that implies th at it was
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
159 <-tcc.callC 161 <-tcc.callC
160 162
161 // Set the terminal index. We will use a minimal LogStreamState. We know 163 // Set the terminal index. We will use a minimal LogStreamState. We know
162 // that this will happen after the streamStateCa cheEntry is registered 164 // that this will happen after the streamStateCa cheEntry is registered
163 // because both block on the LRU cache's Mutate, which is atomic, and 165 // because both block on the LRU cache's Mutate, which is atomic, and
164 // RegisterStream must have added the streamStat eCacheEntry in order for 166 // RegisterStream must have added the streamStat eCacheEntry in order for
165 // the lock to be available for TerminateStream to proceed. 167 // the lock to be available for TerminateStream to proceed.
166 terminalErrC := make(chan error) 168 terminalErrC := make(chan error)
167 go func() { 169 go func() {
168 terminalErrC <- ssc.TerminateStream(c, & LogStreamState{ 170 terminalErrC <- ssc.TerminateStream(c, & LogStreamState{
171 Project: st.Project,
169 Path: st.Path, 172 Path: st.Path,
170 TerminalIndex: 1337, 173 TerminalIndex: 1337,
171 }) 174 })
172 }() 175 }()
173 176
174 // Let both requests succeed. 177 // Let both requests succeed.
175 <-tcc.callC 178 <-tcc.callC
176 tcc.errC <- nil 179 tcc.errC <- nil
177 tcc.errC <- nil 180 tcc.errC <- nil
178 181
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
238 So(tcc.calls, ShouldEqual, 1) 241 So(tcc.calls, ShouldEqual, 1)
239 242
240 Convey(`A second registration without error will make a new request.`, func() { 243 Convey(`A second registration without error will make a new request.`, func() {
241 tcc.errC = nil 244 tcc.errC = nil
242 245
243 _, err := ssc.RegisterStream(c, &st, nil ) 246 _, err := ssc.RegisterStream(c, &st, nil )
244 So(err, ShouldBeNil) 247 So(err, ShouldBeNil)
245 So(tcc.calls, ShouldEqual, 2) 248 So(tcc.calls, ShouldEqual, 2)
246 }) 249 })
247 }) 250 })
251
252 Convey(`Different projects with the sme stream name will not conflict.`, func() {
253 var projects = []config.ProjectName{"", "foo", " bar"}
254
255 for i, p := range projects {
256 st.Project = p
257 s, err := ssc.RegisterStream(c, &st, nil )
258 So(err, ShouldBeNil)
259
260 s.TerminalIndex = types.MessageIndex(i)
261 So(ssc.TerminateStream(c, s), ShouldBeNi l)
262 }
263 So(tcc.calls, ShouldEqual, len(projects)*2)
264
265 for i, p := range projects {
266 st.Project = p
267 st.TerminalIndex = -1
268
269 s, err := ssc.RegisterStream(c, &st, nil )
270 So(err, ShouldBeNil)
271 So(s.TerminalIndex, ShouldEqual, types.M essageIndex(i))
272 }
273 So(tcc.calls, ShouldEqual, len(projects)*2)
274 })
248 }) 275 })
249 276
250 Convey(`A streamStateCache can register multiple streams at once .`, func() { 277 Convey(`A streamStateCache can register multiple streams at once .`, func() {
251 ssc := NewCache(&tcc, 0, 0) 278 ssc := NewCache(&tcc, 0, 0)
252 tcc.callC = make(chan struct{}) 279 tcc.callC = make(chan struct{})
253 tcc.errC = make(chan error) 280 tcc.errC = make(chan error)
254 281
255 count := 2048 282 count := 2048
256 wg := sync.WaitGroup{} 283 wg := sync.WaitGroup{}
257 errs := make(errors.MultiError, count) 284 errs := make(errors.MultiError, count)
(...skipping 28 matching lines...) Expand all
286 remotes := 0 313 remotes := 0
287 for i := 0; i < count; i++ { 314 for i := 0; i < count; i++ {
288 if state[i].ProtoVersion == "remote" { 315 if state[i].ProtoVersion == "remote" {
289 remotes++ 316 remotes++
290 } 317 }
291 } 318 }
292 So(remotes, ShouldEqual, count) 319 So(remotes, ShouldEqual, count)
293 }) 320 })
294 }) 321 })
295 } 322 }
OLDNEW
« no previous file with comments | « server/internal/logdog/collector/coordinator/cache.go ('k') | server/internal/logdog/collector/coordinator/coordinator.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698