OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package services | 5 package services |
6 | 6 |
7 import ( | 7 import ( |
8 "errors" | 8 "errors" |
9 "testing" | 9 "testing" |
10 "time" | 10 "time" |
(...skipping 12 matching lines...) Expand all Loading... |
23 . "github.com/luci/luci-go/common/testing/assertions" | 23 . "github.com/luci/luci-go/common/testing/assertions" |
24 . "github.com/smartystreets/goconvey/convey" | 24 . "github.com/smartystreets/goconvey/convey" |
25 ) | 25 ) |
26 | 26 |
27 func TestTerminateStream(t *testing.T) { | 27 func TestTerminateStream(t *testing.T) { |
28 t.Parallel() | 28 t.Parallel() |
29 | 29 |
30 Convey(`With a testing configuration`, t, func() { | 30 Convey(`With a testing configuration`, t, func() { |
31 c, env := ct.Install() | 31 c, env := ct.Install() |
32 | 32 |
33 » » env.ModServiceConfig(c, func(cfg *svcconfig.Coordinator) { | 33 » » // Set our archival delays. The project delay is smaller than th
e service |
34 » » » cfg.ArchiveTopic = "projects/test/topics/archive" | 34 » » // delay, so it should be used. |
35 » » » cfg.ArchiveSettleDelay = google.NewDuration(10 * time.Se
cond) | 35 » » env.ModServiceConfig(c, func(cfg *svcconfig.Config) { |
36 » » » cfg.ArchiveDelayMax = google.NewDuration(24 * time.Hour) | 36 » » » coord := cfg.Coordinator |
| 37 » » » coord.ArchiveTopic = "projects/test/topics/archive" |
| 38 » » » coord.ArchiveSettleDelay = google.NewDuration(10 * time.
Second) |
| 39 » » » coord.ArchiveDelayMax = google.NewDuration(24 * time.Hou
r) |
| 40 » » }) |
| 41 » » env.ModProjectConfig(c, "proj-foo", func(pcfg *svcconfig.Project
Config) { |
| 42 » » » pcfg.MaxStreamAge = google.NewDuration(time.Hour) |
37 }) | 43 }) |
38 | 44 |
39 svr := New() | 45 svr := New() |
40 | 46 |
41 tls := ct.MakeStream(c, "proj-foo", "testing/+/foo/bar") | 47 tls := ct.MakeStream(c, "proj-foo", "testing/+/foo/bar") |
42 | 48 |
43 req := logdog.TerminateStreamRequest{ | 49 req := logdog.TerminateStreamRequest{ |
44 Project: string(tls.Project), | 50 Project: string(tls.Project), |
45 Id: string(tls.Stream.ID), | 51 Id: string(tls.Stream.ID), |
46 Secret: tls.Prefix.Secret, | 52 Secret: tls.Prefix.Secret, |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
86 }) | 92 }) |
87 So(tls.State.TerminalIndex, ShouldEqual,
1337) | 93 So(tls.State.TerminalIndex, ShouldEqual,
1337) |
88 So(tls.State.Terminated(), ShouldBeTrue) | 94 So(tls.State.Terminated(), ShouldBeTrue) |
89 So(tls.State.ArchivalState(), ShouldEqua
l, coordinator.ArchiveTasked) | 95 So(tls.State.ArchivalState(), ShouldEqua
l, coordinator.ArchiveTasked) |
90 So(env.ArchivalPublisher.Hashes(), Shoul
dResemble, []string{string(tls.Stream.ID)}) | 96 So(env.ArchivalPublisher.Hashes(), Shoul
dResemble, []string{string(tls.Stream.ID)}) |
91 | 97 |
92 // Assert that all archive tasks are sch
eduled ArchiveSettleDelay in | 98 // Assert that all archive tasks are sch
eduled ArchiveSettleDelay in |
93 // the future. | 99 // the future. |
94 for _, t := range env.ArchivalPublisher.
Tasks() { | 100 for _, t := range env.ArchivalPublisher.
Tasks() { |
95 So(t.SettleDelay.Duration(), Sho
uldEqual, 10*time.Second) | 101 So(t.SettleDelay.Duration(), Sho
uldEqual, 10*time.Second) |
96 » » » » » » So(t.CompletePeriod.Duration(),
ShouldEqual, 24*time.Hour) | 102 » » » » » » So(t.CompletePeriod.Duration(),
ShouldEqual, time.Hour) |
97 } | 103 } |
98 | 104 |
99 Convey(`Will cancel the expiration archi
ve Tumble task.`, func() { | 105 Convey(`Will cancel the expiration archi
ve Tumble task.`, func() { |
100 // We will test this by revertin
g the stream to be not terminated | 106 // We will test this by revertin
g the stream to be not terminated |
101 // so that if the Tumble task ge
ts fired, it will try and schedule | 107 // so that if the Tumble task ge
ts fired, it will try and schedule |
102 // another archival task. | 108 // another archival task. |
103 env.ArchivalPublisher.Clear() | 109 env.ArchivalPublisher.Clear() |
104 | 110 |
105 tls.State.TerminalIndex = -1 | 111 tls.State.TerminalIndex = -1 |
106 So(tls.Put(c), ShouldBeNil) | 112 So(tls.Put(c), ShouldBeNil) |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
142 | 148 |
143 // Reload state and confirm. | 149 // Reload state and confirm. |
144 So(tls.Get(c), ShouldBeNil) | 150 So(tls.Get(c), ShouldBeNil) |
145 | 151 |
146 So(tls.State.TerminalIndex, Shou
ldEqual, 1337) | 152 So(tls.State.TerminalIndex, Shou
ldEqual, 1337) |
147 So(tls.State.Terminated(), Shoul
dBeTrue) | 153 So(tls.State.Terminated(), Shoul
dBeTrue) |
148 So(tls.State.ArchivalState(), Sh
ouldEqual, coordinator.ArchiveTasked) | 154 So(tls.State.ArchivalState(), Sh
ouldEqual, coordinator.ArchiveTasked) |
149 }) | 155 }) |
150 }) | 156 }) |
151 | 157 |
| 158 Convey(`Will schedule the correct archival delay
`, func() { |
| 159 |
| 160 Convey(`When there is no project config
delay.`, func() { |
| 161 env.ModProjectConfig(c, "proj-fo
o", func(pcfg *svcconfig.ProjectConfig) { |
| 162 pcfg.MaxStreamAge = nil |
| 163 }) |
| 164 |
| 165 _, err := svr.TerminateStream(c,
&req) |
| 166 So(err, ShouldBeRPCOK) |
| 167 |
| 168 So(env.ArchivalPublisher.Hashes(
), ShouldResemble, []string{string(tls.Stream.ID)}) |
| 169 So(len(env.ArchivalPublisher.Tas
ks()), ShouldEqual, 1) |
| 170 So(env.ArchivalPublisher.Tasks()
[0].CompletePeriod.Duration(), ShouldEqual, 24*time.Hour) |
| 171 }) |
| 172 |
| 173 Convey(`When there is no service or proj
ect config delay.`, func() { |
| 174 env.ModServiceConfig(c, func(cfg
*svcconfig.Config) { |
| 175 cfg.Coordinator.ArchiveD
elayMax = nil |
| 176 }) |
| 177 env.ModProjectConfig(c, "proj-fo
o", func(pcfg *svcconfig.ProjectConfig) { |
| 178 pcfg.MaxStreamAge = nil |
| 179 }) |
| 180 |
| 181 _, err := svr.TerminateStream(c,
&req) |
| 182 So(err, ShouldBeRPCOK) |
| 183 |
| 184 So(env.ArchivalPublisher.Hashes(
), ShouldResemble, []string{string(tls.Stream.ID)}) |
| 185 So(len(env.ArchivalPublisher.Tas
ks()), ShouldEqual, 1) |
| 186 So(env.ArchivalPublisher.Tasks()
[0].CompletePeriod.Duration(), ShouldEqual, 0) |
| 187 }) |
| 188 }) |
| 189 |
152 Convey(`Will return an internal server error if
Put() fails.`, func() { | 190 Convey(`Will return an internal server error if
Put() fails.`, func() { |
153 c, fb := featureBreaker.FilterRDS(c, nil
) | 191 c, fb := featureBreaker.FilterRDS(c, nil
) |
154 fb.BreakFeatures(errors.New("test error"
), "PutMulti") | 192 fb.BreakFeatures(errors.New("test error"
), "PutMulti") |
155 _, err := svr.TerminateStream(c, &req) | 193 _, err := svr.TerminateStream(c, &req) |
156 So(err, ShouldBeRPCInternal) | 194 So(err, ShouldBeRPCInternal) |
157 }) | 195 }) |
158 | 196 |
159 Convey(`Will return an internal server error if
Get() fails.`, func() { | 197 Convey(`Will return an internal server error if
Get() fails.`, func() { |
160 c, fb := featureBreaker.FilterRDS(c, nil
) | 198 c, fb := featureBreaker.FilterRDS(c, nil
) |
161 fb.BreakFeatures(errors.New("test error"
), "GetMulti") | 199 fb.BreakFeatures(errors.New("test error"
), "GetMulti") |
(...skipping 14 matching lines...) Expand all Loading... |
176 So(err, ShouldBeRPCInvalidArgument, "Invalid ID"
) | 214 So(err, ShouldBeRPCInvalidArgument, "Invalid ID"
) |
177 }) | 215 }) |
178 | 216 |
179 Convey(`Will fail if the stream is not registered.`, fun
c() { | 217 Convey(`Will fail if the stream is not registered.`, fun
c() { |
180 _, err := svr.TerminateStream(c, &req) | 218 _, err := svr.TerminateStream(c, &req) |
181 So(err, ShouldBeRPCNotFound, "is not registered"
) | 219 So(err, ShouldBeRPCNotFound, "is not registered"
) |
182 }) | 220 }) |
183 }) | 221 }) |
184 }) | 222 }) |
185 } | 223 } |
OLD | NEW |