Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(279)

Side by Side Diff: common/cloudlogging/buffer_test.go

Issue 2937693003: Make luci-go compile again after deps.lock roll. (Closed)
Patch Set: Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « common/cloudlogging/buffer.go ('k') | common/cloudlogging/client.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 package cloudlogging
6
7 import (
8 "errors"
9 "fmt"
10 "testing"
11 "time"
12
13 "github.com/luci/luci-go/common/retry"
14 . "github.com/smartystreets/goconvey/convey"
15 "golang.org/x/net/context"
16 )
17
18 type testClient struct {
19 callback func([]*Entry) error
20 pushes int
21 }
22
23 var _ Client = (*testClient)(nil)
24
25 func (c *testClient) PushEntries(entries []*Entry) error {
26 c.pushes++
27 if c.callback != nil {
28 return c.callback(entries)
29 }
30 return nil
31 }
32
33 type infiniteRetryIterator struct{}
34
35 func (infiniteRetryIterator) Next(context.Context, error) time.Duration {
36 return 0
37 }
38
39 func TestBuffer(t *testing.T) {
40 t.Parallel()
41
42 Convey(`A Buffer instance`, t, func() {
43 ctx := context.Background()
44
45 entriesC := make(chan []*Entry, 1)
46 client := &testClient{
47 callback: func(entries []*Entry) error {
48 entriesC <- entries
49 return nil
50 },
51 }
52
53 options := BufferOptions{
54 Retry: func() retry.Iterator {
55 return &retry.Limited{
56 Retries: 5,
57 }
58 },
59 }
60
61 b := NewBuffer(ctx, options, client).(*bufferImpl)
62 defer b.StopAndFlush()
63
64 // Allow synchronization when a log entry is ingested. Set "ackC " to nil to
65 // disable synchronization.
66 var bufferC chan []*Entry
67 var ackC chan *Entry
68 b.testCB = &testCallbacks{
69 bufferRound: func(e []*Entry) {
70 if bufferC != nil {
71 bufferC <- e
72 }
73 },
74 receivedLogEntry: func(e *Entry) {
75 if ackC != nil {
76 ackC <- e
77 }
78 },
79 }
80
81 So(b.BatchSize, ShouldEqual, DefaultBatchSize)
82
83 Convey(`Will push a single entry.`, func() {
84 err := b.PushEntries([]*Entry{
85 {
86 InsertID: "a",
87 },
88 })
89 So(err, ShouldBeNil)
90
91 entries := <-entriesC
92 So(len(entries), ShouldEqual, 1)
93 So(entries[0], ShouldResemble, &Entry{
94 InsertID: "a",
95 })
96 So(client.pushes, ShouldEqual, 1)
97 })
98
99 Convey(`Will batch logging data.`, func() {
100 bufferC = make(chan []*Entry)
101 ackC = make(chan *Entry)
102
103 // The first message will be read immediately.
104 err := b.PushEntries([]*Entry{
105 {
106 InsertID: "a",
107 },
108 })
109 So(err, ShouldBeNil)
110 <-ackC
111 <-bufferC
112
113 // The next set of messages will be batched, since we ha ven't allowed our
114 // client stub to finish its PushEntries.
115 entries := make([]*Entry, b.BatchSize)
116 for i := range entries {
117 entries[i] = &Entry{
118 InsertID: fmt.Sprintf("%d", i),
119 }
120 }
121 err = b.PushEntries(entries)
122 So(err, ShouldBeNil)
123
124 // Read the first bundle.
125 bundle := <-entriesC
126 So(len(bundle), ShouldEqual, 1)
127 So(bundle[0].InsertID, ShouldEqual, "a")
128
129 // Read the second bundle.
130 for range entries {
131 <-ackC
132 }
133 <-bufferC
134 bundle = <-entriesC
135
136 So(len(bundle), ShouldEqual, b.BatchSize)
137 for i := range bundle {
138 So(bundle[i].InsertID, ShouldEqual, fmt.Sprintf( "%d", i))
139 }
140 So(client.pushes, ShouldEqual, 2)
141 })
142
143 Convey(`Will retry on failure.`, func() {
144 failures := 3
145 client.callback = func(entries []*Entry) error {
146 if failures > 0 {
147 failures--
148 return errors.New("test: induced failure ")
149 }
150 entriesC <- entries
151 return nil
152 }
153
154 err := b.PushEntries([]*Entry{
155 {
156 InsertID: "a",
157 },
158 })
159 So(err, ShouldBeNil)
160
161 entries := <-entriesC
162 So(len(entries), ShouldEqual, 1)
163 So(entries[0], ShouldResemble, &Entry{
164 InsertID: "a",
165 })
166 So(client.pushes, ShouldEqual, 4)
167 })
168 })
169
170 Convey(`A Buffer instance configured to retry forever will stop if abort ed.`, t, func() {
171 entriesC := make(chan []*Entry)
172 defer close(entriesC)
173
174 client := &testClient{
175 callback: func(entries []*Entry) error {
176 entriesC <- entries
177 return errors.New("test: failure")
178 },
179 }
180
181 options := BufferOptions{
182 Retry: func() retry.Iterator {
183 return infiniteRetryIterator{}
184 },
185 }
186
187 b := NewBuffer(context.Background(), options, client)
188 err := b.PushEntries([]*Entry{
189 {
190 InsertID: "a",
191 },
192 })
193 So(err, ShouldBeNil)
194
195 // Wait for the buffer to finish.
196 finishedC := make(chan struct{})
197 go func() {
198 defer close(finishedC)
199 b.StopAndFlush()
200 }()
201
202 // Wait for the first attempt. This ensures that we've entered t he retry
203 // loop.
204 <-entriesC
205 go func() {
206 // Consume any other attempts.
207 for range entriesC {
208 }
209 }()
210
211 // Abort the buffer.
212 b.Abort()
213
214 // Assert that it will stop eventually. Rather than deadlock/pan ic, we wait
215 // one real second and fail if it didn't terminate. Since there is no
216 // underlying latency, one second (in the failure case) is accep table.
217 closed := false
218 select {
219 case <-finishedC:
220 closed = true
221
222 case <-time.After(1 * time.Second):
223 break
224 }
225 So(closed, ShouldBeTrue)
226 })
227 }
OLDNEW
« no previous file with comments | « common/cloudlogging/buffer.go ('k') | common/cloudlogging/client.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698