Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(20)

Unified Diff: client/internal/logdog/butler/output/pubsub/pubsubOutput_test.go

Issue 1211053004: LogDog: Add Butler Output package. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Relocate butlerproto to common, document. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: client/internal/logdog/butler/output/pubsub/pubsubOutput_test.go
diff --git a/client/internal/logdog/butler/output/pubsub/pubsubOutput_test.go b/client/internal/logdog/butler/output/pubsub/pubsubOutput_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..17aa10f4d4483bf21473ad933ec8485efc508dea
--- /dev/null
+++ b/client/internal/logdog/butler/output/pubsub/pubsubOutput_test.go
@@ -0,0 +1,190 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package pubsub
+
+import (
+ "bytes"
+ "compress/zlib"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/luci/luci-go/common/clock"
+ "github.com/luci/luci-go/common/clock/testclock"
+ "github.com/luci/luci-go/common/gcloud/gcps"
+ "github.com/luci/luci-go/common/logdog/protocol"
+ "github.com/luci/luci-go/common/proto/google"
+ "github.com/luci/luci-go/common/recordio"
+ . "github.com/smartystreets/goconvey/convey"
+ "golang.org/x/net/context"
+ "google.golang.org/cloud/pubsub"
+)
+
+type testPubSub struct {
+ sync.Mutex
+
+ err error
+ topic gcps.Topic
+
+ msgC chan *pubsub.Message
+ nextMessageID int
+}
+
+func (*testPubSub) TopicExists(gcps.Topic) (bool, error) { panic("not implemented") }
+func (*testPubSub) SubExists(gcps.Subscription) (bool, error) { panic("not implemented") }
+func (*testPubSub) Pull(gcps.Subscription, int) ([]*pubsub.Message, error) { panic("not implemented") }
+func (*testPubSub) Ack(gcps.Subscription, ...string) error { panic("not implemented") }
+
+func (ps *testPubSub) Publish(t gcps.Topic, msgs ...*pubsub.Message) ([]string, error) {
+ if ps.err != nil {
+ return nil, ps.err
+ }
+ if t != ps.topic {
+ return nil, fmt.Errorf("test: published topic doesn't match configured (%s != %s)", t, ps.topic)
+ }
+
+ ids := make([]string, len(msgs))
+ for i, m := range msgs {
+ ps.msgC <- m
+ ids[i] = ps.getNextMessageID()
+ }
+ return ids, nil
+}
+
+func (ps *testPubSub) getNextMessageID() string {
+ ps.Lock()
+ defer ps.Unlock()
+
+ id := ps.nextMessageID
+ ps.nextMessageID++
+ return fmt.Sprintf("%d", id)
+}
+
+func TestConfig(t *testing.T) {
+ Convey(`A configuration instance`, t, func() {
+ ps := &testPubSub{}
+ conf := Config{
+ PubSub: ps,
+ Topic: gcps.Topic("test-topic"),
+ }
+
+ Convey(`Will successfully validate.`, func() {
+ So(conf.Validate(), ShouldBeNil)
+ })
+
+ Convey(`Will not validate without a PubSub instance.`, func() {
+ conf.PubSub = nil
+ So(conf.Validate(), ShouldNotBeNil)
+ })
+
+ Convey(`Will not validate with an empty Topic.`, func() {
+ conf.Topic = ""
+ So(conf.Validate(), ShouldNotBeNil)
+ })
+
+ Convey(`Will not validate with an invalid Topic.`, func() {
+ conf.Topic = gcps.Topic("a!")
+ So(conf.Topic.Validate(), ShouldNotBeNil)
+ So(conf.Validate(), ShouldNotBeNil)
+ })
+ })
+}
+
+func deconstructMessage(msg *pubsub.Message) (*protocol.ButlerMetadata, *protocol.ButlerLogBundle, error) {
+ fr := recordio.NewReader(bytes.NewBuffer(msg.Data), gcps.MaxPublishSize)
+
+ // Validate header frame.
+ headerBytes, err := fr.ReadFrameAll()
+ if err != nil {
+ return nil, nil, fmt.Errorf("test: failed to read header frame: %s", err)
+ }
+
+ header := protocol.ButlerMetadata{}
+ if err := proto.Unmarshal(headerBytes, &header); err != nil {
+ return nil, nil, fmt.Errorf("test: failed to unmarshal header: %s", err)
+ }
+
+ if header.Type != protocol.ButlerMetadata_ButlerLogBundle {
+ return nil, nil, fmt.Errorf("test: unknown frame data type: %v", header.Type)
+ }
+
+ // Validate data frame.
+ data, err := fr.ReadFrameAll()
+ if err != nil {
+ return nil, nil, fmt.Errorf("test: failed to read data frame: %s", err)
+ }
+
+ switch header.Compression {
+ case protocol.ButlerMetadata_ZLIB:
+ r, err := zlib.NewReader(bytes.NewReader(data))
+ if err != nil {
+ return nil, nil, fmt.Errorf("test: failed to create zlib reader: %s", err)
+ }
+ defer r.Close()
+
+ data, err = ioutil.ReadAll(r)
+ if err != nil {
+ return nil, nil, fmt.Errorf("test: failed to read compressed data: %s", err)
+ }
+ }
+
+ dataBundle := protocol.ButlerLogBundle{}
+ if err := proto.Unmarshal(data, &dataBundle); err != nil {
+ return nil, nil, fmt.Errorf("test: failed to unmarshal bundle: %s", err)
+ }
+
+ return &header, &dataBundle, nil
+}
+
+func TestOutput(t *testing.T) {
+ Convey(`An Output using a test Pub/Sub instance`, t, func() {
+ ctx, _ := testclock.UseTime(context.Background(), time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC))
+ ps := &testPubSub{
+ topic: gcps.Topic("test-topic"),
+ msgC: make(chan *pubsub.Message, 1),
+ }
+ conf := Config{
+ PubSub: ps,
+ Topic: gcps.Topic("test-topic"),
+ }
+ o := New(ctx, conf).(*gcpsOutput)
+ So(o, ShouldNotBeNil)
+ defer o.Close()
+
+ bundle := &protocol.ButlerLogBundle{
+ Source: "GCPS Test",
+ Timestamp: google.NewTimestamp(clock.Now(ctx)),
+ Entries: []*protocol.ButlerLogBundle_Entry{
+ {},
+ },
+ }
+
+ Convey(`Can send/receive a bundle.`, func() {
+ So(o.SendBundle(bundle), ShouldBeNil)
+
+ h, b, err := deconstructMessage(<-ps.msgC)
+ So(err, ShouldBeNil)
+ So(h.Compression, ShouldEqual, protocol.ButlerMetadata_NONE)
+ So(b, ShouldResemble, bundle)
+
+ Convey(`And records stats.`, func() {
+ st := o.Stats()
+ So(st.Errors(), ShouldEqual, 0)
+ So(st.SentBytes(), ShouldBeGreaterThan, 0)
+ So(st.SentMessages(), ShouldEqual, 1)
+ So(st.DiscardedMessages(), ShouldEqual, 0)
+ })
+ })
+
+ Convey(`Will return an error if Publish failed.`, func() {
+ ps.err = errors.New("test: error")
+ So(o.SendBundle(bundle), ShouldNotBeNil)
+ })
+ })
+}
« no previous file with comments | « client/internal/logdog/butler/output/pubsub/pubsubOutput.go ('k') | client/internal/logdog/butler/output/retry.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698