Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(927)

Side by Side Diff: common/eventlog/internal/logservice/batch_logger_test.go

Issue 2557593002: Add batch logging support to eventlog. (Closed)
Patch Set: Fix race in test. Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 package logservice
6
7 import (
8 "context"
9 "errors"
10 "reflect"
11 "sort"
12 "sync"
13 "testing"
14 "time"
15
16 "github.com/golang/protobuf/proto"
17 logpb "github.com/luci/luci-go/common/eventlog/proto"
18 )
19
20 type testLogger struct {
21 requests chan []*logpb.LogRequestLite_LogEventLite
22
23 mu sync.Mutex
24 err error
25 }
26
27 func newTestLogger(err error) *testLogger {
28 return &testLogger{
29 requests: make(chan []*logpb.LogRequestLite_LogEventLite, 1),
30 err: err,
31 }
32 }
33 func (tl *testLogger) LogSync(_ context.Context, events ...*logpb.LogRequestLite _LogEventLite) error {
34 tl.requests <- events
35
36 tl.mu.Lock()
37 defer tl.mu.Unlock()
38 return tl.err
39 }
40
41 func expectLogCalled(t *testing.T, tl *testLogger, want []*logpb.LogRequestLite_ LogEventLite) {
42 select {
43 case got := <-tl.requests:
44 if !reflect.DeepEqual(got, want) {
45 t.Errorf("events: got: %v; want: %v", got, want)
46 }
47 case <-time.After(50 * time.Millisecond):
48 t.Errorf("timed out waiting for upload")
49 }
50 }
51
52 func expectNoMoreLogs(t *testing.T, tl *testLogger) {
53 close(tl.requests) // Any future sends on this channel will panic.
54 if len(tl.requests) != 0 {
55 t.Errorf("unexpected send on tl.records: %v", <-tl.requests)
56 }
57 }
58
59 func TestBatchLogger(t *testing.T) {
60 ctx := context.Background()
61 tl := newTestLogger(nil)
62 tickc := make(chan time.Time)
63 bl := newBatchLogger(ctx, tl, tickc)
64
65 // We haven't logged any events yet.
66 if len(tl.requests) != 0 {
67 t.Errorf("events: got: %v; want: nil", tl.requests)
68 }
69
70 event := &logpb.LogRequestLite_LogEventLite{}
71 bl.Log(event)
72 bl.Log(event)
73
74 // We have logged events, but upload hasn't been called
75 if len(tl.requests) != 0 {
76 t.Errorf("events: got: %v; want: nil", <-tl.requests)
77 }
78
79 tickc <- time.Time{}
80 expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{event, event })
81
82 // log another event.
83 bl.Log(event)
84 tickc <- time.Time{}
85 expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{event})
86 expectNoMoreLogs(t, tl)
87 }
88
89 var errBang = errors.New("bang")
90
91 func TestRetries(t *testing.T) {
92 // Each batch of events has 4 chances at being uploaded: one initial att empt and up to 3 retries.
93
94 ctx := context.Background()
95 tl := newTestLogger(retryError{errBang})
96 tickc := make(chan time.Time)
97 bl := newBatchLogger(ctx, tl, tickc)
98
99 e1 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(1)}
100 bl.Log(e1)
101
102 // e1 attempt #1 fails.
103 tickc <- time.Time{}
104 expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e1})
105
106 // e1 attempt #2 fails.
107 tickc <- time.Time{}
108 expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e1})
109
110 // Now add e2.
111 e2 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(2)}
112 bl.Log(e2)
113
114 // e1 attempt #3 fails; e2 attempt #1 fails.
115 tickc <- time.Time{}
116 expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e1, e2})
117
118 // final attempt for e1
119 // e1 attempt #4 (final) fails; e2 attempt #2 fails.
120 tickc <- time.Time{}
121 expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e1, e2})
122
123 tl.mu.Lock()
124 tl.err = nil // start succeeding. We've already given up on e1 though.
125 tl.mu.Unlock()
126
127 // e2 attempt #3 succeeds.
128 tickc <- time.Time{}
129 expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e2})
130
131 // e2 already succeeded, so there should be no attempt #4.
132 expectNoMoreLogs(t, tl)
133 tickc <- time.Time{} // trigger upload.
134 }
135
136 func TestAttemptsUploadOnClose(t *testing.T) {
137 ctx := context.Background()
138 tl := newTestLogger(retryError{errBang})
139 tickc := make(chan time.Time)
140 bl := newBatchLogger(ctx, tl, tickc)
141
142 e1 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(1)}
143 bl.Log(e1)
144
145 // Trigger an upload attempt, which will fail.
146 tickc <- time.Time{}
147 expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e1})
148
149 // Now add e2.
150 e2 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(2)}
151 bl.Log(e2)
152
153 // We have not triggered any more uploads via tickc, but bl.Close should trigger an upload attempt.
154 bl.Close()
155 expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e1, e2})
156 expectNoMoreLogs(t, tl)
157 }
158
159 func TestNonTransientFailureDoesntRetry(t *testing.T) {
160 // Each batch of events has 4 chances at being uploaded: one initial att empt and up to 3 retries.
161
162 ctx := context.Background()
163
164 // Note: err is a non-retry error.
165 tl := newTestLogger(errBang)
166 tickc := make(chan time.Time)
167 bl := newBatchLogger(ctx, tl, tickc)
168
169 e1 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(1)}
170 bl.Log(e1)
171
172 tickc <- time.Time{}
173 expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e1})
174 // e1 attempt #1 failed, but it won't be retried.
175
176 tickc <- time.Time{} // trigger upload. We don't expect to receive anyth ing from tl.requests.
177
178 if len(tl.requests) != 0 {
179 t.Errorf("events: got: %v; want: nil", tl.requests)
180 }
181
182 // Now add e2.
183 e2 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(2)}
184 bl.Log(e2)
185
186 tickc <- time.Time{}
187 expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e2})
188
189 expectNoMoreLogs(t, tl)
190 tickc <- time.Time{} // trigger upload. We don't expect to receive anyth ing from tl.requests.
191 }
192
193 func TestRingBuffer(t *testing.T) {
194 // the ring buffer is effectively a sliding window over a list of items, where
195 // the window is moved as items are pushed into the ring buffer.
196 // In this test we check that appending all the slices in the window
197 // yields the same result as appending all of the slices in the ring
198 // buffer.
199 // The cases we test are (window over event slice is indicated with pare ns):
200 //
201 // events: [() 0, 1, 2, 3, 4, 5] ring buffer: [nil, nil, nil]
202 // events: [(0), 1, 2, 3, 4, 5] ring buffer: [0 , nil, nil]
203 // events: [(0, 1), 2, 3, 4, 5] ring buffer: [0 , 1 , nil]
204 // events: [(0, 1, 2), 3, 4, 5] ring buffer: [0 , 1 , 2 ]
205 // events: [0, (1, 2, 3), 4, 5] ring buffer: [3 , 1 , 2 ]
206 // events: [0, 1, (2, 3, 4), 5] ring buffer: [3 , 4 , 2 ]
207 // events: [0, 1, 2, (3, 4, 5)] ring buffer: [3 , 4 , 5 ]
208
209 events := [][]*logpb.LogRequestLite_LogEventLite{}
210 for i := 0; i < numRetries*2; i++ {
211 i64 := int64(i)
212 e := &logpb.LogRequestLite_LogEventLite{EventTimeMs: &i64}
213 events = append(events, []*logpb.LogRequestLite_LogEventLite{e})
214 }
215
216 rb := ringBuffer{}
217 emptyEventSlice := make([]*logpb.LogRequestLite_LogEventLite, 0, 0)
218
219 if got, want := rb.AppendAll(emptyEventSlice), emptyEventSlice; !reflect .DeepEqual(got, want) {
220 t.Errorf("empty ring buffer AppendAll: got: %v; want: %v", got, want)
221 }
222
223 for j := 0; j < numRetries*2; j++ {
224 i := j - (numRetries - 1)
225 if i < 0 {
226 i = 0
227 }
228 gotDisplaced := rb.Push(events[j])
229 var wantDisplaced []*logpb.LogRequestLite_LogEventLite
230 if i > 0 {
231 wantDisplaced = events[i-1]
232 }
233
234 if !reflect.DeepEqual(gotDisplaced, wantDisplaced) {
235 t.Errorf("ring buffer displaced: got: %v; want: %v", got Displaced, wantDisplaced)
236 }
237
238 got := rb.AppendAll(emptyEventSlice)
239 want := appendAll(events[i : j+1])
240 sort.Sort(ByTime(got))
241 sort.Sort(ByTime(want))
242
243 if !reflect.DeepEqual(got, want) {
244 t.Errorf("ring buffer AppendAll (i=%v,j=%v): got: %v; wa nt: %v", i, j, got, want)
245 }
246 }
247
248 }
249
250 func appendAll(events [][]*logpb.LogRequestLite_LogEventLite) []*logpb.LogReques tLite_LogEventLite {
251 var result []*logpb.LogRequestLite_LogEventLite
252 for _, es := range events {
253 result = append(result, es...)
254 }
255 return result
256 }
257
258 type ByTime []*logpb.LogRequestLite_LogEventLite
259
260 func (bp ByTime) Len() int { return len(bp) }
261 func (bp ByTime) Swap(i, j int) { bp[i], bp[j] = bp[j], bp[i] }
262 func (bp ByTime) Less(i, j int) bool { return *bp[i].EventTimeMs < *bp[j].EventT imeMs }
OLDNEW
« no previous file with comments | « common/eventlog/internal/logservice/batch_logger.go ('k') | common/eventlog/internal/logservice/logservice.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698