| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package pubsub | 5 package pubsub |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "compress/zlib" | 9 "compress/zlib" |
| 10 "errors" | 10 "errors" |
| 11 "fmt" | 11 "fmt" |
| 12 "io/ioutil" | 12 "io/ioutil" |
| 13 "sync" | 13 "sync" |
| 14 "testing" | 14 "testing" |
| 15 "time" | 15 "time" |
| 16 | 16 |
| 17 "github.com/golang/protobuf/proto" | 17 "github.com/golang/protobuf/proto" |
| 18 "github.com/luci/luci-go/common/clock" | 18 "github.com/luci/luci-go/common/clock" |
| 19 "github.com/luci/luci-go/common/clock/testclock" | 19 "github.com/luci/luci-go/common/clock/testclock" |
| 20 » "github.com/luci/luci-go/common/gcloud/pubsub" | 20 » gcps "github.com/luci/luci-go/common/gcloud/pubsub" |
| 21 "github.com/luci/luci-go/common/proto/google" | 21 "github.com/luci/luci-go/common/proto/google" |
| 22 "github.com/luci/luci-go/common/proto/logdog/logpb" | 22 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 23 "github.com/luci/luci-go/common/recordio" | 23 "github.com/luci/luci-go/common/recordio" |
| 24 . "github.com/smartystreets/goconvey/convey" | 24 . "github.com/smartystreets/goconvey/convey" |
| 25 "golang.org/x/net/context" | 25 "golang.org/x/net/context" |
| 26 "google.golang.org/cloud/pubsub" |
| 26 ) | 27 ) |
| 27 | 28 |
| 28 type testPublisher struct { | 29 type testTopic struct { |
| 29 sync.Mutex | 30 sync.Mutex |
| 30 | 31 |
| 31 » err error | 32 » err error |
| 32 » topic pubsub.Topic | |
| 33 | 33 |
| 34 msgC chan *pubsub.Message | 34 msgC chan *pubsub.Message |
| 35 nextMessageID int | 35 nextMessageID int |
| 36 } | 36 } |
| 37 | 37 |
| 38 func (ps *testPublisher) Publish(c context.Context, t pubsub.Topic, msgs ...*pub
sub.Message) ([]string, error) { | 38 func (t *testTopic) Name() string { return "test" } |
| 39 » if ps.err != nil { | 39 |
| 40 » » return nil, ps.err | 40 func (t *testTopic) Publish(c context.Context, msgs ...*pubsub.Message) ([]strin
g, error) { |
| 41 » } | 41 » if t.err != nil { |
| 42 » if t != ps.topic { | 42 » » return nil, t.err |
| 43 » » return nil, fmt.Errorf("test: published topic doesn't match conf
igured (%s != %s)", t, ps.topic) | |
| 44 } | 43 } |
| 45 | 44 |
| 46 ids := make([]string, len(msgs)) | 45 ids := make([]string, len(msgs)) |
| 47 for i, m := range msgs { | 46 for i, m := range msgs { |
| 48 » » ps.msgC <- m | 47 » » t.msgC <- m |
| 49 » » ids[i] = ps.getNextMessageID() | 48 » » ids[i] = t.getNextMessageID() |
| 50 } | 49 } |
| 51 return ids, nil | 50 return ids, nil |
| 52 } | 51 } |
| 53 | 52 |
| 54 func (ps *testPublisher) getNextMessageID() string { | 53 func (t *testTopic) getNextMessageID() string { |
| 55 » ps.Lock() | 54 » t.Lock() |
| 56 » defer ps.Unlock() | 55 » defer t.Unlock() |
| 57 | 56 |
| 58 » id := ps.nextMessageID | 57 » id := t.nextMessageID |
| 59 » ps.nextMessageID++ | 58 » t.nextMessageID++ |
| 60 return fmt.Sprintf("%d", id) | 59 return fmt.Sprintf("%d", id) |
| 61 } | 60 } |
| 62 | 61 |
| 63 func TestConfig(t *testing.T) { | |
| 64 Convey(`A configuration instance`, t, func() { | |
| 65 ps := &testPublisher{} | |
| 66 conf := Config{ | |
| 67 Publisher: ps, | |
| 68 Topic: pubsub.NewTopic("test-project", "test-topic")
, | |
| 69 } | |
| 70 | |
| 71 Convey(`Will successfully validate.`, func() { | |
| 72 So(conf.Validate(), ShouldBeNil) | |
| 73 }) | |
| 74 | |
| 75 Convey(`Will not validate without a PubSub instance.`, func() { | |
| 76 conf.Publisher = nil | |
| 77 So(conf.Validate(), ShouldNotBeNil) | |
| 78 }) | |
| 79 | |
| 80 Convey(`Will not validate with an empty Topic.`, func() { | |
| 81 conf.Topic = "" | |
| 82 So(conf.Validate(), ShouldNotBeNil) | |
| 83 }) | |
| 84 | |
| 85 Convey(`Will not validate with an invalid Topic.`, func() { | |
| 86 conf.Topic = pubsub.NewTopic("test-project", "a!") | |
| 87 So(conf.Topic.Validate(), ShouldNotBeNil) | |
| 88 So(conf.Validate(), ShouldNotBeNil) | |
| 89 }) | |
| 90 }) | |
| 91 } | |
| 92 | |
| 93 func deconstructMessage(msg *pubsub.Message) (*logpb.ButlerMetadata, *logpb.Butl
erLogBundle, error) { | 62 func deconstructMessage(msg *pubsub.Message) (*logpb.ButlerMetadata, *logpb.Butl
erLogBundle, error) { |
| 94 » fr := recordio.NewReader(bytes.NewBuffer(msg.Data), pubsub.MaxPublishSiz
e) | 63 » fr := recordio.NewReader(bytes.NewBuffer(msg.Data), gcps.MaxPublishSize) |
| 95 | 64 |
| 96 // Validate header frame. | 65 // Validate header frame. |
| 97 headerBytes, err := fr.ReadFrameAll() | 66 headerBytes, err := fr.ReadFrameAll() |
| 98 if err != nil { | 67 if err != nil { |
| 99 return nil, nil, fmt.Errorf("test: failed to read header frame:
%s", err) | 68 return nil, nil, fmt.Errorf("test: failed to read header frame:
%s", err) |
| 100 } | 69 } |
| 101 | 70 |
| 102 header := logpb.ButlerMetadata{} | 71 header := logpb.ButlerMetadata{} |
| 103 if err := proto.Unmarshal(headerBytes, &header); err != nil { | 72 if err := proto.Unmarshal(headerBytes, &header); err != nil { |
| 104 return nil, nil, fmt.Errorf("test: failed to unmarshal header: %
s", err) | 73 return nil, nil, fmt.Errorf("test: failed to unmarshal header: %
s", err) |
| (...skipping 27 matching lines...) Expand all Loading... |
| 132 if err := proto.Unmarshal(data, &dataBundle); err != nil { | 101 if err := proto.Unmarshal(data, &dataBundle); err != nil { |
| 133 return nil, nil, fmt.Errorf("test: failed to unmarshal bundle: %
s", err) | 102 return nil, nil, fmt.Errorf("test: failed to unmarshal bundle: %
s", err) |
| 134 } | 103 } |
| 135 | 104 |
| 136 return &header, &dataBundle, nil | 105 return &header, &dataBundle, nil |
| 137 } | 106 } |
| 138 | 107 |
| 139 func TestOutput(t *testing.T) { | 108 func TestOutput(t *testing.T) { |
| 140 Convey(`An Output using a test Pub/Sub instance`, t, func() { | 109 Convey(`An Output using a test Pub/Sub instance`, t, func() { |
| 141 ctx, _ := testclock.UseTime(context.Background(), time.Date(2015
, 1, 1, 0, 0, 0, 0, time.UTC)) | 110 ctx, _ := testclock.UseTime(context.Background(), time.Date(2015
, 1, 1, 0, 0, 0, 0, time.UTC)) |
| 142 » » ps := &testPublisher{ | 111 » » tt := &testTopic{ |
| 143 » » » topic: pubsub.NewTopic("test-project", "test-topic"), | 112 » » » msgC: make(chan *pubsub.Message, 1), |
| 144 » » » msgC: make(chan *pubsub.Message, 1), | |
| 145 } | 113 } |
| 146 conf := Config{ | 114 conf := Config{ |
| 147 » » » Publisher: ps, | 115 » » » Topic: tt, |
| 148 » » » Topic: pubsub.NewTopic("test-project", "test-topic")
, | |
| 149 } | 116 } |
| 150 o := New(ctx, conf).(*pubSubOutput) | 117 o := New(ctx, conf).(*pubSubOutput) |
| 151 So(o, ShouldNotBeNil) | 118 So(o, ShouldNotBeNil) |
| 152 defer o.Close() | 119 defer o.Close() |
| 153 | 120 |
| 154 bundle := &logpb.ButlerLogBundle{ | 121 bundle := &logpb.ButlerLogBundle{ |
| 155 Source: "Pub/Sub Test", | 122 Source: "Pub/Sub Test", |
| 156 Timestamp: google.NewTimestamp(clock.Now(ctx)), | 123 Timestamp: google.NewTimestamp(clock.Now(ctx)), |
| 157 Entries: []*logpb.ButlerLogBundle_Entry{ | 124 Entries: []*logpb.ButlerLogBundle_Entry{ |
| 158 {}, | 125 {}, |
| 159 }, | 126 }, |
| 160 } | 127 } |
| 161 | 128 |
| 162 Convey(`Can send/receive a bundle.`, func() { | 129 Convey(`Can send/receive a bundle.`, func() { |
| 163 So(o.SendBundle(bundle), ShouldBeNil) | 130 So(o.SendBundle(bundle), ShouldBeNil) |
| 164 | 131 |
| 165 » » » h, b, err := deconstructMessage(<-ps.msgC) | 132 » » » h, b, err := deconstructMessage(<-tt.msgC) |
| 166 So(err, ShouldBeNil) | 133 So(err, ShouldBeNil) |
| 167 So(h.Compression, ShouldEqual, logpb.ButlerMetadata_NONE
) | 134 So(h.Compression, ShouldEqual, logpb.ButlerMetadata_NONE
) |
| 168 So(b, ShouldResemble, bundle) | 135 So(b, ShouldResemble, bundle) |
| 169 | 136 |
| 170 Convey(`And records stats.`, func() { | 137 Convey(`And records stats.`, func() { |
| 171 st := o.Stats() | 138 st := o.Stats() |
| 172 So(st.Errors(), ShouldEqual, 0) | 139 So(st.Errors(), ShouldEqual, 0) |
| 173 So(st.SentBytes(), ShouldBeGreaterThan, 0) | 140 So(st.SentBytes(), ShouldBeGreaterThan, 0) |
| 174 So(st.SentMessages(), ShouldEqual, 1) | 141 So(st.SentMessages(), ShouldEqual, 1) |
| 175 So(st.DiscardedMessages(), ShouldEqual, 0) | 142 So(st.DiscardedMessages(), ShouldEqual, 0) |
| 176 }) | 143 }) |
| 177 }) | 144 }) |
| 178 | 145 |
| 179 Convey(`Will return an error if Publish failed.`, func() { | 146 Convey(`Will return an error if Publish failed.`, func() { |
| 180 » » » ps.err = errors.New("test: error") | 147 » » » tt.err = errors.New("test: error") |
| 181 So(o.SendBundle(bundle), ShouldNotBeNil) | 148 So(o.SendBundle(bundle), ShouldNotBeNil) |
| 182 }) | 149 }) |
| 183 }) | 150 }) |
| 184 } | 151 } |
| OLD | NEW |