| Index: appengine/logdog/coordinator/endpoints/services/terminateStream_test.go
|
| diff --git a/appengine/logdog/coordinator/endpoints/services/terminateStream_test.go b/appengine/logdog/coordinator/endpoints/services/terminateStream_test.go
|
| index d6a29cc537c987ad9cbac5d932cbceba5f5ad1b9..35aac8d12b5b4cd7ed6ab3c901cacc4e98c17916 100644
|
| --- a/appengine/logdog/coordinator/endpoints/services/terminateStream_test.go
|
| +++ b/appengine/logdog/coordinator/endpoints/services/terminateStream_test.go
|
| @@ -10,16 +10,17 @@ import (
|
| "time"
|
|
|
| "github.com/luci/gae/filter/featureBreaker"
|
| - "github.com/luci/gae/impl/memory"
|
| ds "github.com/luci/gae/service/datastore"
|
| "github.com/luci/luci-go/appengine/logdog/coordinator"
|
| ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest"
|
| + "github.com/luci/luci-go/appengine/logdog/coordinator/mutations"
|
| + "github.com/luci/luci-go/appengine/tumble"
|
| "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
|
| + "github.com/luci/luci-go/common/clock"
|
| "github.com/luci/luci-go/common/clock/testclock"
|
| "github.com/luci/luci-go/common/proto/google"
|
| "github.com/luci/luci-go/server/auth"
|
| "github.com/luci/luci-go/server/auth/authtest"
|
| - "golang.org/x/net/context"
|
|
|
| . "github.com/luci/luci-go/common/testing/assertions"
|
| . "github.com/smartystreets/goconvey/convey"
|
| @@ -29,8 +30,10 @@ func TestTerminateStream(t *testing.T) {
|
| t.Parallel()
|
|
|
| Convey(`With a testing configuration`, t, func() {
|
| - c, _ := testclock.UseTime(context.Background(), testclock.TestTimeLocal)
|
| - c = memory.Use(c)
|
| + var tt tumble.Testing
|
| + c := tt.Context()
|
| + tc := clock.Get(c).(testclock.TestClock)
|
| + tt.EnableDelayedMutations(c)
|
|
|
| var tap ct.ArchivalPublisher
|
| svcStub := ct.Services{
|
| @@ -70,9 +73,25 @@ func TestTerminateStream(t *testing.T) {
|
| Convey(`A non-terminal registered stream, "testing/+/foo/bar"`, func() {
|
| So(ds.Get(c).Put(ls), ShouldBeNil)
|
|
|
| + // Create an archival request for Tumble so we can ensure that it is
|
| + // canceled on termination.
|
| + areq := mutations.CreateArchiveTask{
|
| + Path: ls.Path(),
|
| + Expiration: tc.Now().Add(time.Hour),
|
| + }
|
| + arParent, arName := areq.TaskName(ds.Get(c))
|
| + err := tumble.PutNamedMutations(c, arParent, map[string]tumble.Mutation{
|
| + arName: &areq,
|
| + })
|
| + if err != nil {
|
| + panic(err)
|
| + }
|
| + ds.Get(c).Testable().CatchupIndexes()
|
| +
|
| Convey(`Can be marked terminal and schedules an archival task.`, func() {
|
| _, err := svr.TerminateStream(c, &req)
|
| So(err, ShouldBeRPCOK)
|
| + ds.Get(c).Testable().CatchupIndexes()
|
|
|
| // Reload "ls" and confirm.
|
| So(ds.Get(c).Get(ls), ShouldBeNil)
|
| @@ -88,6 +107,20 @@ func TestTerminateStream(t *testing.T) {
|
| So(t.CompletePeriod.Duration(), ShouldEqual, svcStub.ServiceConfig.Coordinator.ArchiveDelayMax.Duration())
|
| }
|
|
|
| + Convey(`Will cancel the expiration archive Tumble task.`, func() {
|
| + // We will test this by reverting the stream to a LSStreaming state
|
| + // so that if the Tumble task gets fired, it will try and schedule
|
| + // another archival task.
|
| + tap.Clear()
|
| +
|
| + ls.State = coordinator.LSStreaming
|
| + So(ds.Get(c).Put(ls), ShouldBeNil)
|
| +
|
| + tc.Add(time.Hour)
|
| + tt.Drain(c)
|
| + So(tap.StreamNames(), ShouldResemble, []string{})
|
| + })
|
| +
|
| Convey(`Can be marked terminal again (idempotent).`, func() {
|
| _, err := svr.TerminateStream(c, &req)
|
| So(err, ShouldBeRPCOK)
|
| @@ -98,7 +131,6 @@ func TestTerminateStream(t *testing.T) {
|
| So(ls.Terminated(), ShouldBeTrue)
|
| So(ls.TerminalIndex, ShouldEqual, 1337)
|
| So(ls.State, ShouldEqual, coordinator.LSArchiveTasked)
|
| - So(tap.StreamNames(), ShouldResemble, []string{ls.Name})
|
| })
|
|
|
| Convey(`Will reject attempts to change the terminal index.`, func() {
|
| @@ -112,7 +144,6 @@ func TestTerminateStream(t *testing.T) {
|
| So(ls.Terminated(), ShouldBeTrue)
|
| So(ls.State, ShouldEqual, coordinator.LSArchiveTasked)
|
| So(ls.TerminalIndex, ShouldEqual, 1337)
|
| - So(tap.StreamNames(), ShouldResemble, []string{ls.Name})
|
| })
|
|
|
| Convey(`Will reject attempts to clear the terminal index.`, func() {
|
| @@ -126,7 +157,6 @@ func TestTerminateStream(t *testing.T) {
|
| So(ls.Terminated(), ShouldBeTrue)
|
| So(ls.State, ShouldEqual, coordinator.LSArchiveTasked)
|
| So(ls.TerminalIndex, ShouldEqual, 1337)
|
| - So(tap.StreamNames(), ShouldResemble, []string{ls.Name})
|
| })
|
| })
|
|
|
|
|