Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1263)

Side by Side Diff: server/internal/logdog/collector/collector_test.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Reorganized, cleaned up, comments, and updated for pRPC. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package collector
6
7 import (
8 "bytes"
9 "fmt"
10 "sync/atomic"
11 "testing"
12
13 "github.com/luci/luci-go/common/clock/testclock"
14 "github.com/luci/luci-go/common/errors"
15 "github.com/luci/luci-go/common/logdog/butlerproto"
16 "github.com/luci/luci-go/common/logdog/types"
17 "github.com/luci/luci-go/common/proto/logdog/logpb"
18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator"
19 "github.com/luci/luci-go/server/logdog/storage/memory"
20 "golang.org/x/net/context"
21
22 . "github.com/smartystreets/goconvey/convey"
23 )
24
25 // TestCollector runs through a series of end-to-end Collector workflows and
26 // ensures that the Collector behaves appropriately.
27 func TestCollector(t *testing.T) {
28 t.Parallel()
29
30 Convey(`Using a test configuration`, t, func() {
31 c, _ := testclock.UseTime(context.Background(), testclock.TestTi meLocal)
32
33 tcc := &testCoordinator{}
34 st := &testStorage{Storage: &memory.Storage{}}
35
36 coll := &Collector{
37 Coordinator: tcc,
38 Storage: st,
39 }
40
41 bb := bundleBuilder{
42 Context: c,
43 }
44
45 for _, v := range []bool{
iannucci 2016/02/05 23:41:01 for _, phrase := range []string{"disabled", "enabl
dnj (Google) 2016/02/06 04:10:36 Done.
46 false,
47 true,
48 } {
49 phrase := "disabled"
50 if v {
51 phrase = "enabled"
52 }
53 Convey(fmt.Sprintf(`When caching is %s`, phrase), func() {
54 if v {
55 coll.Coordinator = coordinator.NewCache( coll.Coordinator, 0, 0)
56 }
57
58 Convey(`Can process multiple single full streams from a Butler bundle.`, func() {
59 bb.addFullStream("foo/+/bar", 128)
60 bb.addFullStream("foo/+/baz", 256)
61
62 So(coll.Process(c, bb.bundle()), ShouldB eNil)
63
64 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", 127)
65 So(st, shouldHaveStoredStream, "foo/+/ba r", indexRange{0, 127})
66
67 So(tcc, shouldHaveRegisteredStream, "foo /+/baz", 255)
68 So(st, shouldHaveStoredStream, "foo/+/ba z", indexRange{0, 255})
69 })
70
71 Convey(`Will return a transient error if a trans ient error happened while registering.`, func() {
72 tcc.errC = make(chan error, 1)
73 tcc.errC <- errors.WrapTransient(errors. New("test error"))
74
75 bb.addFullStream("foo/+/bar", 128)
76 err := coll.Process(c, bb.bundle())
77 So(err, ShouldNotBeNil)
78 So(errors.IsTransient(err), ShouldBeTrue )
79 })
80
81 Convey(`Will return an error if a non-transient error happened while registering.`, func() {
82 tcc.errC = make(chan error, 1)
83 tcc.errC <- errors.New("test error")
84
85 bb.addFullStream("foo/+/bar", 128)
86 err := coll.Process(c, bb.bundle())
87 So(err, ShouldNotBeNil)
88 So(errors.IsTransient(err), ShouldBeFals e)
89 })
90
91 Convey(`Will return a transient error if a trans ient error happened while terminating.`, func() {
92 tcc.errC = make(chan error, 2)
93 tcc.errC <- nil // Register
94 tcc.errC <- errors.WrapTransient(errors. New("test error")) // Terminate
95
96 bb.addFullStream("foo/+/bar", 128)
97 err := coll.Process(c, bb.bundle())
98 So(err, ShouldNotBeNil)
99 So(errors.IsTransient(err), ShouldBeTrue )
100 })
101
102 Convey(`Will return an error if a non-transient error happened while terminating.`, func() {
103 tcc.errC = make(chan error, 2)
104 tcc.errC <- nil // Register
105 tcc.errC <- errors.New("test error") // Terminate
106
107 bb.addFullStream("foo/+/bar", 128)
108 err := coll.Process(c, bb.bundle())
109 So(err, ShouldNotBeNil)
110 So(errors.IsTransient(err), ShouldBeFals e)
111 })
112
113 Convey(`Will return a transient error if an erro r happened on storage.`, func() {
114 // Single transient error.
115 count := int32(0)
116 st.err = func() error {
117 if atomic.AddInt32(&count, 1) == 1 {
118 return errors.New("test error")
119 }
120 return nil
121 }
122
123 bb.addFullStream("foo/+/bar", 128)
124 err := coll.Process(c, bb.bundle())
125 So(err, ShouldNotBeNil)
126 So(errors.IsTransient(err), ShouldBeTrue )
127 })
128
129 Convey(`Will drop invalid LogStreamDescriptor bu ndle entries and process the valid ones.`, func() {
130 be := bb.genBundleEntry("foo/+/trash", 1 337, 4, 6, 7, 8)
131 be.Desc.ContentType = "" // Missing Cont entType => invalid.
132
133 bb.addStreamEntries("foo/+/trash", -1, 0 , 1, 2, 3, 5)
134 bb.addBundleEntry(be)
135 bb.addFullStream("foo/+/bar", 32)
136
137 So(coll.Process(c, bb.bundle()), ShouldB eNil)
138
139 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", 32)
140 So(st, shouldHaveStoredStream, "foo/+/ba r", indexRange{0, 31})
141
142 So(tcc, shouldHaveRegisteredStream, "foo /+/trash", -1)
143 So(st, shouldHaveStoredStream, "foo/+/tr ash", 0, 1, 2, 3, 5)
144 })
145
146 Convey(`Will drop streams with missing secrets.` , func() {
147 be := bb.genBundleEntry("foo/+/trash", 2 , 0, 1, 2)
148 be.Secret = nil
149 bb.addBundleEntry(be)
150
151 So(coll.Process(c, bb.bundle()), ShouldB eNil)
152 So(tcc, shouldNotHaveRegisteredStream, " foo/+/bar", 127)
153 })
154
155 Convey(`Will drop messages with mismatching secr ets.`, func() {
156 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2)
157 So(coll.Process(c, bb.bundle()), ShouldB eNil)
158
159 // Push another bundle with a different secret.
160 be := bb.genBundleEntry("foo/+/bar", 4, 3, 4)
161 be.Secret = bytes.Repeat([]byte{0xAA}, t ypes.StreamSecretLength)
162 be.TerminalIndex = 1337
163 bb.addBundleEntry(be)
164 bb.addFullStream("foo/+/baz", 3)
165 So(coll.Process(c, bb.bundle()), ShouldB eNil)
166
167 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", -1)
168 So(st, shouldHaveStoredStream, "foo/+/ba r", indexRange{0, 2})
169
170 So(tcc, shouldHaveRegisteredStream, "foo /+/baz", 2)
171 So(st, shouldHaveStoredStream, "foo/+/ba z", indexRange{0, 2})
172 })
173
174 Convey(`Will return no error if the data has a c orrupt bundle header.`, func() {
175 So(coll.Process(c, []byte{0x00}), Should BeNil)
176 So(tcc, shouldNotHaveRegisteredStream, " foo/+/bar")
177 })
178
179 Convey(`Will drop bundles with unknown ProtoVers ion string.`, func() {
180 buf := bytes.Buffer{}
181 w := butlerproto.Writer{ProtoVersion: "! !!invalid!!!"}
182 w.Write(&buf, &logpb.ButlerLogBundle{})
183
184 So(coll.Process(c, buf.Bytes()), ShouldB eNil)
185
186 So(tcc, shouldNotHaveRegisteredStream, " foo/+/bar")
187 })
188
189 Convey(`Will drop records beyond a local termina l index.`, func() {
190 bb.addStreamEntries("foo/+/bar", 3, 0, 1 , 2, 4)
191 So(coll.Process(c, bb.bundle()), ShouldB eNil)
192
193 bb.addStreamEntries("foo/+/bar", 3, 3, 5 )
194 So(coll.Process(c, bb.bundle()), ShouldB eNil)
195
196 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", 4)
197 So(st, shouldHaveStoredStream, "foo/+/ba r", indexRange{0, 3})
198 })
199
200 Convey(`Will not ingest records beyond a remote terminal index.`, func() {
201 tcc.register(coordinator.LogStreamState{
202 Path: "foo/+/bar",
203 Secret: testSecret,
204 TerminalIndex: 3,
205 })
206
207 bb.addStreamEntries("foo/+/bar", 3, 0, 1 , 2)
208 bb.addStreamEntries("foo/+/bar", 3, 3, 5 )
209 So(coll.Process(c, bb.bundle()), ShouldB eNil)
210
211 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", 3)
212 So(st, shouldHaveStoredStream, "foo/+/ba r", indexRange{0, 3})
213 })
214
215 Convey(`Will not ingest records if the stream is archived.`, func() {
216 tcc.register(coordinator.LogStreamState{
217 Path: "foo/+/bar",
218 Secret: testSecret,
219 TerminalIndex: -1,
220 Archived: true,
221 })
222
223 bb.addStreamEntries("foo/+/bar", 3, 0, 1 , 2, 4)
224 So(coll.Process(c, bb.bundle()), ShouldB eNil)
225
226 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", -1)
227 So(st, shouldHaveStoredStream, "foo/+/ba r")
228 })
229
230 Convey(`Will not ingest records if the stream is purged.`, func() {
231 tcc.register(coordinator.LogStreamState{
232 Path: "foo/+/bar",
233 Secret: testSecret,
234 TerminalIndex: -1,
235 Purged: true,
236 })
237
238 So(coll.Process(c, bb.bundle()), ShouldB eNil)
239
240 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", -1)
241 So(st, shouldHaveStoredStream, "foo/+/ba r")
242 })
243
244 Convey(`Will not ingest a bundle with no bundle entries.`, func() {
245 So(coll.Process(c, bb.bundleWithEntries( )), ShouldBeNil)
246 })
247
248 Convey(`Will not ingest a bundle whose log entri es don't match their descriptor.`, func() {
249 be := bb.genBundleEntry("foo/+/bar", 4, 0, 1, 3, 4)
250
251 // Add a binary log entry. This does NOT match the text descriptor, and
252 // should fail validation.
253 be.Logs = append(be.Logs, &logpb.LogEntr y{
254 StreamIndex: 2,
255 Sequence: 2,
256 Content: &logpb.LogEntry_Binary{
257 &logpb.Binary{
258 Data: []byte{0xd 0, 0x6f, 0x00, 0xd5},
259 },
260 },
261 })
262 bb.addBundleEntry(be)
263 So(coll.Process(c, bb.bundle()), ShouldB eNil)
264
265 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", 4)
266 So(st, shouldHaveStoredStream, "foo/+/ba r", 0, 1, 3, 4)
267 })
268 })
269 }
270 })
271 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698