Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(207)

Side by Side Diff: server/internal/logdog/collector/collector_test.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Comments, rebase. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package collector
6
7 import (
8 "bytes"
9 "fmt"
10 "sync/atomic"
11 "testing"
12
13 "github.com/luci/luci-go/common/clock/testclock"
14 "github.com/luci/luci-go/common/errors"
15 "github.com/luci/luci-go/common/logdog/butlerproto"
16 "github.com/luci/luci-go/common/logdog/types"
17 "github.com/luci/luci-go/common/proto/logdog/logpb"
18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator"
19 "github.com/luci/luci-go/server/logdog/storage/memory"
20 "golang.org/x/net/context"
21
22 . "github.com/smartystreets/goconvey/convey"
23 )
24
25 // TestCollector runs through a series of end-to-end Collector workflows and
26 // ensures that the Collector behaves appropriately.
27 func TestCollector(t *testing.T) {
28 t.Parallel()
29
30 Convey(`Using a test configuration`, t, func() {
31 c, _ := testclock.UseTime(context.Background(), testclock.TestTi meLocal)
32
33 tcc := &testCoordinator{}
34 st := &testStorage{Storage: &memory.Storage{}}
35
36 coll := &Collector{
37 Coordinator: tcc,
38 Storage: st,
39 }
40
41 bb := bundleBuilder{
42 Context: c,
43 }
44
45 for _, phrase := range []string{"disabled", "enabled"} {
46 v := phrase == "enabled"
47
48 Convey(fmt.Sprintf(`When caching is %s`, phrase), func() {
49 if v {
50 coll.Coordinator = coordinator.NewCache( coll.Coordinator, 0, 0)
51 }
52
53 Convey(`Can process multiple single full streams from a Butler bundle.`, func() {
54 bb.addFullStream("foo/+/bar", 128)
55 bb.addFullStream("foo/+/baz", 256)
56
57 So(coll.Process(c, bb.bundle()), ShouldB eNil)
58
59 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", 127)
60 So(st, shouldHaveStoredStream, "foo/+/ba r", indexRange{0, 127})
61
62 So(tcc, shouldHaveRegisteredStream, "foo /+/baz", 255)
63 So(st, shouldHaveStoredStream, "foo/+/ba z", indexRange{0, 255})
64 })
65
66 Convey(`Will return a transient error if a trans ient error happened while registering.`, func() {
67 tcc.errC = make(chan error, 1)
68 tcc.errC <- errors.WrapTransient(errors. New("test error"))
69
70 bb.addFullStream("foo/+/bar", 128)
71 err := coll.Process(c, bb.bundle())
72 So(err, ShouldNotBeNil)
73 So(errors.IsTransient(err), ShouldBeTrue )
74 })
75
76 Convey(`Will return an error if a non-transient error happened while registering.`, func() {
77 tcc.errC = make(chan error, 1)
78 tcc.errC <- errors.New("test error")
79
80 bb.addFullStream("foo/+/bar", 128)
81 err := coll.Process(c, bb.bundle())
82 So(err, ShouldNotBeNil)
83 So(errors.IsTransient(err), ShouldBeFals e)
84 })
85
86 Convey(`Will return a transient error if a trans ient error happened while terminating.`, func() {
87 tcc.errC = make(chan error, 2)
88 tcc.errC <- nil // Register
89 tcc.errC <- errors.WrapTransient(errors. New("test error")) // Terminate
90
91 bb.addFullStream("foo/+/bar", 128)
92 err := coll.Process(c, bb.bundle())
93 So(err, ShouldNotBeNil)
94 So(errors.IsTransient(err), ShouldBeTrue )
95 })
96
97 Convey(`Will return an error if a non-transient error happened while terminating.`, func() {
98 tcc.errC = make(chan error, 2)
99 tcc.errC <- nil // Register
100 tcc.errC <- errors.New("test error") // Terminate
101
102 bb.addFullStream("foo/+/bar", 128)
103 err := coll.Process(c, bb.bundle())
104 So(err, ShouldNotBeNil)
105 So(errors.IsTransient(err), ShouldBeFals e)
106 })
107
108 Convey(`Will return a transient error if an erro r happened on storage.`, func() {
109 // Single transient error.
110 count := int32(0)
111 st.err = func() error {
112 if atomic.AddInt32(&count, 1) == 1 {
113 return errors.New("test error")
114 }
115 return nil
116 }
117
118 bb.addFullStream("foo/+/bar", 128)
119 err := coll.Process(c, bb.bundle())
120 So(err, ShouldNotBeNil)
121 So(errors.IsTransient(err), ShouldBeTrue )
122 })
123
124 Convey(`Will drop invalid LogStreamDescriptor bu ndle entries and process the valid ones.`, func() {
125 be := bb.genBundleEntry("foo/+/trash", 1 337, 4, 6, 7, 8)
126 be.Desc.ContentType = "" // Missing Cont entType => invalid.
127
128 bb.addStreamEntries("foo/+/trash", -1, 0 , 1, 2, 3, 5)
129 bb.addBundleEntry(be)
130 bb.addFullStream("foo/+/bar", 32)
131
132 So(coll.Process(c, bb.bundle()), ShouldB eNil)
133
134 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", 32)
135 So(st, shouldHaveStoredStream, "foo/+/ba r", indexRange{0, 31})
136
137 So(tcc, shouldHaveRegisteredStream, "foo /+/trash", -1)
138 So(st, shouldHaveStoredStream, "foo/+/tr ash", 0, 1, 2, 3, 5)
139 })
140
141 Convey(`Will drop streams with missing secrets.` , func() {
142 be := bb.genBundleEntry("foo/+/trash", 2 , 0, 1, 2)
143 be.Secret = nil
144 bb.addBundleEntry(be)
145
146 So(coll.Process(c, bb.bundle()), ShouldB eNil)
147 So(tcc, shouldNotHaveRegisteredStream, " foo/+/bar", 127)
148 })
149
150 Convey(`Will drop messages with mismatching secr ets.`, func() {
151 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2)
152 So(coll.Process(c, bb.bundle()), ShouldB eNil)
153
154 // Push another bundle with a different secret.
155 be := bb.genBundleEntry("foo/+/bar", 4, 3, 4)
156 be.Secret = bytes.Repeat([]byte{0xAA}, t ypes.StreamSecretLength)
157 be.TerminalIndex = 1337
158 bb.addBundleEntry(be)
159 bb.addFullStream("foo/+/baz", 3)
160 So(coll.Process(c, bb.bundle()), ShouldB eNil)
161
162 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", -1)
163 So(st, shouldHaveStoredStream, "foo/+/ba r", indexRange{0, 2})
164
165 So(tcc, shouldHaveRegisteredStream, "foo /+/baz", 2)
166 So(st, shouldHaveStoredStream, "foo/+/ba z", indexRange{0, 2})
167 })
168
169 Convey(`Will return no error if the data has a c orrupt bundle header.`, func() {
170 So(coll.Process(c, []byte{0x00}), Should BeNil)
171 So(tcc, shouldNotHaveRegisteredStream, " foo/+/bar")
172 })
173
174 Convey(`Will drop bundles with unknown ProtoVers ion string.`, func() {
175 buf := bytes.Buffer{}
176 w := butlerproto.Writer{ProtoVersion: "! !!invalid!!!"}
177 w.Write(&buf, &logpb.ButlerLogBundle{})
178
179 So(coll.Process(c, buf.Bytes()), ShouldB eNil)
180
181 So(tcc, shouldNotHaveRegisteredStream, " foo/+/bar")
182 })
183
184 Convey(`Will drop records beyond a local termina l index.`, func() {
185 bb.addStreamEntries("foo/+/bar", 3, 0, 1 , 2, 4)
186 So(coll.Process(c, bb.bundle()), ShouldB eNil)
187
188 bb.addStreamEntries("foo/+/bar", 3, 3, 5 )
189 So(coll.Process(c, bb.bundle()), ShouldB eNil)
190
191 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", 4)
192 So(st, shouldHaveStoredStream, "foo/+/ba r", indexRange{0, 3})
193 })
194
195 Convey(`Will not ingest records beyond a remote terminal index.`, func() {
196 tcc.register(coordinator.LogStreamState{
197 Path: "foo/+/bar",
198 Secret: testSecret,
199 TerminalIndex: 3,
200 })
201
202 bb.addStreamEntries("foo/+/bar", 3, 0, 1 , 2)
203 bb.addStreamEntries("foo/+/bar", 3, 3, 5 )
204 So(coll.Process(c, bb.bundle()), ShouldB eNil)
205
206 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", 3)
207 So(st, shouldHaveStoredStream, "foo/+/ba r", indexRange{0, 3})
208 })
209
210 Convey(`Will not ingest records if the stream is archived.`, func() {
211 tcc.register(coordinator.LogStreamState{
212 Path: "foo/+/bar",
213 Secret: testSecret,
214 TerminalIndex: -1,
215 Archived: true,
216 })
217
218 bb.addStreamEntries("foo/+/bar", 3, 0, 1 , 2, 4)
219 So(coll.Process(c, bb.bundle()), ShouldB eNil)
220
221 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", -1)
222 So(st, shouldHaveStoredStream, "foo/+/ba r")
223 })
224
225 Convey(`Will not ingest records if the stream is purged.`, func() {
226 tcc.register(coordinator.LogStreamState{
227 Path: "foo/+/bar",
228 Secret: testSecret,
229 TerminalIndex: -1,
230 Purged: true,
231 })
232
233 So(coll.Process(c, bb.bundle()), ShouldB eNil)
234
235 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", -1)
236 So(st, shouldHaveStoredStream, "foo/+/ba r")
237 })
238
239 Convey(`Will not ingest a bundle with no bundle entries.`, func() {
240 So(coll.Process(c, bb.bundleWithEntries( )), ShouldBeNil)
241 })
242
243 Convey(`Will not ingest a bundle whose log entri es don't match their descriptor.`, func() {
244 be := bb.genBundleEntry("foo/+/bar", 4, 0, 1, 3, 4)
245
246 // Add a binary log entry. This does NOT match the text descriptor, and
247 // should fail validation.
248 be.Logs = append(be.Logs, &logpb.LogEntr y{
249 StreamIndex: 2,
250 Sequence: 2,
251 Content: &logpb.LogEntry_Binary{
252 &logpb.Binary{
253 Data: []byte{0xd 0, 0x6f, 0x00, 0xd5},
254 },
255 },
256 })
257 bb.addBundleEntry(be)
258 So(coll.Process(c, bb.bundle()), ShouldB eNil)
259
260 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", 4)
261 So(st, shouldHaveStoredStream, "foo/+/ba r", 0, 1, 3, 4)
262 })
263 })
264 }
265 })
266 }
OLDNEW
« no previous file with comments | « server/internal/logdog/collector/collector.go ('k') | server/internal/logdog/collector/coordinator/cache.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698