Chromium Code Reviews| Index: server/internal/logdog/collector/collector_test.go |
| diff --git a/server/internal/logdog/collector/collector_test.go b/server/internal/logdog/collector/collector_test.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..52de53923bd9f072cd18adae2e37e315222cc8f7 |
| --- /dev/null |
| +++ b/server/internal/logdog/collector/collector_test.go |
| @@ -0,0 +1,271 @@ |
| +// Copyright 2016 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package collector |
| + |
| +import ( |
| + "bytes" |
| + "fmt" |
| + "sync/atomic" |
| + "testing" |
| + |
| + "github.com/luci/luci-go/common/clock/testclock" |
| + "github.com/luci/luci-go/common/errors" |
| + "github.com/luci/luci-go/common/logdog/butlerproto" |
| + "github.com/luci/luci-go/common/logdog/types" |
| + "github.com/luci/luci-go/common/proto/logdog/logpb" |
| + "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" |
| + "github.com/luci/luci-go/server/logdog/storage/memory" |
| + "golang.org/x/net/context" |
| + |
| + . "github.com/smartystreets/goconvey/convey" |
| +) |
| + |
| +// TestCollector runs through a series of end-to-end Collector workflows and |
| +// ensures that the Collector behaves appropriately. |
| +func TestCollector(t *testing.T) { |
| + t.Parallel() |
| + |
| + Convey(`Using a test configuration`, t, func() { |
| + c, _ := testclock.UseTime(context.Background(), testclock.TestTimeLocal) |
| + |
| + tcc := &testCoordinator{} |
| + st := &testStorage{Storage: &memory.Storage{}} |
| + |
| + coll := &Collector{ |
| + Coordinator: tcc, |
| + Storage: st, |
| + } |
| + |
| + bb := bundleBuilder{ |
| + Context: c, |
| + } |
| + |
| + for _, v := range []bool{ |
|
iannucci
2016/02/05 23:41:01
for _, phrase := range []string{"disabled", "enabl
dnj (Google)
2016/02/06 04:10:36
Done.
|
| + false, |
| + true, |
| + } { |
| + phrase := "disabled" |
| + if v { |
| + phrase = "enabled" |
| + } |
| + Convey(fmt.Sprintf(`When caching is %s`, phrase), func() { |
| + if v { |
| + coll.Coordinator = coordinator.NewCache(coll.Coordinator, 0, 0) |
| + } |
| + |
| + Convey(`Can process multiple single full streams from a Butler bundle.`, func() { |
| + bb.addFullStream("foo/+/bar", 128) |
| + bb.addFullStream("foo/+/baz", 256) |
| + |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + |
| + So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 127) |
| + So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 127}) |
| + |
| + So(tcc, shouldHaveRegisteredStream, "foo/+/baz", 255) |
| + So(st, shouldHaveStoredStream, "foo/+/baz", indexRange{0, 255}) |
| + }) |
| + |
| + Convey(`Will return a transient error if a transient error happened while registering.`, func() { |
| + tcc.errC = make(chan error, 1) |
| + tcc.errC <- errors.WrapTransient(errors.New("test error")) |
| + |
| + bb.addFullStream("foo/+/bar", 128) |
| + err := coll.Process(c, bb.bundle()) |
| + So(err, ShouldNotBeNil) |
| + So(errors.IsTransient(err), ShouldBeTrue) |
| + }) |
| + |
| + Convey(`Will return an error if a non-transient error happened while registering.`, func() { |
| + tcc.errC = make(chan error, 1) |
| + tcc.errC <- errors.New("test error") |
| + |
| + bb.addFullStream("foo/+/bar", 128) |
| + err := coll.Process(c, bb.bundle()) |
| + So(err, ShouldNotBeNil) |
| + So(errors.IsTransient(err), ShouldBeFalse) |
| + }) |
| + |
| + Convey(`Will return a transient error if a transient error happened while terminating.`, func() { |
| + tcc.errC = make(chan error, 2) |
| + tcc.errC <- nil // Register |
| + tcc.errC <- errors.WrapTransient(errors.New("test error")) // Terminate |
| + |
| + bb.addFullStream("foo/+/bar", 128) |
| + err := coll.Process(c, bb.bundle()) |
| + So(err, ShouldNotBeNil) |
| + So(errors.IsTransient(err), ShouldBeTrue) |
| + }) |
| + |
| + Convey(`Will return an error if a non-transient error happened while terminating.`, func() { |
| + tcc.errC = make(chan error, 2) |
| + tcc.errC <- nil // Register |
| + tcc.errC <- errors.New("test error") // Terminate |
| + |
| + bb.addFullStream("foo/+/bar", 128) |
| + err := coll.Process(c, bb.bundle()) |
| + So(err, ShouldNotBeNil) |
| + So(errors.IsTransient(err), ShouldBeFalse) |
| + }) |
| + |
| + Convey(`Will return a transient error if an error happened on storage.`, func() { |
| + // Single transient error. |
| + count := int32(0) |
| + st.err = func() error { |
| + if atomic.AddInt32(&count, 1) == 1 { |
| + return errors.New("test error") |
| + } |
| + return nil |
| + } |
| + |
| + bb.addFullStream("foo/+/bar", 128) |
| + err := coll.Process(c, bb.bundle()) |
| + So(err, ShouldNotBeNil) |
| + So(errors.IsTransient(err), ShouldBeTrue) |
| + }) |
| + |
| + Convey(`Will drop invalid LogStreamDescriptor bundle entries and process the valid ones.`, func() { |
| + be := bb.genBundleEntry("foo/+/trash", 1337, 4, 6, 7, 8) |
| + be.Desc.ContentType = "" // Missing ContentType => invalid. |
| + |
| + bb.addStreamEntries("foo/+/trash", -1, 0, 1, 2, 3, 5) |
| + bb.addBundleEntry(be) |
| + bb.addFullStream("foo/+/bar", 32) |
| + |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + |
| + So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 32) |
| + So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 31}) |
| + |
| + So(tcc, shouldHaveRegisteredStream, "foo/+/trash", -1) |
| + So(st, shouldHaveStoredStream, "foo/+/trash", 0, 1, 2, 3, 5) |
| + }) |
| + |
| + Convey(`Will drop streams with missing secrets.`, func() { |
| + be := bb.genBundleEntry("foo/+/trash", 2, 0, 1, 2) |
| + be.Secret = nil |
| + bb.addBundleEntry(be) |
| + |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar", 127) |
| + }) |
| + |
| + Convey(`Will drop messages with mismatching secrets.`, func() { |
| + bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2) |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + |
| + // Push another bundle with a different secret. |
| + be := bb.genBundleEntry("foo/+/bar", 4, 3, 4) |
| + be.Secret = bytes.Repeat([]byte{0xAA}, types.StreamSecretLength) |
| + be.TerminalIndex = 1337 |
| + bb.addBundleEntry(be) |
| + bb.addFullStream("foo/+/baz", 3) |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + |
| + So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1) |
| + So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 2}) |
| + |
| + So(tcc, shouldHaveRegisteredStream, "foo/+/baz", 2) |
| + So(st, shouldHaveStoredStream, "foo/+/baz", indexRange{0, 2}) |
| + }) |
| + |
| + Convey(`Will return no error if the data has a corrupt bundle header.`, func() { |
| + So(coll.Process(c, []byte{0x00}), ShouldBeNil) |
| + So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar") |
| + }) |
| + |
| + Convey(`Will drop bundles with unknown ProtoVersion string.`, func() { |
| + buf := bytes.Buffer{} |
| + w := butlerproto.Writer{ProtoVersion: "!!!invalid!!!"} |
| + w.Write(&buf, &logpb.ButlerLogBundle{}) |
| + |
| + So(coll.Process(c, buf.Bytes()), ShouldBeNil) |
| + |
| + So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar") |
| + }) |
| + |
| + Convey(`Will drop records beyond a local terminal index.`, func() { |
| + bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 4) |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + |
| + bb.addStreamEntries("foo/+/bar", 3, 3, 5) |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + |
| + So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 4) |
| + So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 3}) |
| + }) |
| + |
| + Convey(`Will not ingest records beyond a remote terminal index.`, func() { |
| + tcc.register(coordinator.LogStreamState{ |
| + Path: "foo/+/bar", |
| + Secret: testSecret, |
| + TerminalIndex: 3, |
| + }) |
| + |
| + bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2) |
| + bb.addStreamEntries("foo/+/bar", 3, 3, 5) |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + |
| + So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 3) |
| + So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 3}) |
| + }) |
| + |
| + Convey(`Will not ingest records if the stream is archived.`, func() { |
| + tcc.register(coordinator.LogStreamState{ |
| + Path: "foo/+/bar", |
| + Secret: testSecret, |
| + TerminalIndex: -1, |
| + Archived: true, |
| + }) |
| + |
| + bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 4) |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + |
| + So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1) |
| + So(st, shouldHaveStoredStream, "foo/+/bar") |
| + }) |
| + |
| + Convey(`Will not ingest records if the stream is purged.`, func() { |
| + tcc.register(coordinator.LogStreamState{ |
| + Path: "foo/+/bar", |
| + Secret: testSecret, |
| + TerminalIndex: -1, |
| + Purged: true, |
| + }) |
| + |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + |
| + So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1) |
| + So(st, shouldHaveStoredStream, "foo/+/bar") |
| + }) |
| + |
| + Convey(`Will not ingest a bundle with no bundle entries.`, func() { |
| + So(coll.Process(c, bb.bundleWithEntries()), ShouldBeNil) |
| + }) |
| + |
| + Convey(`Will not ingest a bundle whose log entries don't match their descriptor.`, func() { |
| + be := bb.genBundleEntry("foo/+/bar", 4, 0, 1, 3, 4) |
| + |
| + // Add a binary log entry. This does NOT match the text descriptor, and |
| + // should fail validation. |
| + be.Logs = append(be.Logs, &logpb.LogEntry{ |
| + StreamIndex: 2, |
| + Sequence: 2, |
| + Content: &logpb.LogEntry_Binary{ |
| + &logpb.Binary{ |
| + Data: []byte{0xd0, 0x6f, 0x00, 0xd5}, |
| + }, |
| + }, |
| + }) |
| + bb.addBundleEntry(be) |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + |
| + So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 4) |
| + So(st, shouldHaveStoredStream, "foo/+/bar", 0, 1, 3, 4) |
| + }) |
| + }) |
| + } |
| + }) |
| +} |