| Index: appengine/logdog/coordinator/endpoints/services/archiveStream_test.go
|
| diff --git a/appengine/logdog/coordinator/endpoints/services/archiveStream_test.go b/appengine/logdog/coordinator/endpoints/services/archiveStream_test.go
|
| index 512f2c1226e0f14e7995145f052badb8a6914324..1bc711e52c7b820ec16f81cdf23c250e9f3721a1 100644
|
| --- a/appengine/logdog/coordinator/endpoints/services/archiveStream_test.go
|
| +++ b/appengine/logdog/coordinator/endpoints/services/archiveStream_test.go
|
| @@ -14,7 +14,7 @@ import (
|
| "github.com/luci/luci-go/appengine/logdog/coordinator"
|
| ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest"
|
| "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
|
| - "github.com/luci/luci-go/common/proto/logdog/svcconfig"
|
| + "github.com/luci/luci-go/common/clock/testclock"
|
| "github.com/luci/luci-go/server/auth"
|
| "github.com/luci/luci-go/server/auth/authtest"
|
| "golang.org/x/net/context"
|
| @@ -27,25 +27,34 @@ func TestArchiveStream(t *testing.T) {
|
| t.Parallel()
|
|
|
| Convey(`With a testing configuration`, t, func() {
|
| - c := memory.Use(context.Background())
|
| - be := Server{}
|
| + c, tc := testclock.UseTime(context.Background(), testclock.TestTimeUTC)
|
| + c = memory.Use(c)
|
| +
|
| + svcStub := ct.Services{}
|
| + svcStub.InitConfig()
|
| + svcStub.ServiceConfig.Coordinator.ServiceAuthGroup = "test-services"
|
| +
|
| + be := Server{
|
| + ServiceBase: coordinator.ServiceBase{&svcStub},
|
| + }
|
| +
|
| + now := ds.RoundTime(tc.Now().UTC())
|
|
|
| - c = ct.UseConfig(c, &svcconfig.Coordinator{
|
| - ServiceAuthGroup: "test-services",
|
| - })
|
| fs := authtest.FakeState{}
|
| c = auth.WithState(c, &fs)
|
|
|
| - // Register a testing log stream (not archived).
|
| + // Register a testing log stream with an archive tasked.
|
| ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c, "foo"))
|
| - if err := ls.Put(ds.Get(c)); err != nil {
|
| + ls.State = coordinator.LSArchiveTasked
|
| + ls.ArchivalKey = []byte("archival key")
|
| + if err := ds.Get(c).Put(ls); err != nil {
|
| panic(err)
|
| }
|
|
|
| req := &logdog.ArchiveStreamRequest{
|
| Path: string(ls.Path()),
|
| - Complete: true,
|
| TerminalIndex: 13,
|
| + LogEntryCount: 14,
|
| StreamUrl: "gs://fake.stream",
|
| StreamSize: 10,
|
| IndexUrl: "gs://fake.index",
|
| @@ -67,9 +76,41 @@ func TestArchiveStream(t *testing.T) {
|
| So(err, ShouldBeNil)
|
|
|
| So(ds.Get(c).Get(ls), ShouldBeNil)
|
| + So(ls.Terminated(), ShouldBeTrue)
|
| So(ls.Archived(), ShouldBeTrue)
|
| - So(ls.ArchiveWhole, ShouldBeTrue)
|
| + So(ls.ArchiveComplete(), ShouldBeTrue)
|
| +
|
| + So(ls.State, ShouldEqual, coordinator.LSArchived)
|
| + So(ls.ArchivalKey, ShouldBeNil)
|
| + So(ls.TerminatedTime, ShouldResemble, now)
|
| + So(ls.ArchivedTime, ShouldResemble, now)
|
| So(ls.TerminalIndex, ShouldEqual, 13)
|
| + So(ls.ArchiveLogEntryCount, ShouldEqual, 14)
|
| + So(ls.ArchiveStreamURL, ShouldEqual, "gs://fake.stream")
|
| + So(ls.ArchiveStreamSize, ShouldEqual, 10)
|
| + So(ls.ArchiveIndexURL, ShouldEqual, "gs://fake.index")
|
| + So(ls.ArchiveIndexSize, ShouldEqual, 20)
|
| + So(ls.ArchiveDataURL, ShouldEqual, "gs://fake.data")
|
| + So(ls.ArchiveDataSize, ShouldEqual, 30)
|
| + })
|
| +
|
| + Convey(`Will mark the stream as partially archived if not complete.`, func() {
|
| + req.LogEntryCount = 13
|
| +
|
| + _, err := be.ArchiveStream(c, req)
|
| + So(err, ShouldBeNil)
|
| +
|
| + So(ds.Get(c).Get(ls), ShouldBeNil)
|
| + So(ls.Terminated(), ShouldBeTrue)
|
| + So(ls.Archived(), ShouldBeTrue)
|
| + So(ls.ArchiveComplete(), ShouldBeFalse)
|
| +
|
| + So(ls.State, ShouldEqual, coordinator.LSArchived)
|
| + So(ls.ArchivalKey, ShouldBeNil)
|
| + So(ls.TerminatedTime, ShouldResemble, now)
|
| + So(ls.ArchivedTime, ShouldResemble, now)
|
| + So(ls.TerminalIndex, ShouldEqual, 13)
|
| + So(ls.ArchiveLogEntryCount, ShouldEqual, 13)
|
| So(ls.ArchiveStreamURL, ShouldEqual, "gs://fake.stream")
|
| So(ls.ArchiveStreamSize, ShouldEqual, 10)
|
| So(ls.ArchiveIndexURL, ShouldEqual, "gs://fake.index")
|
| @@ -101,16 +142,38 @@ func TestArchiveStream(t *testing.T) {
|
| Convey(`If stream is already archived, will not update and return success.`, func() {
|
| ls.State = coordinator.LSArchived
|
| ls.TerminalIndex = 1337
|
| + ls.ArchiveLogEntryCount = 42
|
| + ls.ArchivedTime = now
|
| + ls.TerminatedTime = now
|
| + So(ds.Get(c).Put(ls), ShouldBeNil)
|
| + So(ls.Terminated(), ShouldBeTrue)
|
| So(ls.Archived(), ShouldBeTrue)
|
| - So(ls.Put(ds.Get(c)), ShouldBeNil)
|
|
|
| _, err := be.ArchiveStream(c, req)
|
| So(err, ShouldBeNil)
|
|
|
| ls.TerminalIndex = -1 // To make sure it reloaded.
|
| So(ds.Get(c).Get(ls), ShouldBeNil)
|
| + So(ls.Terminated(), ShouldBeTrue)
|
| So(ls.Archived(), ShouldBeTrue)
|
| +
|
| + So(ls.State, ShouldEqual, coordinator.LSArchived)
|
| So(ls.TerminalIndex, ShouldEqual, 1337)
|
| + So(ls.ArchiveLogEntryCount, ShouldEqual, 42)
|
| + })
|
| +
|
| + Convey(`If the archive has failed, it is archived as an empty stream.`, func() {
|
| + req.Error = "archive error"
|
| +
|
| + _, err := be.ArchiveStream(c, req)
|
| + So(err, ShouldBeNil)
|
| + So(ds.Get(c).Get(ls), ShouldBeNil)
|
| + So(ls.Archived(), ShouldBeTrue)
|
| +
|
| + So(ls.State, ShouldEqual, coordinator.LSArchived)
|
| + So(ls.ArchivalKey, ShouldBeNil)
|
| + So(ls.TerminalIndex, ShouldEqual, -1)
|
| + So(ls.ArchiveLogEntryCount, ShouldEqual, 0)
|
| })
|
|
|
| Convey(`When datastore Get fails, returns internal error.`, func() {
|
|
|