Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3)

Side by Side Diff: server/internal/logdog/collector/collector_test.go

Issue 1838803002: LogDog: BigTable batching schema. (Closed) Base URL: https://github.com/luci/luci-go@recordio-split
Patch Set: Minor comments and quality of code tweaks. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package collector 5 package collector
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "fmt" 9 "fmt"
10 "sync/atomic" 10 "sync/atomic"
11 "testing" 11 "testing"
12 12
13 "github.com/luci/luci-go/common/clock/testclock" 13 "github.com/luci/luci-go/common/clock/testclock"
14 "github.com/luci/luci-go/common/errors" 14 "github.com/luci/luci-go/common/errors"
15 "github.com/luci/luci-go/common/logdog/butlerproto" 15 "github.com/luci/luci-go/common/logdog/butlerproto"
16 "github.com/luci/luci-go/common/logdog/types" 16 "github.com/luci/luci-go/common/logdog/types"
17 "github.com/luci/luci-go/common/proto/logdog/logpb" 17 "github.com/luci/luci-go/common/proto/logdog/logpb"
18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" 18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator"
19 "github.com/luci/luci-go/server/logdog/storage/memory" 19 "github.com/luci/luci-go/server/logdog/storage/memory"
20 "golang.org/x/net/context" 20 "golang.org/x/net/context"
21 21
22 . "github.com/luci/luci-go/common/testing/assertions"
22 . "github.com/smartystreets/goconvey/convey" 23 . "github.com/smartystreets/goconvey/convey"
23 ) 24 )
24 25
25 // TestCollector runs through a series of end-to-end Collector workflows and 26 // TestCollector runs through a series of end-to-end Collector workflows and
26 // ensures that the Collector behaves appropriately. 27 // ensures that the Collector behaves appropriately.
27 func TestCollector(t *testing.T) { 28 func TestCollector(t *testing.T) {
28 t.Parallel() 29 t.Parallel()
29 30
30 Convey(`Using a test configuration`, t, func() { 31 Convey(`Using a test configuration`, t, func() {
31 c, _ := testclock.UseTime(context.Background(), testclock.TestTi meLocal) 32 c, _ := testclock.UseTime(context.Background(), testclock.TestTi meLocal)
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after
99 tcc.errC = make(chan error, 2) 100 tcc.errC = make(chan error, 2)
100 tcc.errC <- nil // Register 101 tcc.errC <- nil // Register
101 tcc.errC <- errors.New("test error") // Terminate 102 tcc.errC <- errors.New("test error") // Terminate
102 103
103 bb.addFullStream("foo/+/bar", 128) 104 bb.addFullStream("foo/+/bar", 128)
104 err := coll.Process(c, bb.bundle()) 105 err := coll.Process(c, bb.bundle())
105 So(err, ShouldNotBeNil) 106 So(err, ShouldNotBeNil)
106 So(errors.IsTransient(err), ShouldBeFals e) 107 So(errors.IsTransient(err), ShouldBeFals e)
107 }) 108 })
108 109
109 » » » » Convey(`Will return a transient error if an erro r happened on storage.`, func() { 110 » » » » Convey(`Will return a transient error if a trans ient error happened on storage.`, func() {
110 // Single transient error. 111 // Single transient error.
111 count := int32(0) 112 count := int32(0)
112 st.err = func() error { 113 st.err = func() error {
113 if atomic.AddInt32(&count, 1) == 1 { 114 if atomic.AddInt32(&count, 1) == 1 {
114 » » » » » » » return errors.New("test error") 115 » » » » » » » return errors.WrapTransi ent(errors.New("test error"))
115 } 116 }
116 return nil 117 return nil
117 } 118 }
118 119
119 bb.addFullStream("foo/+/bar", 128) 120 bb.addFullStream("foo/+/bar", 128)
120 err := coll.Process(c, bb.bundle()) 121 err := coll.Process(c, bb.bundle())
121 So(err, ShouldNotBeNil) 122 So(err, ShouldNotBeNil)
122 So(errors.IsTransient(err), ShouldBeTrue ) 123 So(errors.IsTransient(err), ShouldBeTrue )
123 }) 124 })
124 125
125 Convey(`Will drop invalid LogStreamDescriptor bu ndle entries and process the valid ones.`, func() { 126 Convey(`Will drop invalid LogStreamDescriptor bu ndle entries and process the valid ones.`, func() {
126 » » » » » be := bb.genBundleEntry("foo/+/trash", 1 337, 4, 6, 7, 8) 127 » » » » » be := bb.genBundleEntry("foo/+/trash", 1 337, 4, 5, 6, 7, 8)
127 » » » » » be.Desc.ContentType = "" // Missing Cont entType => invalid. 128 » » » » » bb.addBundleEntry(be)
128 129
129 » » » » » bb.addStreamEntries("foo/+/trash", -1, 0 , 1, 2, 3, 5) 130 » » » » » bb.addStreamEntries("foo/+/trash", 0, 1, 3) // Invalid: non-contiguous
130 » » » » » bb.addBundleEntry(be)
131 bb.addFullStream("foo/+/bar", 32) 131 bb.addFullStream("foo/+/bar", 32)
132 132
133 » » » » » So(coll.Process(c, bb.bundle()), ShouldB eNil) 133 » » » » » err := coll.Process(c, bb.bundle())
134 » » » » » So(err, ShouldNotBeNil)
135 » » » » » So(errors.IsTransient(err), ShouldBeFals e)
134 136
135 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", 32) 137 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", 32)
136 So(st, shouldHaveStoredStream, "foo/+/ba r", indexRange{0, 31}) 138 So(st, shouldHaveStoredStream, "foo/+/ba r", indexRange{0, 31})
137 139
138 » » » » » So(tcc, shouldHaveRegisteredStream, "foo /+/trash", -1) 140 » » » » » So(tcc, shouldHaveRegisteredStream, "foo /+/trash", 1337)
139 » » » » » So(st, shouldHaveStoredStream, "foo/+/tr ash", 0, 1, 2, 3, 5) 141 » » » » » So(st, shouldHaveStoredStream, "foo/+/tr ash", 4, 5, 6, 7, 8)
140 }) 142 })
141 143
142 Convey(`Will drop streams with missing secrets.` , func() { 144 Convey(`Will drop streams with missing secrets.` , func() {
143 be := bb.genBundleEntry("foo/+/trash", 2 , 0, 1, 2) 145 be := bb.genBundleEntry("foo/+/trash", 2 , 0, 1, 2)
144 be.Secret = nil 146 be.Secret = nil
145 bb.addBundleEntry(be) 147 bb.addBundleEntry(be)
146 148
147 » » » » » So(coll.Process(c, bb.bundle()), ShouldB eNil) 149 » » » » » err := coll.Process(c, bb.bundle())
148 » » » » » So(tcc, shouldNotHaveRegisteredStream, " foo/+/bar", 127) 150 » » » » » So(err, ShouldErrLike, "missing stream s ecret")
151 » » » » » So(errors.IsTransient(err), ShouldBeFals e)
152 » » » » » So(tcc, shouldNotHaveRegisteredStream, " foo/+/bar")
149 }) 153 })
150 154
151 Convey(`Will drop messages with mismatching secr ets.`, func() { 155 Convey(`Will drop messages with mismatching secr ets.`, func() {
152 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2) 156 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2)
153 So(coll.Process(c, bb.bundle()), ShouldB eNil) 157 So(coll.Process(c, bb.bundle()), ShouldB eNil)
154 158
155 // Push another bundle with a different secret. 159 // Push another bundle with a different secret.
156 be := bb.genBundleEntry("foo/+/bar", 4, 3, 4) 160 be := bb.genBundleEntry("foo/+/bar", 4, 3, 4)
157 be.Secret = bytes.Repeat([]byte{0xAA}, t ypes.StreamSecretLength) 161 be.Secret = bytes.Repeat([]byte{0xAA}, t ypes.StreamSecretLength)
158 be.TerminalIndex = 1337 162 be.TerminalIndex = 1337
(...skipping 16 matching lines...) Expand all
175 Convey(`Will drop bundles with unknown ProtoVers ion string.`, func() { 179 Convey(`Will drop bundles with unknown ProtoVers ion string.`, func() {
176 buf := bytes.Buffer{} 180 buf := bytes.Buffer{}
177 w := butlerproto.Writer{ProtoVersion: "! !!invalid!!!"} 181 w := butlerproto.Writer{ProtoVersion: "! !!invalid!!!"}
178 w.Write(&buf, &logpb.ButlerLogBundle{}) 182 w.Write(&buf, &logpb.ButlerLogBundle{})
179 183
180 So(coll.Process(c, buf.Bytes()), ShouldB eNil) 184 So(coll.Process(c, buf.Bytes()), ShouldB eNil)
181 185
182 So(tcc, shouldNotHaveRegisteredStream, " foo/+/bar") 186 So(tcc, shouldNotHaveRegisteredStream, " foo/+/bar")
183 }) 187 })
184 188
185 Convey(`Will drop records beyond a local termina l index.`, func() {
186 bb.addStreamEntries("foo/+/bar", 3, 0, 1 , 2, 4)
187 So(coll.Process(c, bb.bundle()), ShouldB eNil)
188
189 bb.addStreamEntries("foo/+/bar", 3, 3, 5 )
190 So(coll.Process(c, bb.bundle()), ShouldB eNil)
191
192 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", 4)
193 So(st, shouldHaveStoredStream, "foo/+/ba r", indexRange{0, 3})
194 })
195
196 Convey(`Will not ingest records beyond a remote terminal index.`, func() {
197 tcc.register(coordinator.LogStreamState{
198 Path: "foo/+/bar",
199 Secret: testSecret,
200 TerminalIndex: 3,
201 })
202
203 bb.addStreamEntries("foo/+/bar", 3, 0, 1 , 2)
204 bb.addStreamEntries("foo/+/bar", 3, 3, 5 )
205 So(coll.Process(c, bb.bundle()), ShouldB eNil)
206
207 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", 3)
208 So(st, shouldHaveStoredStream, "foo/+/ba r", indexRange{0, 3})
209 })
210
211 Convey(`Will not ingest records if the stream is archived.`, func() { 189 Convey(`Will not ingest records if the stream is archived.`, func() {
212 tcc.register(coordinator.LogStreamState{ 190 tcc.register(coordinator.LogStreamState{
213 Path: "foo/+/bar", 191 Path: "foo/+/bar",
214 Secret: testSecret, 192 Secret: testSecret,
215 TerminalIndex: -1, 193 TerminalIndex: -1,
216 Archived: true, 194 Archived: true,
217 }) 195 })
218 196
219 » » » » » bb.addStreamEntries("foo/+/bar", 3, 0, 1 , 2, 4) 197 » » » » » bb.addStreamEntries("foo/+/bar", 3, 0, 1 , 2, 3, 4)
220 So(coll.Process(c, bb.bundle()), ShouldB eNil) 198 So(coll.Process(c, bb.bundle()), ShouldB eNil)
221 199
222 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", -1) 200 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", -1)
223 So(st, shouldHaveStoredStream, "foo/+/ba r") 201 So(st, shouldHaveStoredStream, "foo/+/ba r")
224 }) 202 })
225 203
226 Convey(`Will not ingest records if the stream is purged.`, func() { 204 Convey(`Will not ingest records if the stream is purged.`, func() {
227 tcc.register(coordinator.LogStreamState{ 205 tcc.register(coordinator.LogStreamState{
228 Path: "foo/+/bar", 206 Path: "foo/+/bar",
229 Secret: testSecret, 207 Secret: testSecret,
230 TerminalIndex: -1, 208 TerminalIndex: -1,
231 Purged: true, 209 Purged: true,
232 }) 210 })
233 211
234 So(coll.Process(c, bb.bundle()), ShouldB eNil) 212 So(coll.Process(c, bb.bundle()), ShouldB eNil)
235 213
236 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", -1) 214 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", -1)
237 So(st, shouldHaveStoredStream, "foo/+/ba r") 215 So(st, shouldHaveStoredStream, "foo/+/ba r")
238 }) 216 })
239 217
240 Convey(`Will not ingest a bundle with no bundle entries.`, func() { 218 Convey(`Will not ingest a bundle with no bundle entries.`, func() {
241 So(coll.Process(c, bb.bundleWithEntries( )), ShouldBeNil) 219 So(coll.Process(c, bb.bundleWithEntries( )), ShouldBeNil)
242 }) 220 })
243 221
244 Convey(`Will not ingest a bundle whose log entri es don't match their descriptor.`, func() { 222 Convey(`Will not ingest a bundle whose log entri es don't match their descriptor.`, func() {
245 » » » » » be := bb.genBundleEntry("foo/+/bar", 4, 0, 1, 3, 4) 223 » » » » » be := bb.genBundleEntry("foo/+/bar", 4, 0, 1, 2, 3, 4)
246 224
247 // Add a binary log entry. This does NOT match the text descriptor, and 225 // Add a binary log entry. This does NOT match the text descriptor, and
248 // should fail validation. 226 // should fail validation.
249 be.Logs = append(be.Logs, &logpb.LogEntr y{ 227 be.Logs = append(be.Logs, &logpb.LogEntr y{
250 StreamIndex: 2, 228 StreamIndex: 2,
251 Sequence: 2, 229 Sequence: 2,
252 Content: &logpb.LogEntry_Binary{ 230 Content: &logpb.LogEntry_Binary{
253 &logpb.Binary{ 231 &logpb.Binary{
254 Data: []byte{0xd 0, 0x6f, 0x00, 0xd5}, 232 Data: []byte{0xd 0, 0x6f, 0x00, 0xd5},
255 }, 233 },
256 }, 234 },
257 }) 235 })
258 bb.addBundleEntry(be) 236 bb.addBundleEntry(be)
259 » » » » » So(coll.Process(c, bb.bundle()), ShouldB eNil) 237 » » » » » So(coll.Process(c, bb.bundle()), ShouldE rrLike, "invalid log entry")
260 238
261 » » » » » So(tcc, shouldHaveRegisteredStream, "foo /+/bar", 4) 239 » » » » » So(tcc, shouldNotHaveRegisteredStream, " foo/+/bar")
262 » » » » » So(st, shouldHaveStoredStream, "foo/+/ba r", 0, 1, 3, 4)
263 }) 240 })
264 }) 241 })
265 } 242 }
266 }) 243 })
267 } 244 }
OLDNEW
« no previous file with comments | « server/internal/logdog/collector/collector.go ('k') | server/internal/logdog/collector/utils_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698