Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(59)

Unified Diff: common/eventlog/internal/logservice/batch_logger_test.go

Issue 2557593002: Add batch logging support to eventlog. (Closed)
Patch Set: Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: common/eventlog/internal/logservice/batch_logger_test.go
diff --git a/common/eventlog/internal/logservice/batch_logger_test.go b/common/eventlog/internal/logservice/batch_logger_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..43a9e918b07ba21d37085560103d87619c075683
--- /dev/null
+++ b/common/eventlog/internal/logservice/batch_logger_test.go
@@ -0,0 +1,256 @@
+// Copyright 2016 The LUCI Authors. All rights reserved.
+// Use of this source code is governed under the Apache License, Version 2.0
+// that can be found in the LICENSE file.
+
+package logservice
+
+import (
+ "context"
+ "errors"
+ "reflect"
+ "sort"
+ "testing"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+ logpb "github.com/luci/luci-go/common/eventlog/proto"
+)
+
+type recordingSyncLogger struct {
+ records [][]*logpb.LogRequestLite_LogEventLite
+ called chan struct{}
+ err error
+}
+
+func (rsl *recordingSyncLogger) LogSync(_ context.Context, events ...*logpb.LogRequestLite_LogEventLite) error {
+ rsl.records = append(rsl.records, events)
+ rsl.called <- struct{}{}
+ return rsl.err
+}
+
+func TestBatchLogger(t *testing.T) {
+ ctx := context.Background()
+ rsl := &recordingSyncLogger{called: make(chan struct{}, 1)}
+ tickc := make(chan time.Time)
+ bl := newBatchLogger(ctx, rsl, tickc)
+
+ // We haven't logged any events yet.
+ if rsl.records != nil {
+ t.Errorf("events: got: %v; want: nil", rsl.records)
+ }
+
+ event := &logpb.LogRequestLite_LogEventLite{}
+ bl.Log(event)
+ bl.Log(event)
+
+ // We have logged events, but upload hasn't been called
+ if rsl.records != nil {
+ t.Errorf("events: got: %v; want: nil", rsl.records)
+ }
+
+ tickc <- time.Time{}
+ select {
+ case <-rsl.called:
+ case <-time.After(50 * time.Millisecond):
+ t.Errorf("timed out waiting for upload")
+ }
+ // upload has been called. We should see our event.
+ if got, want := rsl.records, [][]*logpb.LogRequestLite_LogEventLite{{event, event}}; !reflect.DeepEqual(got, want) {
+ t.Errorf("events: got: %v; want: %v", got, want)
+ }
+
+ // log another event.
djd-OOO-Apr2017 2016/12/06 06:31:27 You should also try the case where there are pendi
mcgreevy 2016/12/06 23:57:36 Added a separate test.
+ bl.Log(event)
+ tickc <- time.Time{}
+ select {
+ case <-rsl.called:
+ case <-time.After(50 * time.Millisecond):
+ t.Errorf("timed out waiting for upload")
+ }
+
+ if got, want := rsl.records, [][]*logpb.LogRequestLite_LogEventLite{{event, event}, {event}}; !reflect.DeepEqual(got, want) {
+ t.Errorf("events: got: %v; want: %v", got, want)
+ }
+}
+
+var errBang = errors.New("bang")
+
+func TestRetries(t *testing.T) {
+ // Each batch of events has 4 chances at being uploaded: one initial attempt and up to 3 retries.
+
+ ctx := context.Background()
+ rsl := &recordingSyncLogger{called: make(chan struct{}, 1), err: retryError{errBang}}
+ tickc := make(chan time.Time)
+ bl := newBatchLogger(ctx, rsl, tickc)
+
+ e1 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(1)}
+ bl.Log(e1)
+
+ triggerUpload := func() {
+ tickc <- time.Time{}
+ select {
+ case <-rsl.called:
+ case <-time.After(50 * time.Millisecond):
+ t.Errorf("timed out waiting for upload")
+ }
+ }
+
+ var want [][]*logpb.LogRequestLite_LogEventLite
+ triggerUpload()
+ // e1 attempt #1 failed.
+ want = append(want, []*logpb.LogRequestLite_LogEventLite{e1})
+
+ triggerUpload()
+
+ // e1 attempt #2 failed.
+ want = append(want, []*logpb.LogRequestLite_LogEventLite{e1})
+
+ // Now add e2.
+ e2 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(2)}
+ bl.Log(e2)
+
+ triggerUpload()
+
+ // e1 attempt #3 failed; e2 attempt #1 failed.
+ want = append(want, []*logpb.LogRequestLite_LogEventLite{e1, e2})
+
+ // final attempt for e1
+ triggerUpload()
+
+ // e1 attempt #4 (final) failed; e2 attempt #2 failed.
+ want = append(want, []*logpb.LogRequestLite_LogEventLite{e1, e2})
+
+ rsl.err = nil // start succeeding. We've already given up on e1 though.
+ triggerUpload()
+
+ // e2 attempt #3 succeeded.
+ want = append(want, []*logpb.LogRequestLite_LogEventLite{e2})
+
+ // e2 already succeeded, so there should be no attempt #4.
+ rsl.called = nil // any attempt to send on this should panic.
+ tickc <- time.Time{} // trigger upload.
+
+ // There is no change to want.
+
+ if got := rsl.records; !reflect.DeepEqual(got, want) {
djd-OOO-Apr2017 2016/12/06 06:31:27 It feels like you should be asserting as you go al
mcgreevy 2016/12/06 23:57:36 That would make the test a bit more precise, but a
+ t.Errorf("events: got: %v; want: %v", got, want)
+ }
+}
+
+func TestNonTransientFailureDoesntRetry(t *testing.T) {
+ // Each batch of events has 4 chances at being uploaded: one initial attempt and up to 3 retries.
+
+ ctx := context.Background()
+
+ // Note: err is a non-retry error.
+ rsl := &recordingSyncLogger{called: make(chan struct{}, 1), err: errBang}
+ tickc := make(chan time.Time)
+ bl := newBatchLogger(ctx, rsl, tickc)
+
+ e1 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(1)}
+ bl.Log(e1)
+
+ triggerUpload := func() {
+ tickc <- time.Time{}
+ select {
+ case <-rsl.called:
+ case <-time.After(50 * time.Millisecond):
+ t.Errorf("timed out waiting for upload")
+ }
+ }
+
+ triggerUpload()
+ // e1 attempt #1 failed, but it won't be retried.
+
+ tickc <- time.Time{} // trigger upload. We don't expect to receive anything from rsl.called.
+
+ // Now add e2.
+ e2 := &logpb.LogRequestLite_LogEventLite{EventTimeMs: proto.Int64(2)}
+ bl.Log(e2)
+
+ triggerUpload()
+
+ rsl.called = nil // any attempt to send on this should panic.
+
+ tickc <- time.Time{} // trigger upload. We don't expect to receive anything from rsl.called.
+
+ // There should only be a single attempt for each of e1 and e2.
+ want := [][]*logpb.LogRequestLite_LogEventLite{{e1}, {e2}}
+
+ if got := rsl.records; !reflect.DeepEqual(got, want) {
+ t.Errorf("events: got: %v; want: %v", got, want)
+ }
+}
+
+func TestRingBuffer(t *testing.T) {
+ // the ring buffer is effectively a sliding window over a list of items, where
+ // the window is moved as items are pushed into the ring buffer.
+ // In this test we check that appending all the slices in the window
+ // yields the same result as appending all of the slices in the ring
+ // buffer.
+ // The cases we test are (window over event slice is indicated with parens):
+ //
+ // events: [() 0, 1, 2, 3, 4, 5] ring buffer: [nil, nil, nil]
+ // events: [(0), 1, 2, 3, 4, 5] ring buffer: [0 , nil, nil]
+ // events: [(0, 1), 2, 3, 4, 5] ring buffer: [0 , 1 , nil]
+ // events: [(0, 1, 2), 3, 4, 5] ring buffer: [0 , 1 , 2 ]
+ // events: [0, (1, 2, 3), 4, 5] ring buffer: [3 , 1 , 2 ]
+ // events: [0, 1, (2, 3, 4), 5] ring buffer: [3 , 4 , 2 ]
+ // events: [0, 1, 2, (3, 4, 5)] ring buffer: [3 , 4 , 5 ]
+
+ events := [][]*logpb.LogRequestLite_LogEventLite{}
+ for i := 0; i < numRetries*2; i++ {
+ i64 := int64(i)
+ e := &logpb.LogRequestLite_LogEventLite{EventTimeMs: &i64}
+ events = append(events, []*logpb.LogRequestLite_LogEventLite{e})
+ }
+
+ rb := ringBuffer{}
+ if got, want := rb.AppendAll(emptyEventSlice()), emptyEventSlice(); !reflect.DeepEqual(got, want) {
+ t.Errorf("empty ring buffer AppendAll: got: %v; want: %v", got, want)
+ }
+
+ for j := 0; j < numRetries*2; j++ {
+ i := j - (numRetries - 1)
+ if i < 0 {
+ i = 0
+ }
+ gotDisplaced := rb.Push(events[j])
+ var wantDisplaced []*logpb.LogRequestLite_LogEventLite
+ if i > 0 {
+ wantDisplaced = events[i-1]
+ }
+
+ if !reflect.DeepEqual(gotDisplaced, wantDisplaced) {
+ t.Errorf("ring buffer displaced: got: %v; want: %v", gotDisplaced, wantDisplaced)
+ }
+
+ got := rb.AppendAll(emptyEventSlice())
+ want := appendAll(events[i : j+1])
+ sort.Sort(ByTime(got))
+ sort.Sort(ByTime(want))
+
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf("ring buffer AppendAll (i=%v,j=%v): got: %v; want: %v", i, j, got, want)
+ }
+ }
+
+}
+
+func appendAll(events [][]*logpb.LogRequestLite_LogEventLite) []*logpb.LogRequestLite_LogEventLite {
+ var result []*logpb.LogRequestLite_LogEventLite
+ for _, es := range events {
+ result = append(result, es...)
+ }
+ return result
+}
+
+func emptyEventSlice() []*logpb.LogRequestLite_LogEventLite {
djd-OOO-Apr2017 2016/12/06 06:31:27 Can you just make this a var? (If it has cap 0, th
mcgreevy 2016/12/06 23:57:36 Good idea. Done.
+ return []*logpb.LogRequestLite_LogEventLite{}
+}
+
+type ByTime []*logpb.LogRequestLite_LogEventLite
+
+func (bp ByTime) Len() int { return len(bp) }
+func (bp ByTime) Swap(i, j int) { bp[i], bp[j] = bp[j], bp[i] }
+func (bp ByTime) Less(i, j int) bool { return *bp[i].EventTimeMs < *bp[j].EventTimeMs }

Powered by Google App Engine
This is Rietveld 408576698