Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(228)

Side by Side Diff: server/internal/logdog/collector/collector_test.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Post-splitting rebase. Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package collector
6
7 import (
8 "bytes"
9 "fmt"
10 "sort"
11 "strings"
12 "sync/atomic"
13 "testing"
14 "time"
15
16 "github.com/golang/protobuf/proto"
17 "github.com/luci/luci-go/common/clock"
18 "github.com/luci/luci-go/common/clock/testclock"
19 "github.com/luci/luci-go/common/errors"
20 "github.com/luci/luci-go/common/logdog/butlerproto"
21 "github.com/luci/luci-go/common/logdog/types"
22 "github.com/luci/luci-go/common/proto/google"
23 "github.com/luci/luci-go/common/proto/logdog/logpb"
24 "github.com/luci/luci-go/server/logdog/storage"
25 "github.com/luci/luci-go/server/logdog/storage/memory"
26 "golang.org/x/net/context"
27
28 . "github.com/smartystreets/goconvey/convey"
29 )
30
31 var testSecret = bytes.Repeat([]byte{0x55}, types.StreamSecretLength)
32
33 type bundleBuilder struct {
34 context.Context
35
36 base time.Time
37 entries []*logpb.ButlerLogBundle_Entry
38 }
39
40 func (b *bundleBuilder) addBundleEntry(be *logpb.ButlerLogBundle_Entry) {
41 if b.base.IsZero() {
42 b.base = clock.Now(b)
43 }
44
45 b.entries = append(b.entries, be)
46 }
47
48 func (b *bundleBuilder) genBundleEntry(name string, tidx int, idxs ...int) *logp b.ButlerLogBundle_Entry {
49 p, n := types.StreamPath(name).Split()
50 be := logpb.ButlerLogBundle_Entry{
51 Secret: testSecret,
52 Desc: &logpb.LogStreamDescriptor{
53 Prefix: string(p),
54 Name: string(n),
55 ContentType: "application/test-message",
56 StreamType: logpb.LogStreamDescriptor_TEXT,
57 Timestamp: google.NewTimestamp(clock.Now(b)),
58 },
59 }
60
61 if len(idxs) > 0 {
62 be.Logs = make([]*logpb.LogEntry, len(idxs))
63 for i, idx := range idxs {
64 be.Logs[i] = b.logEntry(idx)
65 }
66 if tidx >= 0 {
67 be.Terminal = true
68 be.TerminalIndex = uint64(tidx)
69 }
70 }
71
72 return &be
73 }
74
75 func (b *bundleBuilder) addStreamEntries(name string, term int, idxs ...int) {
76 b.addBundleEntry(b.genBundleEntry(name, term, idxs...))
77 }
78
79 func (b *bundleBuilder) addFullStream(name string, count int) {
80 idxs := make([]int, count)
81 for i := range idxs {
82 idxs[i] = i
83 }
84 b.addStreamEntries(name, count-1, idxs...)
85 }
86
87 func (b *bundleBuilder) logEntry(idx int) *logpb.LogEntry {
88 return &logpb.LogEntry{
89 StreamIndex: uint64(idx),
90 Sequence: uint64(idx),
91 Content: &logpb.LogEntry_Text{
92 Text: &logpb.Text{
93 Lines: []*logpb.Text_Line{
94 {
95 Value: fmt.Sprintf("Line #%d ", idx),
96 Delimiter: "\n",
97 },
98 },
99 },
100 },
101 }
102 }
103
104 func (b *bundleBuilder) bundle() []byte {
105 bytes := b.bundleWithEntries(b.entries...)
106 b.entries = nil
107 return bytes
108 }
109
110 func (b *bundleBuilder) bundleWithEntries(e ...*logpb.ButlerLogBundle_Entry) []b yte {
111 bundle := logpb.ButlerLogBundle{
112 Source: "test stream",
113 Timestamp: google.NewTimestamp(clock.Now(b)),
114 Entries: e,
115 }
116
117 buf := bytes.Buffer{}
118 w := butlerproto.Writer{Compress: true}
119 if err := w.Write(&buf, &bundle); err != nil {
120 panic(err)
121 }
122 return buf.Bytes()
123 }
124
125 type indexRange struct {
126 start int
127 end int
128 }
129
130 func (r *indexRange) String() string { return fmt.Sprintf("[%d..%d]", r.start, r .end) }
131
132 // shouldHaveRegisteredStream asserts that a testCoordinatorClient has
133 // registered a stream (string) and its terminal index (int).
134 func shouldHaveRegisteredStream(actual interface{}, expected ...interface{}) str ing {
135 tcc := actual.(*testCoordinatorClient)
136 name := expected[0].(string)
137 tidx := expected[1].(int)
138
139 cur, ok := tcc.stream(name)
140 if !ok {
141 return fmt.Sprintf("stream %q is not registered", name)
142 }
143 if tidx >= 0 && cur < 0 {
144 return fmt.Sprintf("stream %q is expected to be terminated, but isn't.", name)
145 }
146 if cur >= 0 && tidx < 0 {
147 return fmt.Sprintf("stream %q is NOT expected to be terminated, but it is.", name)
148 }
149 return ""
150 }
151
152 // shoudNotHaveRegisteredStream asserts that a testCoordinatorClient has not
153 // registered a stream (string).
154 func shouldNotHaveRegisteredStream(actual interface{}, expected ...interface{}) string {
155 tcc := actual.(*testCoordinatorClient)
156 name := expected[0].(string)
157
158 if _, ok := tcc.stream(name); ok {
159 return fmt.Sprintf("stream %q is registered, but it shoult NOT b e.", name)
160 }
161 return ""
162 }
163
164 // shouldHaveStoredStream asserts that a storage.Storage instance has contiguous
165 // stream records in it.
166 //
167 // actual is the storage.Storage instance. expected is a stream name (string)
168 // followed by a a series of records to assert. This can either be a specific
169 // integer index or an intexRange marking a closed range of indices.
170 func shouldHaveStoredStream(actual interface{}, expected ...interface{}) string {
171 st := actual.(storage.Storage)
172 name := expected[0].(string)
173
174 // Load all entries for this stream.
175 req := storage.GetRequest{
176 Path: types.StreamPath(name),
177 }
178
179 entries := make(map[int]*logpb.LogEntry)
180 var ierr error
181 err := st.Get(&req, func(idx types.MessageIndex, d []byte) bool {
182 le := logpb.LogEntry{}
183 if ierr = proto.Unmarshal(d, &le); ierr != nil {
184 return false
185 }
186 entries[int(idx)] = &le
187 return true
188 })
189 if ierr != nil {
190 err = ierr
191 }
192 if err != nil && err != storage.ErrDoesNotExist {
193 return fmt.Sprintf("error: %v", err)
194 }
195
196 assertLogEntry := func(i int) string {
197 le := entries[i]
198 if le == nil {
199 return fmt.Sprintf("%d", i)
200 }
201 delete(entries, i)
202
203 if le.StreamIndex != uint64(i) {
204 return fmt.Sprintf("*%d", i)
205 }
206 return ""
207 }
208
209 var failed []string
210 for _, exp := range expected[1:] {
211 switch e := exp.(type) {
212 case int:
213 if err := assertLogEntry(e); err != "" {
214 failed = append(failed, err)
215 }
216
217 case indexRange:
218 var errs []string
219 for i := e.start; i <= e.end; i++ {
220 if err := assertLogEntry(i); err != "" {
221 errs = append(errs, err)
222 }
223 }
224 if len(errs) > 0 {
225 failed = append(failed, fmt.Sprintf("%s{%s}", e. String(), strings.Join(errs, ",")))
226 }
227
228 default:
229 panic(fmt.Errorf("unknown expected type %T", e))
230 }
231 }
232
233 // Extras?
234 if len(entries) > 0 {
235 idxs := make([]int, 0, len(entries))
236 for i := range entries {
237 idxs = append(idxs, i)
238 }
239 sort.Ints(idxs)
240
241 extra := make([]string, len(idxs))
242 for i, idx := range idxs {
243 extra[i] = fmt.Sprintf("%d", idx)
244 }
245 failed = append(failed, fmt.Sprintf("extra{%s}", strings.Join(ex tra, ",")))
246 }
247
248 if len(failed) > 0 {
249 return strings.Join(failed, ", ")
250 }
251 return ""
252 }
253
254 // TestCollector runs through a series of end-to-end Collector workflows and
255 // ensures that the Collector behaves appropriately.
256 func TestCollector(t *testing.T) {
257 t.Parallel()
258
259 Convey(`Using a test configuration`, t, func() {
260 c, _ := testclock.UseTime(context.Background(), testclock.TestTi meLocal)
261
262 tcc := &testCoordinatorClient{}
263 st := &testStorage{Storage: &memory.Storage{}}
264
265 coll := New(Options{
266 Storage: st,
267 Coordinator: tcc,
268 })
269
270 bb := bundleBuilder{
271 Context: c,
272 }
273
274 Convey(`Can process multiple single full streams from a Butler b undle.`, func() {
275 bb.addFullStream("foo/+/bar", 128)
276 bb.addFullStream("foo/+/baz", 256)
277
278 So(coll.Process(c, bb.bundle()), ShouldBeNil)
279
280 So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 127)
281 So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0 , 127})
282
283 So(tcc, shouldHaveRegisteredStream, "foo/+/baz", 255)
284 So(st, shouldHaveStoredStream, "foo/+/baz", indexRange{0 , 255})
285 })
286
287 Convey(`Will return an error if a transient error happened while registering.`, func() {
288 tcc.errC = make(chan error, 1)
289 tcc.errC <- errors.WrapTransient(errors.New("test error" ))
290
291 bb.addFullStream("foo/+/bar", 128)
292 err := coll.Process(c, bb.bundle())
293 So(err, ShouldNotBeNil)
294 })
295
296 Convey(`Will not return an error if a non-transient error happen ed while registering.`, func() {
297 tcc.errC = make(chan error, 1)
298 tcc.errC <- errors.New("test error")
299
300 bb.addFullStream("foo/+/bar", 128)
301 So(coll.Process(c, bb.bundle()), ShouldBeNil)
302 })
303
304 Convey(`Will return an error if a transient error happened while terminating.`, func() {
305 tcc.errC = make(chan error, 2)
306 tcc.errC <- nil // Register
307 tcc.errC <- errors.WrapTransient(errors.New("test error" )) // Terminate
308
309 bb.addFullStream("foo/+/bar", 128)
310 So(coll.Process(c, bb.bundle()), ShouldNotBeNil)
311 })
312
313 Convey(`Will not return an error if a non-transient error happen ed while terminating.`, func() {
314 tcc.errC = make(chan error, 2)
315 tcc.errC <- nil // Register
316 tcc.errC <- errors.New("test error") // Terminate
317
318 bb.addFullStream("foo/+/bar", 128)
319 So(coll.Process(c, bb.bundle()), ShouldBeNil)
320 })
321
322 Convey(`Will return an error if a transient error happened on st orage.`, func() {
323 // Single transient error.
324 count := int32(0)
325 st.err = func() error {
326 if atomic.AddInt32(&count, 1) == 1 {
327 return errors.WrapTransient(errors.New(" test error"))
328 }
329 return nil
330 }
331
332 bb.addFullStream("foo/+/bar", 128)
333 So(coll.Process(c, bb.bundle()), ShouldNotBeNil)
334 })
335
336 Convey(`Will not return an error if a non-transient error happen ed on storage.`, func() {
337 // Single non-transient error.
338 count := int32(0)
339 st.err = func() error {
340 if atomic.AddInt32(&count, 1) == 1 {
341 return errors.New("test error")
342 }
343 return nil
344 }
345
346 bb.addFullStream("foo/+/bar", 128)
347 So(coll.Process(c, bb.bundle()), ShouldBeNil)
348 })
349
350 Convey(`Will drop invalid LogStreamDescriptor bundle entries and process the valid ones.`, func() {
351 be := bb.genBundleEntry("foo/+/trash", 1337, 4, 6, 7, 8)
352 be.Desc.ContentType = "" // Missing ContentType => inval id.
353
354 bb.addStreamEntries("foo/+/trash", -1, 0, 1, 2, 3, 5)
355 bb.addBundleEntry(be)
356 bb.addFullStream("foo/+/bar", 32)
357
358 So(coll.Process(c, bb.bundle()), ShouldBeNil)
359
360 So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 32)
361 So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0 , 31})
362
363 So(tcc, shouldHaveRegisteredStream, "foo/+/trash", -1)
364 So(st, shouldHaveStoredStream, "foo/+/trash", 0, 1, 2, 3 , 5)
365 })
366
367 Convey(`Will drop streams with missing secrets.`, func() {
368 be := bb.genBundleEntry("foo/+/trash", 2, 0, 1, 2)
369 be.Secret = nil
370 bb.addBundleEntry(be)
371
372 So(coll.Process(c, bb.bundle()), ShouldBeNil)
373 So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar", 127)
374 })
375
376 Convey(`Will drop messages with mismatching secrets.`, func() {
377 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2)
378 So(coll.Process(c, bb.bundle()), ShouldBeNil)
379
380 // Push another bundle with a different secret.
381 be := bb.genBundleEntry("foo/+/bar", 4, 3, 4)
382 be.Secret = bytes.Repeat([]byte{0xAA}, types.StreamSecre tLength)
383 be.TerminalIndex = 1337
384 bb.addBundleEntry(be)
385 bb.addFullStream("foo/+/baz", 3)
386 So(coll.Process(c, bb.bundle()), ShouldBeNil)
387
388 So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1)
389 So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0 , 2})
390
391 So(tcc, shouldHaveRegisteredStream, "foo/+/baz", 2)
392 So(st, shouldHaveStoredStream, "foo/+/baz", indexRange{0 , 2})
393 })
394
395 Convey(`Will return no error if the data has a corrupt bundle he ader.`, func() {
396 So(coll.Process(c, []byte{0x00}), ShouldBeNil)
397 So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar")
398 })
399
400 Convey(`Will drop bundles with unknown ProtoVersion string.`, fu nc() {
401 buf := bytes.Buffer{}
402 w := butlerproto.Writer{ProtoVersion: "!!!invalid!!!"}
403 w.Write(&buf, &logpb.ButlerLogBundle{})
404
405 So(coll.Process(c, buf.Bytes()), ShouldBeNil)
406
407 So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar")
408 })
409
410 Convey(`Will drop records beyond a local terminal index.`, func( ) {
411 bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 4)
412 So(coll.Process(c, bb.bundle()), ShouldBeNil)
413
414 bb.addStreamEntries("foo/+/bar", 4, 3, 5)
415 So(coll.Process(c, bb.bundle()), ShouldBeNil)
416
417 So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 4)
418 So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0 , 3})
419 })
420
421 Convey(`Will not ingest records beyond a remote terminal index.` , func() {
422 tcc.register(stateProxy{
423 path: "foo/+/bar",
424 secret: testSecret,
425 terminalIndex: 3,
426 })
427
428 bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2)
429 bb.addStreamEntries("foo/+/bar", 4, 3, 5)
430 So(coll.Process(c, bb.bundle()), ShouldBeNil)
431
432 So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 3)
433 So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0 , 3})
434 })
435
436 Convey(`Will not ingest records if the stream is archived.`, fun c() {
437 tcc.register(stateProxy{
438 path: "foo/+/bar",
439 secret: testSecret,
440 terminalIndex: -1,
441 archived: true,
442 })
443
444 bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 4)
445 So(coll.Process(c, bb.bundle()), ShouldBeNil)
446
447 So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1)
448 So(st, shouldHaveStoredStream, "foo/+/bar")
449 })
450
451 Convey(`Will not ingest records if the stream is purged.`, func( ) {
452 tcc.register(stateProxy{
453 path: "foo/+/bar",
454 secret: testSecret,
455 terminalIndex: -1,
456 purged: true,
457 })
458
459 So(coll.Process(c, bb.bundle()), ShouldBeNil)
460
461 So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1)
462 So(st, shouldHaveStoredStream, "foo/+/bar")
463 })
464
465 Convey(`Will not ingest a bundle with no bundle entries.`, func( ) {
466 So(coll.Process(c, bb.bundleWithEntries()), ShouldBeNil)
467 })
468
469 Convey(`Will not ingest a bundle whose log entries don't match t heir descriptor.`, func() {
470 be := bb.genBundleEntry("foo/+/bar", 4, 0, 1, 3, 4)
471
472 // Add a binary log entry. This does NOT match the text descriptor, and
473 // should fail validation.
474 be.Logs = append(be.Logs, &logpb.LogEntry{
475 StreamIndex: 2,
476 Sequence: 2,
477 Content: &logpb.LogEntry_Binary{
478 &logpb.Binary{
479 Data: []byte{0xd0, 0x6f, 0x00, 0 xd5},
480 },
481 },
482 })
483 bb.addBundleEntry(be)
484 So(coll.Process(c, bb.bundle()), ShouldBeNil)
485
486 So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 4)
487 So(st, shouldHaveStoredStream, "foo/+/bar", 0, 1, 3, 4)
488 })
489 })
490 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698