Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(80)

Unified Diff: client/internal/logdog/butler/bundler/stream_test.go

Issue 1412063008: logdog: Add bundler library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Updated from comments. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: client/internal/logdog/butler/bundler/stream_test.go
diff --git a/client/internal/logdog/butler/bundler/stream_test.go b/client/internal/logdog/butler/bundler/stream_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..9eb291e4061d4fb242c8f7d18842422cb00dd144
--- /dev/null
+++ b/client/internal/logdog/butler/bundler/stream_test.go
@@ -0,0 +1,454 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package bundler
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "strconv"
+ "testing"
+ "time"
+
+ "github.com/luci/luci-go/common/clock/testclock"
+ "github.com/luci/luci-go/common/logdog/protocol"
+ . "github.com/smartystreets/goconvey/convey"
+)
+
+type testParserCommand struct {
+ // data is the data content of this command.
+ data []byte
+ // ts is the timestamp, which is valid if this is a data command.
+ ts time.Time
+ // splitToggle, if not null, causes this command to set the "allowSplit"
+ // parser constraint.
+ splitToggle *bool
+ // closedToggle, if not null, causes this command to set the "closed"
+ // parser constraint.
+ closedToggle *bool
+ // err, if not nil, is returned when this command is encountered.
+ err error
+}
+
+var errTestInduced = errors.New("test error")
+
+// testParser is a parser implementation that allows specifically-configured
+// data to be emitted. It consumes commands, some of which alter its behavior
+// and others of which present data. The resulting state affects how it emits
+// LogEntry records via nextEntry.
+type testParser struct {
+ commands []*testParserCommand
+
+ appendedData []byte
+ truncateOn bool
+ closedOn bool
+ err error
+}
+
+func (p *testParser) addCommand(r *testParserCommand) {
+ p.commands = append(p.commands, r)
+}
+
+func (p *testParser) nextCommand(pop bool) *testParserCommand {
+ if len(p.commands) == 0 {
+ return nil
+ }
+ cmd := p.commands[0]
+ if pop {
+ p.commands = p.commands[1:]
+ }
+ return cmd
+}
+
+func (p *testParser) popData() (r *testParserCommand) {
+ for i, cmd := range p.commands {
+ if cmd.data != nil {
+ p.commands = p.commands[i+1:]
+ return cmd
+ }
+ }
+ return nil
+}
+
+func (p *testParser) tags(ts time.Time, commands ...string) {
+ for _, c := range commands {
+ p.addTag(c, ts)
+ }
+}
+
+func (p *testParser) addError(err error) {
+ p.addCommand(&testParserCommand{
+ err: err,
+ })
+}
+
+func (p *testParser) addTag(tag string, ts time.Time) {
+ p.addData([]byte(tag), ts)
+}
+
+func (p *testParser) addData(d []byte, ts time.Time) {
+ p.addCommand(&testParserCommand{
+ data: d,
+ ts: ts,
+ })
+}
+
+func (p *testParser) setAllowSplit(value bool) {
+ p.addCommand(&testParserCommand{
+ splitToggle: &value,
+ })
+}
+
+func (p *testParser) setClosed(value bool) {
+ p.addCommand(&testParserCommand{
+ closedToggle: &value,
+ })
+}
+
+func (p *testParser) appendData(d Data) {
+ p.addData(d.Bytes(), d.Timestamp())
+}
+
+func (p *testParser) nextEntry(c *constraints) (*protocol.LogEntry, error) {
+ // Process records until we hit data or run out.
+ for p.err == nil {
+ rec := p.nextCommand(false)
+ if rec == nil {
+ return nil, p.err
+ }
+
+ // If this is a data record, process.
+ if rec.data != nil {
+ break
+ }
+
+ // Ingest commands, repeat.
+ if rec.err != nil {
+ p.err = rec.err
+ break
+ }
+
+ if rec.splitToggle != nil {
+ p.truncateOn = *rec.splitToggle
+ }
+ if rec.closedToggle != nil {
+ p.closedOn = *rec.closedToggle
+ }
+ p.nextCommand(true)
+ }
+
+ if p.err != nil {
+ return nil, p.err
+ }
+
+ // This is a data record. If we're configured to not yield it, leave it and
+ // return nil.
+ if p.truncateOn && (!c.allowSplit || (p.closedOn && !c.closed)) {
+ return nil, nil
+ }
+
+ // Consume this record.
+ rec := p.nextCommand(true)
+ return &protocol.LogEntry{
+ Content: &protocol.LogEntry_Text{Text: &protocol.Text{
+ Lines: []*protocol.Text_Line{
+ {Value: string(rec.data)},
+ },
+ }},
+ }, nil
+}
+
+func (p *testParser) bufferedBytes() (r int64) {
+ for _, rec := range p.commands {
+ r += int64(len(rec.data))
+ }
+ return
+}
+
+func (p *testParser) firstChunkTime() (time.Time, bool) {
+ for _, c := range p.commands {
+ if c.data != nil {
+ return c.ts, true
+ }
+ }
+ return time.Time{}, false
+}
+
+func TestStream(t *testing.T) {
+ Convey(`A testing stream config`, t, func() {
+ tc := testclock.New(time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC))
+ tp := testParser{}
+ c := streamConfig{
+ name: "test",
+ parser: &tp,
+ template: protocol.ButlerLogBundle_Entry{
+ Desc: &protocol.LogStreamDescriptor{
+ Prefix: "test-prefix",
+ Name: "test",
+ },
+ },
+ }
+
+ Convey(`With a 64-byte maximum buffer and 1 second maximum duration`, func() {
+ c.maximumBufferedBytes = 64
+ c.maximumBufferDuration = time.Second
+ s := newStream(c)
+
+ Convey(`Is not drained by default`, func() {
+ So(s.isDrained(), ShouldBeFalse)
+
+ Convey(`When closed, is drained.`, func() {
+ s.Close()
+ So(s.isDrained(), ShouldBeTrue)
+
+ Convey(`When closed again, is still drained.`, func() {
+ s.Close()
+ So(s.isDrained(), ShouldBeTrue)
+ })
+ })
+ })
+
+ Convey(`With no data, has no expiration time.`, func() {
+ _, has := s.expireTime()
+ So(has, ShouldBeFalse)
+ })
+
+ Convey(`Append will ignore a 0-byte chunk.`, func() {
+ d := data(tc.Now())
+ So(s.Append(d), ShouldBeNil)
+ So(d.released, ShouldBeTrue)
+ })
+
+ Convey(`Append will add two 32-byte chunks.`, func() {
+ content := bytes.Repeat([]byte{0xAA}, 32)
+ So(s.Append(data(tc.Now(), content...)), ShouldBeNil)
+ So(s.Append(data(tc.Now(), content...)), ShouldBeNil)
+ })
+
+ Convey(`Append will add a large chunk when there are no other Data blocks.`, func() {
+ d := data(tc.Now(), bytes.Repeat([]byte{0xAA}, 128)...)
+ So(s.Append(d), ShouldBeNil)
+
+ Convey(`Will use that data's timestamp as expiration time.`, func() {
+ t, has := s.expireTime()
+ So(has, ShouldBeTrue)
+ So(t.Equal(tc.Now().Add(time.Second)), ShouldBeTrue)
+ })
+ })
+
+ Convey(`Append will block if the chunk exceeds the buffer size.`, func() {
+ signalC := make(chan struct{})
+ s.c.onAppend = func(appended bool) {
+ if !appended {
+ // We're waiting.
+ close(signalC)
+ }
+ }
+
+ // Add one chunk so we don't hit the "only byte" condition.
+ So(s.Append(data(tc.Now(), bytes.Repeat([]byte{0xAA}, 34)...)), ShouldBeNil)
+
+ // Wait until we get the signal that Append() will block, then consume
+ // some data and unblock Append().
+ blocked := false
+ go func() {
+ <-signalC
+
+ s.withParserLock(func() {
+ tp.popData()
+ })
+ blocked = true
+ s.signalDataConsumed()
+ }()
+
+ // Add one chunk so we don't hit the "only byte" condition.
+ So(s.Append(data(tc.Now(), bytes.Repeat([]byte{0xBB}, 32)...)), ShouldBeNil)
+ So(blocked, ShouldBeTrue)
+ })
+
+ Convey(`Append in an error state`, func() {
+ terr := errors.New("test error")
+
+ Convey(`Will return the error state.`, func() {
+ s.setAppendError(terr)
+
+ d := data(tc.Now(), bytes.Repeat([]byte{0xAA}, 32)...)
+ So(s.Append(d), ShouldEqual, terr)
+ So(d.released, ShouldBeTrue)
+ })
+
+ Convey(`Will block if the chunk exceeds buffer size, and return error state.`, func() {
+ signalC := make(chan struct{})
+ s.c.onAppend = func(appended bool) {
+ if !appended {
+ // Waiting, notify our goroutine that we're going to be waiting.
+ close(signalC)
+ }
+ }
+
+ // Add one chunk so we don't hit the "only byte" condition.
+ So(s.Append(data(tc.Now(), bytes.Repeat([]byte{0xAA}, 34)...)), ShouldBeNil)
+
+ // Wait until we get the signal that Append() will block, then consume
+ // some data and unblock Append().
+ go func() {
+ <-signalC
+ s.setAppendError(terr)
+ }()
+
+ // Add one chunk so we don't hit the "only byte" condition.
+ for _, sz := range []int{32, 1, 0} {
+ d := data(tc.Now(), bytes.Repeat([]byte{0xAA}, sz)...)
+ So(s.Append(d), ShouldEqual, terr)
+ So(d.released, ShouldBeTrue)
+ }
+ })
+ })
+ })
+
+ Convey(`When building bundle entries`, func() {
+ bb := &builder{
+ size: 1024,
+ }
+ s := newStream(c)
+
+ Convey(`Returns nil with no buffered data.`, func() {
+ So(s.nextBundleEntry(bb, false), ShouldBeFalse)
+ So(bb.bundle(), shouldHaveBundleEntries)
+ })
+
+ Convey(`With a single record, returns that entry.`, func() {
+ tp.tags(tc.Now(), "a", "b")
+
+ So(s.nextBundleEntry(bb, false), ShouldBeTrue)
+ So(bb.bundle(), shouldHaveBundleEntries, "test:a:b")
+ })
+
+ Convey(`When split is allowed, returns nil.`, func() {
+ tp.tags(tc.Now(), "a", "b")
+ tp.setAllowSplit(true)
+ tp.tags(tc.Now(), "c")
+
+ So(s.nextBundleEntry(bb, false), ShouldBeTrue)
+ So(bb.bundle(), shouldHaveBundleEntries, "test:a:b")
+ So(s.nextBundleEntry(bb, false), ShouldBeFalse)
+
+ So(s.nextBundleEntry(bb, true), ShouldBeTrue)
+ So(bb.bundle(), shouldHaveBundleEntries, "test:a:b:c")
+ })
+
+ Convey(`When an error occurs during stream parsing, drains stream.`, func() {
+ So(s.isDrained(), ShouldBeFalse)
+ tp.tags(tc.Now(), "a")
+ tp.addError(errTestInduced)
+ tp.tags(tc.Now(), "b")
+
+ So(s.nextBundleEntry(bb, false), ShouldBeTrue)
+ So(s.isDrained(), ShouldBeTrue)
+ So(bb.bundle(), shouldHaveBundleEntries, "+test:a")
+ So(s.nextBundleEntry(bb, false), ShouldBeFalse)
+ })
+
+ Convey(`With only an error, returns no bundle entry.`, func() {
+ So(s.isDrained(), ShouldBeFalse)
+ tp.addError(errTestInduced)
+ tp.tags(tc.Now(), "a")
+ tp.tags(tc.Now(), "b")
+
+ So(s.nextBundleEntry(bb, false), ShouldBeFalse)
+ So(bb.bundle(), shouldHaveBundleEntries)
+ So(s.isDrained(), ShouldBeTrue)
+ })
+ })
+ })
+}
+
+// TestStreamSmoke tests a Stream in an actual multi-goroutine workflow.
+func TestStreamSmoke(t *testing.T) {
+ Convey(`When running a smoke test`, t, func() {
+ tc := testclock.New(time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC))
+ tp := testParser{}
+ c := streamConfig{
+ name: "test",
+ parser: &tp,
+ template: protocol.ButlerLogBundle_Entry{
+ Desc: &protocol.LogStreamDescriptor{
+ Prefix: "test-prefix",
+ Name: "test",
+ },
+ },
+ }
+ s := newStream(c)
+
+ // Appender goroutine, constantly appends data.
+ //
+ // This will be inherently throttled by the nextBundle consumption.
+ go func() {
+ defer s.Close()
+
+ for i := 0; i < 512; i++ {
+ s.Append(data(tc.Now(), []byte(fmt.Sprintf("%d", i))...))
+ tc.Add(time.Second)
+ }
+ }()
+
+ // The consumer goroutine will consume bundles from the stream.
+ consumerC := make(chan struct{})
+ bundleC := make(chan *protocol.ButlerLogBundle)
+ for i := 0; i < 32; i++ {
+ go func() {
+ defer func() {
+ consumerC <- struct{}{}
+ }()
+
+ b := (*builder)(nil)
+ for !s.isDrained() {
+ if b == nil {
+ b = &builder{
+ size: 128,
+ }
+ }
+
+ s.nextBundleEntry(b, false)
+ if b.hasContent() {
+ bundleC <- b.bundle()
+ b = nil
+ } else {
+ // No content! Sleep for a second and check again.
+ tc.Sleep(time.Second)
+ }
+ }
+ }()
+ }
+
+ // Collect all bundles.
+ gotIt := map[int]struct{}{}
+ collectDoneC := make(chan struct{})
+ go func() {
+ defer close(collectDoneC)
+
+ for bundle := range bundleC {
+ for _, be := range bundle.Entries {
+ for _, le := range be.Logs {
+ idx, _ := strconv.Atoi(logEntryName(le))
+ gotIt[idx] = struct{}{}
+ }
+ }
+ }
+ }()
+
+ for i := 0; i < 32; i++ {
+ <-consumerC
+ }
+ close(bundleC)
+
+ // Did we get them all?
+ <-collectDoneC
+ for i := 0; i < 512; i++ {
+ _, ok := gotIt[i]
+ So(ok, ShouldBeTrue)
+ }
+ })
+}
« no previous file with comments | « client/internal/logdog/butler/bundler/stream.go ('k') | client/internal/logdog/butler/bundler/textParser.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698