| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package services | 5 package services |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | 8 "errors" |
| 9 "testing" | 9 "testing" |
| 10 "time" | 10 "time" |
| (...skipping 12 matching lines...) Expand all Loading... |
| 23 . "github.com/luci/luci-go/common/testing/assertions" | 23 . "github.com/luci/luci-go/common/testing/assertions" |
| 24 . "github.com/smartystreets/goconvey/convey" | 24 . "github.com/smartystreets/goconvey/convey" |
| 25 ) | 25 ) |
| 26 | 26 |
| 27 func TestTerminateStream(t *testing.T) { | 27 func TestTerminateStream(t *testing.T) { |
| 28 t.Parallel() | 28 t.Parallel() |
| 29 | 29 |
| 30 Convey(`With a testing configuration`, t, func() { | 30 Convey(`With a testing configuration`, t, func() { |
| 31 c, env := ct.Install() | 31 c, env := ct.Install() |
| 32 | 32 |
| 33 // Set our archival delays. The project delay is smaller than th
e service |
| 34 // delay, so it should be used. |
| 33 env.ModServiceConfig(c, func(cfg *svcconfig.Coordinator) { | 35 env.ModServiceConfig(c, func(cfg *svcconfig.Coordinator) { |
| 34 cfg.ArchiveTopic = "projects/test/topics/archive" | 36 cfg.ArchiveTopic = "projects/test/topics/archive" |
| 35 cfg.ArchiveSettleDelay = google.NewDuration(10 * time.Se
cond) | 37 cfg.ArchiveSettleDelay = google.NewDuration(10 * time.Se
cond) |
| 36 cfg.ArchiveDelayMax = google.NewDuration(24 * time.Hour) | 38 cfg.ArchiveDelayMax = google.NewDuration(24 * time.Hour) |
| 37 }) | 39 }) |
| 40 env.ModProjectConfig("proj-foo", func(pcfg *svcconfig.ProjectCon
fig) { |
| 41 pcfg.MaxStreamAge = google.NewDuration(time.Hour) |
| 42 }) |
| 38 | 43 |
| 39 svr := New() | 44 svr := New() |
| 40 | 45 |
| 41 tls := ct.MakeStream(c, "proj-foo", "testing/+/foo/bar") | 46 tls := ct.MakeStream(c, "proj-foo", "testing/+/foo/bar") |
| 42 | 47 |
| 43 req := logdog.TerminateStreamRequest{ | 48 req := logdog.TerminateStreamRequest{ |
| 44 Project: string(tls.Project), | 49 Project: string(tls.Project), |
| 45 Id: string(tls.Stream.ID), | 50 Id: string(tls.Stream.ID), |
| 46 Secret: tls.Prefix.Secret, | 51 Secret: tls.Prefix.Secret, |
| 47 TerminalIndex: 1337, | 52 TerminalIndex: 1337, |
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 86 }) | 91 }) |
| 87 So(tls.State.TerminalIndex, ShouldEqual,
1337) | 92 So(tls.State.TerminalIndex, ShouldEqual,
1337) |
| 88 So(tls.State.Terminated(), ShouldBeTrue) | 93 So(tls.State.Terminated(), ShouldBeTrue) |
| 89 So(tls.State.ArchivalState(), ShouldEqua
l, coordinator.ArchiveTasked) | 94 So(tls.State.ArchivalState(), ShouldEqua
l, coordinator.ArchiveTasked) |
| 90 So(env.ArchivalPublisher.Hashes(), Shoul
dResemble, []string{string(tls.Stream.ID)}) | 95 So(env.ArchivalPublisher.Hashes(), Shoul
dResemble, []string{string(tls.Stream.ID)}) |
| 91 | 96 |
| 92 // Assert that all archive tasks are sch
eduled ArchiveSettleDelay in | 97 // Assert that all archive tasks are sch
eduled ArchiveSettleDelay in |
| 93 // the future. | 98 // the future. |
| 94 for _, t := range env.ArchivalPublisher.
Tasks() { | 99 for _, t := range env.ArchivalPublisher.
Tasks() { |
| 95 So(t.SettleDelay.Duration(), Sho
uldEqual, 10*time.Second) | 100 So(t.SettleDelay.Duration(), Sho
uldEqual, 10*time.Second) |
| 96 » » » » » » So(t.CompletePeriod.Duration(),
ShouldEqual, 24*time.Hour) | 101 » » » » » » So(t.CompletePeriod.Duration(),
ShouldEqual, time.Hour) |
| 97 } | 102 } |
| 98 | 103 |
| 99 Convey(`Will cancel the expiration archi
ve Tumble task.`, func() { | 104 Convey(`Will cancel the expiration archi
ve Tumble task.`, func() { |
| 100 // We will test this by revertin
g the stream to be not terminated | 105 // We will test this by revertin
g the stream to be not terminated |
| 101 // so that if the Tumble task ge
ts fired, it will try and schedule | 106 // so that if the Tumble task ge
ts fired, it will try and schedule |
| 102 // another archival task. | 107 // another archival task. |
| 103 env.ArchivalPublisher.Clear() | 108 env.ArchivalPublisher.Clear() |
| 104 | 109 |
| 105 tls.State.TerminalIndex = -1 | 110 tls.State.TerminalIndex = -1 |
| 106 So(tls.Put(c), ShouldBeNil) | 111 So(tls.Put(c), ShouldBeNil) |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 142 | 147 |
| 143 // Reload state and confirm. | 148 // Reload state and confirm. |
| 144 So(tls.Get(c), ShouldBeNil) | 149 So(tls.Get(c), ShouldBeNil) |
| 145 | 150 |
| 146 So(tls.State.TerminalIndex, Shou
ldEqual, 1337) | 151 So(tls.State.TerminalIndex, Shou
ldEqual, 1337) |
| 147 So(tls.State.Terminated(), Shoul
dBeTrue) | 152 So(tls.State.Terminated(), Shoul
dBeTrue) |
| 148 So(tls.State.ArchivalState(), Sh
ouldEqual, coordinator.ArchiveTasked) | 153 So(tls.State.ArchivalState(), Sh
ouldEqual, coordinator.ArchiveTasked) |
| 149 }) | 154 }) |
| 150 }) | 155 }) |
| 151 | 156 |
| 157 Convey(`Will schedule the correct archival delay
`, func() { |
| 158 |
| 159 Convey(`When there is no project config
delay.`, func() { |
| 160 env.ModProjectConfig("proj-foo",
func(pcfg *svcconfig.ProjectConfig) { |
| 161 pcfg.MaxStreamAge = nil |
| 162 }) |
| 163 |
| 164 _, err := svr.TerminateStream(c,
&req) |
| 165 So(err, ShouldBeRPCOK) |
| 166 |
| 167 So(env.ArchivalPublisher.Hashes(
), ShouldResemble, []string{string(tls.Stream.ID)}) |
| 168 So(len(env.ArchivalPublisher.Tas
ks()), ShouldEqual, 1) |
| 169 So(env.ArchivalPublisher.Tasks()
[0].CompletePeriod.Duration(), ShouldEqual, 24*time.Hour) |
| 170 }) |
| 171 |
| 172 Convey(`When there is no service or proj
ect config delay.`, func() { |
| 173 env.ModServiceConfig(c, func(cfg
*svcconfig.Coordinator) { |
| 174 cfg.ArchiveDelayMax = ni
l |
| 175 }) |
| 176 env.ModProjectConfig("proj-foo",
func(pcfg *svcconfig.ProjectConfig) { |
| 177 pcfg.MaxStreamAge = nil |
| 178 }) |
| 179 |
| 180 _, err := svr.TerminateStream(c,
&req) |
| 181 So(err, ShouldBeRPCOK) |
| 182 |
| 183 So(env.ArchivalPublisher.Hashes(
), ShouldResemble, []string{string(tls.Stream.ID)}) |
| 184 So(len(env.ArchivalPublisher.Tas
ks()), ShouldEqual, 1) |
| 185 So(env.ArchivalPublisher.Tasks()
[0].CompletePeriod.Duration(), ShouldEqual, 0) |
| 186 }) |
| 187 }) |
| 188 |
| 152 Convey(`Will return an internal server error if
Put() fails.`, func() { | 189 Convey(`Will return an internal server error if
Put() fails.`, func() { |
| 153 c, fb := featureBreaker.FilterRDS(c, nil
) | 190 c, fb := featureBreaker.FilterRDS(c, nil
) |
| 154 fb.BreakFeatures(errors.New("test error"
), "PutMulti") | 191 fb.BreakFeatures(errors.New("test error"
), "PutMulti") |
| 155 _, err := svr.TerminateStream(c, &req) | 192 _, err := svr.TerminateStream(c, &req) |
| 156 So(err, ShouldBeRPCInternal) | 193 So(err, ShouldBeRPCInternal) |
| 157 }) | 194 }) |
| 158 | 195 |
| 159 Convey(`Will return an internal server error if
Get() fails.`, func() { | 196 Convey(`Will return an internal server error if
Get() fails.`, func() { |
| 160 c, fb := featureBreaker.FilterRDS(c, nil
) | 197 c, fb := featureBreaker.FilterRDS(c, nil
) |
| 161 fb.BreakFeatures(errors.New("test error"
), "GetMulti") | 198 fb.BreakFeatures(errors.New("test error"
), "GetMulti") |
| (...skipping 14 matching lines...) Expand all Loading... |
| 176 So(err, ShouldBeRPCInvalidArgument, "Invalid ID"
) | 213 So(err, ShouldBeRPCInvalidArgument, "Invalid ID"
) |
| 177 }) | 214 }) |
| 178 | 215 |
| 179 Convey(`Will fail if the stream is not registered.`, fun
c() { | 216 Convey(`Will fail if the stream is not registered.`, fun
c() { |
| 180 _, err := svr.TerminateStream(c, &req) | 217 _, err := svr.TerminateStream(c, &req) |
| 181 So(err, ShouldBeRPCNotFound, "is not registered"
) | 218 So(err, ShouldBeRPCNotFound, "is not registered"
) |
| 182 }) | 219 }) |
| 183 }) | 220 }) |
| 184 }) | 221 }) |
| 185 } | 222 } |
| OLD | NEW |