| Index: server/internal/logdog/collector/collector_test.go
|
| diff --git a/server/internal/logdog/collector/collector_test.go b/server/internal/logdog/collector/collector_test.go
|
| index e23b5a862041301d21875c6eeab32434b2f6fb58..eb1e8e0e32e7931d41899bc10c52ac37c05f77d0 100644
|
| --- a/server/internal/logdog/collector/collector_test.go
|
| +++ b/server/internal/logdog/collector/collector_test.go
|
| @@ -19,6 +19,7 @@ import (
|
| "github.com/luci/luci-go/server/logdog/storage/memory"
|
| "golang.org/x/net/context"
|
|
|
| + . "github.com/luci/luci-go/common/testing/assertions"
|
| . "github.com/smartystreets/goconvey/convey"
|
| )
|
|
|
| @@ -106,12 +107,12 @@ func TestCollector(t *testing.T) {
|
| So(errors.IsTransient(err), ShouldBeFalse)
|
| })
|
|
|
| - Convey(`Will return a transient error if an error happened on storage.`, func() {
|
| + Convey(`Will return a transient error if a transient error happened on storage.`, func() {
|
| // Single transient error.
|
| count := int32(0)
|
| st.err = func() error {
|
| if atomic.AddInt32(&count, 1) == 1 {
|
| - return errors.New("test error")
|
| + return errors.WrapTransient(errors.New("test error"))
|
| }
|
| return nil
|
| }
|
| @@ -123,20 +124,21 @@ func TestCollector(t *testing.T) {
|
| })
|
|
|
| Convey(`Will drop invalid LogStreamDescriptor bundle entries and process the valid ones.`, func() {
|
| - be := bb.genBundleEntry("foo/+/trash", 1337, 4, 6, 7, 8)
|
| - be.Desc.ContentType = "" // Missing ContentType => invalid.
|
| -
|
| - bb.addStreamEntries("foo/+/trash", -1, 0, 1, 2, 3, 5)
|
| + be := bb.genBundleEntry("foo/+/trash", 1337, 4, 5, 6, 7, 8)
|
| bb.addBundleEntry(be)
|
| +
|
| + bb.addStreamEntries("foo/+/trash", 0, 1, 3) // Invalid: non-contiguous
|
| bb.addFullStream("foo/+/bar", 32)
|
|
|
| - So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| + err := coll.Process(c, bb.bundle())
|
| + So(err, ShouldNotBeNil)
|
| + So(errors.IsTransient(err), ShouldBeFalse)
|
|
|
| So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 32)
|
| So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 31})
|
|
|
| - So(tcc, shouldHaveRegisteredStream, "foo/+/trash", -1)
|
| - So(st, shouldHaveStoredStream, "foo/+/trash", 0, 1, 2, 3, 5)
|
| + So(tcc, shouldHaveRegisteredStream, "foo/+/trash", 1337)
|
| + So(st, shouldHaveStoredStream, "foo/+/trash", 4, 5, 6, 7, 8)
|
| })
|
|
|
| Convey(`Will drop streams with missing secrets.`, func() {
|
| @@ -144,8 +146,10 @@ func TestCollector(t *testing.T) {
|
| be.Secret = nil
|
| bb.addBundleEntry(be)
|
|
|
| - So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| - So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar", 127)
|
| + err := coll.Process(c, bb.bundle())
|
| + So(err, ShouldErrLike, "missing stream secret")
|
| + So(errors.IsTransient(err), ShouldBeFalse)
|
| + So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar")
|
| })
|
|
|
| Convey(`Will drop messages with mismatching secrets.`, func() {
|
| @@ -182,32 +186,6 @@ func TestCollector(t *testing.T) {
|
| So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar")
|
| })
|
|
|
| - Convey(`Will drop records beyond a local terminal index.`, func() {
|
| - bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 4)
|
| - So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| -
|
| - bb.addStreamEntries("foo/+/bar", 3, 3, 5)
|
| - So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| -
|
| - So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 4)
|
| - So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 3})
|
| - })
|
| -
|
| - Convey(`Will not ingest records beyond a remote terminal index.`, func() {
|
| - tcc.register(coordinator.LogStreamState{
|
| - Path: "foo/+/bar",
|
| - Secret: testSecret,
|
| - TerminalIndex: 3,
|
| - })
|
| -
|
| - bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2)
|
| - bb.addStreamEntries("foo/+/bar", 3, 3, 5)
|
| - So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| -
|
| - So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 3)
|
| - So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 3})
|
| - })
|
| -
|
| Convey(`Will not ingest records if the stream is archived.`, func() {
|
| tcc.register(coordinator.LogStreamState{
|
| Path: "foo/+/bar",
|
| @@ -216,7 +194,7 @@ func TestCollector(t *testing.T) {
|
| Archived: true,
|
| })
|
|
|
| - bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 4)
|
| + bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 3, 4)
|
| So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
|
|
| So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1)
|
| @@ -242,7 +220,7 @@ func TestCollector(t *testing.T) {
|
| })
|
|
|
| Convey(`Will not ingest a bundle whose log entries don't match their descriptor.`, func() {
|
| - be := bb.genBundleEntry("foo/+/bar", 4, 0, 1, 3, 4)
|
| + be := bb.genBundleEntry("foo/+/bar", 4, 0, 1, 2, 3, 4)
|
|
|
| // Add a binary log entry. This does NOT match the text descriptor, and
|
| // should fail validation.
|
| @@ -256,10 +234,9 @@ func TestCollector(t *testing.T) {
|
| },
|
| })
|
| bb.addBundleEntry(be)
|
| - So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| + So(coll.Process(c, bb.bundle()), ShouldErrLike, "invalid log entry")
|
|
|
| - So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 4)
|
| - So(st, shouldHaveStoredStream, "foo/+/bar", 0, 1, 3, 4)
|
| + So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar")
|
| })
|
| })
|
| }
|
|
|