| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package services | 5 package services |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | 8 "errors" |
| 9 "testing" | 9 "testing" |
| 10 "time" | 10 "time" |
| 11 | 11 |
| 12 "github.com/luci/gae/filter/featureBreaker" | 12 "github.com/luci/gae/filter/featureBreaker" |
| 13 "github.com/luci/gae/impl/memory" | |
| 14 ds "github.com/luci/gae/service/datastore" | 13 ds "github.com/luci/gae/service/datastore" |
| 15 "github.com/luci/luci-go/appengine/logdog/coordinator" | 14 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 16 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" | 15 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" |
| 16 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations" |
| 17 "github.com/luci/luci-go/appengine/tumble" |
| 17 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 18 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 19 "github.com/luci/luci-go/common/clock" |
| 18 "github.com/luci/luci-go/common/clock/testclock" | 20 "github.com/luci/luci-go/common/clock/testclock" |
| 19 "github.com/luci/luci-go/common/proto/google" | 21 "github.com/luci/luci-go/common/proto/google" |
| 20 "github.com/luci/luci-go/server/auth" | 22 "github.com/luci/luci-go/server/auth" |
| 21 "github.com/luci/luci-go/server/auth/authtest" | 23 "github.com/luci/luci-go/server/auth/authtest" |
| 22 "golang.org/x/net/context" | |
| 23 | 24 |
| 24 . "github.com/luci/luci-go/common/testing/assertions" | 25 . "github.com/luci/luci-go/common/testing/assertions" |
| 25 . "github.com/smartystreets/goconvey/convey" | 26 . "github.com/smartystreets/goconvey/convey" |
| 26 ) | 27 ) |
| 27 | 28 |
| 28 func TestTerminateStream(t *testing.T) { | 29 func TestTerminateStream(t *testing.T) { |
| 29 t.Parallel() | 30 t.Parallel() |
| 30 | 31 |
| 31 Convey(`With a testing configuration`, t, func() { | 32 Convey(`With a testing configuration`, t, func() { |
| 32 » » c, _ := testclock.UseTime(context.Background(), testclock.TestTi
meLocal) | 33 » » var tt tumble.Testing |
| 33 » » c = memory.Use(c) | 34 » » c := tt.Context() |
| 35 » » tc := clock.Get(c).(testclock.TestClock) |
| 36 » » tt.EnableDelayedMutations(c) |
| 34 | 37 |
| 35 var tap ct.ArchivalPublisher | 38 var tap ct.ArchivalPublisher |
| 36 svcStub := ct.Services{ | 39 svcStub := ct.Services{ |
| 37 AP: func() (coordinator.ArchivalPublisher, error) { | 40 AP: func() (coordinator.ArchivalPublisher, error) { |
| 38 return &tap, nil | 41 return &tap, nil |
| 39 }, | 42 }, |
| 40 } | 43 } |
| 41 svcStub.InitConfig() | 44 svcStub.InitConfig() |
| 42 svcStub.ServiceConfig.Coordinator.ServiceAuthGroup = "test-servi
ces" | 45 svcStub.ServiceConfig.Coordinator.ServiceAuthGroup = "test-servi
ces" |
| 43 svcStub.ServiceConfig.Coordinator.ArchiveTopic = "projects/test/
topics/archive" | 46 svcStub.ServiceConfig.Coordinator.ArchiveTopic = "projects/test/
topics/archive" |
| (...skipping 19 matching lines...) Expand all Loading... |
| 63 _, err := svr.TerminateStream(c, &req) | 66 _, err := svr.TerminateStream(c, &req) |
| 64 So(err, ShouldBeRPCPermissionDenied) | 67 So(err, ShouldBeRPCPermissionDenied) |
| 65 }) | 68 }) |
| 66 | 69 |
| 67 Convey(`When logged in as a service`, func() { | 70 Convey(`When logged in as a service`, func() { |
| 68 fs.IdentityGroups = []string{"test-services"} | 71 fs.IdentityGroups = []string{"test-services"} |
| 69 | 72 |
| 70 Convey(`A non-terminal registered stream, "testing/+/foo
/bar"`, func() { | 73 Convey(`A non-terminal registered stream, "testing/+/foo
/bar"`, func() { |
| 71 So(ds.Get(c).Put(ls), ShouldBeNil) | 74 So(ds.Get(c).Put(ls), ShouldBeNil) |
| 72 | 75 |
| 76 // Create an archival request for Tumble so we c
an ensure that it is |
| 77 // canceled on termination. |
| 78 areq := mutations.CreateArchiveTask{ |
| 79 Path: ls.Path(), |
| 80 Expiration: tc.Now().Add(time.Hour), |
| 81 } |
| 82 arParent, arName := areq.TaskName(ds.Get(c)) |
| 83 err := tumble.PutNamedMutations(c, arParent, map
[string]tumble.Mutation{ |
| 84 arName: &areq, |
| 85 }) |
| 86 if err != nil { |
| 87 panic(err) |
| 88 } |
| 89 ds.Get(c).Testable().CatchupIndexes() |
| 90 |
| 73 Convey(`Can be marked terminal and schedules an
archival task.`, func() { | 91 Convey(`Can be marked terminal and schedules an
archival task.`, func() { |
| 74 _, err := svr.TerminateStream(c, &req) | 92 _, err := svr.TerminateStream(c, &req) |
| 75 So(err, ShouldBeRPCOK) | 93 So(err, ShouldBeRPCOK) |
| 94 ds.Get(c).Testable().CatchupIndexes() |
| 76 | 95 |
| 77 // Reload "ls" and confirm. | 96 // Reload "ls" and confirm. |
| 78 So(ds.Get(c).Get(ls), ShouldBeNil) | 97 So(ds.Get(c).Get(ls), ShouldBeNil) |
| 79 So(ls.TerminalIndex, ShouldEqual, 1337) | 98 So(ls.TerminalIndex, ShouldEqual, 1337) |
| 80 So(ls.State, ShouldEqual, coordinator.LS
ArchiveTasked) | 99 So(ls.State, ShouldEqual, coordinator.LS
ArchiveTasked) |
| 81 So(ls.Terminated(), ShouldBeTrue) | 100 So(ls.Terminated(), ShouldBeTrue) |
| 82 So(tap.StreamNames(), ShouldResemble, []
string{ls.Name}) | 101 So(tap.StreamNames(), ShouldResemble, []
string{ls.Name}) |
| 83 | 102 |
| 84 // Assert that all archive tasks are sch
eduled ArchiveSettleDelay in | 103 // Assert that all archive tasks are sch
eduled ArchiveSettleDelay in |
| 85 // the future. | 104 // the future. |
| 86 for _, t := range tap.Tasks() { | 105 for _, t := range tap.Tasks() { |
| 87 So(t.SettleDelay.Duration(), Sho
uldEqual, svcStub.ServiceConfig.Coordinator.ArchiveSettleDelay.Duration()) | 106 So(t.SettleDelay.Duration(), Sho
uldEqual, svcStub.ServiceConfig.Coordinator.ArchiveSettleDelay.Duration()) |
| 88 So(t.CompletePeriod.Duration(),
ShouldEqual, svcStub.ServiceConfig.Coordinator.ArchiveDelayMax.Duration()) | 107 So(t.CompletePeriod.Duration(),
ShouldEqual, svcStub.ServiceConfig.Coordinator.ArchiveDelayMax.Duration()) |
| 89 } | 108 } |
| 90 | 109 |
| 110 Convey(`Will cancel the expiration archi
ve Tumble task.`, func() { |
| 111 // We will test this by revertin
g the stream to a LSStreaming state |
| 112 // so that if the Tumble task ge
ts fired, it will try and schedule |
| 113 // another archival task. |
| 114 tap.Clear() |
| 115 |
| 116 ls.State = coordinator.LSStreami
ng |
| 117 So(ds.Get(c).Put(ls), ShouldBeNi
l) |
| 118 |
| 119 tc.Add(time.Hour) |
| 120 tt.Drain(c) |
| 121 So(tap.StreamNames(), ShouldRese
mble, []string{}) |
| 122 }) |
| 123 |
| 91 Convey(`Can be marked terminal again (id
empotent).`, func() { | 124 Convey(`Can be marked terminal again (id
empotent).`, func() { |
| 92 _, err := svr.TerminateStream(c,
&req) | 125 _, err := svr.TerminateStream(c,
&req) |
| 93 So(err, ShouldBeRPCOK) | 126 So(err, ShouldBeRPCOK) |
| 94 | 127 |
| 95 // Reload "ls" and confirm. | 128 // Reload "ls" and confirm. |
| 96 So(ds.Get(c).Get(ls), ShouldBeNi
l) | 129 So(ds.Get(c).Get(ls), ShouldBeNi
l) |
| 97 | 130 |
| 98 So(ls.Terminated(), ShouldBeTrue
) | 131 So(ls.Terminated(), ShouldBeTrue
) |
| 99 So(ls.TerminalIndex, ShouldEqual
, 1337) | 132 So(ls.TerminalIndex, ShouldEqual
, 1337) |
| 100 So(ls.State, ShouldEqual, coordi
nator.LSArchiveTasked) | 133 So(ls.State, ShouldEqual, coordi
nator.LSArchiveTasked) |
| 101 So(tap.StreamNames(), ShouldRese
mble, []string{ls.Name}) | |
| 102 }) | 134 }) |
| 103 | 135 |
| 104 Convey(`Will reject attempts to change t
he terminal index.`, func() { | 136 Convey(`Will reject attempts to change t
he terminal index.`, func() { |
| 105 req.TerminalIndex = 1338 | 137 req.TerminalIndex = 1338 |
| 106 _, err := svr.TerminateStream(c,
&req) | 138 _, err := svr.TerminateStream(c,
&req) |
| 107 So(err, ShouldBeRPCFailedPrecond
ition, "Log stream is not in streaming state.") | 139 So(err, ShouldBeRPCFailedPrecond
ition, "Log stream is not in streaming state.") |
| 108 | 140 |
| 109 // Reload "ls" and confirm. | 141 // Reload "ls" and confirm. |
| 110 So(ds.Get(c).Get(ls), ShouldBeNi
l) | 142 So(ds.Get(c).Get(ls), ShouldBeNi
l) |
| 111 | 143 |
| 112 So(ls.Terminated(), ShouldBeTrue
) | 144 So(ls.Terminated(), ShouldBeTrue
) |
| 113 So(ls.State, ShouldEqual, coordi
nator.LSArchiveTasked) | 145 So(ls.State, ShouldEqual, coordi
nator.LSArchiveTasked) |
| 114 So(ls.TerminalIndex, ShouldEqual
, 1337) | 146 So(ls.TerminalIndex, ShouldEqual
, 1337) |
| 115 So(tap.StreamNames(), ShouldRese
mble, []string{ls.Name}) | |
| 116 }) | 147 }) |
| 117 | 148 |
| 118 Convey(`Will reject attempts to clear th
e terminal index.`, func() { | 149 Convey(`Will reject attempts to clear th
e terminal index.`, func() { |
| 119 req.TerminalIndex = -1 | 150 req.TerminalIndex = -1 |
| 120 _, err := svr.TerminateStream(c,
&req) | 151 _, err := svr.TerminateStream(c,
&req) |
| 121 So(err, ShouldBeRPCInvalidArgume
nt, "Negative terminal index.") | 152 So(err, ShouldBeRPCInvalidArgume
nt, "Negative terminal index.") |
| 122 | 153 |
| 123 // Reload "ls" and confirm. | 154 // Reload "ls" and confirm. |
| 124 So(ds.Get(c).Get(ls), ShouldBeNi
l) | 155 So(ds.Get(c).Get(ls), ShouldBeNi
l) |
| 125 | 156 |
| 126 So(ls.Terminated(), ShouldBeTrue
) | 157 So(ls.Terminated(), ShouldBeTrue
) |
| 127 So(ls.State, ShouldEqual, coordi
nator.LSArchiveTasked) | 158 So(ls.State, ShouldEqual, coordi
nator.LSArchiveTasked) |
| 128 So(ls.TerminalIndex, ShouldEqual
, 1337) | 159 So(ls.TerminalIndex, ShouldEqual
, 1337) |
| 129 So(tap.StreamNames(), ShouldRese
mble, []string{ls.Name}) | |
| 130 }) | 160 }) |
| 131 }) | 161 }) |
| 132 | 162 |
| 133 Convey(`Will return an internal server error if
Put() fails.`, func() { | 163 Convey(`Will return an internal server error if
Put() fails.`, func() { |
| 134 c, fb := featureBreaker.FilterRDS(c, nil
) | 164 c, fb := featureBreaker.FilterRDS(c, nil
) |
| 135 fb.BreakFeatures(errors.New("test error"
), "PutMulti") | 165 fb.BreakFeatures(errors.New("test error"
), "PutMulti") |
| 136 _, err := svr.TerminateStream(c, &req) | 166 _, err := svr.TerminateStream(c, &req) |
| 137 So(err, ShouldBeRPCInternal) | 167 So(err, ShouldBeRPCInternal) |
| 138 }) | 168 }) |
| 139 | 169 |
| (...skipping 17 matching lines...) Expand all Loading... |
| 157 So(err, ShouldBeRPCInvalidArgument, "Invalid pat
h") | 187 So(err, ShouldBeRPCInvalidArgument, "Invalid pat
h") |
| 158 }) | 188 }) |
| 159 | 189 |
| 160 Convey(`Will fail if the stream is not registered.`, fun
c() { | 190 Convey(`Will fail if the stream is not registered.`, fun
c() { |
| 161 _, err := svr.TerminateStream(c, &req) | 191 _, err := svr.TerminateStream(c, &req) |
| 162 So(err, ShouldBeRPCNotFound, "is not registered"
) | 192 So(err, ShouldBeRPCNotFound, "is not registered"
) |
| 163 }) | 193 }) |
| 164 }) | 194 }) |
| 165 }) | 195 }) |
| 166 } | 196 } |
| OLD | NEW |