Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(127)

Side by Side Diff: logdog/appengine/coordinator/endpoints/services/registerStream_test.go

Issue 2989333002: [logdog] Replace Tumble with push queues. (Closed)
Patch Set: comments Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. 1 // Copyright 2015 The LUCI Authors.
2 // 2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License. 4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at 5 // You may obtain a copy of the License at
6 // 6 //
7 // http://www.apache.org/licenses/LICENSE-2.0 7 // http://www.apache.org/licenses/LICENSE-2.0
8 // 8 //
9 // Unless required by applicable law or agreed to in writing, software 9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, 10 // distributed under the License is distributed on an "AS IS" BASIS,
(...skipping 12 matching lines...) Expand all
23 "github.com/luci/gae/filter/featureBreaker" 23 "github.com/luci/gae/filter/featureBreaker"
24 ds "github.com/luci/gae/service/datastore" 24 ds "github.com/luci/gae/service/datastore"
25 "github.com/luci/luci-go/common/proto/google" 25 "github.com/luci/luci-go/common/proto/google"
26 "github.com/luci/luci-go/logdog/api/config/svcconfig" 26 "github.com/luci/luci-go/logdog/api/config/svcconfig"
27 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" 27 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1"
28 "github.com/luci/luci-go/logdog/api/logpb" 28 "github.com/luci/luci-go/logdog/api/logpb"
29 "github.com/luci/luci-go/logdog/appengine/coordinator" 29 "github.com/luci/luci-go/logdog/appengine/coordinator"
30 ct "github.com/luci/luci-go/logdog/appengine/coordinator/coordinatorTest " 30 ct "github.com/luci/luci-go/logdog/appengine/coordinator/coordinatorTest "
31 "github.com/luci/luci-go/logdog/common/types" 31 "github.com/luci/luci-go/logdog/common/types"
32 "github.com/luci/luci-go/luci_config/common/cfgtypes" 32 "github.com/luci/luci-go/luci_config/common/cfgtypes"
33
33 "golang.org/x/net/context" 34 "golang.org/x/net/context"
34 35
35 . "github.com/luci/luci-go/common/testing/assertions" 36 . "github.com/luci/luci-go/common/testing/assertions"
36 . "github.com/smartystreets/goconvey/convey" 37 . "github.com/smartystreets/goconvey/convey"
37 ) 38 )
38 39
39 func TestRegisterStream(t *testing.T) { 40 func TestRegisterStream(t *testing.T) {
40 t.Parallel() 41 t.Parallel()
41 42
42 Convey(`With a testing configuration`, t, func() { 43 Convey(`With a testing configuration`, t, func() {
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
96 }, 97 },
97 } 98 }
98 99
99 Convey(`Can register the stream.`, func() { 100 Convey(`Can register the stream.`, func() {
100 created := ds.RoundTime(env.Clock.Now()) 101 created := ds.RoundTime(env.Clock.Now())
101 102
102 resp, err := svr.RegisterStream(c, &req) 103 resp, err := svr.RegisterStream(c, &req)
103 So(err, ShouldBeRPCOK) 104 So(err, ShouldBeRPCOK)
104 So(resp, ShouldResemble, expResp) 105 So(resp, ShouldResemble, expResp)
105 ds.GetTestable(c).CatchupIndexes() 106 ds.GetTestable(c).CatchupIndexes()
106 » » » » » env.IterateTumbleAll(c) 107 » » » » » env.RunTaskQueues(c, tls)
107 108
108 So(tls.Get(c), ShouldBeNil) 109 So(tls.Get(c), ShouldBeNil)
109 110
110 // Registers the log stream. 111 // Registers the log stream.
111 So(tls.Stream.Created, ShouldResemble, c reated) 112 So(tls.Stream.Created, ShouldResemble, c reated)
112 113
113 // Registers the log stream state. 114 // Registers the log stream state.
114 So(tls.State.Created, ShouldResemble, cr eated) 115 So(tls.State.Created, ShouldResemble, cr eated)
115 So(tls.State.Updated, ShouldResemble, cr eated) 116 So(tls.State.Updated, ShouldResemble, cr eated)
116 So(tls.State.Secret, ShouldResemble, req .Secret) 117 So(tls.State.Secret, ShouldResemble, req .Secret)
(...skipping 19 matching lines...) Expand all
136 So(ds.Get(c, tls.Stream, tls.State), ShouldBeNil) 137 So(ds.Get(c, tls.Stream, tls.State), ShouldBeNil)
137 }) 138 })
138 So(tls.State.Created, ShouldRese mble, created) 139 So(tls.State.Created, ShouldRese mble, created)
139 So(tls.Stream.Created, ShouldRes emble, created) 140 So(tls.Stream.Created, ShouldRes emble, created)
140 141
141 // No archival request yet. 142 // No archival request yet.
142 So(env.ArchivalPublisher.Hashes( ), ShouldResemble, []string{}) 143 So(env.ArchivalPublisher.Hashes( ), ShouldResemble, []string{})
143 144
144 Convey(`Forces an archival reque st after first archive expiration.`, func() { 145 Convey(`Forces an archival reque st after first archive expiration.`, func() {
145 env.Clock.Set(created.Ad d(time.Hour)) // 1 hour after initial registration. 146 env.Clock.Set(created.Ad d(time.Hour)) // 1 hour after initial registration.
146 » » » » » » » env.IterateTumbleAll(c) 147 » » » » » » » env.RunTaskQueues(c, tls )
147 148
148 So(env.ArchivalPublisher .Hashes(), ShouldResemble, []string{string(tls.Stream.ID)}) 149 So(env.ArchivalPublisher .Hashes(), ShouldResemble, []string{string(tls.Stream.ID)})
149 }) 150 })
150 }) 151 })
151 152
152 Convey(`Will not re-register if secrets don't match.`, func() { 153 Convey(`Will not re-register if secrets don't match.`, func() {
153 req.Secret[0] = 0xAB 154 req.Secret[0] = 0xAB
154 _, err := svr.RegisterStream(c, &req) 155 _, err := svr.RegisterStream(c, &req)
155 So(err, ShouldBeRPCInvalidArgume nt, "invalid secret") 156 So(err, ShouldBeRPCInvalidArgume nt, "invalid secret")
156 }) 157 })
157 }) 158 })
158 159
159 Convey(`Can register a terminal stream.`, func() { 160 Convey(`Can register a terminal stream.`, func() {
160 created := ds.RoundTime(env.Clock.Now()) 161 created := ds.RoundTime(env.Clock.Now())
161 req.TerminalIndex = 1337 162 req.TerminalIndex = 1337
162 expResp.State.TerminalIndex = 1337 163 expResp.State.TerminalIndex = 1337
163 164
164 resp, err := svr.RegisterStream(c, &req) 165 resp, err := svr.RegisterStream(c, &req)
165 So(err, ShouldBeRPCOK) 166 So(err, ShouldBeRPCOK)
166 So(resp, ShouldResemble, expResp) 167 So(resp, ShouldResemble, expResp)
167 ds.GetTestable(c).CatchupIndexes() 168 ds.GetTestable(c).CatchupIndexes()
168 » » » » » env.IterateTumbleAll(c) 169 » » » » » env.RunTaskQueues(c, tls)
169 170
170 So(tls.Get(c), ShouldBeNil) 171 So(tls.Get(c), ShouldBeNil)
171 172
172 // Registers the log stream. 173 // Registers the log stream.
173 So(tls.Stream.Created, ShouldResemble, c reated) 174 So(tls.Stream.Created, ShouldResemble, c reated)
174 175
175 // Registers the log stream state. 176 // Registers the log stream state.
176 So(tls.State.Created, ShouldResemble, cr eated) 177 So(tls.State.Created, ShouldResemble, cr eated)
177 So(tls.State.Updated, ShouldResemble, cr eated) 178 So(tls.State.Updated, ShouldResemble, cr eated)
178 So(tls.State.Secret, ShouldResemble, req .Secret) 179 So(tls.State.Secret, ShouldResemble, req .Secret)
179 So(tls.State.TerminalIndex, ShouldEqual, 1337) 180 So(tls.State.TerminalIndex, ShouldEqual, 1337)
180 So(tls.State.TerminatedTime, ShouldResem ble, created) 181 So(tls.State.TerminatedTime, ShouldResem ble, created)
181 So(tls.State.Terminated(), ShouldBeTrue) 182 So(tls.State.Terminated(), ShouldBeTrue)
182 So(tls.State.ArchivalState(), ShouldEqua l, coordinator.NotArchived) 183 So(tls.State.ArchivalState(), ShouldEqua l, coordinator.NotArchived)
183 184
184 // Should also register the log stream P refix. 185 // Should also register the log stream P refix.
185 So(tls.Prefix.Created, ShouldResemble, c reated) 186 So(tls.Prefix.Created, ShouldResemble, c reated)
186 So(tls.Prefix.Secret, ShouldResemble, re q.Secret) 187 So(tls.Prefix.Secret, ShouldResemble, re q.Secret)
187 188
188 // No pending archival requests. 189 // No pending archival requests.
189 » » » » » env.IterateTumbleAll(c) 190 » » » » » env.RunTaskQueues(c, tls)
190 So(env.ArchivalPublisher.Hashes(), Shoul dResemble, []string{}) 191 So(env.ArchivalPublisher.Hashes(), Shoul dResemble, []string{})
191 192
192 // When we advance to our settle delay, an archival task is scheduled. 193 // When we advance to our settle delay, an archival task is scheduled.
193 env.Clock.Add(10 * time.Minute) 194 env.Clock.Add(10 * time.Minute)
194 » » » » » env.IterateTumbleAll(c) 195 » » » » » env.RunTaskQueues(c, tls)
195 196
196 // Has a pending archival request. 197 // Has a pending archival request.
197 So(env.ArchivalPublisher.Hashes(), Shoul dResemble, []string{string(tls.Stream.ID)}) 198 So(env.ArchivalPublisher.Hashes(), Shoul dResemble, []string{string(tls.Stream.ID)})
198 }) 199 })
199 200
200 Convey(`Will schedule the correct archival expir ation delay`, func() { 201 Convey(`Will schedule the correct archival expir ation delay`, func() {
201 Convey(`When there is no project config delay.`, func() { 202 Convey(`When there is no project config delay.`, func() {
202 env.ModProjectConfig(c, "proj-fo o", func(pcfg *svcconfig.ProjectConfig) { 203 env.ModProjectConfig(c, "proj-fo o", func(pcfg *svcconfig.ProjectConfig) {
203 pcfg.MaxStreamAge = nil 204 pcfg.MaxStreamAge = nil
204 }) 205 })
205 206
206 _, err := svr.RegisterStream(c, &req) 207 _, err := svr.RegisterStream(c, &req)
207 So(err, ShouldBeRPCOK) 208 So(err, ShouldBeRPCOK)
208 ds.GetTestable(c).CatchupIndexes () 209 ds.GetTestable(c).CatchupIndexes ()
209 210
210 // The cleanup archival should b e scheduled for 24 hours, so advance 211 // The cleanup archival should b e scheduled for 24 hours, so advance
211 // 12, confirm no archival, then advance another 12 and confirm that 212 // 12, confirm no archival, then advance another 12 and confirm that
212 // archival was tasked. 213 // archival was tasked.
213 env.Clock.Add(12 * time.Hour) 214 env.Clock.Add(12 * time.Hour)
214 » » » » » » env.IterateTumbleAll(c) 215 » » » » » » env.RunTaskQueues(c, tls)
215 So(env.ArchivalPublisher.Hashes( ), ShouldHaveLength, 0) 216 So(env.ArchivalPublisher.Hashes( ), ShouldHaveLength, 0)
216 217
217 env.Clock.Add(12 * time.Hour) 218 env.Clock.Add(12 * time.Hour)
218 » » » » » » env.IterateTumbleAll(c) 219 » » » » » » env.RunTaskQueues(c, tls)
219 So(env.ArchivalPublisher.Hashes( ), ShouldResemble, []string{string(tls.Stream.ID)}) 220 So(env.ArchivalPublisher.Hashes( ), ShouldResemble, []string{string(tls.Stream.ID)})
220 }) 221 })
221 222
222 Convey(`When there is no service or proj ect config delay.`, func() { 223 Convey(`When there is no service or proj ect config delay.`, func() {
223 env.ModServiceConfig(c, func(cfg *svcconfig.Config) { 224 env.ModServiceConfig(c, func(cfg *svcconfig.Config) {
224 cfg.Coordinator.ArchiveD elayMax = nil 225 cfg.Coordinator.ArchiveD elayMax = nil
225 }) 226 })
226 env.ModProjectConfig(c, "proj-fo o", func(pcfg *svcconfig.ProjectConfig) { 227 env.ModProjectConfig(c, "proj-fo o", func(pcfg *svcconfig.ProjectConfig) {
227 pcfg.MaxStreamAge = nil 228 pcfg.MaxStreamAge = nil
228 }) 229 })
229 230
230 _, err := svr.RegisterStream(c, &req) 231 _, err := svr.RegisterStream(c, &req)
231 So(err, ShouldBeRPCOK) 232 So(err, ShouldBeRPCOK)
232 ds.GetTestable(c).CatchupIndexes () 233 ds.GetTestable(c).CatchupIndexes ()
233 234
234 // The cleanup archival should b e scheduled immediately. 235 // The cleanup archival should b e scheduled immediately.
235 » » » » » » env.IterateTumbleAll(c) 236 » » » » » » env.RunTaskQueues(c, tls)
236 So(env.ArchivalPublisher.Hashes( ), ShouldResemble, []string{string(tls.Stream.ID)}) 237 So(env.ArchivalPublisher.Hashes( ), ShouldResemble, []string{string(tls.Stream.ID)})
237 }) 238 })
238 }) 239 })
239 240
240 Convey(`Returns internal server error if the dat astore Get() fails.`, func() { 241 Convey(`Returns internal server error if the dat astore Get() fails.`, func() {
241 c, fb := featureBreaker.FilterRDS(c, nil ) 242 c, fb := featureBreaker.FilterRDS(c, nil )
242 fb.BreakFeatures(errors.New("test error" ), "GetMulti") 243 fb.BreakFeatures(errors.New("test error" ), "GetMulti")
243 244
244 _, err := svr.RegisterStream(c, &req) 245 _, err := svr.RegisterStream(c, &req)
245 So(err, ShouldBeRPCInternal) 246 So(err, ShouldBeRPCInternal)
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after
336 Desc: tls.DescBytes(), 337 Desc: tls.DescBytes(),
337 TerminalIndex: -1, 338 TerminalIndex: -1,
338 } 339 }
339 340
340 _, err := svr.RegisterStream(c, &req) 341 _, err := svr.RegisterStream(c, &req)
341 if err != nil { 342 if err != nil {
342 b.Fatalf("failed to get OK response (%s)", err) 343 b.Fatalf("failed to get OK response (%s)", err)
343 } 344 }
344 } 345 }
345 } 346 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698