Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(100)

Side by Side Diff: common/eventlog/internal/logservice/batch_logger_test.go

Issue 2557593002: Add batch logging support to eventlog. (Closed)
Patch Set: Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 package logservice
6
7 import (
8 "context"
9 "errors"
10 "reflect"
11 "sort"
12 "testing"
13 "time"
14
15 "github.com/golang/protobuf/proto"
16 logpb "github.com/luci/luci-go/common/eventlog/proto"
17 )
18
19 type recordingSyncLogger struct {
20 records [][]*logpb.LogRequestLite_LogEventLite
21 called chan struct{}
22 err error
23 }
24
25 func (rsl *recordingSyncLogger) LogSync(_ context.Context, events ...*logpb.LogR equestLite_LogEventLite) error {
26 rsl.records = append(rsl.records, events)
27 rsl.called <- struct{}{}
28 return rsl.err
29 }
30
31 func TestBatchLogger(t *testing.T) {
32 ctx := context.Background()
33 rsl := &recordingSyncLogger{called: make(chan struct{}, 1)}
34 tickc := make(chan time.Time)
35 bl := newBatchLogger(ctx, rsl, tickc)
36
37 // We haven't logged any events yet.
38 if rsl.records != nil {
39 t.Errorf("events: got: %v; want: nil", rsl.records)
40 }
41
42 event := &logpb.LogRequestLite_LogEventLite{}
43 bl.Log(event)
44 bl.Log(event)
45
46 // We have logged events, but upload hasn't been called
47 if rsl.records != nil {
48 t.Errorf("events: got: %v; want: nil", rsl.records)
49 }
50
51 tickc <- time.Time{}
52 select {
53 case <-rsl.called:
54 case <-time.After(50 * time.Millisecond):
55 t.Errorf("timed out waiting for upload")
56 }
57 // upload has been called. We should see our event.
58 if got, want := rsl.records, [][]*logpb.LogRequestLite_LogEventLite{{eve nt, event}}; !reflect.DeepEqual(got, want) {
59 t.Errorf("events: got: %v; want: %v", got, want)
60 }
61
62 // log another event.
djd-OOO-Apr2017 2016/12/06 06:31:27 You should also try the case where there are pendi
mcgreevy 2016/12/06 23:57:36 Added a separate test.
63 bl.Log(event)
64 tickc <- time.Time{}
65 select {
66 case <-rsl.called:
67 case <-time.After(50 * time.Millisecond):
68 t.Errorf("timed out waiting for upload")
69 }
70
71 if got, want := rsl.records, [][]*logpb.LogRequestLite_LogEventLite{{eve nt, event}, {event}}; !reflect.DeepEqual(got, want) {
72 t.Errorf("events: got: %v; want: %v", got, want)
73 }
74 }
75
76 var errBang = errors.New("bang")
77
78 func TestRetries(t *testing.T) {
79 // Each batch of events has 4 chances at being uploaded: one initial att empt and up to 3 retries.
80
81 ctx := context.Background()
82 rsl := &recordingSyncLogger{called: make(chan struct{}, 1), err: retryEr ror{errBang}}
83 tickc := make(chan time.Time)
84 bl := newBatchLogger(ctx, rsl, tickc)
85
86 e1 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(1)}
87 bl.Log(e1)
88
89 triggerUpload := func() {
90 tickc <- time.Time{}
91 select {
92 case <-rsl.called:
93 case <-time.After(50 * time.Millisecond):
94 t.Errorf("timed out waiting for upload")
95 }
96 }
97
98 var want [][]*logpb.LogRequestLite_LogEventLite
99 triggerUpload()
100 // e1 attempt #1 failed.
101 want = append(want, []*logpb.LogRequestLite_LogEventLite{e1})
102
103 triggerUpload()
104
105 // e1 attempt #2 failed.
106 want = append(want, []*logpb.LogRequestLite_LogEventLite{e1})
107
108 // Now add e2.
109 e2 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(2)}
110 bl.Log(e2)
111
112 triggerUpload()
113
114 // e1 attempt #3 failed; e2 attempt #1 failed.
115 want = append(want, []*logpb.LogRequestLite_LogEventLite{e1, e2})
116
117 // final attempt for e1
118 triggerUpload()
119
120 // e1 attempt #4 (final) failed; e2 attempt #2 failed.
121 want = append(want, []*logpb.LogRequestLite_LogEventLite{e1, e2})
122
123 rsl.err = nil // start succeeding. We've already given up on e1 though.
124 triggerUpload()
125
126 // e2 attempt #3 succeeded.
127 want = append(want, []*logpb.LogRequestLite_LogEventLite{e2})
128
129 // e2 already succeeded, so there should be no attempt #4.
130 rsl.called = nil // any attempt to send on this should panic.
131 tickc <- time.Time{} // trigger upload.
132
133 // There is no change to want.
134
135 if got := rsl.records; !reflect.DeepEqual(got, want) {
djd-OOO-Apr2017 2016/12/06 06:31:27 It feels like you should be asserting as you go al
mcgreevy 2016/12/06 23:57:36 That would make the test a bit more precise, but a
136 t.Errorf("events: got: %v; want: %v", got, want)
137 }
138 }
139
140 func TestNonTransientFailureDoesntRetry(t *testing.T) {
141 // Each batch of events has 4 chances at being uploaded: one initial att empt and up to 3 retries.
142
143 ctx := context.Background()
144
145 // Note: err is a non-retry error.
146 rsl := &recordingSyncLogger{called: make(chan struct{}, 1), err: errBang }
147 tickc := make(chan time.Time)
148 bl := newBatchLogger(ctx, rsl, tickc)
149
150 e1 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(1)}
151 bl.Log(e1)
152
153 triggerUpload := func() {
154 tickc <- time.Time{}
155 select {
156 case <-rsl.called:
157 case <-time.After(50 * time.Millisecond):
158 t.Errorf("timed out waiting for upload")
159 }
160 }
161
162 triggerUpload()
163 // e1 attempt #1 failed, but it won't be retried.
164
165 tickc <- time.Time{} // trigger upload. We don't expect to receive anyth ing from rsl.called.
166
167 // Now add e2.
168 e2 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(2)}
169 bl.Log(e2)
170
171 triggerUpload()
172
173 rsl.called = nil // any attempt to send on this should panic.
174
175 tickc <- time.Time{} // trigger upload. We don't expect to receive anyth ing from rsl.called.
176
177 // There should only be a single attempt for each of e1 and e2.
178 want := [][]*logpb.LogRequestLite_LogEventLite{{e1}, {e2}}
179
180 if got := rsl.records; !reflect.DeepEqual(got, want) {
181 t.Errorf("events: got: %v; want: %v", got, want)
182 }
183 }
184
185 func TestRingBuffer(t *testing.T) {
186 // the ring buffer is effectively a sliding window over a list of items, where
187 // the window is moved as items are pushed into the ring buffer.
188 // In this test we check that appending all the slices in the window
189 // yields the same result as appending all of the slices in the ring
190 // buffer.
191 // The cases we test are (window over event slice is indicated with pare ns):
192 //
193 // events: [() 0, 1, 2, 3, 4, 5] ring buffer: [nil, nil, nil]
194 // events: [(0), 1, 2, 3, 4, 5] ring buffer: [0 , nil, nil]
195 // events: [(0, 1), 2, 3, 4, 5] ring buffer: [0 , 1 , nil]
196 // events: [(0, 1, 2), 3, 4, 5] ring buffer: [0 , 1 , 2 ]
197 // events: [0, (1, 2, 3), 4, 5] ring buffer: [3 , 1 , 2 ]
198 // events: [0, 1, (2, 3, 4), 5] ring buffer: [3 , 4 , 2 ]
199 // events: [0, 1, 2, (3, 4, 5)] ring buffer: [3 , 4 , 5 ]
200
201 events := [][]*logpb.LogRequestLite_LogEventLite{}
202 for i := 0; i < numRetries*2; i++ {
203 i64 := int64(i)
204 e := &logpb.LogRequestLite_LogEventLite{EventTimeMs: &i64}
205 events = append(events, []*logpb.LogRequestLite_LogEventLite{e})
206 }
207
208 rb := ringBuffer{}
209 if got, want := rb.AppendAll(emptyEventSlice()), emptyEventSlice(); !ref lect.DeepEqual(got, want) {
210 t.Errorf("empty ring buffer AppendAll: got: %v; want: %v", got, want)
211 }
212
213 for j := 0; j < numRetries*2; j++ {
214 i := j - (numRetries - 1)
215 if i < 0 {
216 i = 0
217 }
218 gotDisplaced := rb.Push(events[j])
219 var wantDisplaced []*logpb.LogRequestLite_LogEventLite
220 if i > 0 {
221 wantDisplaced = events[i-1]
222 }
223
224 if !reflect.DeepEqual(gotDisplaced, wantDisplaced) {
225 t.Errorf("ring buffer displaced: got: %v; want: %v", got Displaced, wantDisplaced)
226 }
227
228 got := rb.AppendAll(emptyEventSlice())
229 want := appendAll(events[i : j+1])
230 sort.Sort(ByTime(got))
231 sort.Sort(ByTime(want))
232
233 if !reflect.DeepEqual(got, want) {
234 t.Errorf("ring buffer AppendAll (i=%v,j=%v): got: %v; wa nt: %v", i, j, got, want)
235 }
236 }
237
238 }
239
240 func appendAll(events [][]*logpb.LogRequestLite_LogEventLite) []*logpb.LogReques tLite_LogEventLite {
241 var result []*logpb.LogRequestLite_LogEventLite
242 for _, es := range events {
243 result = append(result, es...)
244 }
245 return result
246 }
247
248 func emptyEventSlice() []*logpb.LogRequestLite_LogEventLite {
djd-OOO-Apr2017 2016/12/06 06:31:27 Can you just make this a var? (If it has cap 0, th
mcgreevy 2016/12/06 23:57:36 Good idea. Done.
249 return []*logpb.LogRequestLite_LogEventLite{}
250 }
251
252 type ByTime []*logpb.LogRequestLite_LogEventLite
253
254 func (bp ByTime) Len() int { return len(bp) }
255 func (bp ByTime) Swap(i, j int) { bp[i], bp[j] = bp[j], bp[i] }
256 func (bp ByTime) Less(i, j int) bool { return *bp[i].EventTimeMs < *bp[j].EventT imeMs }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698