Chromium Code Reviews| Index: common/eventlog/internal/logservice/batch_logger_test.go |
| diff --git a/common/eventlog/internal/logservice/batch_logger_test.go b/common/eventlog/internal/logservice/batch_logger_test.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..43a9e918b07ba21d37085560103d87619c075683 |
| --- /dev/null |
| +++ b/common/eventlog/internal/logservice/batch_logger_test.go |
| @@ -0,0 +1,256 @@ |
| +// Copyright 2016 The LUCI Authors. All rights reserved. |
| +// Use of this source code is governed under the Apache License, Version 2.0 |
| +// that can be found in the LICENSE file. |
| + |
| +package logservice |
| + |
| +import ( |
| + "context" |
| + "errors" |
| + "reflect" |
| + "sort" |
| + "testing" |
| + "time" |
| + |
| + "github.com/golang/protobuf/proto" |
| + logpb "github.com/luci/luci-go/common/eventlog/proto" |
| +) |
| + |
| +type recordingSyncLogger struct { |
| + records [][]*logpb.LogRequestLite_LogEventLite |
| + called chan struct{} |
| + err error |
| +} |
| + |
| +func (rsl *recordingSyncLogger) LogSync(_ context.Context, events ...*logpb.LogRequestLite_LogEventLite) error { |
| + rsl.records = append(rsl.records, events) |
| + rsl.called <- struct{}{} |
| + return rsl.err |
| +} |
| + |
| +func TestBatchLogger(t *testing.T) { |
| + ctx := context.Background() |
| + rsl := &recordingSyncLogger{called: make(chan struct{}, 1)} |
| + tickc := make(chan time.Time) |
| + bl := newBatchLogger(ctx, rsl, tickc) |
| + |
| + // We haven't logged any events yet. |
| + if rsl.records != nil { |
| + t.Errorf("events: got: %v; want: nil", rsl.records) |
| + } |
| + |
| + event := &logpb.LogRequestLite_LogEventLite{} |
| + bl.Log(event) |
| + bl.Log(event) |
| + |
| + // We have logged events, but upload hasn't been called |
| + if rsl.records != nil { |
| + t.Errorf("events: got: %v; want: nil", rsl.records) |
| + } |
| + |
| + tickc <- time.Time{} |
| + select { |
| + case <-rsl.called: |
| + case <-time.After(50 * time.Millisecond): |
| + t.Errorf("timed out waiting for upload") |
| + } |
| + // upload has been called. We should see our event. |
| + if got, want := rsl.records, [][]*logpb.LogRequestLite_LogEventLite{{event, event}}; !reflect.DeepEqual(got, want) { |
| + t.Errorf("events: got: %v; want: %v", got, want) |
| + } |
| + |
| + // log another event. |
|
djd-OOO-Apr2017
2016/12/06 06:31:27
You should also try the case where there are pendi
mcgreevy
2016/12/06 23:57:36
Added a separate test.
|
| + bl.Log(event) |
| + tickc <- time.Time{} |
| + select { |
| + case <-rsl.called: |
| + case <-time.After(50 * time.Millisecond): |
| + t.Errorf("timed out waiting for upload") |
| + } |
| + |
| + if got, want := rsl.records, [][]*logpb.LogRequestLite_LogEventLite{{event, event}, {event}}; !reflect.DeepEqual(got, want) { |
| + t.Errorf("events: got: %v; want: %v", got, want) |
| + } |
| +} |
| + |
| +var errBang = errors.New("bang") |
| + |
| +func TestRetries(t *testing.T) { |
| + // Each batch of events has 4 chances at being uploaded: one initial attempt and up to 3 retries. |
| + |
| + ctx := context.Background() |
| + rsl := &recordingSyncLogger{called: make(chan struct{}, 1), err: retryError{errBang}} |
| + tickc := make(chan time.Time) |
| + bl := newBatchLogger(ctx, rsl, tickc) |
| + |
| + e1 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(1)} |
| + bl.Log(e1) |
| + |
| + triggerUpload := func() { |
| + tickc <- time.Time{} |
| + select { |
| + case <-rsl.called: |
| + case <-time.After(50 * time.Millisecond): |
| + t.Errorf("timed out waiting for upload") |
| + } |
| + } |
| + |
| + var want [][]*logpb.LogRequestLite_LogEventLite |
| + triggerUpload() |
| + // e1 attempt #1 failed. |
| + want = append(want, []*logpb.LogRequestLite_LogEventLite{e1}) |
| + |
| + triggerUpload() |
| + |
| + // e1 attempt #2 failed. |
| + want = append(want, []*logpb.LogRequestLite_LogEventLite{e1}) |
| + |
| + // Now add e2. |
| + e2 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(2)} |
| + bl.Log(e2) |
| + |
| + triggerUpload() |
| + |
| + // e1 attempt #3 failed; e2 attempt #1 failed. |
| + want = append(want, []*logpb.LogRequestLite_LogEventLite{e1, e2}) |
| + |
| + // final attempt for e1 |
| + triggerUpload() |
| + |
| + // e1 attempt #4 (final) failed; e2 attempt #2 failed. |
| + want = append(want, []*logpb.LogRequestLite_LogEventLite{e1, e2}) |
| + |
| + rsl.err = nil // start succeeding. We've already given up on e1 though. |
| + triggerUpload() |
| + |
| + // e2 attempt #3 succeeded. |
| + want = append(want, []*logpb.LogRequestLite_LogEventLite{e2}) |
| + |
| + // e2 already succeeded, so there should be no attempt #4. |
| + rsl.called = nil // any attempt to send on this should panic. |
| + tickc <- time.Time{} // trigger upload. |
| + |
| + // There is no change to want. |
| + |
| + if got := rsl.records; !reflect.DeepEqual(got, want) { |
|
djd-OOO-Apr2017
2016/12/06 06:31:27
It feels like you should be asserting as you go al
mcgreevy
2016/12/06 23:57:36
That would make the test a bit more precise, but a
|
| + t.Errorf("events: got: %v; want: %v", got, want) |
| + } |
| +} |
| + |
| +func TestNonTransientFailureDoesntRetry(t *testing.T) { |
| + // Each batch of events has 4 chances at being uploaded: one initial attempt and up to 3 retries. |
| + |
| + ctx := context.Background() |
| + |
| + // Note: err is a non-retry error. |
| + rsl := &recordingSyncLogger{called: make(chan struct{}, 1), err: errBang} |
| + tickc := make(chan time.Time) |
| + bl := newBatchLogger(ctx, rsl, tickc) |
| + |
| + e1 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(1)} |
| + bl.Log(e1) |
| + |
| + triggerUpload := func() { |
| + tickc <- time.Time{} |
| + select { |
| + case <-rsl.called: |
| + case <-time.After(50 * time.Millisecond): |
| + t.Errorf("timed out waiting for upload") |
| + } |
| + } |
| + |
| + triggerUpload() |
| + // e1 attempt #1 failed, but it won't be retried. |
| + |
| + tickc <- time.Time{} // trigger upload. We don't expect to receive anything from rsl.called. |
| + |
| + // Now add e2. |
| + e2 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(2)} |
| + bl.Log(e2) |
| + |
| + triggerUpload() |
| + |
| + rsl.called = nil // any attempt to send on this should panic. |
| + |
| + tickc <- time.Time{} // trigger upload. We don't expect to receive anything from rsl.called. |
| + |
| + // There should only be a single attempt for each of e1 and e2. |
| + want := [][]*logpb.LogRequestLite_LogEventLite{{e1}, {e2}} |
| + |
| + if got := rsl.records; !reflect.DeepEqual(got, want) { |
| + t.Errorf("events: got: %v; want: %v", got, want) |
| + } |
| +} |
| + |
| +func TestRingBuffer(t *testing.T) { |
| + // the ring buffer is effectively a sliding window over a list of items, where |
| + // the window is moved as items are pushed into the ring buffer. |
| + // In this test we check that appending all the slices in the window |
| + // yields the same result as appending all of the slices in the ring |
| + // buffer. |
| + // The cases we test are (window over event slice is indicated with parens): |
| + // |
| + // events: [() 0, 1, 2, 3, 4, 5] ring buffer: [nil, nil, nil] |
| + // events: [(0), 1, 2, 3, 4, 5] ring buffer: [0 , nil, nil] |
| + // events: [(0, 1), 2, 3, 4, 5] ring buffer: [0 , 1 , nil] |
| + // events: [(0, 1, 2), 3, 4, 5] ring buffer: [0 , 1 , 2 ] |
| + // events: [0, (1, 2, 3), 4, 5] ring buffer: [3 , 1 , 2 ] |
| + // events: [0, 1, (2, 3, 4), 5] ring buffer: [3 , 4 , 2 ] |
| + // events: [0, 1, 2, (3, 4, 5)] ring buffer: [3 , 4 , 5 ] |
| + |
| + events := [][]*logpb.LogRequestLite_LogEventLite{} |
| + for i := 0; i < numRetries*2; i++ { |
| + i64 := int64(i) |
| + e := &logpb.LogRequestLite_LogEventLite{EventTimeMs: &i64} |
| + events = append(events, []*logpb.LogRequestLite_LogEventLite{e}) |
| + } |
| + |
| + rb := ringBuffer{} |
| + if got, want := rb.AppendAll(emptyEventSlice()), emptyEventSlice(); !reflect.DeepEqual(got, want) { |
| + t.Errorf("empty ring buffer AppendAll: got: %v; want: %v", got, want) |
| + } |
| + |
| + for j := 0; j < numRetries*2; j++ { |
| + i := j - (numRetries - 1) |
| + if i < 0 { |
| + i = 0 |
| + } |
| + gotDisplaced := rb.Push(events[j]) |
| + var wantDisplaced []*logpb.LogRequestLite_LogEventLite |
| + if i > 0 { |
| + wantDisplaced = events[i-1] |
| + } |
| + |
| + if !reflect.DeepEqual(gotDisplaced, wantDisplaced) { |
| + t.Errorf("ring buffer displaced: got: %v; want: %v", gotDisplaced, wantDisplaced) |
| + } |
| + |
| + got := rb.AppendAll(emptyEventSlice()) |
| + want := appendAll(events[i : j+1]) |
| + sort.Sort(ByTime(got)) |
| + sort.Sort(ByTime(want)) |
| + |
| + if !reflect.DeepEqual(got, want) { |
| + t.Errorf("ring buffer AppendAll (i=%v,j=%v): got: %v; want: %v", i, j, got, want) |
| + } |
| + } |
| + |
| +} |
| + |
| +func appendAll(events [][]*logpb.LogRequestLite_LogEventLite) []*logpb.LogRequestLite_LogEventLite { |
| + var result []*logpb.LogRequestLite_LogEventLite |
| + for _, es := range events { |
| + result = append(result, es...) |
| + } |
| + return result |
| +} |
| + |
| +func emptyEventSlice() []*logpb.LogRequestLite_LogEventLite { |
|
djd-OOO-Apr2017
2016/12/06 06:31:27
Can you just make this a var? (If it has cap 0, th
mcgreevy
2016/12/06 23:57:36
Good idea. Done.
|
| + return []*logpb.LogRequestLite_LogEventLite{} |
| +} |
| + |
| +type ByTime []*logpb.LogRequestLite_LogEventLite |
| + |
| +func (bp ByTime) Len() int { return len(bp) } |
| +func (bp ByTime) Swap(i, j int) { bp[i], bp[j] = bp[j], bp[i] } |
| +func (bp ByTime) Less(i, j int) bool { return *bp[i].EventTimeMs < *bp[j].EventTimeMs } |