| Index: logdog/server/collector/collector_test.go
|
| diff --git a/logdog/server/collector/collector_test.go b/logdog/server/collector/collector_test.go
|
| index a692bb9a1eb995330ef6e46f476bdfb301f7e92a..a41a45adcda1304969af5c2b1d550f77c9d9bda6 100644
|
| --- a/logdog/server/collector/collector_test.go
|
| +++ b/logdog/server/collector/collector_test.go
|
| @@ -12,6 +12,7 @@ import (
|
|
|
| "github.com/luci/luci-go/common/clock/testclock"
|
| "github.com/luci/luci-go/common/errors"
|
| + "github.com/luci/luci-go/common/retry/transient"
|
| "github.com/luci/luci-go/logdog/api/logpb"
|
| "github.com/luci/luci-go/logdog/client/butlerproto"
|
| "github.com/luci/luci-go/logdog/common/storage/memory"
|
| @@ -61,12 +62,12 @@ func testCollectorImpl(t *testing.T, caching bool) {
|
| })
|
|
|
| Convey(`Will return a transient error if a transient error happened while registering.`, func() {
|
| - tcc.registerCallback = func(cc.LogStreamState) error { return errors.WrapTransient(errors.New("test error")) }
|
| + tcc.registerCallback = func(cc.LogStreamState) error { return errors.New("test error", transient.Tag) }
|
|
|
| bb.addFullStream("foo/+/bar", 128)
|
| err := coll.Process(c, bb.bundle())
|
| So(err, ShouldNotBeNil)
|
| - So(errors.IsTransient(err), ShouldBeTrue)
|
| + So(transient.Tag.In(err), ShouldBeTrue)
|
| })
|
|
|
| Convey(`Will return an error if a non-transient error happened while registering.`, func() {
|
| @@ -75,7 +76,7 @@ func testCollectorImpl(t *testing.T, caching bool) {
|
| bb.addFullStream("foo/+/bar", 128)
|
| err := coll.Process(c, bb.bundle())
|
| So(err, ShouldNotBeNil)
|
| - So(errors.IsTransient(err), ShouldBeFalse)
|
| + So(transient.Tag.In(err), ShouldBeFalse)
|
| })
|
|
|
| // This will happen when one registration request registers non-terminal,
|
| @@ -98,7 +99,7 @@ func testCollectorImpl(t *testing.T, caching bool) {
|
| })
|
|
|
| Convey(`Will return a transient error if a transient error happened while terminating.`, func() {
|
| - tcc.terminateCallback = func(cc.TerminateRequest) error { return errors.WrapTransient(errors.New("test error")) }
|
| + tcc.terminateCallback = func(cc.TerminateRequest) error { return errors.New("test error", transient.Tag) }
|
|
|
| // Register independently from terminate so we don't bundle RPC.
|
| bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2, 3, 4)
|
| @@ -108,7 +109,7 @@ func testCollectorImpl(t *testing.T, caching bool) {
|
| bb.addStreamEntries("foo/+/bar", 5, 5)
|
| err := coll.Process(c, bb.bundle())
|
| So(err, ShouldNotBeNil)
|
| - So(errors.IsTransient(err), ShouldBeTrue)
|
| + So(transient.Tag.In(err), ShouldBeTrue)
|
| })
|
|
|
| Convey(`Will return an error if a non-transient error happened while terminating.`, func() {
|
| @@ -122,7 +123,7 @@ func testCollectorImpl(t *testing.T, caching bool) {
|
| bb.addStreamEntries("foo/+/bar", 5, 5)
|
| err := coll.Process(c, bb.bundle())
|
| So(err, ShouldNotBeNil)
|
| - So(errors.IsTransient(err), ShouldBeFalse)
|
| + So(transient.Tag.In(err), ShouldBeFalse)
|
| })
|
|
|
| Convey(`Will return a transient error if a transient error happened on storage.`, func() {
|
| @@ -130,7 +131,7 @@ func testCollectorImpl(t *testing.T, caching bool) {
|
| count := int32(0)
|
| st.err = func() error {
|
| if atomic.AddInt32(&count, 1) == 1 {
|
| - return errors.WrapTransient(errors.New("test error"))
|
| + return errors.New("test error", transient.Tag)
|
| }
|
| return nil
|
| }
|
| @@ -138,7 +139,7 @@ func testCollectorImpl(t *testing.T, caching bool) {
|
| bb.addFullStream("foo/+/bar", 128)
|
| err := coll.Process(c, bb.bundle())
|
| So(err, ShouldNotBeNil)
|
| - So(errors.IsTransient(err), ShouldBeTrue)
|
| + So(transient.Tag.In(err), ShouldBeTrue)
|
| })
|
|
|
| Convey(`Will drop invalid LogStreamDescriptor bundle entries and process the valid ones.`, func() {
|
| @@ -150,7 +151,7 @@ func testCollectorImpl(t *testing.T, caching bool) {
|
|
|
| err := coll.Process(c, bb.bundle())
|
| So(err, ShouldNotBeNil)
|
| - So(errors.IsTransient(err), ShouldBeFalse)
|
| + So(transient.Tag.In(err), ShouldBeFalse)
|
|
|
| So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/bar", 32)
|
| So(st, shouldHaveStoredStream, "test-project", "foo/+/bar", indexRange{0, 31})
|
| @@ -166,7 +167,7 @@ func testCollectorImpl(t *testing.T, caching bool) {
|
|
|
| err := coll.Process(c, bb.bundle())
|
| So(err, ShouldErrLike, "invalid prefix secret")
|
| - So(errors.IsTransient(err), ShouldBeFalse)
|
| + So(transient.Tag.In(err), ShouldBeFalse)
|
| })
|
|
|
| Convey(`Will drop messages with mismatching secrets.`, func() {
|
| @@ -196,7 +197,7 @@ func testCollectorImpl(t *testing.T, caching bool) {
|
|
|
| err := coll.Process(c, bb.bundle())
|
| So(err, ShouldErrLike, "invalid bundle project name")
|
| - So(errors.IsTransient(err), ShouldBeFalse)
|
| + So(transient.Tag.In(err), ShouldBeFalse)
|
| })
|
|
|
| Convey(`Will drop streams with invalid project names.`, func() {
|
| @@ -206,7 +207,7 @@ func testCollectorImpl(t *testing.T, caching bool) {
|
|
|
| err := coll.Process(c, bb.bundle())
|
| So(err, ShouldErrLike, "invalid bundle project name")
|
| - So(errors.IsTransient(err), ShouldBeFalse)
|
| + So(transient.Tag.In(err), ShouldBeFalse)
|
| })
|
|
|
| Convey(`Will drop streams with empty bundle prefixes.`, func() {
|
| @@ -215,7 +216,7 @@ func testCollectorImpl(t *testing.T, caching bool) {
|
|
|
| err := coll.Process(c, bb.bundle())
|
| So(err, ShouldErrLike, "invalid bundle prefix")
|
| - So(errors.IsTransient(err), ShouldBeFalse)
|
| + So(transient.Tag.In(err), ShouldBeFalse)
|
| })
|
|
|
| Convey(`Will drop streams with invalid bundle prefixes.`, func() {
|
| @@ -225,7 +226,7 @@ func testCollectorImpl(t *testing.T, caching bool) {
|
|
|
| err := coll.Process(c, bb.bundle())
|
| So(err, ShouldErrLike, "invalid bundle prefix")
|
| - So(errors.IsTransient(err), ShouldBeFalse)
|
| + So(transient.Tag.In(err), ShouldBeFalse)
|
| })
|
|
|
| Convey(`Will drop streams whose descriptor prefix doesn't match its bundle's prefix.`, func() {
|
| @@ -233,7 +234,7 @@ func testCollectorImpl(t *testing.T, caching bool) {
|
|
|
| err := coll.Process(c, bb.bundle())
|
| So(err, ShouldErrLike, "mismatched bundle and entry prefixes")
|
| - So(errors.IsTransient(err), ShouldBeFalse)
|
| + So(transient.Tag.In(err), ShouldBeFalse)
|
| })
|
|
|
| Convey(`Will return no error if the data has a corrupt bundle header.`, func() {
|
|
|