| Index: server/internal/logdog/collector/collector_test.go
|
| diff --git a/server/internal/logdog/collector/collector_test.go b/server/internal/logdog/collector/collector_test.go
|
| index b6949ea273d0fb83c0d4a9aaa32695e54d85ecbd..d9299376b26486ab43cf5fe5523d4408bbc0d0a5 100644
|
| --- a/server/internal/logdog/collector/collector_test.go
|
| +++ b/server/internal/logdog/collector/collector_test.go
|
| @@ -11,6 +11,7 @@ import (
|
| "testing"
|
|
|
| "github.com/luci/luci-go/common/clock/testclock"
|
| + "github.com/luci/luci-go/common/config"
|
| "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/logdog/butlerproto"
|
| "github.com/luci/luci-go/common/logdog/types"
|
| @@ -25,10 +26,8 @@ import (
|
|
|
| // TestCollector runs through a series of end-to-end Collector workflows and
|
| // ensures that the Collector behaves appropriately.
|
| -func TestCollector(t *testing.T) {
|
| - t.Parallel()
|
| -
|
| - Convey(`Using a test configuration`, t, func() {
|
| +func testCollectorImpl(t *testing.T, caching bool) {
|
| + Convey(fmt.Sprintf(`Using a test configuration with caching == %v`, caching), t, func() {
|
| c, _ := testclock.UseTime(context.Background(), testclock.TestTimeLocal)
|
|
|
| tcc := &testCoordinator{}
|
| @@ -44,201 +43,270 @@ func TestCollector(t *testing.T) {
|
| Context: c,
|
| }
|
|
|
| - for _, phrase := range []string{"disabled", "enabled"} {
|
| - v := phrase == "enabled"
|
| + if caching {
|
| + coll.Coordinator = coordinator.NewCache(coll.Coordinator, 0, 0)
|
| + }
|
|
|
| - Convey(fmt.Sprintf(`When caching is %s`, phrase), func() {
|
| - if v {
|
| - coll.Coordinator = coordinator.NewCache(coll.Coordinator, 0, 0)
|
| + Convey(`Can process multiple single full streams from a Butler bundle.`, func() {
|
| + bb.addFullStream("foo/+/bar", 128)
|
| + bb.addFullStream("foo/+/baz", 256)
|
| +
|
| + So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| +
|
| + So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/bar", 127)
|
| + So(st, shouldHaveStoredStream, "test-project", "foo/+/bar", indexRange{0, 127})
|
| +
|
| + So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/baz", 255)
|
| + So(st, shouldHaveStoredStream, "test-project", "foo/+/baz", indexRange{0, 255})
|
| + })
|
| +
|
| + Convey(`Will return a transient error if a transient error happened while registering.`, func() {
|
| + tcc.errC = make(chan error, 1)
|
| + tcc.errC <- errors.WrapTransient(errors.New("test error"))
|
| +
|
| + bb.addFullStream("foo/+/bar", 128)
|
| + err := coll.Process(c, bb.bundle())
|
| + So(err, ShouldNotBeNil)
|
| + So(errors.IsTransient(err), ShouldBeTrue)
|
| + })
|
| +
|
| + Convey(`Will return an error if a non-transient error happened while registering.`, func() {
|
| + tcc.errC = make(chan error, 1)
|
| + tcc.errC <- errors.New("test error")
|
| +
|
| + bb.addFullStream("foo/+/bar", 128)
|
| + err := coll.Process(c, bb.bundle())
|
| + So(err, ShouldNotBeNil)
|
| + So(errors.IsTransient(err), ShouldBeFalse)
|
| + })
|
| +
|
| + Convey(`Will return a transient error if a transient error happened while terminating.`, func() {
|
| + tcc.errC = make(chan error, 2)
|
| + tcc.errC <- nil // Register
|
| + tcc.errC <- errors.WrapTransient(errors.New("test error")) // Terminate
|
| +
|
| + bb.addFullStream("foo/+/bar", 128)
|
| + err := coll.Process(c, bb.bundle())
|
| + So(err, ShouldNotBeNil)
|
| + So(errors.IsTransient(err), ShouldBeTrue)
|
| + })
|
| +
|
| + Convey(`Will return an error if a non-transient error happened while terminating.`, func() {
|
| + tcc.errC = make(chan error, 2)
|
| + tcc.errC <- nil // Register
|
| + tcc.errC <- errors.New("test error") // Terminate
|
| +
|
| + bb.addFullStream("foo/+/bar", 128)
|
| + err := coll.Process(c, bb.bundle())
|
| + So(err, ShouldNotBeNil)
|
| + So(errors.IsTransient(err), ShouldBeFalse)
|
| + })
|
| +
|
| + Convey(`Will return a transient error if a transient error happened on storage.`, func() {
|
| + // Single transient error.
|
| + count := int32(0)
|
| + st.err = func() error {
|
| + if atomic.AddInt32(&count, 1) == 1 {
|
| + return errors.WrapTransient(errors.New("test error"))
|
| }
|
| + return nil
|
| + }
|
| +
|
| + bb.addFullStream("foo/+/bar", 128)
|
| + err := coll.Process(c, bb.bundle())
|
| + So(err, ShouldNotBeNil)
|
| + So(errors.IsTransient(err), ShouldBeTrue)
|
| + })
|
| +
|
| + Convey(`Will drop invalid LogStreamDescriptor bundle entries and process the valid ones.`, func() {
|
| + be := bb.genBundleEntry("foo/+/trash", 1337, 4, 5, 6, 7, 8)
|
| + bb.addBundleEntry(be)
|
| +
|
| + bb.addStreamEntries("foo/+/trash", 0, 1, 3) // Invalid: non-contiguous
|
| + bb.addFullStream("foo/+/bar", 32)
|
| +
|
| + err := coll.Process(c, bb.bundle())
|
| + So(err, ShouldNotBeNil)
|
| + So(errors.IsTransient(err), ShouldBeFalse)
|
| +
|
| + So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/bar", 32)
|
| + So(st, shouldHaveStoredStream, "test-project", "foo/+/bar", indexRange{0, 31})
|
| +
|
| + So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/trash", 1337)
|
| + So(st, shouldHaveStoredStream, "test-project", "foo/+/trash", 4, 5, 6, 7, 8)
|
| + })
|
| +
|
| + Convey(`Will drop streams with missing (invalid) secrets.`, func() {
|
| + b := bb.genBase()
|
| + b.Secret = nil
|
| +
|
| + err := coll.Process(c, bb.bundle())
|
| + So(err, ShouldErrLike, "invalid prefix secret")
|
| + So(errors.IsTransient(err), ShouldBeFalse)
|
| + })
|
| +
|
| + Convey(`Will drop messages with mismatching secrets.`, func() {
|
| + bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2)
|
| + So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| +
|
| + // Push another bundle with a different secret.
|
| + b := bb.genBase()
|
| + b.Secret = bytes.Repeat([]byte{0xAA}, types.PrefixSecretLength)
|
| + be := bb.genBundleEntry("foo/+/bar", 4, 3, 4)
|
| + be.TerminalIndex = 1337
|
| + bb.addBundleEntry(be)
|
| + bb.addFullStream("foo/+/baz", 3)
|
| + So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| +
|
| + So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/bar", -1)
|
| + So(st, shouldHaveStoredStream, "test-project", "foo/+/bar", indexRange{0, 2})
|
| +
|
| + So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/baz", 2)
|
| + So(st, shouldHaveStoredStream, "test-project", "foo/+/baz", indexRange{0, 2})
|
| + })
|
| +
|
| + Convey(`With an empty project name`, func() {
|
| + b := bb.genBase()
|
| + b.Project = ""
|
| + bb.addFullStream("foo/+/baz", 3)
|
| +
|
| + // TODO(dnj): Enable this when project name is required.
|
| + SkipConvey(`Will drop the stream.`, func() {
|
| +
|
| + err := coll.Process(c, bb.bundle())
|
| + So(err, ShouldErrLike, "invalid project name")
|
| + So(errors.IsTransient(err), ShouldBeFalse)
|
| + })
|
| +
|
| + Convey(`Will register the stream.`, func() {
|
| + So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
|
|
| - Convey(`Can process multiple single full streams from a Butler bundle.`, func() {
|
| - bb.addFullStream("foo/+/bar", 128)
|
| - bb.addFullStream("foo/+/baz", 256)
|
| -
|
| - So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| -
|
| - So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 127)
|
| - So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 127})
|
| -
|
| - So(tcc, shouldHaveRegisteredStream, "foo/+/baz", 255)
|
| - So(st, shouldHaveStoredStream, "foo/+/baz", indexRange{0, 255})
|
| - })
|
| -
|
| - Convey(`Will return a transient error if a transient error happened while registering.`, func() {
|
| - tcc.errC = make(chan error, 1)
|
| - tcc.errC <- errors.WrapTransient(errors.New("test error"))
|
| -
|
| - bb.addFullStream("foo/+/bar", 128)
|
| - err := coll.Process(c, bb.bundle())
|
| - So(err, ShouldNotBeNil)
|
| - So(errors.IsTransient(err), ShouldBeTrue)
|
| - })
|
| -
|
| - Convey(`Will return an error if a non-transient error happened while registering.`, func() {
|
| - tcc.errC = make(chan error, 1)
|
| - tcc.errC <- errors.New("test error")
|
| -
|
| - bb.addFullStream("foo/+/bar", 128)
|
| - err := coll.Process(c, bb.bundle())
|
| - So(err, ShouldNotBeNil)
|
| - So(errors.IsTransient(err), ShouldBeFalse)
|
| - })
|
| -
|
| - Convey(`Will return a transient error if a transient error happened while terminating.`, func() {
|
| - tcc.errC = make(chan error, 2)
|
| - tcc.errC <- nil // Register
|
| - tcc.errC <- errors.WrapTransient(errors.New("test error")) // Terminate
|
| -
|
| - bb.addFullStream("foo/+/bar", 128)
|
| - err := coll.Process(c, bb.bundle())
|
| - So(err, ShouldNotBeNil)
|
| - So(errors.IsTransient(err), ShouldBeTrue)
|
| - })
|
| -
|
| - Convey(`Will return an error if a non-transient error happened while terminating.`, func() {
|
| - tcc.errC = make(chan error, 2)
|
| - tcc.errC <- nil // Register
|
| - tcc.errC <- errors.New("test error") // Terminate
|
| -
|
| - bb.addFullStream("foo/+/bar", 128)
|
| - err := coll.Process(c, bb.bundle())
|
| - So(err, ShouldNotBeNil)
|
| - So(errors.IsTransient(err), ShouldBeFalse)
|
| - })
|
| -
|
| - Convey(`Will return a transient error if a transient error happened on storage.`, func() {
|
| - // Single transient error.
|
| - count := int32(0)
|
| - st.err = func() error {
|
| - if atomic.AddInt32(&count, 1) == 1 {
|
| - return errors.WrapTransient(errors.New("test error"))
|
| - }
|
| - return nil
|
| - }
|
| -
|
| - bb.addFullStream("foo/+/bar", 128)
|
| - err := coll.Process(c, bb.bundle())
|
| - So(err, ShouldNotBeNil)
|
| - So(errors.IsTransient(err), ShouldBeTrue)
|
| - })
|
| -
|
| - Convey(`Will drop invalid LogStreamDescriptor bundle entries and process the valid ones.`, func() {
|
| - be := bb.genBundleEntry("foo/+/trash", 1337, 4, 5, 6, 7, 8)
|
| - bb.addBundleEntry(be)
|
| -
|
| - bb.addStreamEntries("foo/+/trash", 0, 1, 3) // Invalid: non-contiguous
|
| - bb.addFullStream("foo/+/bar", 32)
|
| -
|
| - err := coll.Process(c, bb.bundle())
|
| - So(err, ShouldNotBeNil)
|
| - So(errors.IsTransient(err), ShouldBeFalse)
|
| -
|
| - So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 32)
|
| - So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 31})
|
| -
|
| - So(tcc, shouldHaveRegisteredStream, "foo/+/trash", 1337)
|
| - So(st, shouldHaveStoredStream, "foo/+/trash", 4, 5, 6, 7, 8)
|
| - })
|
| -
|
| - Convey(`Will drop streams with missing secrets.`, func() {
|
| - be := bb.genBundleEntry("foo/+/trash", 2, 0, 1, 2)
|
| - be.Secret = nil
|
| - bb.addBundleEntry(be)
|
| -
|
| - err := coll.Process(c, bb.bundle())
|
| - So(err, ShouldErrLike, "missing stream secret")
|
| - So(errors.IsTransient(err), ShouldBeFalse)
|
| - So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar")
|
| - })
|
| -
|
| - Convey(`Will drop messages with mismatching secrets.`, func() {
|
| - bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2)
|
| - So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| -
|
| - // Push another bundle with a different secret.
|
| - be := bb.genBundleEntry("foo/+/bar", 4, 3, 4)
|
| - be.Secret = bytes.Repeat([]byte{0xAA}, types.PrefixSecretLength)
|
| - be.TerminalIndex = 1337
|
| - bb.addBundleEntry(be)
|
| - bb.addFullStream("foo/+/baz", 3)
|
| - So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| -
|
| - So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1)
|
| - So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 2})
|
| -
|
| - So(tcc, shouldHaveRegisteredStream, "foo/+/baz", 2)
|
| - So(st, shouldHaveStoredStream, "foo/+/baz", indexRange{0, 2})
|
| - })
|
| -
|
| - Convey(`Will return no error if the data has a corrupt bundle header.`, func() {
|
| - So(coll.Process(c, []byte{0x00}), ShouldBeNil)
|
| - So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar")
|
| - })
|
| -
|
| - Convey(`Will drop bundles with unknown ProtoVersion string.`, func() {
|
| - buf := bytes.Buffer{}
|
| - w := butlerproto.Writer{ProtoVersion: "!!!invalid!!!"}
|
| - w.Write(&buf, &logpb.ButlerLogBundle{})
|
| -
|
| - So(coll.Process(c, buf.Bytes()), ShouldBeNil)
|
| -
|
| - So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar")
|
| - })
|
| -
|
| - Convey(`Will not ingest records if the stream is archived.`, func() {
|
| - tcc.register(coordinator.LogStreamState{
|
| - Path: "foo/+/bar",
|
| - Secret: testSecret,
|
| - TerminalIndex: -1,
|
| - Archived: true,
|
| - })
|
| -
|
| - bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 3, 4)
|
| - So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| -
|
| - So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1)
|
| - So(st, shouldHaveStoredStream, "foo/+/bar")
|
| - })
|
| -
|
| - Convey(`Will not ingest records if the stream is purged.`, func() {
|
| - tcc.register(coordinator.LogStreamState{
|
| - Path: "foo/+/bar",
|
| - Secret: testSecret,
|
| - TerminalIndex: -1,
|
| - Purged: true,
|
| - })
|
| -
|
| - So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| -
|
| - So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1)
|
| - So(st, shouldHaveStoredStream, "foo/+/bar")
|
| - })
|
| -
|
| - Convey(`Will not ingest a bundle with no bundle entries.`, func() {
|
| - So(coll.Process(c, bb.bundleWithEntries()), ShouldBeNil)
|
| - })
|
| -
|
| - Convey(`Will not ingest a bundle whose log entries don't match their descriptor.`, func() {
|
| - be := bb.genBundleEntry("foo/+/bar", 4, 0, 1, 2, 3, 4)
|
| -
|
| - // Add a binary log entry. This does NOT match the text descriptor, and
|
| - // should fail validation.
|
| - be.Logs = append(be.Logs, &logpb.LogEntry{
|
| - StreamIndex: 2,
|
| - Sequence: 2,
|
| - Content: &logpb.LogEntry_Binary{
|
| - &logpb.Binary{
|
| - Data: []byte{0xd0, 0x6f, 0x00, 0xd5},
|
| - },
|
| - },
|
| - })
|
| - bb.addBundleEntry(be)
|
| - So(coll.Process(c, bb.bundle()), ShouldErrLike, "invalid log entry")
|
| -
|
| - So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar")
|
| - })
|
| + So(tcc, shouldHaveRegisteredStream, "", "foo/+/baz", 2)
|
| + So(st, shouldHaveStoredStream, "", "foo/+/baz", indexRange{0, 2})
|
| })
|
| - }
|
| + })
|
| +
|
| + Convey(`Will drop streams with invalid project names.`, func() {
|
| + b := bb.genBase()
|
| + b.Project = "!!!invalid name!!!"
|
| + So(config.ProjectName(b.Project).Validate(), ShouldNotBeNil)
|
| +
|
| + err := coll.Process(c, bb.bundle())
|
| + So(err, ShouldErrLike, "invalid bundle project name")
|
| + So(errors.IsTransient(err), ShouldBeFalse)
|
| + })
|
| +
|
| + Convey(`Will drop streams with empty bundle prefixes.`, func() {
|
| + b := bb.genBase()
|
| + b.Prefix = ""
|
| +
|
| + err := coll.Process(c, bb.bundle())
|
| + So(err, ShouldErrLike, "invalid bundle prefix")
|
| + So(errors.IsTransient(err), ShouldBeFalse)
|
| + })
|
| +
|
| + Convey(`Will drop streams with invalid bundle prefixes.`, func() {
|
| + b := bb.genBase()
|
| + b.Prefix = "!!!invalid prefix!!!"
|
| + So(types.StreamName(b.Prefix).Validate(), ShouldNotBeNil)
|
| +
|
| + err := coll.Process(c, bb.bundle())
|
| + So(err, ShouldErrLike, "invalid bundle prefix")
|
| + So(errors.IsTransient(err), ShouldBeFalse)
|
| + })
|
| +
|
| + Convey(`Will drop streams whose descriptor prefix doesn't match its bundle's prefix.`, func() {
|
| + bb.addStreamEntries("baz/+/bar", 3, 0, 1, 2, 3, 4)
|
| +
|
| + err := coll.Process(c, bb.bundle())
|
| + So(err, ShouldErrLike, "mismatched bundle and entry prefixes")
|
| + So(errors.IsTransient(err), ShouldBeFalse)
|
| + })
|
| +
|
| + Convey(`Will return no error if the data has a corrupt bundle header.`, func() {
|
| + So(coll.Process(c, []byte{0x00}), ShouldBeNil)
|
| + So(tcc, shouldNotHaveRegisteredStream, "test-project", "foo/+/bar")
|
| + })
|
| +
|
| + Convey(`Will drop bundles with unknown ProtoVersion string.`, func() {
|
| + buf := bytes.Buffer{}
|
| + w := butlerproto.Writer{ProtoVersion: "!!!invalid!!!"}
|
| + w.Write(&buf, &logpb.ButlerLogBundle{})
|
| +
|
| + So(coll.Process(c, buf.Bytes()), ShouldBeNil)
|
| +
|
| + So(tcc, shouldNotHaveRegisteredStream, "test-project", "foo/+/bar")
|
| + })
|
| +
|
| + Convey(`Will not ingest records if the stream is archived.`, func() {
|
| + tcc.register(coordinator.LogStreamState{
|
| + Project: "test-project",
|
| + Path: "foo/+/bar",
|
| + Secret: testSecret,
|
| + TerminalIndex: -1,
|
| + Archived: true,
|
| + })
|
| +
|
| + bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 3, 4)
|
| + So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| +
|
| + So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/bar", -1)
|
| + So(st, shouldHaveStoredStream, "test-project", "foo/+/bar")
|
| + })
|
| +
|
| + Convey(`Will not ingest records if the stream is purged.`, func() {
|
| + tcc.register(coordinator.LogStreamState{
|
| + Project: "test-project",
|
| + Path: "foo/+/bar",
|
| + Secret: testSecret,
|
| + TerminalIndex: -1,
|
| + Purged: true,
|
| + })
|
| +
|
| + So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| +
|
| + So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/bar", -1)
|
| + So(st, shouldHaveStoredStream, "test-project", "foo/+/bar")
|
| + })
|
| +
|
| + Convey(`Will not ingest a bundle with no bundle entries.`, func() {
|
| + So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| + })
|
| +
|
| + Convey(`Will not ingest a bundle whose log entries don't match their descriptor.`, func() {
|
| + be := bb.genBundleEntry("foo/+/bar", 4, 0, 1, 2, 3, 4)
|
| +
|
| + // Add a binary log entry. This does NOT match the text descriptor, and
|
| + // should fail validation.
|
| + be.Logs = append(be.Logs, &logpb.LogEntry{
|
| + StreamIndex: 2,
|
| + Sequence: 2,
|
| + Content: &logpb.LogEntry_Binary{
|
| + &logpb.Binary{
|
| + Data: []byte{0xd0, 0x6f, 0x00, 0xd5},
|
| + },
|
| + },
|
| + })
|
| + bb.addBundleEntry(be)
|
| + So(coll.Process(c, bb.bundle()), ShouldErrLike, "invalid log entry")
|
| +
|
| + So(tcc, shouldNotHaveRegisteredStream, "test-project", "foo/+/bar")
|
| + })
|
| })
|
| }
|
| +
|
| +// TestCollector runs through a series of end-to-end Collector workflows and
|
| +// ensures that the Collector behaves appropriately.
|
| +func TestCollector(t *testing.T) {
|
| + t.Parallel()
|
| +
|
| + testCollectorImpl(t, false)
|
| +}
|
| +
|
| +// TestCollectorWithCaching runs through a series of end-to-end Collector
|
| +// workflows and ensures that the Collector behaves appropriately.
|
| +func TestCollectorWithCaching(t *testing.T) {
|
| + t.Parallel()
|
| +
|
| + testCollectorImpl(t, true)
|
| +}
|
|
|