Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(188)

Side by Side Diff: client/internal/logdog/butler/output/pubsub/pubsubOutput_test.go

Issue 1838303002: Use native Pub/Sub library primitives. (Closed) Base URL: https://github.com/luci/luci-go@logdog-go1.6
Patch Set: Use "Topic" instead of "NewTopic" ... don't want to create :) Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package pubsub 5 package pubsub
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "compress/zlib" 9 "compress/zlib"
10 "errors" 10 "errors"
11 "fmt" 11 "fmt"
12 "io/ioutil" 12 "io/ioutil"
13 "sync" 13 "sync"
14 "testing" 14 "testing"
15 "time" 15 "time"
16 16
17 "github.com/golang/protobuf/proto" 17 "github.com/golang/protobuf/proto"
18 "github.com/luci/luci-go/common/clock" 18 "github.com/luci/luci-go/common/clock"
19 "github.com/luci/luci-go/common/clock/testclock" 19 "github.com/luci/luci-go/common/clock/testclock"
20 » "github.com/luci/luci-go/common/gcloud/pubsub" 20 » gcps "github.com/luci/luci-go/common/gcloud/pubsub"
21 "github.com/luci/luci-go/common/proto/google" 21 "github.com/luci/luci-go/common/proto/google"
22 "github.com/luci/luci-go/common/proto/logdog/logpb" 22 "github.com/luci/luci-go/common/proto/logdog/logpb"
23 "github.com/luci/luci-go/common/recordio" 23 "github.com/luci/luci-go/common/recordio"
24 . "github.com/smartystreets/goconvey/convey" 24 . "github.com/smartystreets/goconvey/convey"
25 "golang.org/x/net/context" 25 "golang.org/x/net/context"
26 "google.golang.org/cloud/pubsub"
26 ) 27 )
27 28
28 type testPublisher struct { 29 type testTopic struct {
29 sync.Mutex 30 sync.Mutex
30 31
31 » err error 32 » err error
32 » topic pubsub.Topic
33 33
34 msgC chan *pubsub.Message 34 msgC chan *pubsub.Message
35 nextMessageID int 35 nextMessageID int
36 } 36 }
37 37
38 func (ps *testPublisher) Publish(c context.Context, t pubsub.Topic, msgs ...*pub sub.Message) ([]string, error) { 38 func (t *testTopic) Name() string { return "test" }
39 » if ps.err != nil { 39
40 » » return nil, ps.err 40 func (t *testTopic) Publish(c context.Context, msgs ...*pubsub.Message) ([]strin g, error) {
41 » } 41 » if t.err != nil {
42 » if t != ps.topic { 42 » » return nil, t.err
43 » » return nil, fmt.Errorf("test: published topic doesn't match conf igured (%s != %s)", t, ps.topic)
44 } 43 }
45 44
46 ids := make([]string, len(msgs)) 45 ids := make([]string, len(msgs))
47 for i, m := range msgs { 46 for i, m := range msgs {
48 » » ps.msgC <- m 47 » » t.msgC <- m
49 » » ids[i] = ps.getNextMessageID() 48 » » ids[i] = t.getNextMessageID()
50 } 49 }
51 return ids, nil 50 return ids, nil
52 } 51 }
53 52
54 func (ps *testPublisher) getNextMessageID() string { 53 func (t *testTopic) getNextMessageID() string {
55 » ps.Lock() 54 » t.Lock()
56 » defer ps.Unlock() 55 » defer t.Unlock()
57 56
58 » id := ps.nextMessageID 57 » id := t.nextMessageID
59 » ps.nextMessageID++ 58 » t.nextMessageID++
60 return fmt.Sprintf("%d", id) 59 return fmt.Sprintf("%d", id)
61 } 60 }
62 61
63 func TestConfig(t *testing.T) {
64 Convey(`A configuration instance`, t, func() {
65 ps := &testPublisher{}
66 conf := Config{
67 Publisher: ps,
68 Topic: pubsub.NewTopic("test-project", "test-topic") ,
69 }
70
71 Convey(`Will successfully validate.`, func() {
72 So(conf.Validate(), ShouldBeNil)
73 })
74
75 Convey(`Will not validate without a PubSub instance.`, func() {
76 conf.Publisher = nil
77 So(conf.Validate(), ShouldNotBeNil)
78 })
79
80 Convey(`Will not validate with an empty Topic.`, func() {
81 conf.Topic = ""
82 So(conf.Validate(), ShouldNotBeNil)
83 })
84
85 Convey(`Will not validate with an invalid Topic.`, func() {
86 conf.Topic = pubsub.NewTopic("test-project", "a!")
87 So(conf.Topic.Validate(), ShouldNotBeNil)
88 So(conf.Validate(), ShouldNotBeNil)
89 })
90 })
91 }
92
93 func deconstructMessage(msg *pubsub.Message) (*logpb.ButlerMetadata, *logpb.Butl erLogBundle, error) { 62 func deconstructMessage(msg *pubsub.Message) (*logpb.ButlerMetadata, *logpb.Butl erLogBundle, error) {
94 » fr := recordio.NewReader(bytes.NewBuffer(msg.Data), pubsub.MaxPublishSiz e) 63 » fr := recordio.NewReader(bytes.NewBuffer(msg.Data), gcps.MaxPublishSize)
95 64
96 // Validate header frame. 65 // Validate header frame.
97 headerBytes, err := fr.ReadFrameAll() 66 headerBytes, err := fr.ReadFrameAll()
98 if err != nil { 67 if err != nil {
99 return nil, nil, fmt.Errorf("test: failed to read header frame: %s", err) 68 return nil, nil, fmt.Errorf("test: failed to read header frame: %s", err)
100 } 69 }
101 70
102 header := logpb.ButlerMetadata{} 71 header := logpb.ButlerMetadata{}
103 if err := proto.Unmarshal(headerBytes, &header); err != nil { 72 if err := proto.Unmarshal(headerBytes, &header); err != nil {
104 return nil, nil, fmt.Errorf("test: failed to unmarshal header: % s", err) 73 return nil, nil, fmt.Errorf("test: failed to unmarshal header: % s", err)
(...skipping 27 matching lines...) Expand all
132 if err := proto.Unmarshal(data, &dataBundle); err != nil { 101 if err := proto.Unmarshal(data, &dataBundle); err != nil {
133 return nil, nil, fmt.Errorf("test: failed to unmarshal bundle: % s", err) 102 return nil, nil, fmt.Errorf("test: failed to unmarshal bundle: % s", err)
134 } 103 }
135 104
136 return &header, &dataBundle, nil 105 return &header, &dataBundle, nil
137 } 106 }
138 107
139 func TestOutput(t *testing.T) { 108 func TestOutput(t *testing.T) {
140 Convey(`An Output using a test Pub/Sub instance`, t, func() { 109 Convey(`An Output using a test Pub/Sub instance`, t, func() {
141 ctx, _ := testclock.UseTime(context.Background(), time.Date(2015 , 1, 1, 0, 0, 0, 0, time.UTC)) 110 ctx, _ := testclock.UseTime(context.Background(), time.Date(2015 , 1, 1, 0, 0, 0, 0, time.UTC))
142 » » ps := &testPublisher{ 111 » » tt := &testTopic{
143 » » » topic: pubsub.NewTopic("test-project", "test-topic"), 112 » » » msgC: make(chan *pubsub.Message, 1),
144 » » » msgC: make(chan *pubsub.Message, 1),
145 } 113 }
146 conf := Config{ 114 conf := Config{
147 » » » Publisher: ps, 115 » » » Topic: tt,
148 » » » Topic: pubsub.NewTopic("test-project", "test-topic") ,
149 } 116 }
150 o := New(ctx, conf).(*pubSubOutput) 117 o := New(ctx, conf).(*pubSubOutput)
151 So(o, ShouldNotBeNil) 118 So(o, ShouldNotBeNil)
152 defer o.Close() 119 defer o.Close()
153 120
154 bundle := &logpb.ButlerLogBundle{ 121 bundle := &logpb.ButlerLogBundle{
155 Source: "Pub/Sub Test", 122 Source: "Pub/Sub Test",
156 Timestamp: google.NewTimestamp(clock.Now(ctx)), 123 Timestamp: google.NewTimestamp(clock.Now(ctx)),
157 Entries: []*logpb.ButlerLogBundle_Entry{ 124 Entries: []*logpb.ButlerLogBundle_Entry{
158 {}, 125 {},
159 }, 126 },
160 } 127 }
161 128
162 Convey(`Can send/receive a bundle.`, func() { 129 Convey(`Can send/receive a bundle.`, func() {
163 So(o.SendBundle(bundle), ShouldBeNil) 130 So(o.SendBundle(bundle), ShouldBeNil)
164 131
165 » » » h, b, err := deconstructMessage(<-ps.msgC) 132 » » » h, b, err := deconstructMessage(<-tt.msgC)
166 So(err, ShouldBeNil) 133 So(err, ShouldBeNil)
167 So(h.Compression, ShouldEqual, logpb.ButlerMetadata_NONE ) 134 So(h.Compression, ShouldEqual, logpb.ButlerMetadata_NONE )
168 So(b, ShouldResemble, bundle) 135 So(b, ShouldResemble, bundle)
169 136
170 Convey(`And records stats.`, func() { 137 Convey(`And records stats.`, func() {
171 st := o.Stats() 138 st := o.Stats()
172 So(st.Errors(), ShouldEqual, 0) 139 So(st.Errors(), ShouldEqual, 0)
173 So(st.SentBytes(), ShouldBeGreaterThan, 0) 140 So(st.SentBytes(), ShouldBeGreaterThan, 0)
174 So(st.SentMessages(), ShouldEqual, 1) 141 So(st.SentMessages(), ShouldEqual, 1)
175 So(st.DiscardedMessages(), ShouldEqual, 0) 142 So(st.DiscardedMessages(), ShouldEqual, 0)
176 }) 143 })
177 }) 144 })
178 145
179 Convey(`Will return an error if Publish failed.`, func() { 146 Convey(`Will return an error if Publish failed.`, func() {
180 » » » ps.err = errors.New("test: error") 147 » » » tt.err = errors.New("test: error")
181 So(o.SendBundle(bundle), ShouldNotBeNil) 148 So(o.SendBundle(bundle), ShouldNotBeNil)
182 }) 149 })
183 }) 150 })
184 } 151 }
OLDNEW
« no previous file with comments | « client/internal/logdog/butler/output/pubsub/pubsubOutput.go ('k') | common/gcloud/pubsub/ackbuffer/ack.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698