Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(8626)

Unified Diff: appengine/logdog/coordinator/endpoints/services/terminateStream_test.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Minor fixes, works in dev now. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/endpoints/services/terminateStream_test.go
diff --git a/appengine/logdog/coordinator/endpoints/services/terminateStream_test.go b/appengine/logdog/coordinator/endpoints/services/terminateStream_test.go
index 8750e9396a8c6b0ac89660fb3490ab2f1727bee1..3bfdb0c29b58e758cdf7aa82213eea23148099dc 100644
--- a/appengine/logdog/coordinator/endpoints/services/terminateStream_test.go
+++ b/appengine/logdog/coordinator/endpoints/services/terminateStream_test.go
@@ -12,10 +12,12 @@ import (
"github.com/luci/gae/filter/featureBreaker"
"github.com/luci/gae/impl/memory"
ds "github.com/luci/gae/service/datastore"
+ tq "github.com/luci/gae/service/taskqueue"
"github.com/luci/luci-go/appengine/logdog/coordinator"
ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest"
"github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
"github.com/luci/luci-go/common/clock/testclock"
+ "github.com/luci/luci-go/common/proto/google"
"github.com/luci/luci-go/common/proto/logdog/svcconfig"
"github.com/luci/luci-go/server/auth"
"github.com/luci/luci-go/server/auth/authtest"
@@ -42,9 +44,24 @@ func TestTerminateStream(t *testing.T) {
TerminalIndex: 1337,
}
- c = ct.UseConfig(c, &svcconfig.Coordinator{
- ServiceAuthGroup: "test-services",
- })
+ ccfg := svcconfig.Coordinator{
+ ServiceAuthGroup: "test-services",
+ ArchiveTaskQueue: "archive-task-queue",
+ ArchiveSettleDelay: google.NewDuration(10 * time.Second),
+ ArchiveDelayMax: google.NewDuration(24 * time.Hour),
+ }
+ c = ct.UseConfig(c, &ccfg)
+
+ // Create an archival task queue.
+ tq.Get(c).Testable().CreateQueue(ccfg.ArchiveTaskQueue)
+ archiveTasks := func() []string {
+ tasks, err := ct.GetArchiveTaskStreams(tq.Get(c), ccfg.ArchiveTaskQueue)
+ if err != nil {
+ panic(err)
+ }
+ return tasks
+ }
+
fs := authtest.FakeState{}
c = auth.WithState(c, &fs)
@@ -57,18 +74,24 @@ func TestTerminateStream(t *testing.T) {
fs.IdentityGroups = []string{"test-services"}
Convey(`A non-terminal registered stream, "testing/+/foo/bar"`, func() {
- So(ls.Put(ds.Get(c)), ShouldBeNil)
- tc.Add(time.Second)
+ So(ds.Get(c).Put(ls), ShouldBeNil)
- Convey(`Can be marked terminal.`, func() {
+ Convey(`Can be marked terminal and schedules an archival task.`, func() {
_, err := be.TerminateStream(c, &req)
So(err, ShouldBeRPCOK)
// Reload "ls" and confirm.
So(ds.Get(c).Get(ls), ShouldBeNil)
So(ls.TerminalIndex, ShouldEqual, 1337)
- So(ls.State, ShouldEqual, coordinator.LSTerminated)
- So(ls.Updated, ShouldResemble, ls.Created.Add(time.Second))
+ So(ls.State, ShouldEqual, coordinator.LSArchiveTasked)
+ So(ls.Terminated(), ShouldBeTrue)
+ So(archiveTasks(), ShouldResemble, []string{ls.Name})
+
+ // Assert that all archive tasks are scheduled ArchiveSettleDelay in
+ // the future.
+ for _, t := range tq.Get(c).Testable().GetScheduledTasks()[ccfg.ArchiveTaskQueue] {
+ So(t.ETA.After(tc.Now()), ShouldBeTrue)
+ }
Convey(`Can be marked terminal again (idempotent).`, func() {
_, err := be.TerminateStream(c, &req)
@@ -76,19 +99,25 @@ func TestTerminateStream(t *testing.T) {
// Reload "ls" and confirm.
So(ds.Get(c).Get(ls), ShouldBeNil)
+
+ So(ls.Terminated(), ShouldBeTrue)
So(ls.TerminalIndex, ShouldEqual, 1337)
- So(ls.State, ShouldEqual, coordinator.LSTerminated)
+ So(ls.State, ShouldEqual, coordinator.LSArchiveTasked)
+ So(archiveTasks(), ShouldResemble, []string{ls.Name})
})
Convey(`Will reject attempts to change the terminal index.`, func() {
req.TerminalIndex = 1338
_, err := be.TerminateStream(c, &req)
- So(err, ShouldBeRPCAlreadyExists, "Terminal index is already set")
+ So(err, ShouldBeRPCFailedPrecondition, "Log stream is not in streaming state.")
// Reload "ls" and confirm.
So(ds.Get(c).Get(ls), ShouldBeNil)
- So(ls.State, ShouldEqual, coordinator.LSTerminated)
+
+ So(ls.Terminated(), ShouldBeTrue)
+ So(ls.State, ShouldEqual, coordinator.LSArchiveTasked)
So(ls.TerminalIndex, ShouldEqual, 1337)
+ So(archiveTasks(), ShouldResemble, []string{ls.Name})
})
Convey(`Will reject attempts to clear the terminal index.`, func() {
@@ -98,8 +127,11 @@ func TestTerminateStream(t *testing.T) {
// Reload "ls" and confirm.
So(ds.Get(c).Get(ls), ShouldBeNil)
- So(ls.State, ShouldEqual, coordinator.LSTerminated)
+
+ So(ls.Terminated(), ShouldBeTrue)
+ So(ls.State, ShouldEqual, coordinator.LSArchiveTasked)
So(ls.TerminalIndex, ShouldEqual, 1337)
+ So(archiveTasks(), ShouldResemble, []string{ls.Name})
})
})

Powered by Google App Engine
This is Rietveld 408576698