| OLD | NEW | 
|---|
| (Empty) |  | 
|  | 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 
|  | 2 // Use of this source code is governed by a BSD-style license that can be | 
|  | 3 // found in the LICENSE file. | 
|  | 4 | 
|  | 5 package gcps | 
|  | 6 | 
|  | 7 import ( | 
|  | 8         "bytes" | 
|  | 9         "compress/zlib" | 
|  | 10         "errors" | 
|  | 11         "fmt" | 
|  | 12         "io/ioutil" | 
|  | 13         "sync" | 
|  | 14         "testing" | 
|  | 15         "time" | 
|  | 16 | 
|  | 17         "github.com/golang/protobuf/proto" | 
|  | 18         "github.com/luci/luci-go/common/clock" | 
|  | 19         "github.com/luci/luci-go/common/clock/testclock" | 
|  | 20         "github.com/luci/luci-go/common/gcloud/gcps" | 
|  | 21         "github.com/luci/luci-go/common/logdog/protocol" | 
|  | 22         "github.com/luci/luci-go/common/proto/google" | 
|  | 23         "github.com/luci/luci-go/common/recordio" | 
|  | 24         . "github.com/smartystreets/goconvey/convey" | 
|  | 25         "golang.org/x/net/context" | 
|  | 26         "google.golang.org/cloud/pubsub" | 
|  | 27 ) | 
|  | 28 | 
|  | 29 type testPubSub struct { | 
|  | 30         sync.Mutex | 
|  | 31 | 
|  | 32         err   error | 
|  | 33         topic gcps.Topic | 
|  | 34 | 
|  | 35         msgC          chan *pubsub.Message | 
|  | 36         nextMessageID int | 
|  | 37 } | 
|  | 38 | 
|  | 39 func (*testPubSub) TopicExists(gcps.Topic) (bool, error)                   { pan
     ic("not implemented") } | 
|  | 40 func (*testPubSub) SubExists(gcps.Subscription) (bool, error)              { pan
     ic("not implemented") } | 
|  | 41 func (*testPubSub) Pull(gcps.Subscription, int) ([]*pubsub.Message, error) { pan
     ic("not implemented") } | 
|  | 42 func (*testPubSub) Ack(gcps.Subscription, ...string) error                 { pan
     ic("not implemented") } | 
|  | 43 | 
|  | 44 func (ps *testPubSub) Publish(t gcps.Topic, msgs ...*pubsub.Message) ([]string, 
     error) { | 
|  | 45         if ps.err != nil { | 
|  | 46                 return nil, ps.err | 
|  | 47         } | 
|  | 48         if t != ps.topic { | 
|  | 49                 return nil, fmt.Errorf("test: published topic doesn't match conf
     igured (%s != %s)", t, ps.topic) | 
|  | 50         } | 
|  | 51 | 
|  | 52         ids := make([]string, len(msgs)) | 
|  | 53         for i, m := range msgs { | 
|  | 54                 ps.msgC <- m | 
|  | 55                 ids[i] = ps.getNextMessageID() | 
|  | 56         } | 
|  | 57         return ids, nil | 
|  | 58 } | 
|  | 59 | 
|  | 60 func (ps *testPubSub) getNextMessageID() string { | 
|  | 61         ps.Lock() | 
|  | 62         defer ps.Unlock() | 
|  | 63 | 
|  | 64         id := ps.nextMessageID | 
|  | 65         ps.nextMessageID++ | 
|  | 66         return fmt.Sprintf("%d", id) | 
|  | 67 } | 
|  | 68 | 
|  | 69 func TestConfig(t *testing.T) { | 
|  | 70         Convey(`A configuration instance`, t, func() { | 
|  | 71                 ps := &testPubSub{} | 
|  | 72                 conf := Config{ | 
|  | 73                         PubSub: ps, | 
|  | 74                         Topic:  gcps.Topic("test-topic"), | 
|  | 75                 } | 
|  | 76 | 
|  | 77                 Convey(`Will successfully validate.`, func() { | 
|  | 78                         So(conf.Validate(), ShouldBeNil) | 
|  | 79                 }) | 
|  | 80 | 
|  | 81                 Convey(`Will not validate without a PubSub instance.`, func() { | 
|  | 82                         conf.PubSub = nil | 
|  | 83                         So(conf.Validate(), ShouldNotBeNil) | 
|  | 84                 }) | 
|  | 85 | 
|  | 86                 Convey(`Will not validate with an empty Topic.`, func() { | 
|  | 87                         conf.Topic = "" | 
|  | 88                         So(conf.Validate(), ShouldNotBeNil) | 
|  | 89                 }) | 
|  | 90 | 
|  | 91                 Convey(`Will not validate with an invalid Topic.`, func() { | 
|  | 92                         conf.Topic = gcps.Topic("a!") | 
|  | 93                         So(conf.Topic.Validate(), ShouldNotBeNil) | 
|  | 94                         So(conf.Validate(), ShouldNotBeNil) | 
|  | 95                 }) | 
|  | 96         }) | 
|  | 97 } | 
|  | 98 | 
|  | 99 func deconstructMessage(msg *pubsub.Message) (*protocol.ButlerMetadata, *protoco
     l.ButlerLogBundle, error) { | 
|  | 100         fr := recordio.NewReader(bytes.NewBuffer(msg.Data), gcps.MaxPublishSize) | 
|  | 101 | 
|  | 102         // Validate header frame. | 
|  | 103         headerBytes, err := fr.ReadFrameAll() | 
|  | 104         if err != nil { | 
|  | 105                 return nil, nil, fmt.Errorf("test: failed to read header frame: 
     %s", err) | 
|  | 106         } | 
|  | 107 | 
|  | 108         header := protocol.ButlerMetadata{} | 
|  | 109         if err := proto.Unmarshal(headerBytes, &header); err != nil { | 
|  | 110                 return nil, nil, fmt.Errorf("test: failed to unmarshal header: %
     s", err) | 
|  | 111         } | 
|  | 112 | 
|  | 113         if header.Type != protocol.ButlerMetadata_ButlerLogBundle { | 
|  | 114                 return nil, nil, fmt.Errorf("test: unknown frame data type: %v",
      header.Type) | 
|  | 115         } | 
|  | 116 | 
|  | 117         // Validate data frame. | 
|  | 118         data, err := fr.ReadFrameAll() | 
|  | 119         if err != nil { | 
|  | 120                 return nil, nil, fmt.Errorf("test: failed to read data frame: %s
     ", err) | 
|  | 121         } | 
|  | 122 | 
|  | 123         switch header.Compression { | 
|  | 124         case protocol.ButlerMetadata_ZLIB: | 
|  | 125                 r, err := zlib.NewReader(bytes.NewReader(data)) | 
|  | 126                 if err != nil { | 
|  | 127                         return nil, nil, fmt.Errorf("test: failed to create zlib
      reader: %s", err) | 
|  | 128                 } | 
|  | 129                 defer r.Close() | 
|  | 130 | 
|  | 131                 data, err = ioutil.ReadAll(r) | 
|  | 132                 if err != nil { | 
|  | 133                         return nil, nil, fmt.Errorf("test: failed to read compre
     ssed data: %s", err) | 
|  | 134                 } | 
|  | 135         } | 
|  | 136 | 
|  | 137         dataBundle := protocol.ButlerLogBundle{} | 
|  | 138         if err := proto.Unmarshal(data, &dataBundle); err != nil { | 
|  | 139                 return nil, nil, fmt.Errorf("test: failed to unmarshal bundle: %
     s", err) | 
|  | 140         } | 
|  | 141 | 
|  | 142         return &header, &dataBundle, nil | 
|  | 143 } | 
|  | 144 | 
|  | 145 func TestOutput(t *testing.T) { | 
|  | 146         Convey(`An Output using a test Pub/Sub instance`, t, func() { | 
|  | 147                 ctx, _ := testclock.UseTime(context.Background(), time.Date(2015
     , 1, 1, 0, 0, 0, 0, time.UTC)) | 
|  | 148                 ps := &testPubSub{ | 
|  | 149                         topic: gcps.Topic("test-topic"), | 
|  | 150                         msgC:  make(chan *pubsub.Message, 1), | 
|  | 151                 } | 
|  | 152                 conf := Config{ | 
|  | 153                         PubSub: ps, | 
|  | 154                         Topic:  gcps.Topic("test-topic"), | 
|  | 155                 } | 
|  | 156                 o := New(ctx, conf).(*gcpsOutput) | 
|  | 157                 So(o, ShouldNotBeNil) | 
|  | 158                 defer o.Close() | 
|  | 159 | 
|  | 160                 bundle := &protocol.ButlerLogBundle{ | 
|  | 161                         Source:    "GCPS Test", | 
|  | 162                         Timestamp: google.NewTimestamp(clock.Now(ctx)), | 
|  | 163                         Entries: []*protocol.ButlerLogBundle_Entry{ | 
|  | 164                                 {}, | 
|  | 165                         }, | 
|  | 166                 } | 
|  | 167 | 
|  | 168                 Convey(`Can send/receive a bundle.`, func() { | 
|  | 169                         So(o.SendBundle(bundle), ShouldBeNil) | 
|  | 170 | 
|  | 171                         h, b, err := deconstructMessage(<-ps.msgC) | 
|  | 172                         So(err, ShouldBeNil) | 
|  | 173                         So(h.Compression, ShouldEqual, protocol.ButlerMetadata_N
     ONE) | 
|  | 174                         So(b, ShouldResemble, bundle) | 
|  | 175 | 
|  | 176                         Convey(`And records stats.`, func() { | 
|  | 177                                 st := o.Stats() | 
|  | 178                                 So(st.Errors(), ShouldEqual, 0) | 
|  | 179                                 So(st.SentBytes(), ShouldBeGreaterThan, 0) | 
|  | 180                                 So(st.SentMessages(), ShouldEqual, 1) | 
|  | 181                                 So(st.DiscardedMessages(), ShouldEqual, 0) | 
|  | 182                         }) | 
|  | 183                 }) | 
|  | 184 | 
|  | 185                 Convey(`Will return an error if Publish failed.`, func() { | 
|  | 186                         ps.err = errors.New("test: error") | 
|  | 187                         So(o.SendBundle(bundle), ShouldNotBeNil) | 
|  | 188                 }) | 
|  | 189         }) | 
|  | 190 } | 
| OLD | NEW | 
|---|