Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(10)

Side by Side Diff: appengine/logdog/coordinator/endpoints/services/terminateStream_test.go

Issue 1910633006: LogDog: Support per-namespace expired archival. (Closed) Base URL: https://github.com/luci/luci-go@logdog-coordinator-svcdec
Patch Set: Update another test. Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package services 5 package services
6 6
7 import ( 7 import (
8 "errors" 8 "errors"
9 "testing" 9 "testing"
10 "time" 10 "time"
11 11
12 "github.com/luci/gae/filter/featureBreaker" 12 "github.com/luci/gae/filter/featureBreaker"
13 "github.com/luci/gae/impl/memory"
14 ds "github.com/luci/gae/service/datastore" 13 ds "github.com/luci/gae/service/datastore"
15 "github.com/luci/luci-go/appengine/logdog/coordinator" 14 "github.com/luci/luci-go/appengine/logdog/coordinator"
16 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest " 15 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest "
16 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations"
17 "github.com/luci/luci-go/appengine/tumble"
17 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 18 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
19 "github.com/luci/luci-go/common/clock"
18 "github.com/luci/luci-go/common/clock/testclock" 20 "github.com/luci/luci-go/common/clock/testclock"
19 "github.com/luci/luci-go/common/proto/google" 21 "github.com/luci/luci-go/common/proto/google"
20 "github.com/luci/luci-go/server/auth" 22 "github.com/luci/luci-go/server/auth"
21 "github.com/luci/luci-go/server/auth/authtest" 23 "github.com/luci/luci-go/server/auth/authtest"
22 "golang.org/x/net/context"
23 24
24 . "github.com/luci/luci-go/common/testing/assertions" 25 . "github.com/luci/luci-go/common/testing/assertions"
25 . "github.com/smartystreets/goconvey/convey" 26 . "github.com/smartystreets/goconvey/convey"
26 ) 27 )
27 28
28 func TestTerminateStream(t *testing.T) { 29 func TestTerminateStream(t *testing.T) {
29 t.Parallel() 30 t.Parallel()
30 31
31 Convey(`With a testing configuration`, t, func() { 32 Convey(`With a testing configuration`, t, func() {
32 » » c, _ := testclock.UseTime(context.Background(), testclock.TestTi meLocal) 33 » » var tt tumble.Testing
33 » » c = memory.Use(c) 34 » » c := tt.Context()
35 » » tc := clock.Get(c).(testclock.TestClock)
36 » » tt.EnableDelayedMutations(c)
34 37
35 var tap ct.ArchivalPublisher 38 var tap ct.ArchivalPublisher
36 svcStub := ct.Services{ 39 svcStub := ct.Services{
37 AP: func() (coordinator.ArchivalPublisher, error) { 40 AP: func() (coordinator.ArchivalPublisher, error) {
38 return &tap, nil 41 return &tap, nil
39 }, 42 },
40 } 43 }
41 svcStub.InitConfig() 44 svcStub.InitConfig()
42 svcStub.ServiceConfig.Coordinator.ServiceAuthGroup = "test-servi ces" 45 svcStub.ServiceConfig.Coordinator.ServiceAuthGroup = "test-servi ces"
43 svcStub.ServiceConfig.Coordinator.ArchiveTopic = "projects/test/ topics/archive" 46 svcStub.ServiceConfig.Coordinator.ArchiveTopic = "projects/test/ topics/archive"
(...skipping 19 matching lines...) Expand all
63 _, err := svr.TerminateStream(c, &req) 66 _, err := svr.TerminateStream(c, &req)
64 So(err, ShouldBeRPCPermissionDenied) 67 So(err, ShouldBeRPCPermissionDenied)
65 }) 68 })
66 69
67 Convey(`When logged in as a service`, func() { 70 Convey(`When logged in as a service`, func() {
68 fs.IdentityGroups = []string{"test-services"} 71 fs.IdentityGroups = []string{"test-services"}
69 72
70 Convey(`A non-terminal registered stream, "testing/+/foo /bar"`, func() { 73 Convey(`A non-terminal registered stream, "testing/+/foo /bar"`, func() {
71 So(ds.Get(c).Put(ls), ShouldBeNil) 74 So(ds.Get(c).Put(ls), ShouldBeNil)
72 75
76 // Create an archival request for Tumble so we c an ensure that it is
77 // canceled on termination.
78 areq := mutations.CreateArchiveTask{
79 Path: ls.Path(),
80 Expiration: tc.Now().Add(time.Hour),
81 }
82 arParent, arName := areq.TaskName(ds.Get(c))
83 err := tumble.PutNamedMutations(c, arParent, map [string]tumble.Mutation{
84 arName: &areq,
85 })
86 if err != nil {
87 panic(err)
88 }
89 ds.Get(c).Testable().CatchupIndexes()
90
73 Convey(`Can be marked terminal and schedules an archival task.`, func() { 91 Convey(`Can be marked terminal and schedules an archival task.`, func() {
74 _, err := svr.TerminateStream(c, &req) 92 _, err := svr.TerminateStream(c, &req)
75 So(err, ShouldBeRPCOK) 93 So(err, ShouldBeRPCOK)
94 ds.Get(c).Testable().CatchupIndexes()
76 95
77 // Reload "ls" and confirm. 96 // Reload "ls" and confirm.
78 So(ds.Get(c).Get(ls), ShouldBeNil) 97 So(ds.Get(c).Get(ls), ShouldBeNil)
79 So(ls.TerminalIndex, ShouldEqual, 1337) 98 So(ls.TerminalIndex, ShouldEqual, 1337)
80 So(ls.State, ShouldEqual, coordinator.LS ArchiveTasked) 99 So(ls.State, ShouldEqual, coordinator.LS ArchiveTasked)
81 So(ls.Terminated(), ShouldBeTrue) 100 So(ls.Terminated(), ShouldBeTrue)
82 So(tap.StreamNames(), ShouldResemble, [] string{ls.Name}) 101 So(tap.StreamNames(), ShouldResemble, [] string{ls.Name})
83 102
84 // Assert that all archive tasks are sch eduled ArchiveSettleDelay in 103 // Assert that all archive tasks are sch eduled ArchiveSettleDelay in
85 // the future. 104 // the future.
86 for _, t := range tap.Tasks() { 105 for _, t := range tap.Tasks() {
87 So(t.SettleDelay.Duration(), Sho uldEqual, svcStub.ServiceConfig.Coordinator.ArchiveSettleDelay.Duration()) 106 So(t.SettleDelay.Duration(), Sho uldEqual, svcStub.ServiceConfig.Coordinator.ArchiveSettleDelay.Duration())
88 So(t.CompletePeriod.Duration(), ShouldEqual, svcStub.ServiceConfig.Coordinator.ArchiveDelayMax.Duration()) 107 So(t.CompletePeriod.Duration(), ShouldEqual, svcStub.ServiceConfig.Coordinator.ArchiveDelayMax.Duration())
89 } 108 }
90 109
110 Convey(`Will cancel the expiration archi ve Tumble task.`, func() {
111 // We will test this by revertin g the stream to a LSStreaming state
112 // so that if the Tumble task ge ts fired, it will try and schedule
113 // another archival task.
114 tap.Clear()
115
116 ls.State = coordinator.LSStreami ng
117 So(ds.Get(c).Put(ls), ShouldBeNi l)
118
119 tc.Add(time.Hour)
120 tt.Drain(c)
121 So(tap.StreamNames(), ShouldRese mble, []string{})
122 })
123
91 Convey(`Can be marked terminal again (id empotent).`, func() { 124 Convey(`Can be marked terminal again (id empotent).`, func() {
92 _, err := svr.TerminateStream(c, &req) 125 _, err := svr.TerminateStream(c, &req)
93 So(err, ShouldBeRPCOK) 126 So(err, ShouldBeRPCOK)
94 127
95 // Reload "ls" and confirm. 128 // Reload "ls" and confirm.
96 So(ds.Get(c).Get(ls), ShouldBeNi l) 129 So(ds.Get(c).Get(ls), ShouldBeNi l)
97 130
98 So(ls.Terminated(), ShouldBeTrue ) 131 So(ls.Terminated(), ShouldBeTrue )
99 So(ls.TerminalIndex, ShouldEqual , 1337) 132 So(ls.TerminalIndex, ShouldEqual , 1337)
100 So(ls.State, ShouldEqual, coordi nator.LSArchiveTasked) 133 So(ls.State, ShouldEqual, coordi nator.LSArchiveTasked)
101 So(tap.StreamNames(), ShouldRese mble, []string{ls.Name})
102 }) 134 })
103 135
104 Convey(`Will reject attempts to change t he terminal index.`, func() { 136 Convey(`Will reject attempts to change t he terminal index.`, func() {
105 req.TerminalIndex = 1338 137 req.TerminalIndex = 1338
106 _, err := svr.TerminateStream(c, &req) 138 _, err := svr.TerminateStream(c, &req)
107 So(err, ShouldBeRPCFailedPrecond ition, "Log stream is not in streaming state.") 139 So(err, ShouldBeRPCFailedPrecond ition, "Log stream is not in streaming state.")
108 140
109 // Reload "ls" and confirm. 141 // Reload "ls" and confirm.
110 So(ds.Get(c).Get(ls), ShouldBeNi l) 142 So(ds.Get(c).Get(ls), ShouldBeNi l)
111 143
112 So(ls.Terminated(), ShouldBeTrue ) 144 So(ls.Terminated(), ShouldBeTrue )
113 So(ls.State, ShouldEqual, coordi nator.LSArchiveTasked) 145 So(ls.State, ShouldEqual, coordi nator.LSArchiveTasked)
114 So(ls.TerminalIndex, ShouldEqual , 1337) 146 So(ls.TerminalIndex, ShouldEqual , 1337)
115 So(tap.StreamNames(), ShouldRese mble, []string{ls.Name})
116 }) 147 })
117 148
118 Convey(`Will reject attempts to clear th e terminal index.`, func() { 149 Convey(`Will reject attempts to clear th e terminal index.`, func() {
119 req.TerminalIndex = -1 150 req.TerminalIndex = -1
120 _, err := svr.TerminateStream(c, &req) 151 _, err := svr.TerminateStream(c, &req)
121 So(err, ShouldBeRPCInvalidArgume nt, "Negative terminal index.") 152 So(err, ShouldBeRPCInvalidArgume nt, "Negative terminal index.")
122 153
123 // Reload "ls" and confirm. 154 // Reload "ls" and confirm.
124 So(ds.Get(c).Get(ls), ShouldBeNi l) 155 So(ds.Get(c).Get(ls), ShouldBeNi l)
125 156
126 So(ls.Terminated(), ShouldBeTrue ) 157 So(ls.Terminated(), ShouldBeTrue )
127 So(ls.State, ShouldEqual, coordi nator.LSArchiveTasked) 158 So(ls.State, ShouldEqual, coordi nator.LSArchiveTasked)
128 So(ls.TerminalIndex, ShouldEqual , 1337) 159 So(ls.TerminalIndex, ShouldEqual , 1337)
129 So(tap.StreamNames(), ShouldRese mble, []string{ls.Name})
130 }) 160 })
131 }) 161 })
132 162
133 Convey(`Will return an internal server error if Put() fails.`, func() { 163 Convey(`Will return an internal server error if Put() fails.`, func() {
134 c, fb := featureBreaker.FilterRDS(c, nil ) 164 c, fb := featureBreaker.FilterRDS(c, nil )
135 fb.BreakFeatures(errors.New("test error" ), "PutMulti") 165 fb.BreakFeatures(errors.New("test error" ), "PutMulti")
136 _, err := svr.TerminateStream(c, &req) 166 _, err := svr.TerminateStream(c, &req)
137 So(err, ShouldBeRPCInternal) 167 So(err, ShouldBeRPCInternal)
138 }) 168 })
139 169
(...skipping 17 matching lines...) Expand all
157 So(err, ShouldBeRPCInvalidArgument, "Invalid pat h") 187 So(err, ShouldBeRPCInvalidArgument, "Invalid pat h")
158 }) 188 })
159 189
160 Convey(`Will fail if the stream is not registered.`, fun c() { 190 Convey(`Will fail if the stream is not registered.`, fun c() {
161 _, err := svr.TerminateStream(c, &req) 191 _, err := svr.TerminateStream(c, &req)
162 So(err, ShouldBeRPCNotFound, "is not registered" ) 192 So(err, ShouldBeRPCNotFound, "is not registered" )
163 }) 193 })
164 }) 194 })
165 }) 195 })
166 } 196 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698