Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(478)

Unified Diff: server/internal/logdog/collector/collector_test.go

Issue 1906023002: LogDog: Add project namespace to Butler/Collector. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-archivist
Patch Set: Rebase? Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: server/internal/logdog/collector/collector_test.go
diff --git a/server/internal/logdog/collector/collector_test.go b/server/internal/logdog/collector/collector_test.go
index b6949ea273d0fb83c0d4a9aaa32695e54d85ecbd..d9299376b26486ab43cf5fe5523d4408bbc0d0a5 100644
--- a/server/internal/logdog/collector/collector_test.go
+++ b/server/internal/logdog/collector/collector_test.go
@@ -11,6 +11,7 @@ import (
"testing"
"github.com/luci/luci-go/common/clock/testclock"
+ "github.com/luci/luci-go/common/config"
"github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/logdog/butlerproto"
"github.com/luci/luci-go/common/logdog/types"
@@ -25,10 +26,8 @@ import (
// TestCollector runs through a series of end-to-end Collector workflows and
// ensures that the Collector behaves appropriately.
-func TestCollector(t *testing.T) {
- t.Parallel()
-
- Convey(`Using a test configuration`, t, func() {
+func testCollectorImpl(t *testing.T, caching bool) {
+ Convey(fmt.Sprintf(`Using a test configuration with caching == %v`, caching), t, func() {
c, _ := testclock.UseTime(context.Background(), testclock.TestTimeLocal)
tcc := &testCoordinator{}
@@ -44,201 +43,270 @@ func TestCollector(t *testing.T) {
Context: c,
}
- for _, phrase := range []string{"disabled", "enabled"} {
- v := phrase == "enabled"
+ if caching {
+ coll.Coordinator = coordinator.NewCache(coll.Coordinator, 0, 0)
+ }
- Convey(fmt.Sprintf(`When caching is %s`, phrase), func() {
- if v {
- coll.Coordinator = coordinator.NewCache(coll.Coordinator, 0, 0)
+ Convey(`Can process multiple single full streams from a Butler bundle.`, func() {
+ bb.addFullStream("foo/+/bar", 128)
+ bb.addFullStream("foo/+/baz", 256)
+
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/bar", 127)
+ So(st, shouldHaveStoredStream, "test-project", "foo/+/bar", indexRange{0, 127})
+
+ So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/baz", 255)
+ So(st, shouldHaveStoredStream, "test-project", "foo/+/baz", indexRange{0, 255})
+ })
+
+ Convey(`Will return a transient error if a transient error happened while registering.`, func() {
+ tcc.errC = make(chan error, 1)
+ tcc.errC <- errors.WrapTransient(errors.New("test error"))
+
+ bb.addFullStream("foo/+/bar", 128)
+ err := coll.Process(c, bb.bundle())
+ So(err, ShouldNotBeNil)
+ So(errors.IsTransient(err), ShouldBeTrue)
+ })
+
+ Convey(`Will return an error if a non-transient error happened while registering.`, func() {
+ tcc.errC = make(chan error, 1)
+ tcc.errC <- errors.New("test error")
+
+ bb.addFullStream("foo/+/bar", 128)
+ err := coll.Process(c, bb.bundle())
+ So(err, ShouldNotBeNil)
+ So(errors.IsTransient(err), ShouldBeFalse)
+ })
+
+ Convey(`Will return a transient error if a transient error happened while terminating.`, func() {
+ tcc.errC = make(chan error, 2)
+ tcc.errC <- nil // Register
+ tcc.errC <- errors.WrapTransient(errors.New("test error")) // Terminate
+
+ bb.addFullStream("foo/+/bar", 128)
+ err := coll.Process(c, bb.bundle())
+ So(err, ShouldNotBeNil)
+ So(errors.IsTransient(err), ShouldBeTrue)
+ })
+
+ Convey(`Will return an error if a non-transient error happened while terminating.`, func() {
+ tcc.errC = make(chan error, 2)
+ tcc.errC <- nil // Register
+ tcc.errC <- errors.New("test error") // Terminate
+
+ bb.addFullStream("foo/+/bar", 128)
+ err := coll.Process(c, bb.bundle())
+ So(err, ShouldNotBeNil)
+ So(errors.IsTransient(err), ShouldBeFalse)
+ })
+
+ Convey(`Will return a transient error if a transient error happened on storage.`, func() {
+ // Single transient error.
+ count := int32(0)
+ st.err = func() error {
+ if atomic.AddInt32(&count, 1) == 1 {
+ return errors.WrapTransient(errors.New("test error"))
}
+ return nil
+ }
+
+ bb.addFullStream("foo/+/bar", 128)
+ err := coll.Process(c, bb.bundle())
+ So(err, ShouldNotBeNil)
+ So(errors.IsTransient(err), ShouldBeTrue)
+ })
+
+ Convey(`Will drop invalid LogStreamDescriptor bundle entries and process the valid ones.`, func() {
+ be := bb.genBundleEntry("foo/+/trash", 1337, 4, 5, 6, 7, 8)
+ bb.addBundleEntry(be)
+
+ bb.addStreamEntries("foo/+/trash", 0, 1, 3) // Invalid: non-contiguous
+ bb.addFullStream("foo/+/bar", 32)
+
+ err := coll.Process(c, bb.bundle())
+ So(err, ShouldNotBeNil)
+ So(errors.IsTransient(err), ShouldBeFalse)
+
+ So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/bar", 32)
+ So(st, shouldHaveStoredStream, "test-project", "foo/+/bar", indexRange{0, 31})
+
+ So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/trash", 1337)
+ So(st, shouldHaveStoredStream, "test-project", "foo/+/trash", 4, 5, 6, 7, 8)
+ })
+
+ Convey(`Will drop streams with missing (invalid) secrets.`, func() {
+ b := bb.genBase()
+ b.Secret = nil
+
+ err := coll.Process(c, bb.bundle())
+ So(err, ShouldErrLike, "invalid prefix secret")
+ So(errors.IsTransient(err), ShouldBeFalse)
+ })
+
+ Convey(`Will drop messages with mismatching secrets.`, func() {
+ bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2)
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ // Push another bundle with a different secret.
+ b := bb.genBase()
+ b.Secret = bytes.Repeat([]byte{0xAA}, types.PrefixSecretLength)
+ be := bb.genBundleEntry("foo/+/bar", 4, 3, 4)
+ be.TerminalIndex = 1337
+ bb.addBundleEntry(be)
+ bb.addFullStream("foo/+/baz", 3)
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/bar", -1)
+ So(st, shouldHaveStoredStream, "test-project", "foo/+/bar", indexRange{0, 2})
+
+ So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/baz", 2)
+ So(st, shouldHaveStoredStream, "test-project", "foo/+/baz", indexRange{0, 2})
+ })
+
+ Convey(`With an empty project name`, func() {
+ b := bb.genBase()
+ b.Project = ""
+ bb.addFullStream("foo/+/baz", 3)
+
+ // TODO(dnj): Enable this when project name is required.
+ SkipConvey(`Will drop the stream.`, func() {
+
+ err := coll.Process(c, bb.bundle())
+ So(err, ShouldErrLike, "invalid project name")
+ So(errors.IsTransient(err), ShouldBeFalse)
+ })
+
+ Convey(`Will register the stream.`, func() {
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
- Convey(`Can process multiple single full streams from a Butler bundle.`, func() {
- bb.addFullStream("foo/+/bar", 128)
- bb.addFullStream("foo/+/baz", 256)
-
- So(coll.Process(c, bb.bundle()), ShouldBeNil)
-
- So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 127)
- So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 127})
-
- So(tcc, shouldHaveRegisteredStream, "foo/+/baz", 255)
- So(st, shouldHaveStoredStream, "foo/+/baz", indexRange{0, 255})
- })
-
- Convey(`Will return a transient error if a transient error happened while registering.`, func() {
- tcc.errC = make(chan error, 1)
- tcc.errC <- errors.WrapTransient(errors.New("test error"))
-
- bb.addFullStream("foo/+/bar", 128)
- err := coll.Process(c, bb.bundle())
- So(err, ShouldNotBeNil)
- So(errors.IsTransient(err), ShouldBeTrue)
- })
-
- Convey(`Will return an error if a non-transient error happened while registering.`, func() {
- tcc.errC = make(chan error, 1)
- tcc.errC <- errors.New("test error")
-
- bb.addFullStream("foo/+/bar", 128)
- err := coll.Process(c, bb.bundle())
- So(err, ShouldNotBeNil)
- So(errors.IsTransient(err), ShouldBeFalse)
- })
-
- Convey(`Will return a transient error if a transient error happened while terminating.`, func() {
- tcc.errC = make(chan error, 2)
- tcc.errC <- nil // Register
- tcc.errC <- errors.WrapTransient(errors.New("test error")) // Terminate
-
- bb.addFullStream("foo/+/bar", 128)
- err := coll.Process(c, bb.bundle())
- So(err, ShouldNotBeNil)
- So(errors.IsTransient(err), ShouldBeTrue)
- })
-
- Convey(`Will return an error if a non-transient error happened while terminating.`, func() {
- tcc.errC = make(chan error, 2)
- tcc.errC <- nil // Register
- tcc.errC <- errors.New("test error") // Terminate
-
- bb.addFullStream("foo/+/bar", 128)
- err := coll.Process(c, bb.bundle())
- So(err, ShouldNotBeNil)
- So(errors.IsTransient(err), ShouldBeFalse)
- })
-
- Convey(`Will return a transient error if a transient error happened on storage.`, func() {
- // Single transient error.
- count := int32(0)
- st.err = func() error {
- if atomic.AddInt32(&count, 1) == 1 {
- return errors.WrapTransient(errors.New("test error"))
- }
- return nil
- }
-
- bb.addFullStream("foo/+/bar", 128)
- err := coll.Process(c, bb.bundle())
- So(err, ShouldNotBeNil)
- So(errors.IsTransient(err), ShouldBeTrue)
- })
-
- Convey(`Will drop invalid LogStreamDescriptor bundle entries and process the valid ones.`, func() {
- be := bb.genBundleEntry("foo/+/trash", 1337, 4, 5, 6, 7, 8)
- bb.addBundleEntry(be)
-
- bb.addStreamEntries("foo/+/trash", 0, 1, 3) // Invalid: non-contiguous
- bb.addFullStream("foo/+/bar", 32)
-
- err := coll.Process(c, bb.bundle())
- So(err, ShouldNotBeNil)
- So(errors.IsTransient(err), ShouldBeFalse)
-
- So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 32)
- So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 31})
-
- So(tcc, shouldHaveRegisteredStream, "foo/+/trash", 1337)
- So(st, shouldHaveStoredStream, "foo/+/trash", 4, 5, 6, 7, 8)
- })
-
- Convey(`Will drop streams with missing secrets.`, func() {
- be := bb.genBundleEntry("foo/+/trash", 2, 0, 1, 2)
- be.Secret = nil
- bb.addBundleEntry(be)
-
- err := coll.Process(c, bb.bundle())
- So(err, ShouldErrLike, "missing stream secret")
- So(errors.IsTransient(err), ShouldBeFalse)
- So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar")
- })
-
- Convey(`Will drop messages with mismatching secrets.`, func() {
- bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2)
- So(coll.Process(c, bb.bundle()), ShouldBeNil)
-
- // Push another bundle with a different secret.
- be := bb.genBundleEntry("foo/+/bar", 4, 3, 4)
- be.Secret = bytes.Repeat([]byte{0xAA}, types.PrefixSecretLength)
- be.TerminalIndex = 1337
- bb.addBundleEntry(be)
- bb.addFullStream("foo/+/baz", 3)
- So(coll.Process(c, bb.bundle()), ShouldBeNil)
-
- So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1)
- So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 2})
-
- So(tcc, shouldHaveRegisteredStream, "foo/+/baz", 2)
- So(st, shouldHaveStoredStream, "foo/+/baz", indexRange{0, 2})
- })
-
- Convey(`Will return no error if the data has a corrupt bundle header.`, func() {
- So(coll.Process(c, []byte{0x00}), ShouldBeNil)
- So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar")
- })
-
- Convey(`Will drop bundles with unknown ProtoVersion string.`, func() {
- buf := bytes.Buffer{}
- w := butlerproto.Writer{ProtoVersion: "!!!invalid!!!"}
- w.Write(&buf, &logpb.ButlerLogBundle{})
-
- So(coll.Process(c, buf.Bytes()), ShouldBeNil)
-
- So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar")
- })
-
- Convey(`Will not ingest records if the stream is archived.`, func() {
- tcc.register(coordinator.LogStreamState{
- Path: "foo/+/bar",
- Secret: testSecret,
- TerminalIndex: -1,
- Archived: true,
- })
-
- bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 3, 4)
- So(coll.Process(c, bb.bundle()), ShouldBeNil)
-
- So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1)
- So(st, shouldHaveStoredStream, "foo/+/bar")
- })
-
- Convey(`Will not ingest records if the stream is purged.`, func() {
- tcc.register(coordinator.LogStreamState{
- Path: "foo/+/bar",
- Secret: testSecret,
- TerminalIndex: -1,
- Purged: true,
- })
-
- So(coll.Process(c, bb.bundle()), ShouldBeNil)
-
- So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1)
- So(st, shouldHaveStoredStream, "foo/+/bar")
- })
-
- Convey(`Will not ingest a bundle with no bundle entries.`, func() {
- So(coll.Process(c, bb.bundleWithEntries()), ShouldBeNil)
- })
-
- Convey(`Will not ingest a bundle whose log entries don't match their descriptor.`, func() {
- be := bb.genBundleEntry("foo/+/bar", 4, 0, 1, 2, 3, 4)
-
- // Add a binary log entry. This does NOT match the text descriptor, and
- // should fail validation.
- be.Logs = append(be.Logs, &logpb.LogEntry{
- StreamIndex: 2,
- Sequence: 2,
- Content: &logpb.LogEntry_Binary{
- &logpb.Binary{
- Data: []byte{0xd0, 0x6f, 0x00, 0xd5},
- },
- },
- })
- bb.addBundleEntry(be)
- So(coll.Process(c, bb.bundle()), ShouldErrLike, "invalid log entry")
-
- So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar")
- })
+ So(tcc, shouldHaveRegisteredStream, "", "foo/+/baz", 2)
+ So(st, shouldHaveStoredStream, "", "foo/+/baz", indexRange{0, 2})
})
- }
+ })
+
+ Convey(`Will drop streams with invalid project names.`, func() {
+ b := bb.genBase()
+ b.Project = "!!!invalid name!!!"
+ So(config.ProjectName(b.Project).Validate(), ShouldNotBeNil)
+
+ err := coll.Process(c, bb.bundle())
+ So(err, ShouldErrLike, "invalid bundle project name")
+ So(errors.IsTransient(err), ShouldBeFalse)
+ })
+
+ Convey(`Will drop streams with empty bundle prefixes.`, func() {
+ b := bb.genBase()
+ b.Prefix = ""
+
+ err := coll.Process(c, bb.bundle())
+ So(err, ShouldErrLike, "invalid bundle prefix")
+ So(errors.IsTransient(err), ShouldBeFalse)
+ })
+
+ Convey(`Will drop streams with invalid bundle prefixes.`, func() {
+ b := bb.genBase()
+ b.Prefix = "!!!invalid prefix!!!"
+ So(types.StreamName(b.Prefix).Validate(), ShouldNotBeNil)
+
+ err := coll.Process(c, bb.bundle())
+ So(err, ShouldErrLike, "invalid bundle prefix")
+ So(errors.IsTransient(err), ShouldBeFalse)
+ })
+
+ Convey(`Will drop streams whose descriptor prefix doesn't match its bundle's prefix.`, func() {
+ bb.addStreamEntries("baz/+/bar", 3, 0, 1, 2, 3, 4)
+
+ err := coll.Process(c, bb.bundle())
+ So(err, ShouldErrLike, "mismatched bundle and entry prefixes")
+ So(errors.IsTransient(err), ShouldBeFalse)
+ })
+
+ Convey(`Will return no error if the data has a corrupt bundle header.`, func() {
+ So(coll.Process(c, []byte{0x00}), ShouldBeNil)
+ So(tcc, shouldNotHaveRegisteredStream, "test-project", "foo/+/bar")
+ })
+
+ Convey(`Will drop bundles with unknown ProtoVersion string.`, func() {
+ buf := bytes.Buffer{}
+ w := butlerproto.Writer{ProtoVersion: "!!!invalid!!!"}
+ w.Write(&buf, &logpb.ButlerLogBundle{})
+
+ So(coll.Process(c, buf.Bytes()), ShouldBeNil)
+
+ So(tcc, shouldNotHaveRegisteredStream, "test-project", "foo/+/bar")
+ })
+
+ Convey(`Will not ingest records if the stream is archived.`, func() {
+ tcc.register(coordinator.LogStreamState{
+ Project: "test-project",
+ Path: "foo/+/bar",
+ Secret: testSecret,
+ TerminalIndex: -1,
+ Archived: true,
+ })
+
+ bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 3, 4)
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/bar", -1)
+ So(st, shouldHaveStoredStream, "test-project", "foo/+/bar")
+ })
+
+ Convey(`Will not ingest records if the stream is purged.`, func() {
+ tcc.register(coordinator.LogStreamState{
+ Project: "test-project",
+ Path: "foo/+/bar",
+ Secret: testSecret,
+ TerminalIndex: -1,
+ Purged: true,
+ })
+
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/bar", -1)
+ So(st, shouldHaveStoredStream, "test-project", "foo/+/bar")
+ })
+
+ Convey(`Will not ingest a bundle with no bundle entries.`, func() {
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+ })
+
+ Convey(`Will not ingest a bundle whose log entries don't match their descriptor.`, func() {
+ be := bb.genBundleEntry("foo/+/bar", 4, 0, 1, 2, 3, 4)
+
+ // Add a binary log entry. This does NOT match the text descriptor, and
+ // should fail validation.
+ be.Logs = append(be.Logs, &logpb.LogEntry{
+ StreamIndex: 2,
+ Sequence: 2,
+ Content: &logpb.LogEntry_Binary{
+ &logpb.Binary{
+ Data: []byte{0xd0, 0x6f, 0x00, 0xd5},
+ },
+ },
+ })
+ bb.addBundleEntry(be)
+ So(coll.Process(c, bb.bundle()), ShouldErrLike, "invalid log entry")
+
+ So(tcc, shouldNotHaveRegisteredStream, "test-project", "foo/+/bar")
+ })
})
}
+
+// TestCollector runs through a series of end-to-end Collector workflows and
+// ensures that the Collector behaves appropriately.
+func TestCollector(t *testing.T) {
+ t.Parallel()
+
+ testCollectorImpl(t, false)
+}
+
+// TestCollectorWithCaching runs through a series of end-to-end Collector
+// workflows and ensures that the Collector behaves appropriately.
+func TestCollectorWithCaching(t *testing.T) {
+ t.Parallel()
+
+ testCollectorImpl(t, true)
+}
« no previous file with comments | « server/internal/logdog/collector/collector.go ('k') | server/internal/logdog/collector/coordinator/cache.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698