| OLD | NEW |
| (Empty) |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | |
| 3 // that can be found in the LICENSE file. | |
| 4 | |
| 5 package cloudlogging | |
| 6 | |
| 7 import ( | |
| 8 "errors" | |
| 9 "fmt" | |
| 10 "testing" | |
| 11 "time" | |
| 12 | |
| 13 "github.com/luci/luci-go/common/retry" | |
| 14 . "github.com/smartystreets/goconvey/convey" | |
| 15 "golang.org/x/net/context" | |
| 16 ) | |
| 17 | |
| 18 type testClient struct { | |
| 19 callback func([]*Entry) error | |
| 20 pushes int | |
| 21 } | |
| 22 | |
| 23 var _ Client = (*testClient)(nil) | |
| 24 | |
| 25 func (c *testClient) PushEntries(entries []*Entry) error { | |
| 26 c.pushes++ | |
| 27 if c.callback != nil { | |
| 28 return c.callback(entries) | |
| 29 } | |
| 30 return nil | |
| 31 } | |
| 32 | |
| 33 type infiniteRetryIterator struct{} | |
| 34 | |
| 35 func (infiniteRetryIterator) Next(context.Context, error) time.Duration { | |
| 36 return 0 | |
| 37 } | |
| 38 | |
| 39 func TestBuffer(t *testing.T) { | |
| 40 t.Parallel() | |
| 41 | |
| 42 Convey(`A Buffer instance`, t, func() { | |
| 43 ctx := context.Background() | |
| 44 | |
| 45 entriesC := make(chan []*Entry, 1) | |
| 46 client := &testClient{ | |
| 47 callback: func(entries []*Entry) error { | |
| 48 entriesC <- entries | |
| 49 return nil | |
| 50 }, | |
| 51 } | |
| 52 | |
| 53 options := BufferOptions{ | |
| 54 Retry: func() retry.Iterator { | |
| 55 return &retry.Limited{ | |
| 56 Retries: 5, | |
| 57 } | |
| 58 }, | |
| 59 } | |
| 60 | |
| 61 b := NewBuffer(ctx, options, client).(*bufferImpl) | |
| 62 defer b.StopAndFlush() | |
| 63 | |
| 64 // Allow synchronization when a log entry is ingested. Set "ackC
" to nil to | |
| 65 // disable synchronization. | |
| 66 var bufferC chan []*Entry | |
| 67 var ackC chan *Entry | |
| 68 b.testCB = &testCallbacks{ | |
| 69 bufferRound: func(e []*Entry) { | |
| 70 if bufferC != nil { | |
| 71 bufferC <- e | |
| 72 } | |
| 73 }, | |
| 74 receivedLogEntry: func(e *Entry) { | |
| 75 if ackC != nil { | |
| 76 ackC <- e | |
| 77 } | |
| 78 }, | |
| 79 } | |
| 80 | |
| 81 So(b.BatchSize, ShouldEqual, DefaultBatchSize) | |
| 82 | |
| 83 Convey(`Will push a single entry.`, func() { | |
| 84 err := b.PushEntries([]*Entry{ | |
| 85 { | |
| 86 InsertID: "a", | |
| 87 }, | |
| 88 }) | |
| 89 So(err, ShouldBeNil) | |
| 90 | |
| 91 entries := <-entriesC | |
| 92 So(len(entries), ShouldEqual, 1) | |
| 93 So(entries[0], ShouldResemble, &Entry{ | |
| 94 InsertID: "a", | |
| 95 }) | |
| 96 So(client.pushes, ShouldEqual, 1) | |
| 97 }) | |
| 98 | |
| 99 Convey(`Will batch logging data.`, func() { | |
| 100 bufferC = make(chan []*Entry) | |
| 101 ackC = make(chan *Entry) | |
| 102 | |
| 103 // The first message will be read immediately. | |
| 104 err := b.PushEntries([]*Entry{ | |
| 105 { | |
| 106 InsertID: "a", | |
| 107 }, | |
| 108 }) | |
| 109 So(err, ShouldBeNil) | |
| 110 <-ackC | |
| 111 <-bufferC | |
| 112 | |
| 113 // The next set of messages will be batched, since we ha
ven't allowed our | |
| 114 // client stub to finish its PushEntries. | |
| 115 entries := make([]*Entry, b.BatchSize) | |
| 116 for i := range entries { | |
| 117 entries[i] = &Entry{ | |
| 118 InsertID: fmt.Sprintf("%d", i), | |
| 119 } | |
| 120 } | |
| 121 err = b.PushEntries(entries) | |
| 122 So(err, ShouldBeNil) | |
| 123 | |
| 124 // Read the first bundle. | |
| 125 bundle := <-entriesC | |
| 126 So(len(bundle), ShouldEqual, 1) | |
| 127 So(bundle[0].InsertID, ShouldEqual, "a") | |
| 128 | |
| 129 // Read the second bundle. | |
| 130 for range entries { | |
| 131 <-ackC | |
| 132 } | |
| 133 <-bufferC | |
| 134 bundle = <-entriesC | |
| 135 | |
| 136 So(len(bundle), ShouldEqual, b.BatchSize) | |
| 137 for i := range bundle { | |
| 138 So(bundle[i].InsertID, ShouldEqual, fmt.Sprintf(
"%d", i)) | |
| 139 } | |
| 140 So(client.pushes, ShouldEqual, 2) | |
| 141 }) | |
| 142 | |
| 143 Convey(`Will retry on failure.`, func() { | |
| 144 failures := 3 | |
| 145 client.callback = func(entries []*Entry) error { | |
| 146 if failures > 0 { | |
| 147 failures-- | |
| 148 return errors.New("test: induced failure
") | |
| 149 } | |
| 150 entriesC <- entries | |
| 151 return nil | |
| 152 } | |
| 153 | |
| 154 err := b.PushEntries([]*Entry{ | |
| 155 { | |
| 156 InsertID: "a", | |
| 157 }, | |
| 158 }) | |
| 159 So(err, ShouldBeNil) | |
| 160 | |
| 161 entries := <-entriesC | |
| 162 So(len(entries), ShouldEqual, 1) | |
| 163 So(entries[0], ShouldResemble, &Entry{ | |
| 164 InsertID: "a", | |
| 165 }) | |
| 166 So(client.pushes, ShouldEqual, 4) | |
| 167 }) | |
| 168 }) | |
| 169 | |
| 170 Convey(`A Buffer instance configured to retry forever will stop if abort
ed.`, t, func() { | |
| 171 entriesC := make(chan []*Entry) | |
| 172 defer close(entriesC) | |
| 173 | |
| 174 client := &testClient{ | |
| 175 callback: func(entries []*Entry) error { | |
| 176 entriesC <- entries | |
| 177 return errors.New("test: failure") | |
| 178 }, | |
| 179 } | |
| 180 | |
| 181 options := BufferOptions{ | |
| 182 Retry: func() retry.Iterator { | |
| 183 return infiniteRetryIterator{} | |
| 184 }, | |
| 185 } | |
| 186 | |
| 187 b := NewBuffer(context.Background(), options, client) | |
| 188 err := b.PushEntries([]*Entry{ | |
| 189 { | |
| 190 InsertID: "a", | |
| 191 }, | |
| 192 }) | |
| 193 So(err, ShouldBeNil) | |
| 194 | |
| 195 // Wait for the buffer to finish. | |
| 196 finishedC := make(chan struct{}) | |
| 197 go func() { | |
| 198 defer close(finishedC) | |
| 199 b.StopAndFlush() | |
| 200 }() | |
| 201 | |
| 202 // Wait for the first attempt. This ensures that we've entered t
he retry | |
| 203 // loop. | |
| 204 <-entriesC | |
| 205 go func() { | |
| 206 // Consume any other attempts. | |
| 207 for range entriesC { | |
| 208 } | |
| 209 }() | |
| 210 | |
| 211 // Abort the buffer. | |
| 212 b.Abort() | |
| 213 | |
| 214 // Assert that it will stop eventually. Rather than deadlock/pan
ic, we wait | |
| 215 // one real second and fail if it didn't terminate. Since there
is no | |
| 216 // underlying latency, one second (in the failure case) is accep
table. | |
| 217 closed := false | |
| 218 select { | |
| 219 case <-finishedC: | |
| 220 closed = true | |
| 221 | |
| 222 case <-time.After(1 * time.Second): | |
| 223 break | |
| 224 } | |
| 225 So(closed, ShouldBeTrue) | |
| 226 }) | |
| 227 } | |
| OLD | NEW |