| Index: client/internal/logdog/butler/output/pubsub/pubsubOutput_test.go
|
| diff --git a/client/internal/logdog/butler/output/pubsub/pubsubOutput_test.go b/client/internal/logdog/butler/output/pubsub/pubsubOutput_test.go
|
| index 7f5129f834f0f02ef235e3a3e7fa969273d6e5cc..e42e18a84e412eeea4d848294398ae3cc5573b92 100644
|
| --- a/client/internal/logdog/butler/output/pubsub/pubsubOutput_test.go
|
| +++ b/client/internal/logdog/butler/output/pubsub/pubsubOutput_test.go
|
| @@ -17,81 +17,50 @@ import (
|
| "github.com/golang/protobuf/proto"
|
| "github.com/luci/luci-go/common/clock"
|
| "github.com/luci/luci-go/common/clock/testclock"
|
| - "github.com/luci/luci-go/common/gcloud/pubsub"
|
| + gcps "github.com/luci/luci-go/common/gcloud/pubsub"
|
| "github.com/luci/luci-go/common/proto/google"
|
| "github.com/luci/luci-go/common/proto/logdog/logpb"
|
| "github.com/luci/luci-go/common/recordio"
|
| . "github.com/smartystreets/goconvey/convey"
|
| "golang.org/x/net/context"
|
| + "google.golang.org/cloud/pubsub"
|
| )
|
|
|
| -type testPublisher struct {
|
| +type testTopic struct {
|
| sync.Mutex
|
|
|
| - err error
|
| - topic pubsub.Topic
|
| + err error
|
|
|
| msgC chan *pubsub.Message
|
| nextMessageID int
|
| }
|
|
|
| -func (ps *testPublisher) Publish(c context.Context, t pubsub.Topic, msgs ...*pubsub.Message) ([]string, error) {
|
| - if ps.err != nil {
|
| - return nil, ps.err
|
| - }
|
| - if t != ps.topic {
|
| - return nil, fmt.Errorf("test: published topic doesn't match configured (%s != %s)", t, ps.topic)
|
| +func (t *testTopic) Name() string { return "test" }
|
| +
|
| +func (t *testTopic) Publish(c context.Context, msgs ...*pubsub.Message) ([]string, error) {
|
| + if t.err != nil {
|
| + return nil, t.err
|
| }
|
|
|
| ids := make([]string, len(msgs))
|
| for i, m := range msgs {
|
| - ps.msgC <- m
|
| - ids[i] = ps.getNextMessageID()
|
| + t.msgC <- m
|
| + ids[i] = t.getNextMessageID()
|
| }
|
| return ids, nil
|
| }
|
|
|
| -func (ps *testPublisher) getNextMessageID() string {
|
| - ps.Lock()
|
| - defer ps.Unlock()
|
| +func (t *testTopic) getNextMessageID() string {
|
| + t.Lock()
|
| + defer t.Unlock()
|
|
|
| - id := ps.nextMessageID
|
| - ps.nextMessageID++
|
| + id := t.nextMessageID
|
| + t.nextMessageID++
|
| return fmt.Sprintf("%d", id)
|
| }
|
|
|
| -func TestConfig(t *testing.T) {
|
| - Convey(`A configuration instance`, t, func() {
|
| - ps := &testPublisher{}
|
| - conf := Config{
|
| - Publisher: ps,
|
| - Topic: pubsub.NewTopic("test-project", "test-topic"),
|
| - }
|
| -
|
| - Convey(`Will successfully validate.`, func() {
|
| - So(conf.Validate(), ShouldBeNil)
|
| - })
|
| -
|
| - Convey(`Will not validate without a PubSub instance.`, func() {
|
| - conf.Publisher = nil
|
| - So(conf.Validate(), ShouldNotBeNil)
|
| - })
|
| -
|
| - Convey(`Will not validate with an empty Topic.`, func() {
|
| - conf.Topic = ""
|
| - So(conf.Validate(), ShouldNotBeNil)
|
| - })
|
| -
|
| - Convey(`Will not validate with an invalid Topic.`, func() {
|
| - conf.Topic = pubsub.NewTopic("test-project", "a!")
|
| - So(conf.Topic.Validate(), ShouldNotBeNil)
|
| - So(conf.Validate(), ShouldNotBeNil)
|
| - })
|
| - })
|
| -}
|
| -
|
| func deconstructMessage(msg *pubsub.Message) (*logpb.ButlerMetadata, *logpb.ButlerLogBundle, error) {
|
| - fr := recordio.NewReader(bytes.NewBuffer(msg.Data), pubsub.MaxPublishSize)
|
| + fr := recordio.NewReader(bytes.NewBuffer(msg.Data), gcps.MaxPublishSize)
|
|
|
| // Validate header frame.
|
| headerBytes, err := fr.ReadFrameAll()
|
| @@ -139,13 +108,11 @@ func deconstructMessage(msg *pubsub.Message) (*logpb.ButlerMetadata, *logpb.Butl
|
| func TestOutput(t *testing.T) {
|
| Convey(`An Output using a test Pub/Sub instance`, t, func() {
|
| ctx, _ := testclock.UseTime(context.Background(), time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC))
|
| - ps := &testPublisher{
|
| - topic: pubsub.NewTopic("test-project", "test-topic"),
|
| - msgC: make(chan *pubsub.Message, 1),
|
| + tt := &testTopic{
|
| + msgC: make(chan *pubsub.Message, 1),
|
| }
|
| conf := Config{
|
| - Publisher: ps,
|
| - Topic: pubsub.NewTopic("test-project", "test-topic"),
|
| + Topic: tt,
|
| }
|
| o := New(ctx, conf).(*pubSubOutput)
|
| So(o, ShouldNotBeNil)
|
| @@ -162,7 +129,7 @@ func TestOutput(t *testing.T) {
|
| Convey(`Can send/receive a bundle.`, func() {
|
| So(o.SendBundle(bundle), ShouldBeNil)
|
|
|
| - h, b, err := deconstructMessage(<-ps.msgC)
|
| + h, b, err := deconstructMessage(<-tt.msgC)
|
| So(err, ShouldBeNil)
|
| So(h.Compression, ShouldEqual, logpb.ButlerMetadata_NONE)
|
| So(b, ShouldResemble, bundle)
|
| @@ -177,7 +144,7 @@ func TestOutput(t *testing.T) {
|
| })
|
|
|
| Convey(`Will return an error if Publish failed.`, func() {
|
| - ps.err = errors.New("test: error")
|
| + tt.err = errors.New("test: error")
|
| So(o.SendBundle(bundle), ShouldNotBeNil)
|
| })
|
| })
|
|
|