Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1445)

Unified Diff: server/internal/logdog/collector/collector_test.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Comments, rebase. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: server/internal/logdog/collector/collector_test.go
diff --git a/server/internal/logdog/collector/collector_test.go b/server/internal/logdog/collector/collector_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..cb54365f92d6d8b0d71d2dadbbeb90960a5e0906
--- /dev/null
+++ b/server/internal/logdog/collector/collector_test.go
@@ -0,0 +1,266 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package collector
+
+import (
+ "bytes"
+ "fmt"
+ "sync/atomic"
+ "testing"
+
+ "github.com/luci/luci-go/common/clock/testclock"
+ "github.com/luci/luci-go/common/errors"
+ "github.com/luci/luci-go/common/logdog/butlerproto"
+ "github.com/luci/luci-go/common/logdog/types"
+ "github.com/luci/luci-go/common/proto/logdog/logpb"
+ "github.com/luci/luci-go/server/internal/logdog/collector/coordinator"
+ "github.com/luci/luci-go/server/logdog/storage/memory"
+ "golang.org/x/net/context"
+
+ . "github.com/smartystreets/goconvey/convey"
+)
+
+// TestCollector runs through a series of end-to-end Collector workflows and
+// ensures that the Collector behaves appropriately.
+func TestCollector(t *testing.T) {
+ t.Parallel()
+
+ Convey(`Using a test configuration`, t, func() {
+ c, _ := testclock.UseTime(context.Background(), testclock.TestTimeLocal)
+
+ tcc := &testCoordinator{}
+ st := &testStorage{Storage: &memory.Storage{}}
+
+ coll := &Collector{
+ Coordinator: tcc,
+ Storage: st,
+ }
+
+ bb := bundleBuilder{
+ Context: c,
+ }
+
+ for _, phrase := range []string{"disabled", "enabled"} {
+ v := phrase == "enabled"
+
+ Convey(fmt.Sprintf(`When caching is %s`, phrase), func() {
+ if v {
+ coll.Coordinator = coordinator.NewCache(coll.Coordinator, 0, 0)
+ }
+
+ Convey(`Can process multiple single full streams from a Butler bundle.`, func() {
+ bb.addFullStream("foo/+/bar", 128)
+ bb.addFullStream("foo/+/baz", 256)
+
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 127)
+ So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 127})
+
+ So(tcc, shouldHaveRegisteredStream, "foo/+/baz", 255)
+ So(st, shouldHaveStoredStream, "foo/+/baz", indexRange{0, 255})
+ })
+
+ Convey(`Will return a transient error if a transient error happened while registering.`, func() {
+ tcc.errC = make(chan error, 1)
+ tcc.errC <- errors.WrapTransient(errors.New("test error"))
+
+ bb.addFullStream("foo/+/bar", 128)
+ err := coll.Process(c, bb.bundle())
+ So(err, ShouldNotBeNil)
+ So(errors.IsTransient(err), ShouldBeTrue)
+ })
+
+ Convey(`Will return an error if a non-transient error happened while registering.`, func() {
+ tcc.errC = make(chan error, 1)
+ tcc.errC <- errors.New("test error")
+
+ bb.addFullStream("foo/+/bar", 128)
+ err := coll.Process(c, bb.bundle())
+ So(err, ShouldNotBeNil)
+ So(errors.IsTransient(err), ShouldBeFalse)
+ })
+
+ Convey(`Will return a transient error if a transient error happened while terminating.`, func() {
+ tcc.errC = make(chan error, 2)
+ tcc.errC <- nil // Register
+ tcc.errC <- errors.WrapTransient(errors.New("test error")) // Terminate
+
+ bb.addFullStream("foo/+/bar", 128)
+ err := coll.Process(c, bb.bundle())
+ So(err, ShouldNotBeNil)
+ So(errors.IsTransient(err), ShouldBeTrue)
+ })
+
+ Convey(`Will return an error if a non-transient error happened while terminating.`, func() {
+ tcc.errC = make(chan error, 2)
+ tcc.errC <- nil // Register
+ tcc.errC <- errors.New("test error") // Terminate
+
+ bb.addFullStream("foo/+/bar", 128)
+ err := coll.Process(c, bb.bundle())
+ So(err, ShouldNotBeNil)
+ So(errors.IsTransient(err), ShouldBeFalse)
+ })
+
+ Convey(`Will return a transient error if an error happened on storage.`, func() {
+ // Single transient error.
+ count := int32(0)
+ st.err = func() error {
+ if atomic.AddInt32(&count, 1) == 1 {
+ return errors.New("test error")
+ }
+ return nil
+ }
+
+ bb.addFullStream("foo/+/bar", 128)
+ err := coll.Process(c, bb.bundle())
+ So(err, ShouldNotBeNil)
+ So(errors.IsTransient(err), ShouldBeTrue)
+ })
+
+ Convey(`Will drop invalid LogStreamDescriptor bundle entries and process the valid ones.`, func() {
+ be := bb.genBundleEntry("foo/+/trash", 1337, 4, 6, 7, 8)
+ be.Desc.ContentType = "" // Missing ContentType => invalid.
+
+ bb.addStreamEntries("foo/+/trash", -1, 0, 1, 2, 3, 5)
+ bb.addBundleEntry(be)
+ bb.addFullStream("foo/+/bar", 32)
+
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 32)
+ So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 31})
+
+ So(tcc, shouldHaveRegisteredStream, "foo/+/trash", -1)
+ So(st, shouldHaveStoredStream, "foo/+/trash", 0, 1, 2, 3, 5)
+ })
+
+ Convey(`Will drop streams with missing secrets.`, func() {
+ be := bb.genBundleEntry("foo/+/trash", 2, 0, 1, 2)
+ be.Secret = nil
+ bb.addBundleEntry(be)
+
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+ So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar", 127)
+ })
+
+ Convey(`Will drop messages with mismatching secrets.`, func() {
+ bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2)
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ // Push another bundle with a different secret.
+ be := bb.genBundleEntry("foo/+/bar", 4, 3, 4)
+ be.Secret = bytes.Repeat([]byte{0xAA}, types.StreamSecretLength)
+ be.TerminalIndex = 1337
+ bb.addBundleEntry(be)
+ bb.addFullStream("foo/+/baz", 3)
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1)
+ So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 2})
+
+ So(tcc, shouldHaveRegisteredStream, "foo/+/baz", 2)
+ So(st, shouldHaveStoredStream, "foo/+/baz", indexRange{0, 2})
+ })
+
+ Convey(`Will return no error if the data has a corrupt bundle header.`, func() {
+ So(coll.Process(c, []byte{0x00}), ShouldBeNil)
+ So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar")
+ })
+
+ Convey(`Will drop bundles with unknown ProtoVersion string.`, func() {
+ buf := bytes.Buffer{}
+ w := butlerproto.Writer{ProtoVersion: "!!!invalid!!!"}
+ w.Write(&buf, &logpb.ButlerLogBundle{})
+
+ So(coll.Process(c, buf.Bytes()), ShouldBeNil)
+
+ So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar")
+ })
+
+ Convey(`Will drop records beyond a local terminal index.`, func() {
+ bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 4)
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ bb.addStreamEntries("foo/+/bar", 3, 3, 5)
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 4)
+ So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 3})
+ })
+
+ Convey(`Will not ingest records beyond a remote terminal index.`, func() {
+ tcc.register(coordinator.LogStreamState{
+ Path: "foo/+/bar",
+ Secret: testSecret,
+ TerminalIndex: 3,
+ })
+
+ bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2)
+ bb.addStreamEntries("foo/+/bar", 3, 3, 5)
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 3)
+ So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 3})
+ })
+
+ Convey(`Will not ingest records if the stream is archived.`, func() {
+ tcc.register(coordinator.LogStreamState{
+ Path: "foo/+/bar",
+ Secret: testSecret,
+ TerminalIndex: -1,
+ Archived: true,
+ })
+
+ bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 4)
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1)
+ So(st, shouldHaveStoredStream, "foo/+/bar")
+ })
+
+ Convey(`Will not ingest records if the stream is purged.`, func() {
+ tcc.register(coordinator.LogStreamState{
+ Path: "foo/+/bar",
+ Secret: testSecret,
+ TerminalIndex: -1,
+ Purged: true,
+ })
+
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1)
+ So(st, shouldHaveStoredStream, "foo/+/bar")
+ })
+
+ Convey(`Will not ingest a bundle with no bundle entries.`, func() {
+ So(coll.Process(c, bb.bundleWithEntries()), ShouldBeNil)
+ })
+
+ Convey(`Will not ingest a bundle whose log entries don't match their descriptor.`, func() {
+ be := bb.genBundleEntry("foo/+/bar", 4, 0, 1, 3, 4)
+
+ // Add a binary log entry. This does NOT match the text descriptor, and
+ // should fail validation.
+ be.Logs = append(be.Logs, &logpb.LogEntry{
+ StreamIndex: 2,
+ Sequence: 2,
+ Content: &logpb.LogEntry_Binary{
+ &logpb.Binary{
+ Data: []byte{0xd0, 0x6f, 0x00, 0xd5},
+ },
+ },
+ })
+ bb.addBundleEntry(be)
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 4)
+ So(st, shouldHaveStoredStream, "foo/+/bar", 0, 1, 3, 4)
+ })
+ })
+ }
+ })
+}
« no previous file with comments | « server/internal/logdog/collector/collector.go ('k') | server/internal/logdog/collector/coordinator/cache.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698