| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package ackbuffer | 5 package ackbuffer |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 » "sort" | 8 » "fmt" |
| 9 "sync" | 9 "sync" |
| 10 "testing" | 10 "testing" |
| 11 "time" | |
| 12 | 11 |
| 13 "github.com/luci/luci-go/common/clock" | |
| 14 "github.com/luci/luci-go/common/clock/testclock" | 12 "github.com/luci/luci-go/common/clock/testclock" |
| 15 "github.com/luci/luci-go/common/errors" | 13 "github.com/luci/luci-go/common/errors" |
| 16 "github.com/luci/luci-go/common/stringset" | |
| 17 "golang.org/x/net/context" | 14 "golang.org/x/net/context" |
| 18 | 15 |
| 19 . "github.com/smartystreets/goconvey/convey" | 16 . "github.com/smartystreets/goconvey/convey" |
| 20 ) | 17 ) |
| 21 | 18 |
| 22 type testACK struct { | 19 type testACK struct { |
| 23 sync.Mutex | 20 sync.Mutex |
| 24 | 21 |
| 25 err error | 22 err error |
| 26 » acks stringset.Set | 23 » ackC chan []string |
| 24 » acks []string |
| 27 batchSize int | 25 batchSize int |
| 28 } | 26 } |
| 29 | 27 |
| 30 func (ps *testACK) Ack(c context.Context, acks ...string) error { | 28 func (ps *testACK) Ack(c context.Context, acks ...string) error { |
| 29 if ps.ackC != nil { |
| 30 ps.ackC <- acks |
| 31 } |
| 32 |
| 31 ps.Lock() | 33 ps.Lock() |
| 32 defer ps.Unlock() | 34 defer ps.Unlock() |
| 33 | 35 |
| 34 if ps.err != nil { | 36 if ps.err != nil { |
| 35 return ps.err | 37 return ps.err |
| 36 } | 38 } |
| 37 | 39 |
| 38 if ps.acks == nil { | |
| 39 ps.acks = stringset.New(0) | |
| 40 } | |
| 41 for _, ack := range acks { | 40 for _, ack := range acks { |
| 42 » » ps.acks.Add(ack) | 41 » » ps.acks = append(ps.acks, ack) |
| 43 } | 42 } |
| 44 return nil | 43 return nil |
| 45 } | 44 } |
| 46 | 45 |
| 47 func (ps *testACK) AckBatchSize() int { | 46 func (ps *testACK) AckBatchSize() int { |
| 48 size := ps.batchSize | 47 size := ps.batchSize |
| 49 if size <= 0 { | 48 if size <= 0 { |
| 50 size = 4 | 49 size = 4 |
| 51 } | 50 } |
| 52 return size | 51 return size |
| 53 } | 52 } |
| 54 | 53 |
| 55 func (ps *testACK) ackIDs() []string { | 54 func (ps *testACK) ackIDs() []string { |
| 56 ps.Lock() | 55 ps.Lock() |
| 57 defer ps.Unlock() | 56 defer ps.Unlock() |
| 58 | 57 |
| 59 » v := make([]string, 0, ps.acks.Len()) | 58 » v := make([]string, len(ps.acks)) |
| 60 » ps.acks.Iter(func(s string) bool { | 59 » copy(v, ps.acks) |
| 61 » » v = append(v, s) | |
| 62 » » return true | |
| 63 » }) | |
| 64 » sort.Strings(v) | |
| 65 return v | 60 return v |
| 66 } | 61 } |
| 67 | 62 |
| 68 func TestAckBuffer(t *testing.T) { | 63 func TestAckBuffer(t *testing.T) { |
| 69 t.Parallel() | 64 t.Parallel() |
| 70 | 65 |
| 71 Convey(`An AckBuffer configuration using a testing Pub/Sub`, t, func() { | 66 Convey(`An AckBuffer configuration using a testing Pub/Sub`, t, func() { |
| 72 c := context.Background() | 67 c := context.Background() |
| 73 c, tc := testclock.UseTime(c, testclock.TestTimeLocal) | 68 c, tc := testclock.UseTime(c, testclock.TestTimeLocal) |
| 74 ps := &testACK{} | 69 ps := &testACK{} |
| (...skipping 20 matching lines...) Expand all Loading... |
| 95 // goroutines. | 90 // goroutines. |
| 96 closed := false | 91 closed := false |
| 97 closeOnce := func() { | 92 closeOnce := func() { |
| 98 if !closed { | 93 if !closed { |
| 99 closed = true | 94 closed = true |
| 100 ab.CloseAndFlush() | 95 ab.CloseAndFlush() |
| 101 } | 96 } |
| 102 } | 97 } |
| 103 defer closeOnce() | 98 defer closeOnce() |
| 104 | 99 |
| 105 » » » Convey(`Can send ACKs.`, func() { | 100 » » » Convey(`Will buffer ACKs until enough are sent.`, func()
{ |
| 101 » » » » ps.ackC = make(chan []string) |
| 102 » » » » acks := make([]string, ps.AckBatchSize()) |
| 103 |
| 104 » » » » // Fill up the entire batch, which will cause an
automatic dump. |
| 105 » » » » for i := range acks { |
| 106 » » » » » acks[i] = fmt.Sprintf("%d", i) |
| 107 » » » » » ab.Ack(acks[i]) |
| 108 » » » » } |
| 109 » » » » <-ps.ackC |
| 110 |
| 111 » » » » So(ps.ackIDs(), ShouldResemble, acks) |
| 112 » » » » So(discarded, ShouldBeNil) |
| 113 » » » }) |
| 114 |
| 115 » » » Convey(`Will buffer ACKs and send if time has expired.`,
func() { |
| 116 » » » » ps.ackC = make(chan []string) |
| 117 » » » » ab.ackReceivedC = make(chan string) |
| 118 |
| 119 » » » » acks := []string{"foo", "bar", "baz"} |
| 120 » » » » for _, v := range acks { |
| 121 » » » » » ab.Ack(v) |
| 122 |
| 123 » » » » » // Acknoweldge that all ACKs have been r
eceived before we advance |
| 124 » » » » » // our timer. This will ensure that the
timer triggers AFTER the ACKs |
| 125 » » » » » // are buffered. |
| 126 » » » » » <-ab.ackReceivedC |
| 127 » » » » } |
| 128 » » » » tc.Add(DefaultMaxBufferTime) |
| 129 |
| 130 » » » » <-ps.ackC |
| 131 » » » » So(ps.ackIDs(), ShouldResemble, acks) |
| 132 » » » » So(discarded, ShouldBeNil) |
| 133 » » » }) |
| 134 |
| 135 » » » Convey(`Will flush any remaining ACKs on close.`, func()
{ |
| 106 acks := []string{"foo", "bar", "baz"} | 136 acks := []string{"foo", "bar", "baz"} |
| 107 for _, v := range acks { | 137 for _, v := range acks { |
| 108 ab.Ack(v) | 138 ab.Ack(v) |
| 109 } | 139 } |
| 110 » » » » tc.Add(DefaultMaxBufferTime) | 140 » » » » closeOnce() |
| 111 | 141 |
| 112 closeOnce() | |
| 113 sort.Strings(acks) | |
| 114 So(ps.ackIDs(), ShouldResemble, acks) | 142 So(ps.ackIDs(), ShouldResemble, acks) |
| 115 So(discarded, ShouldBeNil) | 143 So(discarded, ShouldBeNil) |
| 116 }) | 144 }) |
| 117 | 145 |
| 118 » » » Convey(`Will retry on transient Pub/Sub error`, func() { | 146 » » » Convey(`Will discard the ACK if it could not be sent`, f
unc() { |
| 119 » » » » tc.SetTimerCallback(func(d time.Duration, t cloc
k.Timer) { | |
| 120 » » » » » tc.Add(d) | |
| 121 » » » » }) | |
| 122 | |
| 123 ps.err = errors.WrapTransient(errors.New("test e
rror")) | 147 ps.err = errors.WrapTransient(errors.New("test e
rror")) |
| 124 acks := []string{"foo", "bar", "baz"} | 148 acks := []string{"foo", "bar", "baz"} |
| 125 for _, v := range acks { | 149 for _, v := range acks { |
| 126 ab.Ack(v) | 150 ab.Ack(v) |
| 127 } | 151 } |
| 152 closeOnce() |
| 128 | 153 |
| 129 » » » » Convey(`And eventually discard the ACK.`, func()
{ | 154 » » » » So(discarded, ShouldResemble, acks) |
| 130 » » » » » closeOnce() | |
| 131 | |
| 132 » » » » » sort.Strings(acks) | |
| 133 » » » » » sort.Strings(discarded) | |
| 134 » » » » » So(discarded, ShouldResemble, acks) | |
| 135 » » » » }) | |
| 136 }) | 155 }) |
| 137 }) | 156 }) |
| 138 }) | 157 }) |
| 139 } | 158 } |
| OLD | NEW |