Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(213)

Side by Side Diff: common/gcloud/pubsub/ackbuffer/ackbuffer_test.go

Issue 1679023005: Add Context cancellation to clock. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Actually upload the patch. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package ackbuffer 5 package ackbuffer
6 6
7 import ( 7 import (
8 » "sort" 8 » "fmt"
9 "sync" 9 "sync"
10 "testing" 10 "testing"
11 "time"
12 11
13 "github.com/luci/luci-go/common/clock"
14 "github.com/luci/luci-go/common/clock/testclock" 12 "github.com/luci/luci-go/common/clock/testclock"
15 "github.com/luci/luci-go/common/errors" 13 "github.com/luci/luci-go/common/errors"
16 "github.com/luci/luci-go/common/stringset"
17 "golang.org/x/net/context" 14 "golang.org/x/net/context"
18 15
19 . "github.com/smartystreets/goconvey/convey" 16 . "github.com/smartystreets/goconvey/convey"
20 ) 17 )
21 18
22 type testACK struct { 19 type testACK struct {
23 sync.Mutex 20 sync.Mutex
24 21
25 err error 22 err error
26 » acks stringset.Set 23 » ackC chan []string
24 » acks []string
27 batchSize int 25 batchSize int
28 } 26 }
29 27
30 func (ps *testACK) Ack(c context.Context, acks ...string) error { 28 func (ps *testACK) Ack(c context.Context, acks ...string) error {
29 if ps.ackC != nil {
30 ps.ackC <- acks
31 }
32
31 ps.Lock() 33 ps.Lock()
32 defer ps.Unlock() 34 defer ps.Unlock()
33 35
34 if ps.err != nil { 36 if ps.err != nil {
35 return ps.err 37 return ps.err
36 } 38 }
37 39
38 if ps.acks == nil {
39 ps.acks = stringset.New(0)
40 }
41 for _, ack := range acks { 40 for _, ack := range acks {
42 » » ps.acks.Add(ack) 41 » » ps.acks = append(ps.acks, ack)
43 } 42 }
44 return nil 43 return nil
45 } 44 }
46 45
47 func (ps *testACK) AckBatchSize() int { 46 func (ps *testACK) AckBatchSize() int {
48 size := ps.batchSize 47 size := ps.batchSize
49 if size <= 0 { 48 if size <= 0 {
50 size = 4 49 size = 4
51 } 50 }
52 return size 51 return size
53 } 52 }
54 53
55 func (ps *testACK) ackIDs() []string { 54 func (ps *testACK) ackIDs() []string {
56 ps.Lock() 55 ps.Lock()
57 defer ps.Unlock() 56 defer ps.Unlock()
58 57
59 » v := make([]string, 0, ps.acks.Len()) 58 » v := make([]string, len(ps.acks))
60 » ps.acks.Iter(func(s string) bool { 59 » copy(v, ps.acks)
61 » » v = append(v, s)
62 » » return true
63 » })
64 » sort.Strings(v)
65 return v 60 return v
66 } 61 }
67 62
68 func TestAckBuffer(t *testing.T) { 63 func TestAckBuffer(t *testing.T) {
69 t.Parallel() 64 t.Parallel()
70 65
71 Convey(`An AckBuffer configuration using a testing Pub/Sub`, t, func() { 66 Convey(`An AckBuffer configuration using a testing Pub/Sub`, t, func() {
72 c := context.Background() 67 c := context.Background()
73 c, tc := testclock.UseTime(c, testclock.TestTimeLocal) 68 c, tc := testclock.UseTime(c, testclock.TestTimeLocal)
74 ps := &testACK{} 69 ps := &testACK{}
(...skipping 20 matching lines...) Expand all
95 // goroutines. 90 // goroutines.
96 closed := false 91 closed := false
97 closeOnce := func() { 92 closeOnce := func() {
98 if !closed { 93 if !closed {
99 closed = true 94 closed = true
100 ab.CloseAndFlush() 95 ab.CloseAndFlush()
101 } 96 }
102 } 97 }
103 defer closeOnce() 98 defer closeOnce()
104 99
105 » » » Convey(`Can send ACKs.`, func() { 100 » » » Convey(`Will buffer ACKs until enough are sent.`, func() {
101 » » » » ps.ackC = make(chan []string)
102 » » » » acks := make([]string, ps.AckBatchSize())
103
104 » » » » // Fill up the entire batch, which will cause an automatic dump.
105 » » » » for i := range acks {
106 » » » » » acks[i] = fmt.Sprintf("%d", i)
107 » » » » » ab.Ack(acks[i])
108 » » » » }
109 » » » » <-ps.ackC
110
111 » » » » So(ps.ackIDs(), ShouldResemble, acks)
112 » » » » So(discarded, ShouldBeNil)
113 » » » })
114
115 » » » Convey(`Will buffer ACKs and send if time has expired.`, func() {
116 » » » » ps.ackC = make(chan []string)
117 » » » » ab.ackReceivedC = make(chan string)
118
119 » » » » acks := []string{"foo", "bar", "baz"}
120 » » » » for _, v := range acks {
121 » » » » » ab.Ack(v)
122
123 » » » » » // Acknoweldge that all ACKs have been r eceived before we advance
124 » » » » » // our timer. This will ensure that the timer triggers AFTER the ACKs
125 » » » » » // are buffered.
126 » » » » » <-ab.ackReceivedC
127 » » » » }
128 » » » » tc.Add(DefaultMaxBufferTime)
129
130 » » » » <-ps.ackC
131 » » » » So(ps.ackIDs(), ShouldResemble, acks)
132 » » » » So(discarded, ShouldBeNil)
133 » » » })
134
135 » » » Convey(`Will flush any remaining ACKs on close.`, func() {
106 acks := []string{"foo", "bar", "baz"} 136 acks := []string{"foo", "bar", "baz"}
107 for _, v := range acks { 137 for _, v := range acks {
108 ab.Ack(v) 138 ab.Ack(v)
109 } 139 }
110 » » » » tc.Add(DefaultMaxBufferTime) 140 » » » » closeOnce()
111 141
112 closeOnce()
113 sort.Strings(acks)
114 So(ps.ackIDs(), ShouldResemble, acks) 142 So(ps.ackIDs(), ShouldResemble, acks)
115 So(discarded, ShouldBeNil) 143 So(discarded, ShouldBeNil)
116 }) 144 })
117 145
118 » » » Convey(`Will retry on transient Pub/Sub error`, func() { 146 » » » Convey(`Will discard the ACK if it could not be sent`, f unc() {
119 » » » » tc.SetTimerCallback(func(d time.Duration, t cloc k.Timer) {
120 » » » » » tc.Add(d)
121 » » » » })
122
123 ps.err = errors.WrapTransient(errors.New("test e rror")) 147 ps.err = errors.WrapTransient(errors.New("test e rror"))
124 acks := []string{"foo", "bar", "baz"} 148 acks := []string{"foo", "bar", "baz"}
125 for _, v := range acks { 149 for _, v := range acks {
126 ab.Ack(v) 150 ab.Ack(v)
127 } 151 }
152 closeOnce()
128 153
129 » » » » Convey(`And eventually discard the ACK.`, func() { 154 » » » » So(discarded, ShouldResemble, acks)
130 » » » » » closeOnce()
131
132 » » » » » sort.Strings(acks)
133 » » » » » sort.Strings(discarded)
134 » » » » » So(discarded, ShouldResemble, acks)
135 » » » » })
136 }) 155 })
137 }) 156 })
138 }) 157 })
139 } 158 }
OLDNEW
« no previous file with comments | « common/gcloud/pubsub/ackbuffer/ackbuffer.go ('k') | common/gcloud/pubsub/subscriber/subscriber.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698