OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package ackbuffer | 5 package ackbuffer |
6 | 6 |
7 import ( | 7 import ( |
8 » "sort" | 8 » "fmt" |
9 "sync" | 9 "sync" |
10 "testing" | 10 "testing" |
11 "time" | |
12 | 11 |
13 "github.com/luci/luci-go/common/clock" | |
14 "github.com/luci/luci-go/common/clock/testclock" | 12 "github.com/luci/luci-go/common/clock/testclock" |
15 "github.com/luci/luci-go/common/errors" | 13 "github.com/luci/luci-go/common/errors" |
16 "github.com/luci/luci-go/common/stringset" | |
17 "golang.org/x/net/context" | 14 "golang.org/x/net/context" |
18 | 15 |
19 . "github.com/smartystreets/goconvey/convey" | 16 . "github.com/smartystreets/goconvey/convey" |
20 ) | 17 ) |
21 | 18 |
22 type testACK struct { | 19 type testACK struct { |
23 sync.Mutex | 20 sync.Mutex |
24 | 21 |
25 err error | 22 err error |
26 » acks stringset.Set | 23 » ackC chan []string |
| 24 » acks []string |
27 batchSize int | 25 batchSize int |
28 } | 26 } |
29 | 27 |
30 func (ps *testACK) Ack(c context.Context, acks ...string) error { | 28 func (ps *testACK) Ack(c context.Context, acks ...string) error { |
| 29 if ps.ackC != nil { |
| 30 ps.ackC <- acks |
| 31 } |
| 32 |
31 ps.Lock() | 33 ps.Lock() |
32 defer ps.Unlock() | 34 defer ps.Unlock() |
33 | 35 |
34 if ps.err != nil { | 36 if ps.err != nil { |
35 return ps.err | 37 return ps.err |
36 } | 38 } |
37 | 39 |
38 if ps.acks == nil { | |
39 ps.acks = stringset.New(0) | |
40 } | |
41 for _, ack := range acks { | 40 for _, ack := range acks { |
42 » » ps.acks.Add(ack) | 41 » » ps.acks = append(ps.acks, ack) |
43 } | 42 } |
44 return nil | 43 return nil |
45 } | 44 } |
46 | 45 |
47 func (ps *testACK) AckBatchSize() int { | 46 func (ps *testACK) AckBatchSize() int { |
48 size := ps.batchSize | 47 size := ps.batchSize |
49 if size <= 0 { | 48 if size <= 0 { |
50 size = 4 | 49 size = 4 |
51 } | 50 } |
52 return size | 51 return size |
53 } | 52 } |
54 | 53 |
55 func (ps *testACK) ackIDs() []string { | 54 func (ps *testACK) ackIDs() []string { |
56 ps.Lock() | 55 ps.Lock() |
57 defer ps.Unlock() | 56 defer ps.Unlock() |
58 | 57 |
59 » v := make([]string, 0, ps.acks.Len()) | 58 » v := make([]string, len(ps.acks)) |
60 » ps.acks.Iter(func(s string) bool { | 59 » copy(v, ps.acks) |
61 » » v = append(v, s) | |
62 » » return true | |
63 » }) | |
64 » sort.Strings(v) | |
65 return v | 60 return v |
66 } | 61 } |
67 | 62 |
68 func TestAckBuffer(t *testing.T) { | 63 func TestAckBuffer(t *testing.T) { |
69 t.Parallel() | 64 t.Parallel() |
70 | 65 |
71 Convey(`An AckBuffer configuration using a testing Pub/Sub`, t, func() { | 66 Convey(`An AckBuffer configuration using a testing Pub/Sub`, t, func() { |
72 c := context.Background() | 67 c := context.Background() |
73 c, tc := testclock.UseTime(c, testclock.TestTimeLocal) | 68 c, tc := testclock.UseTime(c, testclock.TestTimeLocal) |
74 ps := &testACK{} | 69 ps := &testACK{} |
(...skipping 20 matching lines...) Expand all Loading... |
95 // goroutines. | 90 // goroutines. |
96 closed := false | 91 closed := false |
97 closeOnce := func() { | 92 closeOnce := func() { |
98 if !closed { | 93 if !closed { |
99 closed = true | 94 closed = true |
100 ab.CloseAndFlush() | 95 ab.CloseAndFlush() |
101 } | 96 } |
102 } | 97 } |
103 defer closeOnce() | 98 defer closeOnce() |
104 | 99 |
105 » » » Convey(`Can send ACKs.`, func() { | 100 » » » Convey(`Will buffer ACKs until enough are sent.`, func()
{ |
| 101 » » » » ps.ackC = make(chan []string) |
| 102 » » » » acks := make([]string, ps.AckBatchSize()) |
| 103 |
| 104 » » » » // Fill up the entire batch, which will cause an
automatic dump. |
| 105 » » » » for i := range acks { |
| 106 » » » » » acks[i] = fmt.Sprintf("%d", i) |
| 107 » » » » » ab.Ack(acks[i]) |
| 108 » » » » } |
| 109 » » » » <-ps.ackC |
| 110 |
| 111 » » » » So(ps.ackIDs(), ShouldResemble, acks) |
| 112 » » » » So(discarded, ShouldBeNil) |
| 113 » » » }) |
| 114 |
| 115 » » » Convey(`Will buffer ACKs and send if time has expired.`,
func() { |
| 116 » » » » ps.ackC = make(chan []string) |
| 117 » » » » ab.ackReceivedC = make(chan string) |
| 118 |
| 119 » » » » acks := []string{"foo", "bar", "baz"} |
| 120 » » » » for _, v := range acks { |
| 121 » » » » » ab.Ack(v) |
| 122 |
| 123 » » » » » // Acknoweldge that all ACKs have been r
eceived before we advance |
| 124 » » » » » // our timer. This will ensure that the
timer triggers AFTER the ACKs |
| 125 » » » » » // are buffered. |
| 126 » » » » » <-ab.ackReceivedC |
| 127 » » » » } |
| 128 » » » » tc.Add(DefaultMaxBufferTime) |
| 129 |
| 130 » » » » <-ps.ackC |
| 131 » » » » So(ps.ackIDs(), ShouldResemble, acks) |
| 132 » » » » So(discarded, ShouldBeNil) |
| 133 » » » }) |
| 134 |
| 135 » » » Convey(`Will flush any remaining ACKs on close.`, func()
{ |
106 acks := []string{"foo", "bar", "baz"} | 136 acks := []string{"foo", "bar", "baz"} |
107 for _, v := range acks { | 137 for _, v := range acks { |
108 ab.Ack(v) | 138 ab.Ack(v) |
109 } | 139 } |
110 » » » » tc.Add(DefaultMaxBufferTime) | 140 » » » » closeOnce() |
111 | 141 |
112 closeOnce() | |
113 sort.Strings(acks) | |
114 So(ps.ackIDs(), ShouldResemble, acks) | 142 So(ps.ackIDs(), ShouldResemble, acks) |
115 So(discarded, ShouldBeNil) | 143 So(discarded, ShouldBeNil) |
116 }) | 144 }) |
117 | 145 |
118 » » » Convey(`Will retry on transient Pub/Sub error`, func() { | 146 » » » Convey(`Will discard the ACK if it could not be sent`, f
unc() { |
119 » » » » tc.SetTimerCallback(func(d time.Duration, t cloc
k.Timer) { | |
120 » » » » » tc.Add(d) | |
121 » » » » }) | |
122 | |
123 ps.err = errors.WrapTransient(errors.New("test e
rror")) | 147 ps.err = errors.WrapTransient(errors.New("test e
rror")) |
124 acks := []string{"foo", "bar", "baz"} | 148 acks := []string{"foo", "bar", "baz"} |
125 for _, v := range acks { | 149 for _, v := range acks { |
126 ab.Ack(v) | 150 ab.Ack(v) |
127 } | 151 } |
| 152 closeOnce() |
128 | 153 |
129 » » » » Convey(`And eventually discard the ACK.`, func()
{ | 154 » » » » So(discarded, ShouldResemble, acks) |
130 » » » » » closeOnce() | |
131 | |
132 » » » » » sort.Strings(acks) | |
133 » » » » » sort.Strings(discarded) | |
134 » » » » » So(discarded, ShouldResemble, acks) | |
135 » » » » }) | |
136 }) | 155 }) |
137 }) | 156 }) |
138 }) | 157 }) |
139 } | 158 } |
OLD | NEW |