Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(35)

Unified Diff: common/cloudlogging/buffer_test.go

Issue 2937693003: Make luci-go compile again after deps.lock roll. (Closed)
Patch Set: Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « common/cloudlogging/buffer.go ('k') | common/cloudlogging/client.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: common/cloudlogging/buffer_test.go
diff --git a/common/cloudlogging/buffer_test.go b/common/cloudlogging/buffer_test.go
deleted file mode 100644
index fca1102a0a4336ffb8200e5f8d8d892c90e96602..0000000000000000000000000000000000000000
--- a/common/cloudlogging/buffer_test.go
+++ /dev/null
@@ -1,227 +0,0 @@
-// Copyright 2015 The LUCI Authors. All rights reserved.
-// Use of this source code is governed under the Apache License, Version 2.0
-// that can be found in the LICENSE file.
-
-package cloudlogging
-
-import (
- "errors"
- "fmt"
- "testing"
- "time"
-
- "github.com/luci/luci-go/common/retry"
- . "github.com/smartystreets/goconvey/convey"
- "golang.org/x/net/context"
-)
-
-type testClient struct {
- callback func([]*Entry) error
- pushes int
-}
-
-var _ Client = (*testClient)(nil)
-
-func (c *testClient) PushEntries(entries []*Entry) error {
- c.pushes++
- if c.callback != nil {
- return c.callback(entries)
- }
- return nil
-}
-
-type infiniteRetryIterator struct{}
-
-func (infiniteRetryIterator) Next(context.Context, error) time.Duration {
- return 0
-}
-
-func TestBuffer(t *testing.T) {
- t.Parallel()
-
- Convey(`A Buffer instance`, t, func() {
- ctx := context.Background()
-
- entriesC := make(chan []*Entry, 1)
- client := &testClient{
- callback: func(entries []*Entry) error {
- entriesC <- entries
- return nil
- },
- }
-
- options := BufferOptions{
- Retry: func() retry.Iterator {
- return &retry.Limited{
- Retries: 5,
- }
- },
- }
-
- b := NewBuffer(ctx, options, client).(*bufferImpl)
- defer b.StopAndFlush()
-
- // Allow synchronization when a log entry is ingested. Set "ackC" to nil to
- // disable synchronization.
- var bufferC chan []*Entry
- var ackC chan *Entry
- b.testCB = &testCallbacks{
- bufferRound: func(e []*Entry) {
- if bufferC != nil {
- bufferC <- e
- }
- },
- receivedLogEntry: func(e *Entry) {
- if ackC != nil {
- ackC <- e
- }
- },
- }
-
- So(b.BatchSize, ShouldEqual, DefaultBatchSize)
-
- Convey(`Will push a single entry.`, func() {
- err := b.PushEntries([]*Entry{
- {
- InsertID: "a",
- },
- })
- So(err, ShouldBeNil)
-
- entries := <-entriesC
- So(len(entries), ShouldEqual, 1)
- So(entries[0], ShouldResemble, &Entry{
- InsertID: "a",
- })
- So(client.pushes, ShouldEqual, 1)
- })
-
- Convey(`Will batch logging data.`, func() {
- bufferC = make(chan []*Entry)
- ackC = make(chan *Entry)
-
- // The first message will be read immediately.
- err := b.PushEntries([]*Entry{
- {
- InsertID: "a",
- },
- })
- So(err, ShouldBeNil)
- <-ackC
- <-bufferC
-
- // The next set of messages will be batched, since we haven't allowed our
- // client stub to finish its PushEntries.
- entries := make([]*Entry, b.BatchSize)
- for i := range entries {
- entries[i] = &Entry{
- InsertID: fmt.Sprintf("%d", i),
- }
- }
- err = b.PushEntries(entries)
- So(err, ShouldBeNil)
-
- // Read the first bundle.
- bundle := <-entriesC
- So(len(bundle), ShouldEqual, 1)
- So(bundle[0].InsertID, ShouldEqual, "a")
-
- // Read the second bundle.
- for range entries {
- <-ackC
- }
- <-bufferC
- bundle = <-entriesC
-
- So(len(bundle), ShouldEqual, b.BatchSize)
- for i := range bundle {
- So(bundle[i].InsertID, ShouldEqual, fmt.Sprintf("%d", i))
- }
- So(client.pushes, ShouldEqual, 2)
- })
-
- Convey(`Will retry on failure.`, func() {
- failures := 3
- client.callback = func(entries []*Entry) error {
- if failures > 0 {
- failures--
- return errors.New("test: induced failure")
- }
- entriesC <- entries
- return nil
- }
-
- err := b.PushEntries([]*Entry{
- {
- InsertID: "a",
- },
- })
- So(err, ShouldBeNil)
-
- entries := <-entriesC
- So(len(entries), ShouldEqual, 1)
- So(entries[0], ShouldResemble, &Entry{
- InsertID: "a",
- })
- So(client.pushes, ShouldEqual, 4)
- })
- })
-
- Convey(`A Buffer instance configured to retry forever will stop if aborted.`, t, func() {
- entriesC := make(chan []*Entry)
- defer close(entriesC)
-
- client := &testClient{
- callback: func(entries []*Entry) error {
- entriesC <- entries
- return errors.New("test: failure")
- },
- }
-
- options := BufferOptions{
- Retry: func() retry.Iterator {
- return infiniteRetryIterator{}
- },
- }
-
- b := NewBuffer(context.Background(), options, client)
- err := b.PushEntries([]*Entry{
- {
- InsertID: "a",
- },
- })
- So(err, ShouldBeNil)
-
- // Wait for the buffer to finish.
- finishedC := make(chan struct{})
- go func() {
- defer close(finishedC)
- b.StopAndFlush()
- }()
-
- // Wait for the first attempt. This ensures that we've entered the retry
- // loop.
- <-entriesC
- go func() {
- // Consume any other attempts.
- for range entriesC {
- }
- }()
-
- // Abort the buffer.
- b.Abort()
-
- // Assert that it will stop eventually. Rather than deadlock/panic, we wait
- // one real second and fail if it didn't terminate. Since there is no
- // underlying latency, one second (in the failure case) is acceptable.
- closed := false
- select {
- case <-finishedC:
- closed = true
-
- case <-time.After(1 * time.Second):
- break
- }
- So(closed, ShouldBeTrue)
- })
-}
« no previous file with comments | « common/cloudlogging/buffer.go ('k') | common/cloudlogging/client.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698