OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package services | 5 package services |
6 | 6 |
7 import ( | 7 import ( |
8 "errors" | 8 "errors" |
9 "testing" | 9 "testing" |
10 "time" | 10 "time" |
(...skipping 12 matching lines...) Expand all Loading... |
23 . "github.com/luci/luci-go/common/testing/assertions" | 23 . "github.com/luci/luci-go/common/testing/assertions" |
24 . "github.com/smartystreets/goconvey/convey" | 24 . "github.com/smartystreets/goconvey/convey" |
25 ) | 25 ) |
26 | 26 |
27 func TestTerminateStream(t *testing.T) { | 27 func TestTerminateStream(t *testing.T) { |
28 t.Parallel() | 28 t.Parallel() |
29 | 29 |
30 Convey(`With a testing configuration`, t, func() { | 30 Convey(`With a testing configuration`, t, func() { |
31 c, env := ct.Install() | 31 c, env := ct.Install() |
32 | 32 |
| 33 // Set our archival delays. The project delay is smaller than th
e service |
| 34 // delay, so it should be used. |
33 env.ModServiceConfig(c, func(cfg *svcconfig.Coordinator) { | 35 env.ModServiceConfig(c, func(cfg *svcconfig.Coordinator) { |
34 cfg.ArchiveTopic = "projects/test/topics/archive" | 36 cfg.ArchiveTopic = "projects/test/topics/archive" |
35 cfg.ArchiveSettleDelay = google.NewDuration(10 * time.Se
cond) | 37 cfg.ArchiveSettleDelay = google.NewDuration(10 * time.Se
cond) |
36 cfg.ArchiveDelayMax = google.NewDuration(24 * time.Hour) | 38 cfg.ArchiveDelayMax = google.NewDuration(24 * time.Hour) |
37 }) | 39 }) |
| 40 env.ModProjectConfig("proj-foo", func(pcfg *svcconfig.ProjectCon
fig) { |
| 41 pcfg.MaxStreamAge = google.NewDuration(time.Hour) |
| 42 }) |
38 | 43 |
39 svr := New() | 44 svr := New() |
40 | 45 |
41 tls := ct.MakeStream(c, "proj-foo", "testing/+/foo/bar") | 46 tls := ct.MakeStream(c, "proj-foo", "testing/+/foo/bar") |
42 | 47 |
43 req := logdog.TerminateStreamRequest{ | 48 req := logdog.TerminateStreamRequest{ |
44 Project: string(tls.Project), | 49 Project: string(tls.Project), |
45 Id: string(tls.Stream.ID), | 50 Id: string(tls.Stream.ID), |
46 Secret: tls.Prefix.Secret, | 51 Secret: tls.Prefix.Secret, |
47 TerminalIndex: 1337, | 52 TerminalIndex: 1337, |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
86 }) | 91 }) |
87 So(tls.State.TerminalIndex, ShouldEqual,
1337) | 92 So(tls.State.TerminalIndex, ShouldEqual,
1337) |
88 So(tls.State.Terminated(), ShouldBeTrue) | 93 So(tls.State.Terminated(), ShouldBeTrue) |
89 So(tls.State.ArchivalState(), ShouldEqua
l, coordinator.ArchiveTasked) | 94 So(tls.State.ArchivalState(), ShouldEqua
l, coordinator.ArchiveTasked) |
90 So(env.ArchivalPublisher.Hashes(), Shoul
dResemble, []string{string(tls.Stream.ID)}) | 95 So(env.ArchivalPublisher.Hashes(), Shoul
dResemble, []string{string(tls.Stream.ID)}) |
91 | 96 |
92 // Assert that all archive tasks are sch
eduled ArchiveSettleDelay in | 97 // Assert that all archive tasks are sch
eduled ArchiveSettleDelay in |
93 // the future. | 98 // the future. |
94 for _, t := range env.ArchivalPublisher.
Tasks() { | 99 for _, t := range env.ArchivalPublisher.
Tasks() { |
95 So(t.SettleDelay.Duration(), Sho
uldEqual, 10*time.Second) | 100 So(t.SettleDelay.Duration(), Sho
uldEqual, 10*time.Second) |
96 » » » » » » So(t.CompletePeriod.Duration(),
ShouldEqual, 24*time.Hour) | 101 » » » » » » So(t.CompletePeriod.Duration(),
ShouldEqual, time.Hour) |
97 } | 102 } |
98 | 103 |
99 Convey(`Will cancel the expiration archi
ve Tumble task.`, func() { | 104 Convey(`Will cancel the expiration archi
ve Tumble task.`, func() { |
100 // We will test this by revertin
g the stream to be not terminated | 105 // We will test this by revertin
g the stream to be not terminated |
101 // so that if the Tumble task ge
ts fired, it will try and schedule | 106 // so that if the Tumble task ge
ts fired, it will try and schedule |
102 // another archival task. | 107 // another archival task. |
103 env.ArchivalPublisher.Clear() | 108 env.ArchivalPublisher.Clear() |
104 | 109 |
105 tls.State.TerminalIndex = -1 | 110 tls.State.TerminalIndex = -1 |
106 So(tls.Put(c), ShouldBeNil) | 111 So(tls.Put(c), ShouldBeNil) |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
142 | 147 |
143 // Reload state and confirm. | 148 // Reload state and confirm. |
144 So(tls.Get(c), ShouldBeNil) | 149 So(tls.Get(c), ShouldBeNil) |
145 | 150 |
146 So(tls.State.TerminalIndex, Shou
ldEqual, 1337) | 151 So(tls.State.TerminalIndex, Shou
ldEqual, 1337) |
147 So(tls.State.Terminated(), Shoul
dBeTrue) | 152 So(tls.State.Terminated(), Shoul
dBeTrue) |
148 So(tls.State.ArchivalState(), Sh
ouldEqual, coordinator.ArchiveTasked) | 153 So(tls.State.ArchivalState(), Sh
ouldEqual, coordinator.ArchiveTasked) |
149 }) | 154 }) |
150 }) | 155 }) |
151 | 156 |
| 157 Convey(`Will schedule the correct archival delay
`, func() { |
| 158 |
| 159 Convey(`When there is no project config
delay.`, func() { |
| 160 env.ModProjectConfig("proj-foo",
func(pcfg *svcconfig.ProjectConfig) { |
| 161 pcfg.MaxStreamAge = nil |
| 162 }) |
| 163 |
| 164 _, err := svr.TerminateStream(c,
&req) |
| 165 So(err, ShouldBeRPCOK) |
| 166 |
| 167 So(env.ArchivalPublisher.Hashes(
), ShouldResemble, []string{string(tls.Stream.ID)}) |
| 168 So(len(env.ArchivalPublisher.Tas
ks()), ShouldEqual, 1) |
| 169 So(env.ArchivalPublisher.Tasks()
[0].CompletePeriod.Duration(), ShouldEqual, 24*time.Hour) |
| 170 }) |
| 171 |
| 172 Convey(`When there is no service or proj
ect config delay.`, func() { |
| 173 env.ModServiceConfig(c, func(cfg
*svcconfig.Coordinator) { |
| 174 cfg.ArchiveDelayMax = ni
l |
| 175 }) |
| 176 env.ModProjectConfig("proj-foo",
func(pcfg *svcconfig.ProjectConfig) { |
| 177 pcfg.MaxStreamAge = nil |
| 178 }) |
| 179 |
| 180 _, err := svr.TerminateStream(c,
&req) |
| 181 So(err, ShouldBeRPCOK) |
| 182 |
| 183 So(env.ArchivalPublisher.Hashes(
), ShouldResemble, []string{string(tls.Stream.ID)}) |
| 184 So(len(env.ArchivalPublisher.Tas
ks()), ShouldEqual, 1) |
| 185 So(env.ArchivalPublisher.Tasks()
[0].CompletePeriod.Duration(), ShouldEqual, 0) |
| 186 }) |
| 187 }) |
| 188 |
152 Convey(`Will return an internal server error if
Put() fails.`, func() { | 189 Convey(`Will return an internal server error if
Put() fails.`, func() { |
153 c, fb := featureBreaker.FilterRDS(c, nil
) | 190 c, fb := featureBreaker.FilterRDS(c, nil
) |
154 fb.BreakFeatures(errors.New("test error"
), "PutMulti") | 191 fb.BreakFeatures(errors.New("test error"
), "PutMulti") |
155 _, err := svr.TerminateStream(c, &req) | 192 _, err := svr.TerminateStream(c, &req) |
156 So(err, ShouldBeRPCInternal) | 193 So(err, ShouldBeRPCInternal) |
157 }) | 194 }) |
158 | 195 |
159 Convey(`Will return an internal server error if
Get() fails.`, func() { | 196 Convey(`Will return an internal server error if
Get() fails.`, func() { |
160 c, fb := featureBreaker.FilterRDS(c, nil
) | 197 c, fb := featureBreaker.FilterRDS(c, nil
) |
161 fb.BreakFeatures(errors.New("test error"
), "GetMulti") | 198 fb.BreakFeatures(errors.New("test error"
), "GetMulti") |
(...skipping 14 matching lines...) Expand all Loading... |
176 So(err, ShouldBeRPCInvalidArgument, "Invalid ID"
) | 213 So(err, ShouldBeRPCInvalidArgument, "Invalid ID"
) |
177 }) | 214 }) |
178 | 215 |
179 Convey(`Will fail if the stream is not registered.`, fun
c() { | 216 Convey(`Will fail if the stream is not registered.`, fun
c() { |
180 _, err := svr.TerminateStream(c, &req) | 217 _, err := svr.TerminateStream(c, &req) |
181 So(err, ShouldBeRPCNotFound, "is not registered"
) | 218 So(err, ShouldBeRPCNotFound, "is not registered"
) |
182 }) | 219 }) |
183 }) | 220 }) |
184 }) | 221 }) |
185 } | 222 } |
OLD | NEW |