Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3305)

Unified Diff: appengine/logdog/coordinator/endpoints/services/archiveStream_test.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Fix proto comment. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/endpoints/services/archiveStream_test.go
diff --git a/appengine/logdog/coordinator/endpoints/services/archiveStream_test.go b/appengine/logdog/coordinator/endpoints/services/archiveStream_test.go
index 512f2c1226e0f14e7995145f052badb8a6914324..1bc711e52c7b820ec16f81cdf23c250e9f3721a1 100644
--- a/appengine/logdog/coordinator/endpoints/services/archiveStream_test.go
+++ b/appengine/logdog/coordinator/endpoints/services/archiveStream_test.go
@@ -14,7 +14,7 @@ import (
"github.com/luci/luci-go/appengine/logdog/coordinator"
ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest"
"github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
- "github.com/luci/luci-go/common/proto/logdog/svcconfig"
+ "github.com/luci/luci-go/common/clock/testclock"
"github.com/luci/luci-go/server/auth"
"github.com/luci/luci-go/server/auth/authtest"
"golang.org/x/net/context"
@@ -27,25 +27,34 @@ func TestArchiveStream(t *testing.T) {
t.Parallel()
Convey(`With a testing configuration`, t, func() {
- c := memory.Use(context.Background())
- be := Server{}
+ c, tc := testclock.UseTime(context.Background(), testclock.TestTimeUTC)
+ c = memory.Use(c)
+
+ svcStub := ct.Services{}
+ svcStub.InitConfig()
+ svcStub.ServiceConfig.Coordinator.ServiceAuthGroup = "test-services"
+
+ be := Server{
+ ServiceBase: coordinator.ServiceBase{&svcStub},
+ }
+
+ now := ds.RoundTime(tc.Now().UTC())
- c = ct.UseConfig(c, &svcconfig.Coordinator{
- ServiceAuthGroup: "test-services",
- })
fs := authtest.FakeState{}
c = auth.WithState(c, &fs)
- // Register a testing log stream (not archived).
+ // Register a testing log stream with an archive tasked.
ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c, "foo"))
- if err := ls.Put(ds.Get(c)); err != nil {
+ ls.State = coordinator.LSArchiveTasked
+ ls.ArchivalKey = []byte("archival key")
+ if err := ds.Get(c).Put(ls); err != nil {
panic(err)
}
req := &logdog.ArchiveStreamRequest{
Path: string(ls.Path()),
- Complete: true,
TerminalIndex: 13,
+ LogEntryCount: 14,
StreamUrl: "gs://fake.stream",
StreamSize: 10,
IndexUrl: "gs://fake.index",
@@ -67,9 +76,41 @@ func TestArchiveStream(t *testing.T) {
So(err, ShouldBeNil)
So(ds.Get(c).Get(ls), ShouldBeNil)
+ So(ls.Terminated(), ShouldBeTrue)
So(ls.Archived(), ShouldBeTrue)
- So(ls.ArchiveWhole, ShouldBeTrue)
+ So(ls.ArchiveComplete(), ShouldBeTrue)
+
+ So(ls.State, ShouldEqual, coordinator.LSArchived)
+ So(ls.ArchivalKey, ShouldBeNil)
+ So(ls.TerminatedTime, ShouldResemble, now)
+ So(ls.ArchivedTime, ShouldResemble, now)
So(ls.TerminalIndex, ShouldEqual, 13)
+ So(ls.ArchiveLogEntryCount, ShouldEqual, 14)
+ So(ls.ArchiveStreamURL, ShouldEqual, "gs://fake.stream")
+ So(ls.ArchiveStreamSize, ShouldEqual, 10)
+ So(ls.ArchiveIndexURL, ShouldEqual, "gs://fake.index")
+ So(ls.ArchiveIndexSize, ShouldEqual, 20)
+ So(ls.ArchiveDataURL, ShouldEqual, "gs://fake.data")
+ So(ls.ArchiveDataSize, ShouldEqual, 30)
+ })
+
+ Convey(`Will mark the stream as partially archived if not complete.`, func() {
+ req.LogEntryCount = 13
+
+ _, err := be.ArchiveStream(c, req)
+ So(err, ShouldBeNil)
+
+ So(ds.Get(c).Get(ls), ShouldBeNil)
+ So(ls.Terminated(), ShouldBeTrue)
+ So(ls.Archived(), ShouldBeTrue)
+ So(ls.ArchiveComplete(), ShouldBeFalse)
+
+ So(ls.State, ShouldEqual, coordinator.LSArchived)
+ So(ls.ArchivalKey, ShouldBeNil)
+ So(ls.TerminatedTime, ShouldResemble, now)
+ So(ls.ArchivedTime, ShouldResemble, now)
+ So(ls.TerminalIndex, ShouldEqual, 13)
+ So(ls.ArchiveLogEntryCount, ShouldEqual, 13)
So(ls.ArchiveStreamURL, ShouldEqual, "gs://fake.stream")
So(ls.ArchiveStreamSize, ShouldEqual, 10)
So(ls.ArchiveIndexURL, ShouldEqual, "gs://fake.index")
@@ -101,16 +142,38 @@ func TestArchiveStream(t *testing.T) {
Convey(`If stream is already archived, will not update and return success.`, func() {
ls.State = coordinator.LSArchived
ls.TerminalIndex = 1337
+ ls.ArchiveLogEntryCount = 42
+ ls.ArchivedTime = now
+ ls.TerminatedTime = now
+ So(ds.Get(c).Put(ls), ShouldBeNil)
+ So(ls.Terminated(), ShouldBeTrue)
So(ls.Archived(), ShouldBeTrue)
- So(ls.Put(ds.Get(c)), ShouldBeNil)
_, err := be.ArchiveStream(c, req)
So(err, ShouldBeNil)
ls.TerminalIndex = -1 // To make sure it reloaded.
So(ds.Get(c).Get(ls), ShouldBeNil)
+ So(ls.Terminated(), ShouldBeTrue)
So(ls.Archived(), ShouldBeTrue)
+
+ So(ls.State, ShouldEqual, coordinator.LSArchived)
So(ls.TerminalIndex, ShouldEqual, 1337)
+ So(ls.ArchiveLogEntryCount, ShouldEqual, 42)
+ })
+
+ Convey(`If the archive has failed, it is archived as an empty stream.`, func() {
+ req.Error = "archive error"
+
+ _, err := be.ArchiveStream(c, req)
+ So(err, ShouldBeNil)
+ So(ds.Get(c).Get(ls), ShouldBeNil)
+ So(ls.Archived(), ShouldBeTrue)
+
+ So(ls.State, ShouldEqual, coordinator.LSArchived)
+ So(ls.ArchivalKey, ShouldBeNil)
+ So(ls.TerminalIndex, ShouldEqual, -1)
+ So(ls.ArchiveLogEntryCount, ShouldEqual, 0)
})
Convey(`When datastore Get fails, returns internal error.`, func() {

Powered by Google App Engine
This is Rietveld 408576698