Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3162)

Unified Diff: appengine/logdog/coordinator/endpoints/services/archiveStream_test.go

Issue 1853433002: LogDog: Handle archive failures. (Closed) Base URL: https://github.com/luci/luci-go@logdog-gs-update
Patch Set: Regenerate protobufs. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/endpoints/services/archiveStream_test.go
diff --git a/appengine/logdog/coordinator/endpoints/services/archiveStream_test.go b/appengine/logdog/coordinator/endpoints/services/archiveStream_test.go
index 512f2c1226e0f14e7995145f052badb8a6914324..9a4f69d1c110278ae759814f8ec45c4beac58ec1 100644
--- a/appengine/logdog/coordinator/endpoints/services/archiveStream_test.go
+++ b/appengine/logdog/coordinator/endpoints/services/archiveStream_test.go
@@ -14,10 +14,12 @@ import (
"github.com/luci/luci-go/appengine/logdog/coordinator"
ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest"
"github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
+ "github.com/luci/luci-go/common/logging/gologger"
"github.com/luci/luci-go/common/proto/logdog/svcconfig"
"github.com/luci/luci-go/server/auth"
"github.com/luci/luci-go/server/auth/authtest"
"golang.org/x/net/context"
+ "google.golang.org/grpc/codes"
. "github.com/luci/luci-go/common/testing/assertions"
. "github.com/smartystreets/goconvey/convey"
@@ -28,6 +30,7 @@ func TestArchiveStream(t *testing.T) {
Convey(`With a testing configuration`, t, func() {
c := memory.Use(context.Background())
+ c = gologger.Use(c)
be := Server{}
c = ct.UseConfig(c, &svcconfig.Coordinator{
@@ -68,6 +71,7 @@ func TestArchiveStream(t *testing.T) {
So(ds.Get(c).Get(ls), ShouldBeNil)
So(ls.Archived(), ShouldBeTrue)
+ So(ls.ArchiveState, ShouldEqual, coordinator.Archived)
So(ls.ArchiveWhole, ShouldBeTrue)
So(ls.TerminalIndex, ShouldEqual, 13)
So(ls.ArchiveStreamURL, ShouldEqual, "gs://fake.stream")
@@ -78,6 +82,25 @@ func TestArchiveStream(t *testing.T) {
So(ls.ArchiveDataSize, ShouldEqual, 30)
})
+ Convey(`Will mark the stream as partially archived if not complete.`, func() {
+ req.Complete = false
+
+ _, err := be.ArchiveStream(c, req)
+ So(err, ShouldBeNil)
+
+ So(ds.Get(c).Get(ls), ShouldBeNil)
+ So(ls.Archived(), ShouldBeTrue)
+ So(ls.ArchiveState, ShouldEqual, coordinator.ArchivedPartially)
+ So(ls.ArchiveWhole, ShouldBeFalse)
+ So(ls.TerminalIndex, ShouldEqual, 13)
+ So(ls.ArchiveStreamURL, ShouldEqual, "gs://fake.stream")
+ So(ls.ArchiveStreamSize, ShouldEqual, 10)
+ So(ls.ArchiveIndexURL, ShouldEqual, "gs://fake.index")
+ So(ls.ArchiveIndexSize, ShouldEqual, 20)
+ So(ls.ArchiveDataURL, ShouldEqual, "gs://fake.data")
+ So(ls.ArchiveDataSize, ShouldEqual, 30)
+ })
+
Convey(`Will refuse to process an invalid stream path.`, func() {
req.Path = "!!!invalid!!!"
_, err := be.ArchiveStream(c, req)
@@ -100,6 +123,7 @@ func TestArchiveStream(t *testing.T) {
Convey(`If stream is already archived, will not update and return success.`, func() {
ls.State = coordinator.LSArchived
+ ls.ArchiveState = coordinator.Archived
ls.TerminalIndex = 1337
So(ls.Archived(), ShouldBeTrue)
So(ls.Put(ds.Get(c)), ShouldBeNil)
@@ -110,9 +134,40 @@ func TestArchiveStream(t *testing.T) {
ls.TerminalIndex = -1 // To make sure it reloaded.
So(ds.Get(c).Get(ls), ShouldBeNil)
So(ls.Archived(), ShouldBeTrue)
+ So(ls.ArchiveState, ShouldEqual, coordinator.Archived)
So(ls.TerminalIndex, ShouldEqual, 1337)
})
+ Convey(`If the archive has failed`, func() {
+ req.Error = true
+
+ Convey(`If the stream is below error threshold, will increment error count and return FailedPrecondition.`, func() {
+ ls.ArchiveErrors = 0
+ So(ls.ArchiveErrors, ShouldBeLessThan, maxArchiveErrors)
+ So(ls.Put(ds.Get(c)), ShouldBeNil)
+
+ _, err := be.ArchiveStream(c, req)
+ So(err, ShouldHaveRPCCode, codes.FailedPrecondition)
+
+ So(ds.Get(c).Get(ls), ShouldBeNil)
+ So(ls.Archived(), ShouldBeFalse)
+ So(ls.ArchiveState, ShouldEqual, coordinator.NotArchived)
+ So(ls.ArchiveErrors, ShouldEqual, 1)
+ })
+
+ Convey(`If the stream is above error threshold, will succeed.`, func() {
+ ls.ArchiveErrors = maxArchiveErrors
+ So(ls.Put(ds.Get(c)), ShouldBeNil)
+
+ _, err := be.ArchiveStream(c, req)
+ So(err, ShouldBeNil)
+ So(ds.Get(c).Get(ls), ShouldBeNil)
+ So(ls.Archived(), ShouldBeTrue)
+ So(ls.ArchiveState, ShouldEqual, coordinator.ArchivedWithErrors)
+ So(ls.TerminalIndex, ShouldEqual, 13)
+ })
+ })
+
Convey(`When datastore Get fails, returns internal error.`, func() {
c, fb := featureBreaker.FilterRDS(c, nil)
fb.BreakFeatures(errors.New("test error"), "GetMulti")
« no previous file with comments | « appengine/logdog/coordinator/endpoints/services/archiveStream.go ('k') | appengine/logdog/coordinator/logStream.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698