| Index: logdog/client/butler/butler_test.go
|
| diff --git a/logdog/client/butler/butler_test.go b/logdog/client/butler/butler_test.go
|
| index 13b3ffb2b4c03d4e3b0d5ec8594643228d6db88c..be56e25ef9b9a909eae85929d8c1d140c6c4a525 100644
|
| --- a/logdog/client/butler/butler_test.go
|
| +++ b/logdog/client/butler/butler_test.go
|
| @@ -129,14 +129,6 @@ type testStream struct {
|
| properties *streamproto.Properties
|
| }
|
|
|
| -func newTestStream(p streamproto.Properties) *testStream {
|
| - return &testStream{
|
| - inC: make(chan *testStreamData, 16),
|
| - closedC: make(chan struct{}),
|
| - properties: &p,
|
| - }
|
| -}
|
| -
|
| func (ts *testStream) data(d []byte, err error) {
|
| ts.inC <- &testStreamData{
|
| data: d,
|
| @@ -317,31 +309,42 @@ func TestButler(t *testing.T) {
|
| })
|
|
|
| Convey(`Using a generic stream Properties`, func() {
|
| - props := streamproto.Properties{
|
| - LogStreamDescriptor: logpb.LogStreamDescriptor{
|
| - Name: "test",
|
| - StreamType: logpb.StreamType_TEXT,
|
| - ContentType: string(types.ContentTypeText),
|
| - },
|
| + newTestStream := func(setup func(p *streamproto.Properties)) *testStream {
|
| + props := streamproto.Properties{
|
| + LogStreamDescriptor: &logpb.LogStreamDescriptor{
|
| + Name: "test",
|
| + StreamType: logpb.StreamType_TEXT,
|
| + ContentType: string(types.ContentTypeText),
|
| + },
|
| + }
|
| + if setup != nil {
|
| + setup(&props)
|
| + }
|
| +
|
| + return &testStream{
|
| + inC: make(chan *testStreamData, 16),
|
| + closedC: make(chan struct{}),
|
| + properties: &props,
|
| + }
|
| }
|
|
|
| Convey(`Will not add a stream with an invalid configuration.`, func() {
|
| // No content type.
|
| - props.ContentType = ""
|
| -
|
| - s := newTestStream(props)
|
| + s := newTestStream(func(p *streamproto.Properties) {
|
| + p.ContentType = ""
|
| + })
|
| b := mkb(c, conf)
|
| - So(b.AddStream(s, *s.properties), ShouldNotBeNil)
|
| + So(b.AddStream(s, s.properties), ShouldNotBeNil)
|
| })
|
|
|
| Convey(`Will not add a stream with a duplicate stream name.`, func() {
|
| b := mkb(c, conf)
|
|
|
| - s0 := newTestStream(props)
|
| - So(b.AddStream(s0, *s0.properties), ShouldBeNil)
|
| + s0 := newTestStream(nil)
|
| + So(b.AddStream(s0, s0.properties), ShouldBeNil)
|
|
|
| - s1 := newTestStream(props)
|
| - So(b.AddStream(s1, *s1.properties), ShouldErrLike, "a stream has already been registered")
|
| + s1 := newTestStream(nil)
|
| + So(b.AddStream(s1, s1.properties), ShouldErrLike, "a stream has already been registered")
|
| })
|
|
|
| Convey(`Will not accept invalid tee configuration`, func() {
|
| @@ -357,27 +360,29 @@ func TestButler(t *testing.T) {
|
| {streamproto.TeeType(0xFFFFFFFF), "invalid tee value"},
|
| } {
|
| Convey(fmt.Sprintf(`Rejects stream with TeeType [%v], when no tee outputs are configured.`, tc.tee), func() {
|
| - props.Tee = tc.tee
|
| -
|
| b := mkb(c, conf)
|
| - s := newTestStream(props)
|
| - So(b.AddStream(s, *s.properties), ShouldErrLike, tc.err)
|
| + s := newTestStream(func(p *streamproto.Properties) {
|
| + p.Tee = tc.tee
|
| + })
|
| + So(b.AddStream(s, s.properties), ShouldErrLike, tc.err)
|
| })
|
| }
|
| })
|
|
|
| Convey(`When adding a stream configured to tee through STDOUT/STDERR, tees.`, func() {
|
| - props.Name = "stdout"
|
| - props.Tee = streamproto.TeeStdout
|
| - stdout := newTestStream(props)
|
| + stdout := newTestStream(func(p *streamproto.Properties) {
|
| + p.Name = "stdout"
|
| + p.Tee = streamproto.TeeStdout
|
| + })
|
|
|
| - props.Name = "stderr"
|
| - props.Tee = streamproto.TeeStderr
|
| - stderr := newTestStream(props)
|
| + stderr := newTestStream(func(p *streamproto.Properties) {
|
| + p.Name = "stderr"
|
| + p.Tee = streamproto.TeeStderr
|
| + })
|
|
|
| b := mkb(c, conf)
|
| - So(b.AddStream(stdout, *stdout.properties), ShouldBeNil)
|
| - So(b.AddStream(stderr, *stderr.properties), ShouldBeNil)
|
| + So(b.AddStream(stdout, stdout.properties), ShouldBeNil)
|
| + So(b.AddStream(stderr, stderr.properties), ShouldBeNil)
|
|
|
| stdout.data([]byte("Hello, STDOUT"), io.EOF)
|
| stderr.data([]byte("Hello, STDERR"), io.EOF)
|
| @@ -398,12 +403,13 @@ func TestButler(t *testing.T) {
|
| b := mkb(c, conf)
|
| streams := make([]*testStream, 256)
|
| for i := range streams {
|
| - props.Name = fmt.Sprintf("stream%d", i)
|
| - streams[i] = newTestStream(props)
|
| + streams[i] = newTestStream(func(p *streamproto.Properties) {
|
| + p.Name = fmt.Sprintf("stream%d", i)
|
| + })
|
| }
|
|
|
| for _, s := range streams {
|
| - So(b.AddStream(s, *s.properties), ShouldBeNil)
|
| + So(b.AddStream(s, s.properties), ShouldBeNil)
|
| s.data([]byte("stream data 0!\n"), nil)
|
| s.data([]byte("stream data 1!\n"), nil)
|
| }
|
| @@ -428,12 +434,13 @@ func TestButler(t *testing.T) {
|
| b := mkb(c, conf)
|
| streams := make([]*testStream, 256)
|
| for i := range streams {
|
| - props.Name = fmt.Sprintf("stream%d", i)
|
| - streams[i] = newTestStream(props)
|
| + streams[i] = newTestStream(func(p *streamproto.Properties) {
|
| + p.Name = fmt.Sprintf("stream%d", i)
|
| + })
|
| }
|
|
|
| for _, s := range streams {
|
| - So(b.AddStream(s, *s.properties), ShouldBeNil)
|
| + So(b.AddStream(s, s.properties), ShouldBeNil)
|
| s.data([]byte("stream data!\n"), nil)
|
| }
|
|
|
| @@ -461,8 +468,9 @@ func TestButler(t *testing.T) {
|
| for i, tss := range servers {
|
| b.AddStreamServer(tss)
|
|
|
| - props.Name = fmt.Sprintf("stream%d", i)
|
| - s := newTestStream(props)
|
| + s := newTestStream(func(p *streamproto.Properties) {
|
| + p.Name = fmt.Sprintf("stream%d", i)
|
| + })
|
| streams = append(streams, s)
|
| s.data([]byte("test data"), io.EOF)
|
| tss.enqueue(s)
|
| @@ -482,8 +490,9 @@ func TestButler(t *testing.T) {
|
| for i, tss := range servers {
|
| b.AddStreamServer(tss)
|
|
|
| - props.Name = fmt.Sprintf("stream%d", i)
|
| - s := newTestStream(props)
|
| + s := newTestStream(func(p *streamproto.Properties) {
|
| + p.Name = fmt.Sprintf("stream%d", i)
|
| + })
|
| streams = append(streams, s)
|
| s.data([]byte("test data"), io.EOF)
|
| tss.enqueue(s)
|
| @@ -503,11 +512,12 @@ func TestButler(t *testing.T) {
|
| tss := newTestStreamServer()
|
|
|
| // Generate an invalid stream for "tss" to register.
|
| - sGood := newTestStream(props)
|
| + sGood := newTestStream(nil)
|
| sGood.data([]byte("good test data"), io.EOF)
|
|
|
| - props.ContentType = ""
|
| - sBad := newTestStream(props)
|
| + sBad := newTestStream(func(p *streamproto.Properties) {
|
| + p.ContentType = ""
|
| + })
|
| sBad.data([]byte("bad test data"), io.EOF)
|
|
|
| b := mkb(c, conf)
|
| @@ -519,8 +529,8 @@ func TestButler(t *testing.T) {
|
|
|
| So(sBad.isClosed(), ShouldBeTrue)
|
| So(sGood.isClosed(), ShouldBeTrue)
|
| - So(to.logs(props.Name), shouldHaveTextLogs, "good test data")
|
| - So(to.isTerminal(props.Name), ShouldBeTrue)
|
| + So(to.logs("test"), shouldHaveTextLogs, "good test data")
|
| + So(to.isTerminal("test"), ShouldBeTrue)
|
| })
|
| })
|
|
|
|
|