| Index: appengine/logdog/coordinator/endpoints/services/terminateStream_test.go
|
| diff --git a/appengine/logdog/coordinator/endpoints/services/terminateStream_test.go b/appengine/logdog/coordinator/endpoints/services/terminateStream_test.go
|
| index 8750e9396a8c6b0ac89660fb3490ab2f1727bee1..3bfdb0c29b58e758cdf7aa82213eea23148099dc 100644
|
| --- a/appengine/logdog/coordinator/endpoints/services/terminateStream_test.go
|
| +++ b/appengine/logdog/coordinator/endpoints/services/terminateStream_test.go
|
| @@ -12,10 +12,12 @@ import (
|
| "github.com/luci/gae/filter/featureBreaker"
|
| "github.com/luci/gae/impl/memory"
|
| ds "github.com/luci/gae/service/datastore"
|
| + tq "github.com/luci/gae/service/taskqueue"
|
| "github.com/luci/luci-go/appengine/logdog/coordinator"
|
| ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest"
|
| "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
|
| "github.com/luci/luci-go/common/clock/testclock"
|
| + "github.com/luci/luci-go/common/proto/google"
|
| "github.com/luci/luci-go/common/proto/logdog/svcconfig"
|
| "github.com/luci/luci-go/server/auth"
|
| "github.com/luci/luci-go/server/auth/authtest"
|
| @@ -42,9 +44,24 @@ func TestTerminateStream(t *testing.T) {
|
| TerminalIndex: 1337,
|
| }
|
|
|
| - c = ct.UseConfig(c, &svcconfig.Coordinator{
|
| - ServiceAuthGroup: "test-services",
|
| - })
|
| + ccfg := svcconfig.Coordinator{
|
| + ServiceAuthGroup: "test-services",
|
| + ArchiveTaskQueue: "archive-task-queue",
|
| + ArchiveSettleDelay: google.NewDuration(10 * time.Second),
|
| + ArchiveDelayMax: google.NewDuration(24 * time.Hour),
|
| + }
|
| + c = ct.UseConfig(c, &ccfg)
|
| +
|
| + // Create an archival task queue.
|
| + tq.Get(c).Testable().CreateQueue(ccfg.ArchiveTaskQueue)
|
| + archiveTasks := func() []string {
|
| + tasks, err := ct.GetArchiveTaskStreams(tq.Get(c), ccfg.ArchiveTaskQueue)
|
| + if err != nil {
|
| + panic(err)
|
| + }
|
| + return tasks
|
| + }
|
| +
|
| fs := authtest.FakeState{}
|
| c = auth.WithState(c, &fs)
|
|
|
| @@ -57,18 +74,24 @@ func TestTerminateStream(t *testing.T) {
|
| fs.IdentityGroups = []string{"test-services"}
|
|
|
| Convey(`A non-terminal registered stream, "testing/+/foo/bar"`, func() {
|
| - So(ls.Put(ds.Get(c)), ShouldBeNil)
|
| - tc.Add(time.Second)
|
| + So(ds.Get(c).Put(ls), ShouldBeNil)
|
|
|
| - Convey(`Can be marked terminal.`, func() {
|
| + Convey(`Can be marked terminal and schedules an archival task.`, func() {
|
| _, err := be.TerminateStream(c, &req)
|
| So(err, ShouldBeRPCOK)
|
|
|
| // Reload "ls" and confirm.
|
| So(ds.Get(c).Get(ls), ShouldBeNil)
|
| So(ls.TerminalIndex, ShouldEqual, 1337)
|
| - So(ls.State, ShouldEqual, coordinator.LSTerminated)
|
| - So(ls.Updated, ShouldResemble, ls.Created.Add(time.Second))
|
| + So(ls.State, ShouldEqual, coordinator.LSArchiveTasked)
|
| + So(ls.Terminated(), ShouldBeTrue)
|
| + So(archiveTasks(), ShouldResemble, []string{ls.Name})
|
| +
|
| + // Assert that all archive tasks are scheduled ArchiveSettleDelay in
|
| + // the future.
|
| + for _, t := range tq.Get(c).Testable().GetScheduledTasks()[ccfg.ArchiveTaskQueue] {
|
| + So(t.ETA.After(tc.Now()), ShouldBeTrue)
|
| + }
|
|
|
| Convey(`Can be marked terminal again (idempotent).`, func() {
|
| _, err := be.TerminateStream(c, &req)
|
| @@ -76,19 +99,25 @@ func TestTerminateStream(t *testing.T) {
|
|
|
| // Reload "ls" and confirm.
|
| So(ds.Get(c).Get(ls), ShouldBeNil)
|
| +
|
| + So(ls.Terminated(), ShouldBeTrue)
|
| So(ls.TerminalIndex, ShouldEqual, 1337)
|
| - So(ls.State, ShouldEqual, coordinator.LSTerminated)
|
| + So(ls.State, ShouldEqual, coordinator.LSArchiveTasked)
|
| + So(archiveTasks(), ShouldResemble, []string{ls.Name})
|
| })
|
|
|
| Convey(`Will reject attempts to change the terminal index.`, func() {
|
| req.TerminalIndex = 1338
|
| _, err := be.TerminateStream(c, &req)
|
| - So(err, ShouldBeRPCAlreadyExists, "Terminal index is already set")
|
| + So(err, ShouldBeRPCFailedPrecondition, "Log stream is not in streaming state.")
|
|
|
| // Reload "ls" and confirm.
|
| So(ds.Get(c).Get(ls), ShouldBeNil)
|
| - So(ls.State, ShouldEqual, coordinator.LSTerminated)
|
| +
|
| + So(ls.Terminated(), ShouldBeTrue)
|
| + So(ls.State, ShouldEqual, coordinator.LSArchiveTasked)
|
| So(ls.TerminalIndex, ShouldEqual, 1337)
|
| + So(archiveTasks(), ShouldResemble, []string{ls.Name})
|
| })
|
|
|
| Convey(`Will reject attempts to clear the terminal index.`, func() {
|
| @@ -98,8 +127,11 @@ func TestTerminateStream(t *testing.T) {
|
|
|
| // Reload "ls" and confirm.
|
| So(ds.Get(c).Get(ls), ShouldBeNil)
|
| - So(ls.State, ShouldEqual, coordinator.LSTerminated)
|
| +
|
| + So(ls.Terminated(), ShouldBeTrue)
|
| + So(ls.State, ShouldEqual, coordinator.LSArchiveTasked)
|
| So(ls.TerminalIndex, ShouldEqual, 1337)
|
| + So(archiveTasks(), ShouldResemble, []string{ls.Name})
|
| })
|
| })
|
|
|
|
|