| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. | 1 // Copyright 2015 The LUCI Authors. |
| 2 // | 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
| 6 // | 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // | 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| (...skipping 12 matching lines...) Expand all Loading... |
| 23 "github.com/luci/gae/filter/featureBreaker" | 23 "github.com/luci/gae/filter/featureBreaker" |
| 24 ds "github.com/luci/gae/service/datastore" | 24 ds "github.com/luci/gae/service/datastore" |
| 25 "github.com/luci/luci-go/common/proto/google" | 25 "github.com/luci/luci-go/common/proto/google" |
| 26 "github.com/luci/luci-go/logdog/api/config/svcconfig" | 26 "github.com/luci/luci-go/logdog/api/config/svcconfig" |
| 27 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" | 27 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" |
| 28 "github.com/luci/luci-go/logdog/api/logpb" | 28 "github.com/luci/luci-go/logdog/api/logpb" |
| 29 "github.com/luci/luci-go/logdog/appengine/coordinator" | 29 "github.com/luci/luci-go/logdog/appengine/coordinator" |
| 30 ct "github.com/luci/luci-go/logdog/appengine/coordinator/coordinatorTest
" | 30 ct "github.com/luci/luci-go/logdog/appengine/coordinator/coordinatorTest
" |
| 31 "github.com/luci/luci-go/logdog/common/types" | 31 "github.com/luci/luci-go/logdog/common/types" |
| 32 "github.com/luci/luci-go/luci_config/common/cfgtypes" | 32 "github.com/luci/luci-go/luci_config/common/cfgtypes" |
| 33 |
| 33 "golang.org/x/net/context" | 34 "golang.org/x/net/context" |
| 34 | 35 |
| 35 . "github.com/luci/luci-go/common/testing/assertions" | 36 . "github.com/luci/luci-go/common/testing/assertions" |
| 36 . "github.com/smartystreets/goconvey/convey" | 37 . "github.com/smartystreets/goconvey/convey" |
| 37 ) | 38 ) |
| 38 | 39 |
| 39 func TestRegisterStream(t *testing.T) { | 40 func TestRegisterStream(t *testing.T) { |
| 40 t.Parallel() | 41 t.Parallel() |
| 41 | 42 |
| 42 Convey(`With a testing configuration`, t, func() { | 43 Convey(`With a testing configuration`, t, func() { |
| (...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 96 }, | 97 }, |
| 97 } | 98 } |
| 98 | 99 |
| 99 Convey(`Can register the stream.`, func() { | 100 Convey(`Can register the stream.`, func() { |
| 100 created := ds.RoundTime(env.Clock.Now()) | 101 created := ds.RoundTime(env.Clock.Now()) |
| 101 | 102 |
| 102 resp, err := svr.RegisterStream(c, &req) | 103 resp, err := svr.RegisterStream(c, &req) |
| 103 So(err, ShouldBeRPCOK) | 104 So(err, ShouldBeRPCOK) |
| 104 So(resp, ShouldResemble, expResp) | 105 So(resp, ShouldResemble, expResp) |
| 105 ds.GetTestable(c).CatchupIndexes() | 106 ds.GetTestable(c).CatchupIndexes() |
| 106 » » » » » env.IterateTumbleAll(c) | 107 » » » » » env.RunTaskQueues(c, tls) |
| 107 | 108 |
| 108 So(tls.Get(c), ShouldBeNil) | 109 So(tls.Get(c), ShouldBeNil) |
| 109 | 110 |
| 110 // Registers the log stream. | 111 // Registers the log stream. |
| 111 So(tls.Stream.Created, ShouldResemble, c
reated) | 112 So(tls.Stream.Created, ShouldResemble, c
reated) |
| 112 | 113 |
| 113 // Registers the log stream state. | 114 // Registers the log stream state. |
| 114 So(tls.State.Created, ShouldResemble, cr
eated) | 115 So(tls.State.Created, ShouldResemble, cr
eated) |
| 115 So(tls.State.Updated, ShouldResemble, cr
eated) | 116 So(tls.State.Updated, ShouldResemble, cr
eated) |
| 116 So(tls.State.Secret, ShouldResemble, req
.Secret) | 117 So(tls.State.Secret, ShouldResemble, req
.Secret) |
| (...skipping 19 matching lines...) Expand all Loading... |
| 136 So(ds.Get(c, tls.Stream,
tls.State), ShouldBeNil) | 137 So(ds.Get(c, tls.Stream,
tls.State), ShouldBeNil) |
| 137 }) | 138 }) |
| 138 So(tls.State.Created, ShouldRese
mble, created) | 139 So(tls.State.Created, ShouldRese
mble, created) |
| 139 So(tls.Stream.Created, ShouldRes
emble, created) | 140 So(tls.Stream.Created, ShouldRes
emble, created) |
| 140 | 141 |
| 141 // No archival request yet. | 142 // No archival request yet. |
| 142 So(env.ArchivalPublisher.Hashes(
), ShouldResemble, []string{}) | 143 So(env.ArchivalPublisher.Hashes(
), ShouldResemble, []string{}) |
| 143 | 144 |
| 144 Convey(`Forces an archival reque
st after first archive expiration.`, func() { | 145 Convey(`Forces an archival reque
st after first archive expiration.`, func() { |
| 145 env.Clock.Set(created.Ad
d(time.Hour)) // 1 hour after initial registration. | 146 env.Clock.Set(created.Ad
d(time.Hour)) // 1 hour after initial registration. |
| 146 » » » » » » » env.IterateTumbleAll(c) | 147 » » » » » » » env.RunTaskQueues(c, tls
) |
| 147 | 148 |
| 148 So(env.ArchivalPublisher
.Hashes(), ShouldResemble, []string{string(tls.Stream.ID)}) | 149 So(env.ArchivalPublisher
.Hashes(), ShouldResemble, []string{string(tls.Stream.ID)}) |
| 149 }) | 150 }) |
| 150 }) | 151 }) |
| 151 | 152 |
| 152 Convey(`Will not re-register if secrets
don't match.`, func() { | 153 Convey(`Will not re-register if secrets
don't match.`, func() { |
| 153 req.Secret[0] = 0xAB | 154 req.Secret[0] = 0xAB |
| 154 _, err := svr.RegisterStream(c,
&req) | 155 _, err := svr.RegisterStream(c,
&req) |
| 155 So(err, ShouldBeRPCInvalidArgume
nt, "invalid secret") | 156 So(err, ShouldBeRPCInvalidArgume
nt, "invalid secret") |
| 156 }) | 157 }) |
| 157 }) | 158 }) |
| 158 | 159 |
| 159 Convey(`Can register a terminal stream.`, func()
{ | 160 Convey(`Can register a terminal stream.`, func()
{ |
| 160 created := ds.RoundTime(env.Clock.Now()) | 161 created := ds.RoundTime(env.Clock.Now()) |
| 161 req.TerminalIndex = 1337 | 162 req.TerminalIndex = 1337 |
| 162 expResp.State.TerminalIndex = 1337 | 163 expResp.State.TerminalIndex = 1337 |
| 163 | 164 |
| 164 resp, err := svr.RegisterStream(c, &req) | 165 resp, err := svr.RegisterStream(c, &req) |
| 165 So(err, ShouldBeRPCOK) | 166 So(err, ShouldBeRPCOK) |
| 166 So(resp, ShouldResemble, expResp) | 167 So(resp, ShouldResemble, expResp) |
| 167 ds.GetTestable(c).CatchupIndexes() | 168 ds.GetTestable(c).CatchupIndexes() |
| 168 » » » » » env.IterateTumbleAll(c) | 169 » » » » » env.RunTaskQueues(c, tls) |
| 169 | 170 |
| 170 So(tls.Get(c), ShouldBeNil) | 171 So(tls.Get(c), ShouldBeNil) |
| 171 | 172 |
| 172 // Registers the log stream. | 173 // Registers the log stream. |
| 173 So(tls.Stream.Created, ShouldResemble, c
reated) | 174 So(tls.Stream.Created, ShouldResemble, c
reated) |
| 174 | 175 |
| 175 // Registers the log stream state. | 176 // Registers the log stream state. |
| 176 So(tls.State.Created, ShouldResemble, cr
eated) | 177 So(tls.State.Created, ShouldResemble, cr
eated) |
| 177 So(tls.State.Updated, ShouldResemble, cr
eated) | 178 So(tls.State.Updated, ShouldResemble, cr
eated) |
| 178 So(tls.State.Secret, ShouldResemble, req
.Secret) | 179 So(tls.State.Secret, ShouldResemble, req
.Secret) |
| 179 So(tls.State.TerminalIndex, ShouldEqual,
1337) | 180 So(tls.State.TerminalIndex, ShouldEqual,
1337) |
| 180 So(tls.State.TerminatedTime, ShouldResem
ble, created) | 181 So(tls.State.TerminatedTime, ShouldResem
ble, created) |
| 181 So(tls.State.Terminated(), ShouldBeTrue) | 182 So(tls.State.Terminated(), ShouldBeTrue) |
| 182 So(tls.State.ArchivalState(), ShouldEqua
l, coordinator.NotArchived) | 183 So(tls.State.ArchivalState(), ShouldEqua
l, coordinator.NotArchived) |
| 183 | 184 |
| 184 // Should also register the log stream P
refix. | 185 // Should also register the log stream P
refix. |
| 185 So(tls.Prefix.Created, ShouldResemble, c
reated) | 186 So(tls.Prefix.Created, ShouldResemble, c
reated) |
| 186 So(tls.Prefix.Secret, ShouldResemble, re
q.Secret) | 187 So(tls.Prefix.Secret, ShouldResemble, re
q.Secret) |
| 187 | 188 |
| 188 // No pending archival requests. | 189 // No pending archival requests. |
| 189 » » » » » env.IterateTumbleAll(c) | 190 » » » » » env.RunTaskQueues(c, tls) |
| 190 So(env.ArchivalPublisher.Hashes(), Shoul
dResemble, []string{}) | 191 So(env.ArchivalPublisher.Hashes(), Shoul
dResemble, []string{}) |
| 191 | 192 |
| 192 // When we advance to our settle delay,
an archival task is scheduled. | 193 // When we advance to our settle delay,
an archival task is scheduled. |
| 193 env.Clock.Add(10 * time.Minute) | 194 env.Clock.Add(10 * time.Minute) |
| 194 » » » » » env.IterateTumbleAll(c) | 195 » » » » » env.RunTaskQueues(c, tls) |
| 195 | 196 |
| 196 // Has a pending archival request. | 197 // Has a pending archival request. |
| 197 So(env.ArchivalPublisher.Hashes(), Shoul
dResemble, []string{string(tls.Stream.ID)}) | 198 So(env.ArchivalPublisher.Hashes(), Shoul
dResemble, []string{string(tls.Stream.ID)}) |
| 198 }) | 199 }) |
| 199 | 200 |
| 200 Convey(`Will schedule the correct archival expir
ation delay`, func() { | 201 Convey(`Will schedule the correct archival expir
ation delay`, func() { |
| 201 Convey(`When there is no project config
delay.`, func() { | 202 Convey(`When there is no project config
delay.`, func() { |
| 202 env.ModProjectConfig(c, "proj-fo
o", func(pcfg *svcconfig.ProjectConfig) { | 203 env.ModProjectConfig(c, "proj-fo
o", func(pcfg *svcconfig.ProjectConfig) { |
| 203 pcfg.MaxStreamAge = nil | 204 pcfg.MaxStreamAge = nil |
| 204 }) | 205 }) |
| 205 | 206 |
| 206 _, err := svr.RegisterStream(c,
&req) | 207 _, err := svr.RegisterStream(c,
&req) |
| 207 So(err, ShouldBeRPCOK) | 208 So(err, ShouldBeRPCOK) |
| 208 ds.GetTestable(c).CatchupIndexes
() | 209 ds.GetTestable(c).CatchupIndexes
() |
| 209 | 210 |
| 210 // The cleanup archival should b
e scheduled for 24 hours, so advance | 211 // The cleanup archival should b
e scheduled for 24 hours, so advance |
| 211 // 12, confirm no archival, then
advance another 12 and confirm that | 212 // 12, confirm no archival, then
advance another 12 and confirm that |
| 212 // archival was tasked. | 213 // archival was tasked. |
| 213 env.Clock.Add(12 * time.Hour) | 214 env.Clock.Add(12 * time.Hour) |
| 214 » » » » » » env.IterateTumbleAll(c) | 215 » » » » » » env.RunTaskQueues(c, tls) |
| 215 So(env.ArchivalPublisher.Hashes(
), ShouldHaveLength, 0) | 216 So(env.ArchivalPublisher.Hashes(
), ShouldHaveLength, 0) |
| 216 | 217 |
| 217 env.Clock.Add(12 * time.Hour) | 218 env.Clock.Add(12 * time.Hour) |
| 218 » » » » » » env.IterateTumbleAll(c) | 219 » » » » » » env.RunTaskQueues(c, tls) |
| 219 So(env.ArchivalPublisher.Hashes(
), ShouldResemble, []string{string(tls.Stream.ID)}) | 220 So(env.ArchivalPublisher.Hashes(
), ShouldResemble, []string{string(tls.Stream.ID)}) |
| 220 }) | 221 }) |
| 221 | 222 |
| 222 Convey(`When there is no service or proj
ect config delay.`, func() { | 223 Convey(`When there is no service or proj
ect config delay.`, func() { |
| 223 env.ModServiceConfig(c, func(cfg
*svcconfig.Config) { | 224 env.ModServiceConfig(c, func(cfg
*svcconfig.Config) { |
| 224 cfg.Coordinator.ArchiveD
elayMax = nil | 225 cfg.Coordinator.ArchiveD
elayMax = nil |
| 225 }) | 226 }) |
| 226 env.ModProjectConfig(c, "proj-fo
o", func(pcfg *svcconfig.ProjectConfig) { | 227 env.ModProjectConfig(c, "proj-fo
o", func(pcfg *svcconfig.ProjectConfig) { |
| 227 pcfg.MaxStreamAge = nil | 228 pcfg.MaxStreamAge = nil |
| 228 }) | 229 }) |
| 229 | 230 |
| 230 _, err := svr.RegisterStream(c,
&req) | 231 _, err := svr.RegisterStream(c,
&req) |
| 231 So(err, ShouldBeRPCOK) | 232 So(err, ShouldBeRPCOK) |
| 232 ds.GetTestable(c).CatchupIndexes
() | 233 ds.GetTestable(c).CatchupIndexes
() |
| 233 | 234 |
| 234 // The cleanup archival should b
e scheduled immediately. | 235 // The cleanup archival should b
e scheduled immediately. |
| 235 » » » » » » env.IterateTumbleAll(c) | 236 » » » » » » env.RunTaskQueues(c, tls) |
| 236 So(env.ArchivalPublisher.Hashes(
), ShouldResemble, []string{string(tls.Stream.ID)}) | 237 So(env.ArchivalPublisher.Hashes(
), ShouldResemble, []string{string(tls.Stream.ID)}) |
| 237 }) | 238 }) |
| 238 }) | 239 }) |
| 239 | 240 |
| 240 Convey(`Returns internal server error if the dat
astore Get() fails.`, func() { | 241 Convey(`Returns internal server error if the dat
astore Get() fails.`, func() { |
| 241 c, fb := featureBreaker.FilterRDS(c, nil
) | 242 c, fb := featureBreaker.FilterRDS(c, nil
) |
| 242 fb.BreakFeatures(errors.New("test error"
), "GetMulti") | 243 fb.BreakFeatures(errors.New("test error"
), "GetMulti") |
| 243 | 244 |
| 244 _, err := svr.RegisterStream(c, &req) | 245 _, err := svr.RegisterStream(c, &req) |
| 245 So(err, ShouldBeRPCInternal) | 246 So(err, ShouldBeRPCInternal) |
| (...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 336 Desc: tls.DescBytes(), | 337 Desc: tls.DescBytes(), |
| 337 TerminalIndex: -1, | 338 TerminalIndex: -1, |
| 338 } | 339 } |
| 339 | 340 |
| 340 _, err := svr.RegisterStream(c, &req) | 341 _, err := svr.RegisterStream(c, &req) |
| 341 if err != nil { | 342 if err != nil { |
| 342 b.Fatalf("failed to get OK response (%s)", err) | 343 b.Fatalf("failed to get OK response (%s)", err) |
| 343 } | 344 } |
| 344 } | 345 } |
| 345 } | 346 } |
| OLD | NEW |