OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package services | 5 package services |
6 | 6 |
7 import ( | 7 import ( |
8 "errors" | 8 "errors" |
9 "testing" | 9 "testing" |
10 "time" | 10 "time" |
11 | 11 |
12 "github.com/luci/gae/filter/featureBreaker" | 12 "github.com/luci/gae/filter/featureBreaker" |
13 "github.com/luci/gae/impl/memory" | |
14 ds "github.com/luci/gae/service/datastore" | 13 ds "github.com/luci/gae/service/datastore" |
15 "github.com/luci/luci-go/appengine/logdog/coordinator" | 14 "github.com/luci/luci-go/appengine/logdog/coordinator" |
16 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" | 15 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" |
| 16 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations" |
| 17 "github.com/luci/luci-go/appengine/tumble" |
17 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 18 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 19 "github.com/luci/luci-go/common/clock" |
18 "github.com/luci/luci-go/common/clock/testclock" | 20 "github.com/luci/luci-go/common/clock/testclock" |
19 "github.com/luci/luci-go/common/proto/google" | 21 "github.com/luci/luci-go/common/proto/google" |
20 "github.com/luci/luci-go/server/auth" | 22 "github.com/luci/luci-go/server/auth" |
21 "github.com/luci/luci-go/server/auth/authtest" | 23 "github.com/luci/luci-go/server/auth/authtest" |
22 "golang.org/x/net/context" | |
23 | 24 |
24 . "github.com/luci/luci-go/common/testing/assertions" | 25 . "github.com/luci/luci-go/common/testing/assertions" |
25 . "github.com/smartystreets/goconvey/convey" | 26 . "github.com/smartystreets/goconvey/convey" |
26 ) | 27 ) |
27 | 28 |
28 func TestTerminateStream(t *testing.T) { | 29 func TestTerminateStream(t *testing.T) { |
29 t.Parallel() | 30 t.Parallel() |
30 | 31 |
31 Convey(`With a testing configuration`, t, func() { | 32 Convey(`With a testing configuration`, t, func() { |
32 » » c, _ := testclock.UseTime(context.Background(), testclock.TestTi
meLocal) | 33 » » var tt tumble.Testing |
33 » » c = memory.Use(c) | 34 » » c := tt.Context() |
| 35 » » tc := clock.Get(c).(testclock.TestClock) |
| 36 » » tt.EnableDelayedMutations(c) |
34 | 37 |
35 var tap ct.ArchivalPublisher | 38 var tap ct.ArchivalPublisher |
36 svcStub := ct.Services{ | 39 svcStub := ct.Services{ |
37 AP: func() (coordinator.ArchivalPublisher, error) { | 40 AP: func() (coordinator.ArchivalPublisher, error) { |
38 return &tap, nil | 41 return &tap, nil |
39 }, | 42 }, |
40 } | 43 } |
41 svcStub.InitConfig() | 44 svcStub.InitConfig() |
42 svcStub.ServiceConfig.Coordinator.ServiceAuthGroup = "test-servi
ces" | 45 svcStub.ServiceConfig.Coordinator.ServiceAuthGroup = "test-servi
ces" |
43 svcStub.ServiceConfig.Coordinator.ArchiveTopic = "projects/test/
topics/archive" | 46 svcStub.ServiceConfig.Coordinator.ArchiveTopic = "projects/test/
topics/archive" |
(...skipping 19 matching lines...) Expand all Loading... |
63 _, err := svr.TerminateStream(c, &req) | 66 _, err := svr.TerminateStream(c, &req) |
64 So(err, ShouldBeRPCPermissionDenied) | 67 So(err, ShouldBeRPCPermissionDenied) |
65 }) | 68 }) |
66 | 69 |
67 Convey(`When logged in as a service`, func() { | 70 Convey(`When logged in as a service`, func() { |
68 fs.IdentityGroups = []string{"test-services"} | 71 fs.IdentityGroups = []string{"test-services"} |
69 | 72 |
70 Convey(`A non-terminal registered stream, "testing/+/foo
/bar"`, func() { | 73 Convey(`A non-terminal registered stream, "testing/+/foo
/bar"`, func() { |
71 So(ds.Get(c).Put(ls), ShouldBeNil) | 74 So(ds.Get(c).Put(ls), ShouldBeNil) |
72 | 75 |
| 76 // Create an archival request for Tumble so we c
an ensure that it is |
| 77 // canceled on termination. |
| 78 areq := mutations.CreateArchiveTask{ |
| 79 Path: ls.Path(), |
| 80 Expiration: tc.Now().Add(time.Hour), |
| 81 } |
| 82 arParent, arName := areq.TaskName(ds.Get(c)) |
| 83 err := tumble.PutNamedMutations(c, arParent, map
[string]tumble.Mutation{ |
| 84 arName: &areq, |
| 85 }) |
| 86 if err != nil { |
| 87 panic(err) |
| 88 } |
| 89 ds.Get(c).Testable().CatchupIndexes() |
| 90 |
73 Convey(`Can be marked terminal and schedules an
archival task.`, func() { | 91 Convey(`Can be marked terminal and schedules an
archival task.`, func() { |
74 _, err := svr.TerminateStream(c, &req) | 92 _, err := svr.TerminateStream(c, &req) |
75 So(err, ShouldBeRPCOK) | 93 So(err, ShouldBeRPCOK) |
| 94 ds.Get(c).Testable().CatchupIndexes() |
76 | 95 |
77 // Reload "ls" and confirm. | 96 // Reload "ls" and confirm. |
78 So(ds.Get(c).Get(ls), ShouldBeNil) | 97 So(ds.Get(c).Get(ls), ShouldBeNil) |
79 So(ls.TerminalIndex, ShouldEqual, 1337) | 98 So(ls.TerminalIndex, ShouldEqual, 1337) |
80 So(ls.State, ShouldEqual, coordinator.LS
ArchiveTasked) | 99 So(ls.State, ShouldEqual, coordinator.LS
ArchiveTasked) |
81 So(ls.Terminated(), ShouldBeTrue) | 100 So(ls.Terminated(), ShouldBeTrue) |
82 So(tap.StreamNames(), ShouldResemble, []
string{ls.Name}) | 101 So(tap.StreamNames(), ShouldResemble, []
string{ls.Name}) |
83 | 102 |
84 // Assert that all archive tasks are sch
eduled ArchiveSettleDelay in | 103 // Assert that all archive tasks are sch
eduled ArchiveSettleDelay in |
85 // the future. | 104 // the future. |
86 for _, t := range tap.Tasks() { | 105 for _, t := range tap.Tasks() { |
87 So(t.SettleDelay.Duration(), Sho
uldEqual, svcStub.ServiceConfig.Coordinator.ArchiveSettleDelay.Duration()) | 106 So(t.SettleDelay.Duration(), Sho
uldEqual, svcStub.ServiceConfig.Coordinator.ArchiveSettleDelay.Duration()) |
88 So(t.CompletePeriod.Duration(),
ShouldEqual, svcStub.ServiceConfig.Coordinator.ArchiveDelayMax.Duration()) | 107 So(t.CompletePeriod.Duration(),
ShouldEqual, svcStub.ServiceConfig.Coordinator.ArchiveDelayMax.Duration()) |
89 } | 108 } |
90 | 109 |
| 110 Convey(`Will cancel the expiration archi
ve Tumble task.`, func() { |
| 111 // We will test this by revertin
g the stream to a LSStreaming state |
| 112 // so that if the Tumble task ge
ts fired, it will try and schedule |
| 113 // another archival task. |
| 114 tap.Clear() |
| 115 |
| 116 ls.State = coordinator.LSStreami
ng |
| 117 So(ds.Get(c).Put(ls), ShouldBeNi
l) |
| 118 |
| 119 tc.Add(time.Hour) |
| 120 tt.Drain(c) |
| 121 So(tap.StreamNames(), ShouldRese
mble, []string{}) |
| 122 }) |
| 123 |
91 Convey(`Can be marked terminal again (id
empotent).`, func() { | 124 Convey(`Can be marked terminal again (id
empotent).`, func() { |
92 _, err := svr.TerminateStream(c,
&req) | 125 _, err := svr.TerminateStream(c,
&req) |
93 So(err, ShouldBeRPCOK) | 126 So(err, ShouldBeRPCOK) |
94 | 127 |
95 // Reload "ls" and confirm. | 128 // Reload "ls" and confirm. |
96 So(ds.Get(c).Get(ls), ShouldBeNi
l) | 129 So(ds.Get(c).Get(ls), ShouldBeNi
l) |
97 | 130 |
98 So(ls.Terminated(), ShouldBeTrue
) | 131 So(ls.Terminated(), ShouldBeTrue
) |
99 So(ls.TerminalIndex, ShouldEqual
, 1337) | 132 So(ls.TerminalIndex, ShouldEqual
, 1337) |
100 So(ls.State, ShouldEqual, coordi
nator.LSArchiveTasked) | 133 So(ls.State, ShouldEqual, coordi
nator.LSArchiveTasked) |
101 So(tap.StreamNames(), ShouldRese
mble, []string{ls.Name}) | |
102 }) | 134 }) |
103 | 135 |
104 Convey(`Will reject attempts to change t
he terminal index.`, func() { | 136 Convey(`Will reject attempts to change t
he terminal index.`, func() { |
105 req.TerminalIndex = 1338 | 137 req.TerminalIndex = 1338 |
106 _, err := svr.TerminateStream(c,
&req) | 138 _, err := svr.TerminateStream(c,
&req) |
107 So(err, ShouldBeRPCFailedPrecond
ition, "Log stream is not in streaming state.") | 139 So(err, ShouldBeRPCFailedPrecond
ition, "Log stream is not in streaming state.") |
108 | 140 |
109 // Reload "ls" and confirm. | 141 // Reload "ls" and confirm. |
110 So(ds.Get(c).Get(ls), ShouldBeNi
l) | 142 So(ds.Get(c).Get(ls), ShouldBeNi
l) |
111 | 143 |
112 So(ls.Terminated(), ShouldBeTrue
) | 144 So(ls.Terminated(), ShouldBeTrue
) |
113 So(ls.State, ShouldEqual, coordi
nator.LSArchiveTasked) | 145 So(ls.State, ShouldEqual, coordi
nator.LSArchiveTasked) |
114 So(ls.TerminalIndex, ShouldEqual
, 1337) | 146 So(ls.TerminalIndex, ShouldEqual
, 1337) |
115 So(tap.StreamNames(), ShouldRese
mble, []string{ls.Name}) | |
116 }) | 147 }) |
117 | 148 |
118 Convey(`Will reject attempts to clear th
e terminal index.`, func() { | 149 Convey(`Will reject attempts to clear th
e terminal index.`, func() { |
119 req.TerminalIndex = -1 | 150 req.TerminalIndex = -1 |
120 _, err := svr.TerminateStream(c,
&req) | 151 _, err := svr.TerminateStream(c,
&req) |
121 So(err, ShouldBeRPCInvalidArgume
nt, "Negative terminal index.") | 152 So(err, ShouldBeRPCInvalidArgume
nt, "Negative terminal index.") |
122 | 153 |
123 // Reload "ls" and confirm. | 154 // Reload "ls" and confirm. |
124 So(ds.Get(c).Get(ls), ShouldBeNi
l) | 155 So(ds.Get(c).Get(ls), ShouldBeNi
l) |
125 | 156 |
126 So(ls.Terminated(), ShouldBeTrue
) | 157 So(ls.Terminated(), ShouldBeTrue
) |
127 So(ls.State, ShouldEqual, coordi
nator.LSArchiveTasked) | 158 So(ls.State, ShouldEqual, coordi
nator.LSArchiveTasked) |
128 So(ls.TerminalIndex, ShouldEqual
, 1337) | 159 So(ls.TerminalIndex, ShouldEqual
, 1337) |
129 So(tap.StreamNames(), ShouldRese
mble, []string{ls.Name}) | |
130 }) | 160 }) |
131 }) | 161 }) |
132 | 162 |
133 Convey(`Will return an internal server error if
Put() fails.`, func() { | 163 Convey(`Will return an internal server error if
Put() fails.`, func() { |
134 c, fb := featureBreaker.FilterRDS(c, nil
) | 164 c, fb := featureBreaker.FilterRDS(c, nil
) |
135 fb.BreakFeatures(errors.New("test error"
), "PutMulti") | 165 fb.BreakFeatures(errors.New("test error"
), "PutMulti") |
136 _, err := svr.TerminateStream(c, &req) | 166 _, err := svr.TerminateStream(c, &req) |
137 So(err, ShouldBeRPCInternal) | 167 So(err, ShouldBeRPCInternal) |
138 }) | 168 }) |
139 | 169 |
(...skipping 17 matching lines...) Expand all Loading... |
157 So(err, ShouldBeRPCInvalidArgument, "Invalid pat
h") | 187 So(err, ShouldBeRPCInvalidArgument, "Invalid pat
h") |
158 }) | 188 }) |
159 | 189 |
160 Convey(`Will fail if the stream is not registered.`, fun
c() { | 190 Convey(`Will fail if the stream is not registered.`, fun
c() { |
161 _, err := svr.TerminateStream(c, &req) | 191 _, err := svr.TerminateStream(c, &req) |
162 So(err, ShouldBeRPCNotFound, "is not registered"
) | 192 So(err, ShouldBeRPCNotFound, "is not registered"
) |
163 }) | 193 }) |
164 }) | 194 }) |
165 }) | 195 }) |
166 } | 196 } |
OLD | NEW |