Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(75)

Side by Side Diff: client/internal/logdog/butler/output/pubsub/pubsubOutput_test.go

Issue 1211053004: LogDog: Add Butler Output package. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Relocate butlerproto to common, document. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package pubsub
6
7 import (
8 "bytes"
9 "compress/zlib"
10 "errors"
11 "fmt"
12 "io/ioutil"
13 "sync"
14 "testing"
15 "time"
16
17 "github.com/golang/protobuf/proto"
18 "github.com/luci/luci-go/common/clock"
19 "github.com/luci/luci-go/common/clock/testclock"
20 "github.com/luci/luci-go/common/gcloud/gcps"
21 "github.com/luci/luci-go/common/logdog/protocol"
22 "github.com/luci/luci-go/common/proto/google"
23 "github.com/luci/luci-go/common/recordio"
24 . "github.com/smartystreets/goconvey/convey"
25 "golang.org/x/net/context"
26 "google.golang.org/cloud/pubsub"
27 )
28
29 type testPubSub struct {
30 sync.Mutex
31
32 err error
33 topic gcps.Topic
34
35 msgC chan *pubsub.Message
36 nextMessageID int
37 }
38
39 func (*testPubSub) TopicExists(gcps.Topic) (bool, error) { pan ic("not implemented") }
40 func (*testPubSub) SubExists(gcps.Subscription) (bool, error) { pan ic("not implemented") }
41 func (*testPubSub) Pull(gcps.Subscription, int) ([]*pubsub.Message, error) { pan ic("not implemented") }
42 func (*testPubSub) Ack(gcps.Subscription, ...string) error { pan ic("not implemented") }
43
44 func (ps *testPubSub) Publish(t gcps.Topic, msgs ...*pubsub.Message) ([]string, error) {
45 if ps.err != nil {
46 return nil, ps.err
47 }
48 if t != ps.topic {
49 return nil, fmt.Errorf("test: published topic doesn't match conf igured (%s != %s)", t, ps.topic)
50 }
51
52 ids := make([]string, len(msgs))
53 for i, m := range msgs {
54 ps.msgC <- m
55 ids[i] = ps.getNextMessageID()
56 }
57 return ids, nil
58 }
59
60 func (ps *testPubSub) getNextMessageID() string {
61 ps.Lock()
62 defer ps.Unlock()
63
64 id := ps.nextMessageID
65 ps.nextMessageID++
66 return fmt.Sprintf("%d", id)
67 }
68
69 func TestConfig(t *testing.T) {
70 Convey(`A configuration instance`, t, func() {
71 ps := &testPubSub{}
72 conf := Config{
73 PubSub: ps,
74 Topic: gcps.Topic("test-topic"),
75 }
76
77 Convey(`Will successfully validate.`, func() {
78 So(conf.Validate(), ShouldBeNil)
79 })
80
81 Convey(`Will not validate without a PubSub instance.`, func() {
82 conf.PubSub = nil
83 So(conf.Validate(), ShouldNotBeNil)
84 })
85
86 Convey(`Will not validate with an empty Topic.`, func() {
87 conf.Topic = ""
88 So(conf.Validate(), ShouldNotBeNil)
89 })
90
91 Convey(`Will not validate with an invalid Topic.`, func() {
92 conf.Topic = gcps.Topic("a!")
93 So(conf.Topic.Validate(), ShouldNotBeNil)
94 So(conf.Validate(), ShouldNotBeNil)
95 })
96 })
97 }
98
99 func deconstructMessage(msg *pubsub.Message) (*protocol.ButlerMetadata, *protoco l.ButlerLogBundle, error) {
100 fr := recordio.NewReader(bytes.NewBuffer(msg.Data), gcps.MaxPublishSize)
101
102 // Validate header frame.
103 headerBytes, err := fr.ReadFrameAll()
104 if err != nil {
105 return nil, nil, fmt.Errorf("test: failed to read header frame: %s", err)
106 }
107
108 header := protocol.ButlerMetadata{}
109 if err := proto.Unmarshal(headerBytes, &header); err != nil {
110 return nil, nil, fmt.Errorf("test: failed to unmarshal header: % s", err)
111 }
112
113 if header.Type != protocol.ButlerMetadata_ButlerLogBundle {
114 return nil, nil, fmt.Errorf("test: unknown frame data type: %v", header.Type)
115 }
116
117 // Validate data frame.
118 data, err := fr.ReadFrameAll()
119 if err != nil {
120 return nil, nil, fmt.Errorf("test: failed to read data frame: %s ", err)
121 }
122
123 switch header.Compression {
124 case protocol.ButlerMetadata_ZLIB:
125 r, err := zlib.NewReader(bytes.NewReader(data))
126 if err != nil {
127 return nil, nil, fmt.Errorf("test: failed to create zlib reader: %s", err)
128 }
129 defer r.Close()
130
131 data, err = ioutil.ReadAll(r)
132 if err != nil {
133 return nil, nil, fmt.Errorf("test: failed to read compre ssed data: %s", err)
134 }
135 }
136
137 dataBundle := protocol.ButlerLogBundle{}
138 if err := proto.Unmarshal(data, &dataBundle); err != nil {
139 return nil, nil, fmt.Errorf("test: failed to unmarshal bundle: % s", err)
140 }
141
142 return &header, &dataBundle, nil
143 }
144
145 func TestOutput(t *testing.T) {
146 Convey(`An Output using a test Pub/Sub instance`, t, func() {
147 ctx, _ := testclock.UseTime(context.Background(), time.Date(2015 , 1, 1, 0, 0, 0, 0, time.UTC))
148 ps := &testPubSub{
149 topic: gcps.Topic("test-topic"),
150 msgC: make(chan *pubsub.Message, 1),
151 }
152 conf := Config{
153 PubSub: ps,
154 Topic: gcps.Topic("test-topic"),
155 }
156 o := New(ctx, conf).(*gcpsOutput)
157 So(o, ShouldNotBeNil)
158 defer o.Close()
159
160 bundle := &protocol.ButlerLogBundle{
161 Source: "GCPS Test",
162 Timestamp: google.NewTimestamp(clock.Now(ctx)),
163 Entries: []*protocol.ButlerLogBundle_Entry{
164 {},
165 },
166 }
167
168 Convey(`Can send/receive a bundle.`, func() {
169 So(o.SendBundle(bundle), ShouldBeNil)
170
171 h, b, err := deconstructMessage(<-ps.msgC)
172 So(err, ShouldBeNil)
173 So(h.Compression, ShouldEqual, protocol.ButlerMetadata_N ONE)
174 So(b, ShouldResemble, bundle)
175
176 Convey(`And records stats.`, func() {
177 st := o.Stats()
178 So(st.Errors(), ShouldEqual, 0)
179 So(st.SentBytes(), ShouldBeGreaterThan, 0)
180 So(st.SentMessages(), ShouldEqual, 1)
181 So(st.DiscardedMessages(), ShouldEqual, 0)
182 })
183 })
184
185 Convey(`Will return an error if Publish failed.`, func() {
186 ps.err = errors.New("test: error")
187 So(o.SendBundle(bundle), ShouldNotBeNil)
188 })
189 })
190 }
OLDNEW
« no previous file with comments | « client/internal/logdog/butler/output/pubsub/pubsubOutput.go ('k') | client/internal/logdog/butler/output/retry.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698