| Index: appengine/logdog/coordinator/endpoints/services/archiveStream_test.go
|
| diff --git a/appengine/logdog/coordinator/endpoints/services/archiveStream_test.go b/appengine/logdog/coordinator/endpoints/services/archiveStream_test.go
|
| index 512f2c1226e0f14e7995145f052badb8a6914324..9a4f69d1c110278ae759814f8ec45c4beac58ec1 100644
|
| --- a/appengine/logdog/coordinator/endpoints/services/archiveStream_test.go
|
| +++ b/appengine/logdog/coordinator/endpoints/services/archiveStream_test.go
|
| @@ -14,10 +14,12 @@ import (
|
| "github.com/luci/luci-go/appengine/logdog/coordinator"
|
| ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest"
|
| "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
|
| + "github.com/luci/luci-go/common/logging/gologger"
|
| "github.com/luci/luci-go/common/proto/logdog/svcconfig"
|
| "github.com/luci/luci-go/server/auth"
|
| "github.com/luci/luci-go/server/auth/authtest"
|
| "golang.org/x/net/context"
|
| + "google.golang.org/grpc/codes"
|
|
|
| . "github.com/luci/luci-go/common/testing/assertions"
|
| . "github.com/smartystreets/goconvey/convey"
|
| @@ -28,6 +30,7 @@ func TestArchiveStream(t *testing.T) {
|
|
|
| Convey(`With a testing configuration`, t, func() {
|
| c := memory.Use(context.Background())
|
| + c = gologger.Use(c)
|
| be := Server{}
|
|
|
| c = ct.UseConfig(c, &svcconfig.Coordinator{
|
| @@ -68,6 +71,7 @@ func TestArchiveStream(t *testing.T) {
|
|
|
| So(ds.Get(c).Get(ls), ShouldBeNil)
|
| So(ls.Archived(), ShouldBeTrue)
|
| + So(ls.ArchiveState, ShouldEqual, coordinator.Archived)
|
| So(ls.ArchiveWhole, ShouldBeTrue)
|
| So(ls.TerminalIndex, ShouldEqual, 13)
|
| So(ls.ArchiveStreamURL, ShouldEqual, "gs://fake.stream")
|
| @@ -78,6 +82,25 @@ func TestArchiveStream(t *testing.T) {
|
| So(ls.ArchiveDataSize, ShouldEqual, 30)
|
| })
|
|
|
| + Convey(`Will mark the stream as partially archived if not complete.`, func() {
|
| + req.Complete = false
|
| +
|
| + _, err := be.ArchiveStream(c, req)
|
| + So(err, ShouldBeNil)
|
| +
|
| + So(ds.Get(c).Get(ls), ShouldBeNil)
|
| + So(ls.Archived(), ShouldBeTrue)
|
| + So(ls.ArchiveState, ShouldEqual, coordinator.ArchivedPartially)
|
| + So(ls.ArchiveWhole, ShouldBeFalse)
|
| + So(ls.TerminalIndex, ShouldEqual, 13)
|
| + So(ls.ArchiveStreamURL, ShouldEqual, "gs://fake.stream")
|
| + So(ls.ArchiveStreamSize, ShouldEqual, 10)
|
| + So(ls.ArchiveIndexURL, ShouldEqual, "gs://fake.index")
|
| + So(ls.ArchiveIndexSize, ShouldEqual, 20)
|
| + So(ls.ArchiveDataURL, ShouldEqual, "gs://fake.data")
|
| + So(ls.ArchiveDataSize, ShouldEqual, 30)
|
| + })
|
| +
|
| Convey(`Will refuse to process an invalid stream path.`, func() {
|
| req.Path = "!!!invalid!!!"
|
| _, err := be.ArchiveStream(c, req)
|
| @@ -100,6 +123,7 @@ func TestArchiveStream(t *testing.T) {
|
|
|
| Convey(`If stream is already archived, will not update and return success.`, func() {
|
| ls.State = coordinator.LSArchived
|
| + ls.ArchiveState = coordinator.Archived
|
| ls.TerminalIndex = 1337
|
| So(ls.Archived(), ShouldBeTrue)
|
| So(ls.Put(ds.Get(c)), ShouldBeNil)
|
| @@ -110,9 +134,40 @@ func TestArchiveStream(t *testing.T) {
|
| ls.TerminalIndex = -1 // To make sure it reloaded.
|
| So(ds.Get(c).Get(ls), ShouldBeNil)
|
| So(ls.Archived(), ShouldBeTrue)
|
| + So(ls.ArchiveState, ShouldEqual, coordinator.Archived)
|
| So(ls.TerminalIndex, ShouldEqual, 1337)
|
| })
|
|
|
| + Convey(`If the archive has failed`, func() {
|
| + req.Error = true
|
| +
|
| + Convey(`If the stream is below error threshold, will increment error count and return FailedPrecondition.`, func() {
|
| + ls.ArchiveErrors = 0
|
| + So(ls.ArchiveErrors, ShouldBeLessThan, maxArchiveErrors)
|
| + So(ls.Put(ds.Get(c)), ShouldBeNil)
|
| +
|
| + _, err := be.ArchiveStream(c, req)
|
| + So(err, ShouldHaveRPCCode, codes.FailedPrecondition)
|
| +
|
| + So(ds.Get(c).Get(ls), ShouldBeNil)
|
| + So(ls.Archived(), ShouldBeFalse)
|
| + So(ls.ArchiveState, ShouldEqual, coordinator.NotArchived)
|
| + So(ls.ArchiveErrors, ShouldEqual, 1)
|
| + })
|
| +
|
| + Convey(`If the stream is above error threshold, will succeed.`, func() {
|
| + ls.ArchiveErrors = maxArchiveErrors
|
| + So(ls.Put(ds.Get(c)), ShouldBeNil)
|
| +
|
| + _, err := be.ArchiveStream(c, req)
|
| + So(err, ShouldBeNil)
|
| + So(ds.Get(c).Get(ls), ShouldBeNil)
|
| + So(ls.Archived(), ShouldBeTrue)
|
| + So(ls.ArchiveState, ShouldEqual, coordinator.ArchivedWithErrors)
|
| + So(ls.TerminalIndex, ShouldEqual, 13)
|
| + })
|
| + })
|
| +
|
| Convey(`When datastore Get fails, returns internal error.`, func() {
|
| c, fb := featureBreaker.FilterRDS(c, nil)
|
| fb.BreakFeatures(errors.New("test error"), "GetMulti")
|
|
|