| Index: server/internal/logdog/collector/collector_test.go
|
| diff --git a/server/internal/logdog/collector/collector_test.go b/server/internal/logdog/collector/collector_test.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..fccdccc79ac055d03314c47c690b567b848ef862
|
| --- /dev/null
|
| +++ b/server/internal/logdog/collector/collector_test.go
|
| @@ -0,0 +1,490 @@
|
| +// Copyright 2016 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package collector
|
| +
|
| +import (
|
| + "bytes"
|
| + "fmt"
|
| + "sort"
|
| + "strings"
|
| + "sync/atomic"
|
| + "testing"
|
| + "time"
|
| +
|
| + "github.com/golang/protobuf/proto"
|
| + "github.com/luci/luci-go/common/clock"
|
| + "github.com/luci/luci-go/common/clock/testclock"
|
| + "github.com/luci/luci-go/common/errors"
|
| + "github.com/luci/luci-go/common/logdog/butlerproto"
|
| + "github.com/luci/luci-go/common/logdog/protocol"
|
| + "github.com/luci/luci-go/common/logdog/types"
|
| + "github.com/luci/luci-go/common/proto/google"
|
| + "github.com/luci/luci-go/server/logdog/storage"
|
| + "github.com/luci/luci-go/server/logdog/storage/memory"
|
| + "golang.org/x/net/context"
|
| +
|
| + . "github.com/smartystreets/goconvey/convey"
|
| +)
|
| +
|
| +var testSecret = bytes.Repeat([]byte{0x55}, types.StreamSecretLength)
|
| +
|
| +type bundleBuilder struct {
|
| + context.Context
|
| +
|
| + base time.Time
|
| + entries []*protocol.ButlerLogBundle_Entry
|
| +}
|
| +
|
| +func (b *bundleBuilder) addBundleEntry(be *protocol.ButlerLogBundle_Entry) {
|
| + if b.base.IsZero() {
|
| + b.base = clock.Now(b)
|
| + }
|
| +
|
| + b.entries = append(b.entries, be)
|
| +}
|
| +
|
| +func (b *bundleBuilder) genBundleEntry(name string, tidx int, idxs ...int) *protocol.ButlerLogBundle_Entry {
|
| + p, n := types.StreamPath(name).Split()
|
| + be := protocol.ButlerLogBundle_Entry{
|
| + Secret: testSecret,
|
| + Desc: &protocol.LogStreamDescriptor{
|
| + Prefix: string(p),
|
| + Name: string(n),
|
| + ContentType: "application/test-message",
|
| + StreamType: protocol.LogStreamDescriptor_TEXT,
|
| + Timestamp: google.NewTimestamp(clock.Now(b)),
|
| + },
|
| + }
|
| +
|
| + if len(idxs) > 0 {
|
| + be.Logs = make([]*protocol.LogEntry, len(idxs))
|
| + for i, idx := range idxs {
|
| + be.Logs[i] = b.logEntry(idx)
|
| + }
|
| + if tidx >= 0 {
|
| + be.Terminal = true
|
| + be.TerminalIndex = uint64(tidx)
|
| + }
|
| + }
|
| +
|
| + return &be
|
| +}
|
| +
|
| +func (b *bundleBuilder) addStreamEntries(name string, term int, idxs ...int) {
|
| + b.addBundleEntry(b.genBundleEntry(name, term, idxs...))
|
| +}
|
| +
|
| +func (b *bundleBuilder) addFullStream(name string, count int) {
|
| + idxs := make([]int, count)
|
| + for i := range idxs {
|
| + idxs[i] = i
|
| + }
|
| + b.addStreamEntries(name, count-1, idxs...)
|
| +}
|
| +
|
| +func (b *bundleBuilder) logEntry(idx int) *protocol.LogEntry {
|
| + return &protocol.LogEntry{
|
| + StreamIndex: uint64(idx),
|
| + Sequence: uint64(idx),
|
| + Content: &protocol.LogEntry_Text{
|
| + Text: &protocol.Text{
|
| + Lines: []*protocol.Text_Line{
|
| + {
|
| + Value: fmt.Sprintf("Line #%d", idx),
|
| + Delimiter: "\n",
|
| + },
|
| + },
|
| + },
|
| + },
|
| + }
|
| +}
|
| +
|
| +func (b *bundleBuilder) bundle() []byte {
|
| + bytes := b.bundleWithEntries(b.entries...)
|
| + b.entries = nil
|
| + return bytes
|
| +}
|
| +
|
| +func (b *bundleBuilder) bundleWithEntries(e ...*protocol.ButlerLogBundle_Entry) []byte {
|
| + bundle := protocol.ButlerLogBundle{
|
| + Source: "test stream",
|
| + Timestamp: google.NewTimestamp(clock.Now(b)),
|
| + Entries: e,
|
| + }
|
| +
|
| + buf := bytes.Buffer{}
|
| + w := butlerproto.Writer{Compress: true}
|
| + if err := w.Write(&buf, &bundle); err != nil {
|
| + panic(err)
|
| + }
|
| + return buf.Bytes()
|
| +}
|
| +
|
| +type indexRange struct {
|
| + start int
|
| + end int
|
| +}
|
| +
|
| +func (r *indexRange) String() string { return fmt.Sprintf("[%d..%d]", r.start, r.end) }
|
| +
|
| +// shouldHaveRegisteredStream asserts that a testCoordinatorClient has
|
| +// registered a stream (string) and its terminal index (int).
|
| +func shouldHaveRegisteredStream(actual interface{}, expected ...interface{}) string {
|
| + tcc := actual.(*testCoordinatorClient)
|
| + name := expected[0].(string)
|
| + tidx := expected[1].(int)
|
| +
|
| + cur, ok := tcc.stream(name)
|
| + if !ok {
|
| + return fmt.Sprintf("stream %q is not registered", name)
|
| + }
|
| + if tidx >= 0 && cur < 0 {
|
| + return fmt.Sprintf("stream %q is expected to be terminated, but isn't.", name)
|
| + }
|
| + if cur >= 0 && tidx < 0 {
|
| + return fmt.Sprintf("stream %q is NOT expected to be terminated, but it is.", name)
|
| + }
|
| + return ""
|
| +}
|
| +
|
| +// shoudNotHaveRegisteredStream asserts that a testCoordinatorClient has not
|
| +// registered a stream (string).
|
| +func shouldNotHaveRegisteredStream(actual interface{}, expected ...interface{}) string {
|
| + tcc := actual.(*testCoordinatorClient)
|
| + name := expected[0].(string)
|
| +
|
| + if _, ok := tcc.stream(name); ok {
|
| + return fmt.Sprintf("stream %q is registered, but it shoult NOT be.", name)
|
| + }
|
| + return ""
|
| +}
|
| +
|
| +// shouldHaveStoredStream asserts that a storage.Storage instance has contiguous
|
| +// stream records in it.
|
| +//
|
| +// actual is the storage.Storage instance. expected is a stream name (string)
|
| +// followed by a a series of records to assert. This can either be a specific
|
| +// integer index or an intexRange marking a closed range of indices.
|
| +func shouldHaveStoredStream(actual interface{}, expected ...interface{}) string {
|
| + st := actual.(storage.Storage)
|
| + name := expected[0].(string)
|
| +
|
| + // Load all entries for this stream.
|
| + req := storage.GetRequest{
|
| + Path: types.StreamPath(name),
|
| + }
|
| +
|
| + entries := make(map[int]*protocol.LogEntry)
|
| + var ierr error
|
| + err := st.Get(&req, func(idx types.MessageIndex, d []byte) bool {
|
| + le := protocol.LogEntry{}
|
| + if ierr = proto.Unmarshal(d, &le); ierr != nil {
|
| + return false
|
| + }
|
| + entries[int(idx)] = &le
|
| + return true
|
| + })
|
| + if ierr != nil {
|
| + err = ierr
|
| + }
|
| + if err != nil && err != storage.ErrDoesNotExist {
|
| + return fmt.Sprintf("error: %v", err)
|
| + }
|
| +
|
| + assertLogEntry := func(i int) string {
|
| + le := entries[i]
|
| + if le == nil {
|
| + return fmt.Sprintf("%d", i)
|
| + }
|
| + delete(entries, i)
|
| +
|
| + if le.StreamIndex != uint64(i) {
|
| + return fmt.Sprintf("*%d", i)
|
| + }
|
| + return ""
|
| + }
|
| +
|
| + var failed []string
|
| + for _, exp := range expected[1:] {
|
| + switch e := exp.(type) {
|
| + case int:
|
| + if err := assertLogEntry(e); err != "" {
|
| + failed = append(failed, err)
|
| + }
|
| +
|
| + case indexRange:
|
| + var errs []string
|
| + for i := e.start; i <= e.end; i++ {
|
| + if err := assertLogEntry(i); err != "" {
|
| + errs = append(errs, err)
|
| + }
|
| + }
|
| + if len(errs) > 0 {
|
| + failed = append(failed, fmt.Sprintf("%s{%s}", e.String(), strings.Join(errs, ",")))
|
| + }
|
| +
|
| + default:
|
| + panic(fmt.Errorf("unknown expected type %T", e))
|
| + }
|
| + }
|
| +
|
| + // Extras?
|
| + if len(entries) > 0 {
|
| + idxs := make([]int, 0, len(entries))
|
| + for i := range entries {
|
| + idxs = append(idxs, i)
|
| + }
|
| + sort.Ints(idxs)
|
| +
|
| + extra := make([]string, len(idxs))
|
| + for i, idx := range idxs {
|
| + extra[i] = fmt.Sprintf("%d", idx)
|
| + }
|
| + failed = append(failed, fmt.Sprintf("extra{%s}", strings.Join(extra, ",")))
|
| + }
|
| +
|
| + if len(failed) > 0 {
|
| + return strings.Join(failed, ", ")
|
| + }
|
| + return ""
|
| +}
|
| +
|
| +// TestCollector runs through a series of end-to-end Collector workflows and
|
| +// ensures that the Collector behaves appropriately.
|
| +func TestCollector(t *testing.T) {
|
| + t.Parallel()
|
| +
|
| + Convey(`Using a test configuration`, t, func() {
|
| + c, _ := testclock.UseTime(context.Background(), testclock.TestTimeLocal)
|
| +
|
| + tcc := &testCoordinatorClient{}
|
| + st := &testStorage{Storage: &memory.Storage{}}
|
| +
|
| + coll := New(Options{
|
| + Storage: st,
|
| + Coordinator: tcc,
|
| + })
|
| +
|
| + bb := bundleBuilder{
|
| + Context: c,
|
| + }
|
| +
|
| + Convey(`Can process multiple single full streams from a Butler bundle.`, func() {
|
| + bb.addFullStream("foo/+/bar", 128)
|
| + bb.addFullStream("foo/+/baz", 256)
|
| +
|
| + So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| +
|
| + So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 127)
|
| + So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 127})
|
| +
|
| + So(tcc, shouldHaveRegisteredStream, "foo/+/baz", 255)
|
| + So(st, shouldHaveStoredStream, "foo/+/baz", indexRange{0, 255})
|
| + })
|
| +
|
| + Convey(`Will return an error if a transient error happened while registering.`, func() {
|
| + tcc.errC = make(chan error, 1)
|
| + tcc.errC <- errors.WrapTransient(errors.New("test error"))
|
| +
|
| + bb.addFullStream("foo/+/bar", 128)
|
| + err := coll.Process(c, bb.bundle())
|
| + So(err, ShouldNotBeNil)
|
| + })
|
| +
|
| + Convey(`Will not return an error if a non-transient error happened while registering.`, func() {
|
| + tcc.errC = make(chan error, 1)
|
| + tcc.errC <- errors.New("test error")
|
| +
|
| + bb.addFullStream("foo/+/bar", 128)
|
| + So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| + })
|
| +
|
| + Convey(`Will return an error if a transient error happened while terminating.`, func() {
|
| + tcc.errC = make(chan error, 2)
|
| + tcc.errC <- nil // Register
|
| + tcc.errC <- errors.WrapTransient(errors.New("test error")) // Terminate
|
| +
|
| + bb.addFullStream("foo/+/bar", 128)
|
| + So(coll.Process(c, bb.bundle()), ShouldNotBeNil)
|
| + })
|
| +
|
| + Convey(`Will not return an error if a non-transient error happened while terminating.`, func() {
|
| + tcc.errC = make(chan error, 2)
|
| + tcc.errC <- nil // Register
|
| + tcc.errC <- errors.New("test error") // Terminate
|
| +
|
| + bb.addFullStream("foo/+/bar", 128)
|
| + So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| + })
|
| +
|
| + Convey(`Will return an error if a transient error happened on storage.`, func() {
|
| + // Single transient error.
|
| + count := int32(0)
|
| + st.err = func() error {
|
| + if atomic.AddInt32(&count, 1) == 1 {
|
| + return errors.WrapTransient(errors.New("test error"))
|
| + }
|
| + return nil
|
| + }
|
| +
|
| + bb.addFullStream("foo/+/bar", 128)
|
| + So(coll.Process(c, bb.bundle()), ShouldNotBeNil)
|
| + })
|
| +
|
| + Convey(`Will not return an error if a non-transient error happened on storage.`, func() {
|
| + // Single non-transient error.
|
| + count := int32(0)
|
| + st.err = func() error {
|
| + if atomic.AddInt32(&count, 1) == 1 {
|
| + return errors.New("test error")
|
| + }
|
| + return nil
|
| + }
|
| +
|
| + bb.addFullStream("foo/+/bar", 128)
|
| + So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| + })
|
| +
|
| + Convey(`Will drop invalid LogStreamDescriptor bundle entries and process the valid ones.`, func() {
|
| + be := bb.genBundleEntry("foo/+/trash", 1337, 4, 6, 7, 8)
|
| + be.Desc.ContentType = "" // Missing ContentType => invalid.
|
| +
|
| + bb.addStreamEntries("foo/+/trash", -1, 0, 1, 2, 3, 5)
|
| + bb.addBundleEntry(be)
|
| + bb.addFullStream("foo/+/bar", 32)
|
| +
|
| + So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| +
|
| + So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 32)
|
| + So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 31})
|
| +
|
| + So(tcc, shouldHaveRegisteredStream, "foo/+/trash", -1)
|
| + So(st, shouldHaveStoredStream, "foo/+/trash", 0, 1, 2, 3, 5)
|
| + })
|
| +
|
| + Convey(`Will drop streams with missing secrets.`, func() {
|
| + be := bb.genBundleEntry("foo/+/trash", 2, 0, 1, 2)
|
| + be.Secret = nil
|
| + bb.addBundleEntry(be)
|
| +
|
| + So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| + So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar", 127)
|
| + })
|
| +
|
| + Convey(`Will drop messages with mismatching secrets.`, func() {
|
| + bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2)
|
| + So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| +
|
| + // Push another bundle with a different secret.
|
| + be := bb.genBundleEntry("foo/+/bar", 4, 3, 4)
|
| + be.Secret = bytes.Repeat([]byte{0xAA}, types.StreamSecretLength)
|
| + be.TerminalIndex = 1337
|
| + bb.addBundleEntry(be)
|
| + bb.addFullStream("foo/+/baz", 3)
|
| + So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| +
|
| + So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1)
|
| + So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 2})
|
| +
|
| + So(tcc, shouldHaveRegisteredStream, "foo/+/baz", 2)
|
| + So(st, shouldHaveStoredStream, "foo/+/baz", indexRange{0, 2})
|
| + })
|
| +
|
| + Convey(`Will return no error if the data has a corrupt bundle header.`, func() {
|
| + So(coll.Process(c, []byte{0x00}), ShouldBeNil)
|
| + So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar")
|
| + })
|
| +
|
| + Convey(`Will drop bundles with unknown ProtoVersion string.`, func() {
|
| + buf := bytes.Buffer{}
|
| + w := butlerproto.Writer{ProtoVersion: "!!!invalid!!!"}
|
| + w.Write(&buf, &protocol.ButlerLogBundle{})
|
| +
|
| + So(coll.Process(c, buf.Bytes()), ShouldBeNil)
|
| +
|
| + So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar")
|
| + })
|
| +
|
| + Convey(`Will drop records beyond a local terminal index.`, func() {
|
| + bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 4)
|
| + So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| +
|
| + bb.addStreamEntries("foo/+/bar", 4, 3, 5)
|
| + So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| +
|
| + So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 4)
|
| + So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 3})
|
| + })
|
| +
|
| + Convey(`Will not ingest records beyond a remote terminal index.`, func() {
|
| + tcc.register(stateProxy{
|
| + path: "foo/+/bar",
|
| + secret: testSecret,
|
| + terminalIndex: 3,
|
| + })
|
| +
|
| + bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2)
|
| + bb.addStreamEntries("foo/+/bar", 4, 3, 5)
|
| + So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| +
|
| + So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 3)
|
| + So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 3})
|
| + })
|
| +
|
| + Convey(`Will not ingest records if the stream is archived.`, func() {
|
| + tcc.register(stateProxy{
|
| + path: "foo/+/bar",
|
| + secret: testSecret,
|
| + terminalIndex: -1,
|
| + archived: true,
|
| + })
|
| +
|
| + bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 4)
|
| + So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| +
|
| + So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1)
|
| + So(st, shouldHaveStoredStream, "foo/+/bar")
|
| + })
|
| +
|
| + Convey(`Will not ingest records if the stream is purged.`, func() {
|
| + tcc.register(stateProxy{
|
| + path: "foo/+/bar",
|
| + secret: testSecret,
|
| + terminalIndex: -1,
|
| + purged: true,
|
| + })
|
| +
|
| + So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| +
|
| + So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1)
|
| + So(st, shouldHaveStoredStream, "foo/+/bar")
|
| + })
|
| +
|
| + Convey(`Will not ingest a bundle with no bundle entries.`, func() {
|
| + So(coll.Process(c, bb.bundleWithEntries()), ShouldBeNil)
|
| + })
|
| +
|
| + Convey(`Will not ingest a bundle whose log entries don't match their descriptor.`, func() {
|
| + be := bb.genBundleEntry("foo/+/bar", 4, 0, 1, 3, 4)
|
| +
|
| + // Add a binary log entry. This does NOT match the text descriptor, and
|
| + // should fail validation.
|
| + be.Logs = append(be.Logs, &protocol.LogEntry{
|
| + StreamIndex: 2,
|
| + Sequence: 2,
|
| + Content: &protocol.LogEntry_Binary{
|
| + &protocol.Binary{
|
| + Data: []byte{0xd0, 0x6f, 0x00, 0xd5},
|
| + },
|
| + },
|
| + })
|
| + bb.addBundleEntry(be)
|
| + So(coll.Process(c, bb.bundle()), ShouldBeNil)
|
| +
|
| + So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 4)
|
| + So(st, shouldHaveStoredStream, "foo/+/bar", 0, 1, 3, 4)
|
| + })
|
| + })
|
| +}
|
|
|