Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(743)

Side by Side Diff: common/gcloud/pubsub/ackbuffer/ackbuffer_test.go

Issue 1838303002: Use native Pub/Sub library primitives. (Closed) Base URL: https://github.com/luci/luci-go@logdog-go1.6
Patch Set: Use "Topic" instead of "NewTopic" ... don't want to create :) Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « common/gcloud/pubsub/ackbuffer/ackbuffer.go ('k') | common/gcloud/pubsub/connection.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package ackbuffer
6
7 import (
8 "fmt"
9 "sync"
10 "testing"
11
12 "github.com/luci/luci-go/common/clock/testclock"
13 "github.com/luci/luci-go/common/errors"
14 "golang.org/x/net/context"
15
16 . "github.com/smartystreets/goconvey/convey"
17 )
18
19 type testACK struct {
20 sync.Mutex
21
22 err error
23 ackC chan []string
24 acks []string
25 batchSize int
26 }
27
28 func (ps *testACK) Ack(c context.Context, acks ...string) error {
29 if ps.ackC != nil {
30 ps.ackC <- acks
31 }
32
33 ps.Lock()
34 defer ps.Unlock()
35
36 if ps.err != nil {
37 return ps.err
38 }
39
40 for _, ack := range acks {
41 ps.acks = append(ps.acks, ack)
42 }
43 return nil
44 }
45
46 func (ps *testACK) AckBatchSize() int {
47 size := ps.batchSize
48 if size <= 0 {
49 size = 4
50 }
51 return size
52 }
53
54 func (ps *testACK) ackIDs() []string {
55 ps.Lock()
56 defer ps.Unlock()
57
58 v := make([]string, len(ps.acks))
59 copy(v, ps.acks)
60 return v
61 }
62
63 func TestAckBuffer(t *testing.T) {
64 t.Parallel()
65
66 Convey(`An AckBuffer configuration using a testing Pub/Sub`, t, func() {
67 c := context.Background()
68 c, tc := testclock.UseTime(c, testclock.TestTimeLocal)
69 ps := &testACK{}
70
71 var discarded []string
72 var discardedMu sync.Mutex
73
74 cfg := Config{
75 Ack: ps,
76 DiscardCallback: func(acks []string) {
77 discardedMu.Lock()
78 defer discardedMu.Unlock()
79
80 discarded = append(discarded, acks...)
81 },
82 }
83
84 Convey(`Can instantiate an AckBuffer`, func() {
85 ab := New(c, cfg)
86 So(ab, ShouldNotBeNil)
87
88 // Our tests will close/flush the buffer to synchronize. However, if they
89 // don't, make sure we do so we don't spawn a bunch of f loating
90 // goroutines.
91 closed := false
92 closeOnce := func() {
93 if !closed {
94 closed = true
95 ab.CloseAndFlush()
96 }
97 }
98 defer closeOnce()
99
100 Convey(`Will buffer ACKs until enough are sent.`, func() {
101 ps.ackC = make(chan []string)
102 acks := make([]string, ps.AckBatchSize())
103
104 // Fill up the entire batch, which will cause an automatic dump.
105 for i := range acks {
106 acks[i] = fmt.Sprintf("%d", i)
107 ab.Ack(acks[i])
108 }
109 <-ps.ackC
110
111 So(ps.ackIDs(), ShouldResemble, acks)
112 So(discarded, ShouldBeNil)
113 })
114
115 Convey(`Will buffer ACKs and send if time has expired.`, func() {
116 ps.ackC = make(chan []string)
117 ab.ackReceivedC = make(chan string)
118
119 acks := []string{"foo", "bar", "baz"}
120 for _, v := range acks {
121 ab.Ack(v)
122
123 // Acknoweldge that all ACKs have been r eceived before we advance
124 // our timer. This will ensure that the timer triggers AFTER the ACKs
125 // are buffered.
126 <-ab.ackReceivedC
127 }
128 tc.Add(DefaultMaxBufferTime)
129
130 <-ps.ackC
131 So(ps.ackIDs(), ShouldResemble, acks)
132 So(discarded, ShouldBeNil)
133 })
134
135 Convey(`Will flush any remaining ACKs on close.`, func() {
136 acks := []string{"foo", "bar", "baz"}
137 for _, v := range acks {
138 ab.Ack(v)
139 }
140 closeOnce()
141
142 So(ps.ackIDs(), ShouldResemble, acks)
143 So(discarded, ShouldBeNil)
144 })
145
146 Convey(`Will discard the ACK if it could not be sent`, f unc() {
147 ps.err = errors.WrapTransient(errors.New("test e rror"))
148 acks := []string{"foo", "bar", "baz"}
149 for _, v := range acks {
150 ab.Ack(v)
151 }
152 closeOnce()
153
154 So(discarded, ShouldResemble, acks)
155 })
156 })
157 })
158 }
OLDNEW
« no previous file with comments | « common/gcloud/pubsub/ackbuffer/ackbuffer.go ('k') | common/gcloud/pubsub/connection.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698