Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(165)

Side by Side Diff: client/internal/logdog/butler/output/pubsub/pubsubOutput_test.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package pubsub 5 package pubsub
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "compress/zlib" 9 "compress/zlib"
10 "errors" 10 "errors"
11 "fmt" 11 "fmt"
12 "io/ioutil" 12 "io/ioutil"
13 "sync" 13 "sync"
14 "testing" 14 "testing"
15 "time" 15 "time"
16 16
17 "github.com/golang/protobuf/proto" 17 "github.com/golang/protobuf/proto"
18 "github.com/luci/luci-go/common/clock" 18 "github.com/luci/luci-go/common/clock"
19 "github.com/luci/luci-go/common/clock/testclock" 19 "github.com/luci/luci-go/common/clock/testclock"
20 "github.com/luci/luci-go/common/gcloud/gcps" 20 "github.com/luci/luci-go/common/gcloud/gcps"
21 "github.com/luci/luci-go/common/logdog/protocol" 21 "github.com/luci/luci-go/common/logdog/protocol"
22 "github.com/luci/luci-go/common/proto/google" 22 "github.com/luci/luci-go/common/proto/google"
23 "github.com/luci/luci-go/common/recordio" 23 "github.com/luci/luci-go/common/recordio"
24 . "github.com/smartystreets/goconvey/convey" 24 . "github.com/smartystreets/goconvey/convey"
25 "golang.org/x/net/context" 25 "golang.org/x/net/context"
26 "google.golang.org/cloud/pubsub" 26 "google.golang.org/cloud/pubsub"
27 ) 27 )
28 28
29 type testPubSub struct { 29 type testPublisher struct {
30 sync.Mutex 30 sync.Mutex
31 31
32 err error 32 err error
33 topic gcps.Topic 33 topic gcps.Topic
34 34
35 msgC chan *pubsub.Message 35 msgC chan *pubsub.Message
36 nextMessageID int 36 nextMessageID int
37 } 37 }
38 38
39 func (*testPubSub) TopicExists(gcps.Topic) (bool, error) { pan ic("not implemented") } 39 func (ps *testPublisher) Publish(c context.Context, t gcps.Topic, msgs ...*pubsu b.Message) ([]string, error) {
40 func (*testPubSub) SubExists(gcps.Subscription) (bool, error) { pan ic("not implemented") }
41 func (*testPubSub) Pull(gcps.Subscription, int) ([]*pubsub.Message, error) { pan ic("not implemented") }
42 func (*testPubSub) Ack(gcps.Subscription, ...string) error { pan ic("not implemented") }
43
44 func (ps *testPubSub) Publish(t gcps.Topic, msgs ...*pubsub.Message) ([]string, error) {
45 if ps.err != nil { 40 if ps.err != nil {
46 return nil, ps.err 41 return nil, ps.err
47 } 42 }
48 if t != ps.topic { 43 if t != ps.topic {
49 return nil, fmt.Errorf("test: published topic doesn't match conf igured (%s != %s)", t, ps.topic) 44 return nil, fmt.Errorf("test: published topic doesn't match conf igured (%s != %s)", t, ps.topic)
50 } 45 }
51 46
52 ids := make([]string, len(msgs)) 47 ids := make([]string, len(msgs))
53 for i, m := range msgs { 48 for i, m := range msgs {
54 ps.msgC <- m 49 ps.msgC <- m
55 ids[i] = ps.getNextMessageID() 50 ids[i] = ps.getNextMessageID()
56 } 51 }
57 return ids, nil 52 return ids, nil
58 } 53 }
59 54
60 func (ps *testPubSub) getNextMessageID() string { 55 func (ps *testPublisher) getNextMessageID() string {
61 ps.Lock() 56 ps.Lock()
62 defer ps.Unlock() 57 defer ps.Unlock()
63 58
64 id := ps.nextMessageID 59 id := ps.nextMessageID
65 ps.nextMessageID++ 60 ps.nextMessageID++
66 return fmt.Sprintf("%d", id) 61 return fmt.Sprintf("%d", id)
67 } 62 }
68 63
69 func TestConfig(t *testing.T) { 64 func TestConfig(t *testing.T) {
70 Convey(`A configuration instance`, t, func() { 65 Convey(`A configuration instance`, t, func() {
71 » » ps := &testPubSub{} 66 » » ps := &testPublisher{}
72 conf := Config{ 67 conf := Config{
73 » » » PubSub: ps, 68 » » » Publisher: ps,
74 » » » Topic: gcps.Topic("test-topic"), 69 » » » Topic: gcps.Topic("test-topic"),
75 } 70 }
76 71
77 Convey(`Will successfully validate.`, func() { 72 Convey(`Will successfully validate.`, func() {
78 So(conf.Validate(), ShouldBeNil) 73 So(conf.Validate(), ShouldBeNil)
79 }) 74 })
80 75
81 Convey(`Will not validate without a PubSub instance.`, func() { 76 Convey(`Will not validate without a PubSub instance.`, func() {
82 » » » conf.PubSub = nil 77 » » » conf.Publisher = nil
83 So(conf.Validate(), ShouldNotBeNil) 78 So(conf.Validate(), ShouldNotBeNil)
84 }) 79 })
85 80
86 Convey(`Will not validate with an empty Topic.`, func() { 81 Convey(`Will not validate with an empty Topic.`, func() {
87 conf.Topic = "" 82 conf.Topic = ""
88 So(conf.Validate(), ShouldNotBeNil) 83 So(conf.Validate(), ShouldNotBeNil)
89 }) 84 })
90 85
91 Convey(`Will not validate with an invalid Topic.`, func() { 86 Convey(`Will not validate with an invalid Topic.`, func() {
92 conf.Topic = gcps.Topic("a!") 87 conf.Topic = gcps.Topic("a!")
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
138 if err := proto.Unmarshal(data, &dataBundle); err != nil { 133 if err := proto.Unmarshal(data, &dataBundle); err != nil {
139 return nil, nil, fmt.Errorf("test: failed to unmarshal bundle: % s", err) 134 return nil, nil, fmt.Errorf("test: failed to unmarshal bundle: % s", err)
140 } 135 }
141 136
142 return &header, &dataBundle, nil 137 return &header, &dataBundle, nil
143 } 138 }
144 139
145 func TestOutput(t *testing.T) { 140 func TestOutput(t *testing.T) {
146 Convey(`An Output using a test Pub/Sub instance`, t, func() { 141 Convey(`An Output using a test Pub/Sub instance`, t, func() {
147 ctx, _ := testclock.UseTime(context.Background(), time.Date(2015 , 1, 1, 0, 0, 0, 0, time.UTC)) 142 ctx, _ := testclock.UseTime(context.Background(), time.Date(2015 , 1, 1, 0, 0, 0, 0, time.UTC))
148 » » ps := &testPubSub{ 143 » » ps := &testPublisher{
149 topic: gcps.Topic("test-topic"), 144 topic: gcps.Topic("test-topic"),
150 msgC: make(chan *pubsub.Message, 1), 145 msgC: make(chan *pubsub.Message, 1),
151 } 146 }
152 conf := Config{ 147 conf := Config{
153 » » » PubSub: ps, 148 » » » Publisher: ps,
154 » » » Topic: gcps.Topic("test-topic"), 149 » » » Topic: gcps.Topic("test-topic"),
155 } 150 }
156 o := New(ctx, conf).(*gcpsOutput) 151 o := New(ctx, conf).(*gcpsOutput)
157 So(o, ShouldNotBeNil) 152 So(o, ShouldNotBeNil)
158 defer o.Close() 153 defer o.Close()
159 154
160 bundle := &protocol.ButlerLogBundle{ 155 bundle := &protocol.ButlerLogBundle{
161 Source: "GCPS Test", 156 Source: "GCPS Test",
162 Timestamp: google.NewTimestamp(clock.Now(ctx)), 157 Timestamp: google.NewTimestamp(clock.Now(ctx)),
163 Entries: []*protocol.ButlerLogBundle_Entry{ 158 Entries: []*protocol.ButlerLogBundle_Entry{
164 {}, 159 {},
(...skipping 16 matching lines...) Expand all
181 So(st.DiscardedMessages(), ShouldEqual, 0) 176 So(st.DiscardedMessages(), ShouldEqual, 0)
182 }) 177 })
183 }) 178 })
184 179
185 Convey(`Will return an error if Publish failed.`, func() { 180 Convey(`Will return an error if Publish failed.`, func() {
186 ps.err = errors.New("test: error") 181 ps.err = errors.New("test: error")
187 So(o.SendBundle(bundle), ShouldNotBeNil) 182 So(o.SendBundle(bundle), ShouldNotBeNil)
188 }) 183 })
189 }) 184 })
190 } 185 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698