Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1783)

Unified Diff: common/eventlog/internal/logservice/batch_logger_test.go

Issue 2557593002: Add batch logging support to eventlog. (Closed)
Patch Set: Fix race in test. Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: common/eventlog/internal/logservice/batch_logger_test.go
diff --git a/common/eventlog/internal/logservice/batch_logger_test.go b/common/eventlog/internal/logservice/batch_logger_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..b7efb949039b4e60ebd8f12b6b9dfc41e3db166a
--- /dev/null
+++ b/common/eventlog/internal/logservice/batch_logger_test.go
@@ -0,0 +1,262 @@
+// Copyright 2016 The LUCI Authors. All rights reserved.
+// Use of this source code is governed under the Apache License, Version 2.0
+// that can be found in the LICENSE file.
+
+package logservice
+
+import (
+ "context"
+ "errors"
+ "reflect"
+ "sort"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+ logpb "github.com/luci/luci-go/common/eventlog/proto"
+)
+
+type testLogger struct {
+ requests chan []*logpb.LogRequestLite_LogEventLite
+
+ mu sync.Mutex
+ err error
+}
+
+func newTestLogger(err error) *testLogger {
+ return &testLogger{
+ requests: make(chan []*logpb.LogRequestLite_LogEventLite, 1),
+ err: err,
+ }
+}
+func (tl *testLogger) LogSync(_ context.Context, events ...*logpb.LogRequestLite_LogEventLite) error {
+ tl.requests <- events
+
+ tl.mu.Lock()
+ defer tl.mu.Unlock()
+ return tl.err
+}
+
+func expectLogCalled(t *testing.T, tl *testLogger, want []*logpb.LogRequestLite_LogEventLite) {
+ select {
+ case got := <-tl.requests:
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf("events: got: %v; want: %v", got, want)
+ }
+ case <-time.After(50 * time.Millisecond):
+ t.Errorf("timed out waiting for upload")
+ }
+}
+
+func expectNoMoreLogs(t *testing.T, tl *testLogger) {
+ close(tl.requests) // Any future sends on this channel will panic.
+ if len(tl.requests) != 0 {
+ t.Errorf("unexpected send on tl.records: %v", <-tl.requests)
+ }
+}
+
+func TestBatchLogger(t *testing.T) {
+ ctx := context.Background()
+ tl := newTestLogger(nil)
+ tickc := make(chan time.Time)
+ bl := newBatchLogger(ctx, tl, tickc)
+
+ // We haven't logged any events yet.
+ if len(tl.requests) != 0 {
+ t.Errorf("events: got: %v; want: nil", tl.requests)
+ }
+
+ event := &logpb.LogRequestLite_LogEventLite{}
+ bl.Log(event)
+ bl.Log(event)
+
+ // We have logged events, but upload hasn't been called
+ if len(tl.requests) != 0 {
+ t.Errorf("events: got: %v; want: nil", <-tl.requests)
+ }
+
+ tickc <- time.Time{}
+ expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{event, event})
+
+ // log another event.
+ bl.Log(event)
+ tickc <- time.Time{}
+ expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{event})
+ expectNoMoreLogs(t, tl)
+}
+
+var errBang = errors.New("bang")
+
+func TestRetries(t *testing.T) {
+ // Each batch of events has 4 chances at being uploaded: one initial attempt and up to 3 retries.
+
+ ctx := context.Background()
+ tl := newTestLogger(retryError{errBang})
+ tickc := make(chan time.Time)
+ bl := newBatchLogger(ctx, tl, tickc)
+
+ e1 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(1)}
+ bl.Log(e1)
+
+ // e1 attempt #1 fails.
+ tickc <- time.Time{}
+ expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e1})
+
+ // e1 attempt #2 fails.
+ tickc <- time.Time{}
+ expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e1})
+
+ // Now add e2.
+ e2 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(2)}
+ bl.Log(e2)
+
+ // e1 attempt #3 fails; e2 attempt #1 fails.
+ tickc <- time.Time{}
+ expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e1, e2})
+
+ // final attempt for e1
+ // e1 attempt #4 (final) fails; e2 attempt #2 fails.
+ tickc <- time.Time{}
+ expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e1, e2})
+
+ tl.mu.Lock()
+ tl.err = nil // start succeeding. We've already given up on e1 though.
+ tl.mu.Unlock()
+
+ // e2 attempt #3 succeeds.
+ tickc <- time.Time{}
+ expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e2})
+
+ // e2 already succeeded, so there should be no attempt #4.
+ expectNoMoreLogs(t, tl)
+ tickc <- time.Time{} // trigger upload.
+}
+
+func TestAttemptsUploadOnClose(t *testing.T) {
+ ctx := context.Background()
+ tl := newTestLogger(retryError{errBang})
+ tickc := make(chan time.Time)
+ bl := newBatchLogger(ctx, tl, tickc)
+
+ e1 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(1)}
+ bl.Log(e1)
+
+ // Trigger an upload attempt, which will fail.
+ tickc <- time.Time{}
+ expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e1})
+
+ // Now add e2.
+ e2 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(2)}
+ bl.Log(e2)
+
+ // We have not triggered any more uploads via tickc, but bl.Close should trigger an upload attempt.
+ bl.Close()
+ expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e1, e2})
+ expectNoMoreLogs(t, tl)
+}
+
+func TestNonTransientFailureDoesntRetry(t *testing.T) {
+ // Each batch of events has 4 chances at being uploaded: one initial attempt and up to 3 retries.
+
+ ctx := context.Background()
+
+ // Note: err is a non-retry error.
+ tl := newTestLogger(errBang)
+ tickc := make(chan time.Time)
+ bl := newBatchLogger(ctx, tl, tickc)
+
+ e1 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(1)}
+ bl.Log(e1)
+
+ tickc <- time.Time{}
+ expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e1})
+ // e1 attempt #1 failed, but it won't be retried.
+
+ tickc <- time.Time{} // trigger upload. We don't expect to receive anything from tl.requests.
+
+ if len(tl.requests) != 0 {
+ t.Errorf("events: got: %v; want: nil", tl.requests)
+ }
+
+ // Now add e2.
+ e2 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(2)}
+ bl.Log(e2)
+
+ tickc <- time.Time{}
+ expectLogCalled(t, tl, []*logpb.LogRequestLite_LogEventLite{e2})
+
+ expectNoMoreLogs(t, tl)
+ tickc <- time.Time{} // trigger upload. We don't expect to receive anything from tl.requests.
+}
+
+func TestRingBuffer(t *testing.T) {
+ // the ring buffer is effectively a sliding window over a list of items, where
+ // the window is moved as items are pushed into the ring buffer.
+ // In this test we check that appending all the slices in the window
+ // yields the same result as appending all of the slices in the ring
+ // buffer.
+ // The cases we test are (window over event slice is indicated with parens):
+ //
+ // events: [() 0, 1, 2, 3, 4, 5] ring buffer: [nil, nil, nil]
+ // events: [(0), 1, 2, 3, 4, 5] ring buffer: [0 , nil, nil]
+ // events: [(0, 1), 2, 3, 4, 5] ring buffer: [0 , 1 , nil]
+ // events: [(0, 1, 2), 3, 4, 5] ring buffer: [0 , 1 , 2 ]
+ // events: [0, (1, 2, 3), 4, 5] ring buffer: [3 , 1 , 2 ]
+ // events: [0, 1, (2, 3, 4), 5] ring buffer: [3 , 4 , 2 ]
+ // events: [0, 1, 2, (3, 4, 5)] ring buffer: [3 , 4 , 5 ]
+
+ events := [][]*logpb.LogRequestLite_LogEventLite{}
+ for i := 0; i < numRetries*2; i++ {
+ i64 := int64(i)
+ e := &logpb.LogRequestLite_LogEventLite{EventTimeMs: &i64}
+ events = append(events, []*logpb.LogRequestLite_LogEventLite{e})
+ }
+
+ rb := ringBuffer{}
+ emptyEventSlice := make([]*logpb.LogRequestLite_LogEventLite, 0, 0)
+
+ if got, want := rb.AppendAll(emptyEventSlice), emptyEventSlice; !reflect.DeepEqual(got, want) {
+ t.Errorf("empty ring buffer AppendAll: got: %v; want: %v", got, want)
+ }
+
+ for j := 0; j < numRetries*2; j++ {
+ i := j - (numRetries - 1)
+ if i < 0 {
+ i = 0
+ }
+ gotDisplaced := rb.Push(events[j])
+ var wantDisplaced []*logpb.LogRequestLite_LogEventLite
+ if i > 0 {
+ wantDisplaced = events[i-1]
+ }
+
+ if !reflect.DeepEqual(gotDisplaced, wantDisplaced) {
+ t.Errorf("ring buffer displaced: got: %v; want: %v", gotDisplaced, wantDisplaced)
+ }
+
+ got := rb.AppendAll(emptyEventSlice)
+ want := appendAll(events[i : j+1])
+ sort.Sort(ByTime(got))
+ sort.Sort(ByTime(want))
+
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf("ring buffer AppendAll (i=%v,j=%v): got: %v; want: %v", i, j, got, want)
+ }
+ }
+
+}
+
+func appendAll(events [][]*logpb.LogRequestLite_LogEventLite) []*logpb.LogRequestLite_LogEventLite {
+ var result []*logpb.LogRequestLite_LogEventLite
+ for _, es := range events {
+ result = append(result, es...)
+ }
+ return result
+}
+
+type ByTime []*logpb.LogRequestLite_LogEventLite
+
+func (bp ByTime) Len() int { return len(bp) }
+func (bp ByTime) Swap(i, j int) { bp[i], bp[j] = bp[j], bp[i] }
+func (bp ByTime) Less(i, j int) bool { return *bp[i].EventTimeMs < *bp[j].EventTimeMs }
« no previous file with comments | « common/eventlog/internal/logservice/batch_logger.go ('k') | common/eventlog/internal/logservice/logservice.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698