| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. |
| 4 |
| 5 package logservice |
| 6 |
| 7 import ( |
| 8 "context" |
| 9 "errors" |
| 10 "reflect" |
| 11 "sort" |
| 12 "sync" |
| 13 "testing" |
| 14 "time" |
| 15 |
| 16 "github.com/golang/protobuf/proto" |
| 17 logpb "github.com/luci/luci-go/common/eventlog/proto" |
| 18 ) |
| 19 |
| 20 type testLogger struct { |
| 21 requests chan []*logpb.LogRequestLite_LogEventLite |
| 22 |
| 23 mu sync.Mutex |
| 24 err error |
| 25 } |
| 26 |
| 27 func newTestLogger(err error) *testLogger { |
| 28 return &testLogger{ |
| 29 requests: make(chan []*logpb.LogRequestLite_LogEventLite, 1), |
| 30 err: err, |
| 31 } |
| 32 } |
| 33 func (tl *testLogger) LogSync(_ context.Context, events ...*logpb.LogRequestLite
_LogEventLite) error { |
| 34 tl.requests <- events |
| 35 |
| 36 tl.mu.Lock() |
| 37 defer tl.mu.Unlock() |
| 38 return tl.err |
| 39 } |
| 40 |
| 41 func expectLogCalled(t *testing.T, tl *testLogger, want []*logpb.LogRequestLite_
LogEventLite) { |
| 42 select { |
| 43 case got := <-tl.requests: |
| 44 if !reflect.DeepEqual(got, want) { |
| 45 t.Errorf("events: got: %v; want: %v", got, want) |
| 46 } |
| 47 case <-time.After(50 * time.Millisecond): |
| 48 t.Errorf("timed out waiting for upload") |
| 49 } |
| 50 } |
| 51 |
| 52 func expectNoMoreLogs(t *testing.T, tl *testLogger) { |
| 53 close(tl.requests) // Any future sends on this channel will panic. |
| 54 if len(tl.requests) != 0 { |
| 55 t.Errorf("unexpected send on tl.records: %v", <-tl.requests) |
| 56 } |
| 57 } |
| 58 |
| 59 func TestBatchLogger(t *testing.T) { |
| 60 ctx := context.Background() |
| 61 tl := newTestLogger(nil) |
| 62 tickc := make(chan time.Time) |
| 63 bl := newBatchLogger(ctx, tl, tickc) |
| 64 |
| 65 // We haven't logged any events yet. |
| 66 if len(tl.requests) != 0 { |
| 67 t.Errorf("events: got: %v; want: nil", tl.requests) |
| 68 } |
| 69 |
| 70 event := &logpb.LogRequestLite_LogEventLite{} |
| 71 bl.Log(event) |
| 72 bl.Log(event) |
| 73 |
| 74 // We have logged events, but upload hasn't been called |
| 75 if len(tl.requests) != 0 { |
| 76 t.Errorf("events: got: %v; want: nil", <-tl.requests) |
| 77 } |
| 78 |
| 79 tickc <- time.Time{} |
| 80 expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{event, event
}) |
| 81 |
| 82 // log another event. |
| 83 bl.Log(event) |
| 84 tickc <- time.Time{} |
| 85 expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{event}) |
| 86 expectNoMoreLogs(t, tl) |
| 87 } |
| 88 |
| 89 var errBang = errors.New("bang") |
| 90 |
| 91 func TestRetries(t *testing.T) { |
| 92 // Each batch of events has 4 chances at being uploaded: one initial att
empt and up to 3 retries. |
| 93 |
| 94 ctx := context.Background() |
| 95 tl := newTestLogger(retryError{errBang}) |
| 96 tickc := make(chan time.Time) |
| 97 bl := newBatchLogger(ctx, tl, tickc) |
| 98 |
| 99 e1 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(1)} |
| 100 bl.Log(e1) |
| 101 |
| 102 // e1 attempt #1 fails. |
| 103 tickc <- time.Time{} |
| 104 expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e1}) |
| 105 |
| 106 // e1 attempt #2 fails. |
| 107 tickc <- time.Time{} |
| 108 expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e1}) |
| 109 |
| 110 // Now add e2. |
| 111 e2 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(2)} |
| 112 bl.Log(e2) |
| 113 |
| 114 // e1 attempt #3 fails; e2 attempt #1 fails. |
| 115 tickc <- time.Time{} |
| 116 expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e1, e2}) |
| 117 |
| 118 // final attempt for e1 |
| 119 // e1 attempt #4 (final) fails; e2 attempt #2 fails. |
| 120 tickc <- time.Time{} |
| 121 expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e1, e2}) |
| 122 |
| 123 tl.mu.Lock() |
| 124 tl.err = nil // start succeeding. We've already given up on e1 though. |
| 125 tl.mu.Unlock() |
| 126 |
| 127 // e2 attempt #3 succeeds. |
| 128 tickc <- time.Time{} |
| 129 expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e2}) |
| 130 |
| 131 // e2 already succeeded, so there should be no attempt #4. |
| 132 expectNoMoreLogs(t, tl) |
| 133 tickc <- time.Time{} // trigger upload. |
| 134 } |
| 135 |
| 136 func TestAttemptsUploadOnClose(t *testing.T) { |
| 137 ctx := context.Background() |
| 138 tl := newTestLogger(retryError{errBang}) |
| 139 tickc := make(chan time.Time) |
| 140 bl := newBatchLogger(ctx, tl, tickc) |
| 141 |
| 142 e1 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(1)} |
| 143 bl.Log(e1) |
| 144 |
| 145 // Trigger an upload attempt, which will fail. |
| 146 tickc <- time.Time{} |
| 147 expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e1}) |
| 148 |
| 149 // Now add e2. |
| 150 e2 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(2)} |
| 151 bl.Log(e2) |
| 152 |
| 153 // We have not triggered any more uploads via tickc, but bl.Close should
trigger an upload attempt. |
| 154 bl.Close() |
| 155 expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e1, e2}) |
| 156 expectNoMoreLogs(t, tl) |
| 157 } |
| 158 |
| 159 func TestNonTransientFailureDoesntRetry(t *testing.T) { |
| 160 // Each batch of events has 4 chances at being uploaded: one initial att
empt and up to 3 retries. |
| 161 |
| 162 ctx := context.Background() |
| 163 |
| 164 // Note: err is a non-retry error. |
| 165 tl := newTestLogger(errBang) |
| 166 tickc := make(chan time.Time) |
| 167 bl := newBatchLogger(ctx, tl, tickc) |
| 168 |
| 169 e1 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(1)} |
| 170 bl.Log(e1) |
| 171 |
| 172 tickc <- time.Time{} |
| 173 expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e1}) |
| 174 // e1 attempt #1 failed, but it won't be retried. |
| 175 |
| 176 tickc <- time.Time{} // trigger upload. We don't expect to receive anyth
ing from tl.requests. |
| 177 |
| 178 if len(tl.requests) != 0 { |
| 179 t.Errorf("events: got: %v; want: nil", tl.requests) |
| 180 } |
| 181 |
| 182 // Now add e2. |
| 183 e2 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(2)} |
| 184 bl.Log(e2) |
| 185 |
| 186 tickc <- time.Time{} |
| 187 expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e2}) |
| 188 |
| 189 expectNoMoreLogs(t, tl) |
| 190 tickc <- time.Time{} // trigger upload. We don't expect to receive anyth
ing from tl.requests. |
| 191 } |
| 192 |
| 193 func TestRingBuffer(t *testing.T) { |
| 194 // the ring buffer is effectively a sliding window over a list of items,
where |
| 195 // the window is moved as items are pushed into the ring buffer. |
| 196 // In this test we check that appending all the slices in the window |
| 197 // yields the same result as appending all of the slices in the ring |
| 198 // buffer. |
| 199 // The cases we test are (window over event slice is indicated with pare
ns): |
| 200 // |
| 201 // events: [() 0, 1, 2, 3, 4, 5] ring buffer: [nil, nil, nil] |
| 202 // events: [(0), 1, 2, 3, 4, 5] ring buffer: [0 , nil, nil] |
| 203 // events: [(0, 1), 2, 3, 4, 5] ring buffer: [0 , 1 , nil] |
| 204 // events: [(0, 1, 2), 3, 4, 5] ring buffer: [0 , 1 , 2 ] |
| 205 // events: [0, (1, 2, 3), 4, 5] ring buffer: [3 , 1 , 2 ] |
| 206 // events: [0, 1, (2, 3, 4), 5] ring buffer: [3 , 4 , 2 ] |
| 207 // events: [0, 1, 2, (3, 4, 5)] ring buffer: [3 , 4 , 5 ] |
| 208 |
| 209 events := [][]*logpb.LogRequestLite_LogEventLite{} |
| 210 for i := 0; i < numRetries*2; i++ { |
| 211 i64 := int64(i) |
| 212 e := &logpb.LogRequestLite_LogEventLite{EventTimeMs: &i64} |
| 213 events = append(events, []*logpb.LogRequestLite_LogEventLite{e}) |
| 214 } |
| 215 |
| 216 rb := ringBuffer{} |
| 217 emptyEventSlice := make([]*logpb.LogRequestLite_LogEventLite, 0, 0) |
| 218 |
| 219 if got, want := rb.AppendAll(emptyEventSlice), emptyEventSlice; !reflect
.DeepEqual(got, want) { |
| 220 t.Errorf("empty ring buffer AppendAll: got: %v; want: %v", got,
want) |
| 221 } |
| 222 |
| 223 for j := 0; j < numRetries*2; j++ { |
| 224 i := j - (numRetries - 1) |
| 225 if i < 0 { |
| 226 i = 0 |
| 227 } |
| 228 gotDisplaced := rb.Push(events[j]) |
| 229 var wantDisplaced []*logpb.LogRequestLite_LogEventLite |
| 230 if i > 0 { |
| 231 wantDisplaced = events[i-1] |
| 232 } |
| 233 |
| 234 if !reflect.DeepEqual(gotDisplaced, wantDisplaced) { |
| 235 t.Errorf("ring buffer displaced: got: %v; want: %v", got
Displaced, wantDisplaced) |
| 236 } |
| 237 |
| 238 got := rb.AppendAll(emptyEventSlice) |
| 239 want := appendAll(events[i : j+1]) |
| 240 sort.Sort(ByTime(got)) |
| 241 sort.Sort(ByTime(want)) |
| 242 |
| 243 if !reflect.DeepEqual(got, want) { |
| 244 t.Errorf("ring buffer AppendAll (i=%v,j=%v): got: %v; wa
nt: %v", i, j, got, want) |
| 245 } |
| 246 } |
| 247 |
| 248 } |
| 249 |
| 250 func appendAll(events [][]*logpb.LogRequestLite_LogEventLite) []*logpb.LogReques
tLite_LogEventLite { |
| 251 var result []*logpb.LogRequestLite_LogEventLite |
| 252 for _, es := range events { |
| 253 result = append(result, es...) |
| 254 } |
| 255 return result |
| 256 } |
| 257 |
| 258 type ByTime []*logpb.LogRequestLite_LogEventLite |
| 259 |
| 260 func (bp ByTime) Len() int { return len(bp) } |
| 261 func (bp ByTime) Swap(i, j int) { bp[i], bp[j] = bp[j], bp[i] } |
| 262 func (bp ByTime) Less(i, j int) bool { return *bp[i].EventTimeMs < *bp[j].EventT
imeMs } |
| OLD | NEW |