| OLD | NEW |
| (Empty) |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package ackbuffer | |
| 6 | |
| 7 import ( | |
| 8 "fmt" | |
| 9 "sync" | |
| 10 "testing" | |
| 11 | |
| 12 "github.com/luci/luci-go/common/clock/testclock" | |
| 13 "github.com/luci/luci-go/common/errors" | |
| 14 "golang.org/x/net/context" | |
| 15 | |
| 16 . "github.com/smartystreets/goconvey/convey" | |
| 17 ) | |
| 18 | |
| 19 type testACK struct { | |
| 20 sync.Mutex | |
| 21 | |
| 22 err error | |
| 23 ackC chan []string | |
| 24 acks []string | |
| 25 batchSize int | |
| 26 } | |
| 27 | |
| 28 func (ps *testACK) Ack(c context.Context, acks ...string) error { | |
| 29 if ps.ackC != nil { | |
| 30 ps.ackC <- acks | |
| 31 } | |
| 32 | |
| 33 ps.Lock() | |
| 34 defer ps.Unlock() | |
| 35 | |
| 36 if ps.err != nil { | |
| 37 return ps.err | |
| 38 } | |
| 39 | |
| 40 for _, ack := range acks { | |
| 41 ps.acks = append(ps.acks, ack) | |
| 42 } | |
| 43 return nil | |
| 44 } | |
| 45 | |
| 46 func (ps *testACK) AckBatchSize() int { | |
| 47 size := ps.batchSize | |
| 48 if size <= 0 { | |
| 49 size = 4 | |
| 50 } | |
| 51 return size | |
| 52 } | |
| 53 | |
| 54 func (ps *testACK) ackIDs() []string { | |
| 55 ps.Lock() | |
| 56 defer ps.Unlock() | |
| 57 | |
| 58 v := make([]string, len(ps.acks)) | |
| 59 copy(v, ps.acks) | |
| 60 return v | |
| 61 } | |
| 62 | |
| 63 func TestAckBuffer(t *testing.T) { | |
| 64 t.Parallel() | |
| 65 | |
| 66 Convey(`An AckBuffer configuration using a testing Pub/Sub`, t, func() { | |
| 67 c := context.Background() | |
| 68 c, tc := testclock.UseTime(c, testclock.TestTimeLocal) | |
| 69 ps := &testACK{} | |
| 70 | |
| 71 var discarded []string | |
| 72 var discardedMu sync.Mutex | |
| 73 | |
| 74 cfg := Config{ | |
| 75 Ack: ps, | |
| 76 DiscardCallback: func(acks []string) { | |
| 77 discardedMu.Lock() | |
| 78 defer discardedMu.Unlock() | |
| 79 | |
| 80 discarded = append(discarded, acks...) | |
| 81 }, | |
| 82 } | |
| 83 | |
| 84 Convey(`Can instantiate an AckBuffer`, func() { | |
| 85 ab := New(c, cfg) | |
| 86 So(ab, ShouldNotBeNil) | |
| 87 | |
| 88 // Our tests will close/flush the buffer to synchronize.
However, if they | |
| 89 // don't, make sure we do so we don't spawn a bunch of f
loating | |
| 90 // goroutines. | |
| 91 closed := false | |
| 92 closeOnce := func() { | |
| 93 if !closed { | |
| 94 closed = true | |
| 95 ab.CloseAndFlush() | |
| 96 } | |
| 97 } | |
| 98 defer closeOnce() | |
| 99 | |
| 100 Convey(`Will buffer ACKs until enough are sent.`, func()
{ | |
| 101 ps.ackC = make(chan []string) | |
| 102 acks := make([]string, ps.AckBatchSize()) | |
| 103 | |
| 104 // Fill up the entire batch, which will cause an
automatic dump. | |
| 105 for i := range acks { | |
| 106 acks[i] = fmt.Sprintf("%d", i) | |
| 107 ab.Ack(acks[i]) | |
| 108 } | |
| 109 <-ps.ackC | |
| 110 | |
| 111 So(ps.ackIDs(), ShouldResemble, acks) | |
| 112 So(discarded, ShouldBeNil) | |
| 113 }) | |
| 114 | |
| 115 Convey(`Will buffer ACKs and send if time has expired.`,
func() { | |
| 116 ps.ackC = make(chan []string) | |
| 117 ab.ackReceivedC = make(chan string) | |
| 118 | |
| 119 acks := []string{"foo", "bar", "baz"} | |
| 120 for _, v := range acks { | |
| 121 ab.Ack(v) | |
| 122 | |
| 123 // Acknoweldge that all ACKs have been r
eceived before we advance | |
| 124 // our timer. This will ensure that the
timer triggers AFTER the ACKs | |
| 125 // are buffered. | |
| 126 <-ab.ackReceivedC | |
| 127 } | |
| 128 tc.Add(DefaultMaxBufferTime) | |
| 129 | |
| 130 <-ps.ackC | |
| 131 So(ps.ackIDs(), ShouldResemble, acks) | |
| 132 So(discarded, ShouldBeNil) | |
| 133 }) | |
| 134 | |
| 135 Convey(`Will flush any remaining ACKs on close.`, func()
{ | |
| 136 acks := []string{"foo", "bar", "baz"} | |
| 137 for _, v := range acks { | |
| 138 ab.Ack(v) | |
| 139 } | |
| 140 closeOnce() | |
| 141 | |
| 142 So(ps.ackIDs(), ShouldResemble, acks) | |
| 143 So(discarded, ShouldBeNil) | |
| 144 }) | |
| 145 | |
| 146 Convey(`Will discard the ACK if it could not be sent`, f
unc() { | |
| 147 ps.err = errors.WrapTransient(errors.New("test e
rror")) | |
| 148 acks := []string{"foo", "bar", "baz"} | |
| 149 for _, v := range acks { | |
| 150 ab.Ack(v) | |
| 151 } | |
| 152 closeOnce() | |
| 153 | |
| 154 So(discarded, ShouldResemble, acks) | |
| 155 }) | |
| 156 }) | |
| 157 }) | |
| 158 } | |
| OLD | NEW |