Index: logdog/server/collector/collector_test.go |
diff --git a/logdog/server/collector/collector_test.go b/logdog/server/collector/collector_test.go |
index a692bb9a1eb995330ef6e46f476bdfb301f7e92a..a41a45adcda1304969af5c2b1d550f77c9d9bda6 100644 |
--- a/logdog/server/collector/collector_test.go |
+++ b/logdog/server/collector/collector_test.go |
@@ -12,6 +12,7 @@ import ( |
"github.com/luci/luci-go/common/clock/testclock" |
"github.com/luci/luci-go/common/errors" |
+ "github.com/luci/luci-go/common/retry/transient" |
"github.com/luci/luci-go/logdog/api/logpb" |
"github.com/luci/luci-go/logdog/client/butlerproto" |
"github.com/luci/luci-go/logdog/common/storage/memory" |
@@ -61,12 +62,12 @@ func testCollectorImpl(t *testing.T, caching bool) { |
}) |
Convey(`Will return a transient error if a transient error happened while registering.`, func() { |
- tcc.registerCallback = func(cc.LogStreamState) error { return errors.WrapTransient(errors.New("test error")) } |
+ tcc.registerCallback = func(cc.LogStreamState) error { return errors.New("test error", transient.Tag) } |
bb.addFullStream("foo/+/bar", 128) |
err := coll.Process(c, bb.bundle()) |
So(err, ShouldNotBeNil) |
- So(errors.IsTransient(err), ShouldBeTrue) |
+ So(transient.Tag.In(err), ShouldBeTrue) |
}) |
Convey(`Will return an error if a non-transient error happened while registering.`, func() { |
@@ -75,7 +76,7 @@ func testCollectorImpl(t *testing.T, caching bool) { |
bb.addFullStream("foo/+/bar", 128) |
err := coll.Process(c, bb.bundle()) |
So(err, ShouldNotBeNil) |
- So(errors.IsTransient(err), ShouldBeFalse) |
+ So(transient.Tag.In(err), ShouldBeFalse) |
}) |
// This will happen when one registration request registers non-terminal, |
@@ -98,7 +99,7 @@ func testCollectorImpl(t *testing.T, caching bool) { |
}) |
Convey(`Will return a transient error if a transient error happened while terminating.`, func() { |
- tcc.terminateCallback = func(cc.TerminateRequest) error { return errors.WrapTransient(errors.New("test error")) } |
+ tcc.terminateCallback = func(cc.TerminateRequest) error { return errors.New("test error", transient.Tag) } |
// Register independently from terminate so we don't bundle RPC. |
bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2, 3, 4) |
@@ -108,7 +109,7 @@ func testCollectorImpl(t *testing.T, caching bool) { |
bb.addStreamEntries("foo/+/bar", 5, 5) |
err := coll.Process(c, bb.bundle()) |
So(err, ShouldNotBeNil) |
- So(errors.IsTransient(err), ShouldBeTrue) |
+ So(transient.Tag.In(err), ShouldBeTrue) |
}) |
Convey(`Will return an error if a non-transient error happened while terminating.`, func() { |
@@ -122,7 +123,7 @@ func testCollectorImpl(t *testing.T, caching bool) { |
bb.addStreamEntries("foo/+/bar", 5, 5) |
err := coll.Process(c, bb.bundle()) |
So(err, ShouldNotBeNil) |
- So(errors.IsTransient(err), ShouldBeFalse) |
+ So(transient.Tag.In(err), ShouldBeFalse) |
}) |
Convey(`Will return a transient error if a transient error happened on storage.`, func() { |
@@ -130,7 +131,7 @@ func testCollectorImpl(t *testing.T, caching bool) { |
count := int32(0) |
st.err = func() error { |
if atomic.AddInt32(&count, 1) == 1 { |
- return errors.WrapTransient(errors.New("test error")) |
+ return errors.New("test error", transient.Tag) |
} |
return nil |
} |
@@ -138,7 +139,7 @@ func testCollectorImpl(t *testing.T, caching bool) { |
bb.addFullStream("foo/+/bar", 128) |
err := coll.Process(c, bb.bundle()) |
So(err, ShouldNotBeNil) |
- So(errors.IsTransient(err), ShouldBeTrue) |
+ So(transient.Tag.In(err), ShouldBeTrue) |
}) |
Convey(`Will drop invalid LogStreamDescriptor bundle entries and process the valid ones.`, func() { |
@@ -150,7 +151,7 @@ func testCollectorImpl(t *testing.T, caching bool) { |
err := coll.Process(c, bb.bundle()) |
So(err, ShouldNotBeNil) |
- So(errors.IsTransient(err), ShouldBeFalse) |
+ So(transient.Tag.In(err), ShouldBeFalse) |
So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/bar", 32) |
So(st, shouldHaveStoredStream, "test-project", "foo/+/bar", indexRange{0, 31}) |
@@ -166,7 +167,7 @@ func testCollectorImpl(t *testing.T, caching bool) { |
err := coll.Process(c, bb.bundle()) |
So(err, ShouldErrLike, "invalid prefix secret") |
- So(errors.IsTransient(err), ShouldBeFalse) |
+ So(transient.Tag.In(err), ShouldBeFalse) |
}) |
Convey(`Will drop messages with mismatching secrets.`, func() { |
@@ -196,7 +197,7 @@ func testCollectorImpl(t *testing.T, caching bool) { |
err := coll.Process(c, bb.bundle()) |
So(err, ShouldErrLike, "invalid bundle project name") |
- So(errors.IsTransient(err), ShouldBeFalse) |
+ So(transient.Tag.In(err), ShouldBeFalse) |
}) |
Convey(`Will drop streams with invalid project names.`, func() { |
@@ -206,7 +207,7 @@ func testCollectorImpl(t *testing.T, caching bool) { |
err := coll.Process(c, bb.bundle()) |
So(err, ShouldErrLike, "invalid bundle project name") |
- So(errors.IsTransient(err), ShouldBeFalse) |
+ So(transient.Tag.In(err), ShouldBeFalse) |
}) |
Convey(`Will drop streams with empty bundle prefixes.`, func() { |
@@ -215,7 +216,7 @@ func testCollectorImpl(t *testing.T, caching bool) { |
err := coll.Process(c, bb.bundle()) |
So(err, ShouldErrLike, "invalid bundle prefix") |
- So(errors.IsTransient(err), ShouldBeFalse) |
+ So(transient.Tag.In(err), ShouldBeFalse) |
}) |
Convey(`Will drop streams with invalid bundle prefixes.`, func() { |
@@ -225,7 +226,7 @@ func testCollectorImpl(t *testing.T, caching bool) { |
err := coll.Process(c, bb.bundle()) |
So(err, ShouldErrLike, "invalid bundle prefix") |
- So(errors.IsTransient(err), ShouldBeFalse) |
+ So(transient.Tag.In(err), ShouldBeFalse) |
}) |
Convey(`Will drop streams whose descriptor prefix doesn't match its bundle's prefix.`, func() { |
@@ -233,7 +234,7 @@ func testCollectorImpl(t *testing.T, caching bool) { |
err := coll.Process(c, bb.bundle()) |
So(err, ShouldErrLike, "mismatched bundle and entry prefixes") |
- So(errors.IsTransient(err), ShouldBeFalse) |
+ So(transient.Tag.In(err), ShouldBeFalse) |
}) |
Convey(`Will return no error if the data has a corrupt bundle header.`, func() { |