Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1950)

Unified Diff: appengine/logdog/coordinator/endpoints/services/terminateStream_test.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Fix proto comment. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/endpoints/services/terminateStream_test.go
diff --git a/appengine/logdog/coordinator/endpoints/services/terminateStream_test.go b/appengine/logdog/coordinator/endpoints/services/terminateStream_test.go
index 8750e9396a8c6b0ac89660fb3490ab2f1727bee1..7dcae214899ad78fb2d470dacc0a8469f6b16551 100644
--- a/appengine/logdog/coordinator/endpoints/services/terminateStream_test.go
+++ b/appengine/logdog/coordinator/endpoints/services/terminateStream_test.go
@@ -16,7 +16,7 @@ import (
ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest"
"github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
"github.com/luci/luci-go/common/clock/testclock"
- "github.com/luci/luci-go/common/proto/logdog/svcconfig"
+ "github.com/luci/luci-go/common/proto/google"
"github.com/luci/luci-go/server/auth"
"github.com/luci/luci-go/server/auth/authtest"
"golang.org/x/net/context"
@@ -29,9 +29,24 @@ func TestTerminateStream(t *testing.T) {
t.Parallel()
Convey(`With a testing configuration`, t, func() {
- c, tc := testclock.UseTime(context.Background(), testclock.TestTimeLocal)
+ c, _ := testclock.UseTime(context.Background(), testclock.TestTimeLocal)
c = memory.Use(c)
- be := Server{}
+
+ var tap ct.ArchivalPublisher
+ svcStub := ct.Services{
+ AP: func() (coordinator.ArchivalPublisher, error) {
+ return &tap, nil
+ },
+ }
+ svcStub.InitConfig()
+ svcStub.ServiceConfig.Coordinator.ServiceAuthGroup = "test-services"
+ svcStub.ServiceConfig.Coordinator.ArchiveTopic = "projects/test/topics/archive"
+ svcStub.ServiceConfig.Coordinator.ArchiveSettleDelay = google.NewDuration(10 * time.Second)
+ svcStub.ServiceConfig.Coordinator.ArchiveDelayMax = google.NewDuration(24 * time.Hour)
+
+ be := Server{
+ ServiceBase: coordinator.ServiceBase{&svcStub},
+ }
desc := ct.TestLogStreamDescriptor(c, "foo/bar")
ls := ct.TestLogStream(c, desc)
@@ -42,9 +57,6 @@ func TestTerminateStream(t *testing.T) {
TerminalIndex: 1337,
}
- c = ct.UseConfig(c, &svcconfig.Coordinator{
- ServiceAuthGroup: "test-services",
- })
fs := authtest.FakeState{}
c = auth.WithState(c, &fs)
@@ -57,18 +69,25 @@ func TestTerminateStream(t *testing.T) {
fs.IdentityGroups = []string{"test-services"}
Convey(`A non-terminal registered stream, "testing/+/foo/bar"`, func() {
- So(ls.Put(ds.Get(c)), ShouldBeNil)
- tc.Add(time.Second)
+ So(ds.Get(c).Put(ls), ShouldBeNil)
- Convey(`Can be marked terminal.`, func() {
+ Convey(`Can be marked terminal and schedules an archival task.`, func() {
_, err := be.TerminateStream(c, &req)
So(err, ShouldBeRPCOK)
// Reload "ls" and confirm.
So(ds.Get(c).Get(ls), ShouldBeNil)
So(ls.TerminalIndex, ShouldEqual, 1337)
- So(ls.State, ShouldEqual, coordinator.LSTerminated)
- So(ls.Updated, ShouldResemble, ls.Created.Add(time.Second))
+ So(ls.State, ShouldEqual, coordinator.LSArchiveTasked)
+ So(ls.Terminated(), ShouldBeTrue)
+ So(tap.StreamNames(), ShouldResemble, []string{ls.Name})
+
+ // Assert that all archive tasks are scheduled ArchiveSettleDelay in
+ // the future.
+ for _, t := range tap.Tasks() {
+ So(t.SettleDelay.Duration(), ShouldEqual, svcStub.ServiceConfig.Coordinator.ArchiveSettleDelay.Duration())
+ So(t.CompletePeriod.Duration(), ShouldEqual, svcStub.ServiceConfig.Coordinator.ArchiveDelayMax.Duration())
+ }
Convey(`Can be marked terminal again (idempotent).`, func() {
_, err := be.TerminateStream(c, &req)
@@ -76,19 +95,25 @@ func TestTerminateStream(t *testing.T) {
// Reload "ls" and confirm.
So(ds.Get(c).Get(ls), ShouldBeNil)
+
+ So(ls.Terminated(), ShouldBeTrue)
So(ls.TerminalIndex, ShouldEqual, 1337)
- So(ls.State, ShouldEqual, coordinator.LSTerminated)
+ So(ls.State, ShouldEqual, coordinator.LSArchiveTasked)
+ So(tap.StreamNames(), ShouldResemble, []string{ls.Name})
})
Convey(`Will reject attempts to change the terminal index.`, func() {
req.TerminalIndex = 1338
_, err := be.TerminateStream(c, &req)
- So(err, ShouldBeRPCAlreadyExists, "Terminal index is already set")
+ So(err, ShouldBeRPCFailedPrecondition, "Log stream is not in streaming state.")
// Reload "ls" and confirm.
So(ds.Get(c).Get(ls), ShouldBeNil)
- So(ls.State, ShouldEqual, coordinator.LSTerminated)
+
+ So(ls.Terminated(), ShouldBeTrue)
+ So(ls.State, ShouldEqual, coordinator.LSArchiveTasked)
So(ls.TerminalIndex, ShouldEqual, 1337)
+ So(tap.StreamNames(), ShouldResemble, []string{ls.Name})
})
Convey(`Will reject attempts to clear the terminal index.`, func() {
@@ -98,8 +123,11 @@ func TestTerminateStream(t *testing.T) {
// Reload "ls" and confirm.
So(ds.Get(c).Get(ls), ShouldBeNil)
- So(ls.State, ShouldEqual, coordinator.LSTerminated)
+
+ So(ls.Terminated(), ShouldBeTrue)
+ So(ls.State, ShouldEqual, coordinator.LSArchiveTasked)
So(ls.TerminalIndex, ShouldEqual, 1337)
+ So(tap.StreamNames(), ShouldResemble, []string{ls.Name})
})
})
« no previous file with comments | « appengine/logdog/coordinator/endpoints/services/terminateStream.go ('k') | appengine/logdog/coordinator/logStream.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698