Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(5)

Side by Side Diff: server/internal/logdog/collector/collector_test.go

Issue 1906023002: LogDog: Add project namespace to Butler/Collector. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-archivist
Patch Set: Rebase? Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package collector 5 package collector
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "fmt" 9 "fmt"
10 "sync/atomic" 10 "sync/atomic"
11 "testing" 11 "testing"
12 12
13 "github.com/luci/luci-go/common/clock/testclock" 13 "github.com/luci/luci-go/common/clock/testclock"
14 "github.com/luci/luci-go/common/config"
14 "github.com/luci/luci-go/common/errors" 15 "github.com/luci/luci-go/common/errors"
15 "github.com/luci/luci-go/common/logdog/butlerproto" 16 "github.com/luci/luci-go/common/logdog/butlerproto"
16 "github.com/luci/luci-go/common/logdog/types" 17 "github.com/luci/luci-go/common/logdog/types"
17 "github.com/luci/luci-go/common/proto/logdog/logpb" 18 "github.com/luci/luci-go/common/proto/logdog/logpb"
18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" 19 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator"
19 "github.com/luci/luci-go/server/logdog/storage/memory" 20 "github.com/luci/luci-go/server/logdog/storage/memory"
20 "golang.org/x/net/context" 21 "golang.org/x/net/context"
21 22
22 . "github.com/luci/luci-go/common/testing/assertions" 23 . "github.com/luci/luci-go/common/testing/assertions"
23 . "github.com/smartystreets/goconvey/convey" 24 . "github.com/smartystreets/goconvey/convey"
24 ) 25 )
25 26
26 // TestCollector runs through a series of end-to-end Collector workflows and 27 // TestCollector runs through a series of end-to-end Collector workflows and
27 // ensures that the Collector behaves appropriately. 28 // ensures that the Collector behaves appropriately.
28 func TestCollector(t *testing.T) { 29 func testCollectorImpl(t *testing.T, caching bool) {
29 » t.Parallel() 30 » Convey(fmt.Sprintf(`Using a test configuration with caching == %v`, cach ing), t, func() {
30
31 » Convey(`Using a test configuration`, t, func() {
32 c, _ := testclock.UseTime(context.Background(), testclock.TestTi meLocal) 31 c, _ := testclock.UseTime(context.Background(), testclock.TestTi meLocal)
33 32
34 tcc := &testCoordinator{} 33 tcc := &testCoordinator{}
35 st := &testStorage{Storage: &memory.Storage{}} 34 st := &testStorage{Storage: &memory.Storage{}}
36 35
37 coll := &Collector{ 36 coll := &Collector{
38 Coordinator: tcc, 37 Coordinator: tcc,
39 Storage: st, 38 Storage: st,
40 } 39 }
41 defer coll.Close() 40 defer coll.Close()
42 41
43 bb := bundleBuilder{ 42 bb := bundleBuilder{
44 Context: c, 43 Context: c,
45 } 44 }
46 45
47 » » for _, phrase := range []string{"disabled", "enabled"} { 46 » » if caching {
48 » » » v := phrase == "enabled" 47 » » » coll.Coordinator = coordinator.NewCache(coll.Coordinator , 0, 0)
49 48 » » }
50 » » » Convey(fmt.Sprintf(`When caching is %s`, phrase), func() { 49
51 » » » » if v { 50 » » Convey(`Can process multiple single full streams from a Butler b undle.`, func() {
52 » » » » » coll.Coordinator = coordinator.NewCache( coll.Coordinator, 0, 0) 51 » » » bb.addFullStream("foo/+/bar", 128)
52 » » » bb.addFullStream("foo/+/baz", 256)
53
54 » » » So(coll.Process(c, bb.bundle()), ShouldBeNil)
55
56 » » » So(tcc, shouldHaveRegisteredStream, "test-project", "foo /+/bar", 127)
57 » » » So(st, shouldHaveStoredStream, "test-project", "foo/+/ba r", indexRange{0, 127})
58
59 » » » So(tcc, shouldHaveRegisteredStream, "test-project", "foo /+/baz", 255)
60 » » » So(st, shouldHaveStoredStream, "test-project", "foo/+/ba z", indexRange{0, 255})
61 » » })
62
63 » » Convey(`Will return a transient error if a transient error happe ned while registering.`, func() {
64 » » » tcc.errC = make(chan error, 1)
65 » » » tcc.errC <- errors.WrapTransient(errors.New("test error" ))
66
67 » » » bb.addFullStream("foo/+/bar", 128)
68 » » » err := coll.Process(c, bb.bundle())
69 » » » So(err, ShouldNotBeNil)
70 » » » So(errors.IsTransient(err), ShouldBeTrue)
71 » » })
72
73 » » Convey(`Will return an error if a non-transient error happened w hile registering.`, func() {
74 » » » tcc.errC = make(chan error, 1)
75 » » » tcc.errC <- errors.New("test error")
76
77 » » » bb.addFullStream("foo/+/bar", 128)
78 » » » err := coll.Process(c, bb.bundle())
79 » » » So(err, ShouldNotBeNil)
80 » » » So(errors.IsTransient(err), ShouldBeFalse)
81 » » })
82
83 » » Convey(`Will return a transient error if a transient error happe ned while terminating.`, func() {
84 » » » tcc.errC = make(chan error, 2)
85 » » » tcc.errC <- nil // Register
86 » » » tcc.errC <- errors.WrapTransient(errors.New("test error" )) // Terminate
87
88 » » » bb.addFullStream("foo/+/bar", 128)
89 » » » err := coll.Process(c, bb.bundle())
90 » » » So(err, ShouldNotBeNil)
91 » » » So(errors.IsTransient(err), ShouldBeTrue)
92 » » })
93
94 » » Convey(`Will return an error if a non-transient error happened w hile terminating.`, func() {
95 » » » tcc.errC = make(chan error, 2)
96 » » » tcc.errC <- nil // Register
97 » » » tcc.errC <- errors.New("test error") // Terminate
98
99 » » » bb.addFullStream("foo/+/bar", 128)
100 » » » err := coll.Process(c, bb.bundle())
101 » » » So(err, ShouldNotBeNil)
102 » » » So(errors.IsTransient(err), ShouldBeFalse)
103 » » })
104
105 » » Convey(`Will return a transient error if a transient error happe ned on storage.`, func() {
106 » » » // Single transient error.
107 » » » count := int32(0)
108 » » » st.err = func() error {
109 » » » » if atomic.AddInt32(&count, 1) == 1 {
110 » » » » » return errors.WrapTransient(errors.New(" test error"))
53 } 111 }
54 112 return nil
55 Convey(`Can process multiple single full streams from a Butler bundle.`, func() { 113 }
56 bb.addFullStream("foo/+/bar", 128) 114
57 bb.addFullStream("foo/+/baz", 256) 115 bb.addFullStream("foo/+/bar", 128)
58 116 err := coll.Process(c, bb.bundle())
59 So(coll.Process(c, bb.bundle()), ShouldB eNil) 117 So(err, ShouldNotBeNil)
60 118 So(errors.IsTransient(err), ShouldBeTrue)
61 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", 127) 119 })
62 So(st, shouldHaveStoredStream, "foo/+/ba r", indexRange{0, 127}) 120
63 121 Convey(`Will drop invalid LogStreamDescriptor bundle entries and process the valid ones.`, func() {
64 So(tcc, shouldHaveRegisteredStream, "foo /+/baz", 255) 122 be := bb.genBundleEntry("foo/+/trash", 1337, 4, 5, 6, 7, 8)
65 So(st, shouldHaveStoredStream, "foo/+/ba z", indexRange{0, 255}) 123 bb.addBundleEntry(be)
66 }) 124
67 125 bb.addStreamEntries("foo/+/trash", 0, 1, 3) // Invalid: non-contiguous
68 Convey(`Will return a transient error if a trans ient error happened while registering.`, func() { 126 bb.addFullStream("foo/+/bar", 32)
69 tcc.errC = make(chan error, 1) 127
70 tcc.errC <- errors.WrapTransient(errors. New("test error")) 128 err := coll.Process(c, bb.bundle())
71 129 So(err, ShouldNotBeNil)
72 bb.addFullStream("foo/+/bar", 128) 130 So(errors.IsTransient(err), ShouldBeFalse)
73 err := coll.Process(c, bb.bundle()) 131
74 So(err, ShouldNotBeNil) 132 So(tcc, shouldHaveRegisteredStream, "test-project", "foo /+/bar", 32)
75 So(errors.IsTransient(err), ShouldBeTrue ) 133 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba r", indexRange{0, 31})
76 }) 134
77 135 So(tcc, shouldHaveRegisteredStream, "test-project", "foo /+/trash", 1337)
78 Convey(`Will return an error if a non-transient error happened while registering.`, func() { 136 So(st, shouldHaveStoredStream, "test-project", "foo/+/tr ash", 4, 5, 6, 7, 8)
79 tcc.errC = make(chan error, 1) 137 })
80 tcc.errC <- errors.New("test error") 138
81 139 Convey(`Will drop streams with missing (invalid) secrets.`, func () {
82 bb.addFullStream("foo/+/bar", 128) 140 b := bb.genBase()
83 err := coll.Process(c, bb.bundle()) 141 b.Secret = nil
84 So(err, ShouldNotBeNil) 142
85 So(errors.IsTransient(err), ShouldBeFals e) 143 err := coll.Process(c, bb.bundle())
86 }) 144 So(err, ShouldErrLike, "invalid prefix secret")
87 145 So(errors.IsTransient(err), ShouldBeFalse)
88 Convey(`Will return a transient error if a trans ient error happened while terminating.`, func() { 146 })
89 tcc.errC = make(chan error, 2) 147
90 tcc.errC <- nil // Register 148 Convey(`Will drop messages with mismatching secrets.`, func() {
91 tcc.errC <- errors.WrapTransient(errors. New("test error")) // Terminate 149 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2)
92 150 So(coll.Process(c, bb.bundle()), ShouldBeNil)
93 bb.addFullStream("foo/+/bar", 128) 151
94 err := coll.Process(c, bb.bundle()) 152 // Push another bundle with a different secret.
95 So(err, ShouldNotBeNil) 153 b := bb.genBase()
96 So(errors.IsTransient(err), ShouldBeTrue ) 154 b.Secret = bytes.Repeat([]byte{0xAA}, types.PrefixSecret Length)
97 }) 155 be := bb.genBundleEntry("foo/+/bar", 4, 3, 4)
98 156 be.TerminalIndex = 1337
99 Convey(`Will return an error if a non-transient error happened while terminating.`, func() { 157 bb.addBundleEntry(be)
100 tcc.errC = make(chan error, 2) 158 bb.addFullStream("foo/+/baz", 3)
101 tcc.errC <- nil // Register 159 So(coll.Process(c, bb.bundle()), ShouldBeNil)
102 tcc.errC <- errors.New("test error") // Terminate 160
103 161 So(tcc, shouldHaveRegisteredStream, "test-project", "foo /+/bar", -1)
104 bb.addFullStream("foo/+/bar", 128) 162 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba r", indexRange{0, 2})
105 err := coll.Process(c, bb.bundle()) 163
106 So(err, ShouldNotBeNil) 164 So(tcc, shouldHaveRegisteredStream, "test-project", "foo /+/baz", 2)
107 So(errors.IsTransient(err), ShouldBeFals e) 165 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba z", indexRange{0, 2})
108 }) 166 })
109 167
110 Convey(`Will return a transient error if a trans ient error happened on storage.`, func() { 168 Convey(`With an empty project name`, func() {
111 // Single transient error. 169 b := bb.genBase()
112 count := int32(0) 170 b.Project = ""
113 st.err = func() error { 171 bb.addFullStream("foo/+/baz", 3)
114 if atomic.AddInt32(&count, 1) == 1 { 172
115 return errors.WrapTransi ent(errors.New("test error")) 173 // TODO(dnj): Enable this when project name is required.
116 } 174 SkipConvey(`Will drop the stream.`, func() {
117 return nil 175
118 } 176 err := coll.Process(c, bb.bundle())
119 177 So(err, ShouldErrLike, "invalid project name")
120 bb.addFullStream("foo/+/bar", 128) 178 So(errors.IsTransient(err), ShouldBeFalse)
121 err := coll.Process(c, bb.bundle()) 179 })
122 So(err, ShouldNotBeNil) 180
123 So(errors.IsTransient(err), ShouldBeTrue ) 181 Convey(`Will register the stream.`, func() {
124 }) 182 So(coll.Process(c, bb.bundle()), ShouldBeNil)
125 183
126 Convey(`Will drop invalid LogStreamDescriptor bu ndle entries and process the valid ones.`, func() { 184 So(tcc, shouldHaveRegisteredStream, "", "foo/+/b az", 2)
127 be := bb.genBundleEntry("foo/+/trash", 1 337, 4, 5, 6, 7, 8) 185 So(st, shouldHaveStoredStream, "", "foo/+/baz", indexRange{0, 2})
128 bb.addBundleEntry(be) 186 })
129 187 })
130 bb.addStreamEntries("foo/+/trash", 0, 1, 3) // Invalid: non-contiguous 188
131 bb.addFullStream("foo/+/bar", 32) 189 Convey(`Will drop streams with invalid project names.`, func() {
132 190 b := bb.genBase()
133 err := coll.Process(c, bb.bundle()) 191 b.Project = "!!!invalid name!!!"
134 So(err, ShouldNotBeNil) 192 So(config.ProjectName(b.Project).Validate(), ShouldNotBe Nil)
135 So(errors.IsTransient(err), ShouldBeFals e) 193
136 194 err := coll.Process(c, bb.bundle())
137 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", 32) 195 So(err, ShouldErrLike, "invalid bundle project name")
138 So(st, shouldHaveStoredStream, "foo/+/ba r", indexRange{0, 31}) 196 So(errors.IsTransient(err), ShouldBeFalse)
139 197 })
140 So(tcc, shouldHaveRegisteredStream, "foo /+/trash", 1337) 198
141 So(st, shouldHaveStoredStream, "foo/+/tr ash", 4, 5, 6, 7, 8) 199 Convey(`Will drop streams with empty bundle prefixes.`, func() {
142 }) 200 b := bb.genBase()
143 201 b.Prefix = ""
144 Convey(`Will drop streams with missing secrets.` , func() { 202
145 be := bb.genBundleEntry("foo/+/trash", 2 , 0, 1, 2) 203 err := coll.Process(c, bb.bundle())
146 be.Secret = nil 204 So(err, ShouldErrLike, "invalid bundle prefix")
147 bb.addBundleEntry(be) 205 So(errors.IsTransient(err), ShouldBeFalse)
148 206 })
149 err := coll.Process(c, bb.bundle()) 207
150 So(err, ShouldErrLike, "missing stream s ecret") 208 Convey(`Will drop streams with invalid bundle prefixes.`, func() {
151 So(errors.IsTransient(err), ShouldBeFals e) 209 b := bb.genBase()
152 So(tcc, shouldNotHaveRegisteredStream, " foo/+/bar") 210 b.Prefix = "!!!invalid prefix!!!"
153 }) 211 So(types.StreamName(b.Prefix).Validate(), ShouldNotBeNil )
154 212
155 Convey(`Will drop messages with mismatching secr ets.`, func() { 213 err := coll.Process(c, bb.bundle())
156 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2) 214 So(err, ShouldErrLike, "invalid bundle prefix")
157 So(coll.Process(c, bb.bundle()), ShouldB eNil) 215 So(errors.IsTransient(err), ShouldBeFalse)
158 216 })
159 // Push another bundle with a different secret. 217
160 be := bb.genBundleEntry("foo/+/bar", 4, 3, 4) 218 Convey(`Will drop streams whose descriptor prefix doesn't match its bundle's prefix.`, func() {
161 be.Secret = bytes.Repeat([]byte{0xAA}, t ypes.PrefixSecretLength) 219 bb.addStreamEntries("baz/+/bar", 3, 0, 1, 2, 3, 4)
162 be.TerminalIndex = 1337 220
163 bb.addBundleEntry(be) 221 err := coll.Process(c, bb.bundle())
164 bb.addFullStream("foo/+/baz", 3) 222 So(err, ShouldErrLike, "mismatched bundle and entry pref ixes")
165 So(coll.Process(c, bb.bundle()), ShouldB eNil) 223 So(errors.IsTransient(err), ShouldBeFalse)
166 224 })
167 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", -1) 225
168 So(st, shouldHaveStoredStream, "foo/+/ba r", indexRange{0, 2}) 226 Convey(`Will return no error if the data has a corrupt bundle he ader.`, func() {
169 227 So(coll.Process(c, []byte{0x00}), ShouldBeNil)
170 So(tcc, shouldHaveRegisteredStream, "foo /+/baz", 2) 228 So(tcc, shouldNotHaveRegisteredStream, "test-project", " foo/+/bar")
171 So(st, shouldHaveStoredStream, "foo/+/ba z", indexRange{0, 2}) 229 })
172 }) 230
173 231 Convey(`Will drop bundles with unknown ProtoVersion string.`, fu nc() {
174 Convey(`Will return no error if the data has a c orrupt bundle header.`, func() { 232 buf := bytes.Buffer{}
175 So(coll.Process(c, []byte{0x00}), Should BeNil) 233 w := butlerproto.Writer{ProtoVersion: "!!!invalid!!!"}
176 So(tcc, shouldNotHaveRegisteredStream, " foo/+/bar") 234 w.Write(&buf, &logpb.ButlerLogBundle{})
177 }) 235
178 236 So(coll.Process(c, buf.Bytes()), ShouldBeNil)
179 Convey(`Will drop bundles with unknown ProtoVers ion string.`, func() { 237
180 buf := bytes.Buffer{} 238 So(tcc, shouldNotHaveRegisteredStream, "test-project", " foo/+/bar")
181 w := butlerproto.Writer{ProtoVersion: "! !!invalid!!!"} 239 })
182 w.Write(&buf, &logpb.ButlerLogBundle{}) 240
183 241 Convey(`Will not ingest records if the stream is archived.`, fun c() {
184 So(coll.Process(c, buf.Bytes()), ShouldB eNil) 242 tcc.register(coordinator.LogStreamState{
185 243 Project: "test-project",
186 So(tcc, shouldNotHaveRegisteredStream, " foo/+/bar") 244 Path: "foo/+/bar",
187 }) 245 Secret: testSecret,
188 246 TerminalIndex: -1,
189 Convey(`Will not ingest records if the stream is archived.`, func() { 247 Archived: true,
190 tcc.register(coordinator.LogStreamState{ 248 })
191 Path: "foo/+/bar", 249
192 Secret: testSecret, 250 bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 3, 4)
193 TerminalIndex: -1, 251 So(coll.Process(c, bb.bundle()), ShouldBeNil)
194 Archived: true, 252
195 }) 253 So(tcc, shouldHaveRegisteredStream, "test-project", "foo /+/bar", -1)
196 254 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba r")
197 bb.addStreamEntries("foo/+/bar", 3, 0, 1 , 2, 3, 4) 255 })
198 So(coll.Process(c, bb.bundle()), ShouldB eNil) 256
199 257 Convey(`Will not ingest records if the stream is purged.`, func( ) {
200 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", -1) 258 tcc.register(coordinator.LogStreamState{
201 So(st, shouldHaveStoredStream, "foo/+/ba r") 259 Project: "test-project",
202 }) 260 Path: "foo/+/bar",
203 261 Secret: testSecret,
204 Convey(`Will not ingest records if the stream is purged.`, func() { 262 TerminalIndex: -1,
205 tcc.register(coordinator.LogStreamState{ 263 Purged: true,
206 Path: "foo/+/bar", 264 })
207 Secret: testSecret, 265
208 TerminalIndex: -1, 266 So(coll.Process(c, bb.bundle()), ShouldBeNil)
209 Purged: true, 267
210 }) 268 So(tcc, shouldHaveRegisteredStream, "test-project", "foo /+/bar", -1)
211 269 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba r")
212 So(coll.Process(c, bb.bundle()), ShouldB eNil) 270 })
213 271
214 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", -1) 272 Convey(`Will not ingest a bundle with no bundle entries.`, func( ) {
215 So(st, shouldHaveStoredStream, "foo/+/ba r") 273 So(coll.Process(c, bb.bundle()), ShouldBeNil)
216 }) 274 })
217 275
218 Convey(`Will not ingest a bundle with no bundle entries.`, func() { 276 Convey(`Will not ingest a bundle whose log entries don't match t heir descriptor.`, func() {
219 So(coll.Process(c, bb.bundleWithEntries( )), ShouldBeNil) 277 be := bb.genBundleEntry("foo/+/bar", 4, 0, 1, 2, 3, 4)
220 }) 278
221 279 // Add a binary log entry. This does NOT match the text descriptor, and
222 Convey(`Will not ingest a bundle whose log entri es don't match their descriptor.`, func() { 280 // should fail validation.
223 be := bb.genBundleEntry("foo/+/bar", 4, 0, 1, 2, 3, 4) 281 be.Logs = append(be.Logs, &logpb.LogEntry{
224 282 StreamIndex: 2,
225 // Add a binary log entry. This does NOT match the text descriptor, and 283 Sequence: 2,
226 // should fail validation. 284 Content: &logpb.LogEntry_Binary{
227 be.Logs = append(be.Logs, &logpb.LogEntr y{ 285 &logpb.Binary{
228 StreamIndex: 2, 286 Data: []byte{0xd0, 0x6f, 0x00, 0 xd5},
229 Sequence: 2, 287 },
230 Content: &logpb.LogEntry_Binary{ 288 },
231 &logpb.Binary{ 289 })
232 Data: []byte{0xd 0, 0x6f, 0x00, 0xd5}, 290 bb.addBundleEntry(be)
233 }, 291 So(coll.Process(c, bb.bundle()), ShouldErrLike, "invalid log entry")
234 }, 292
235 }) 293 So(tcc, shouldNotHaveRegisteredStream, "test-project", " foo/+/bar")
236 bb.addBundleEntry(be) 294 })
237 So(coll.Process(c, bb.bundle()), ShouldE rrLike, "invalid log entry")
238
239 So(tcc, shouldNotHaveRegisteredStream, " foo/+/bar")
240 })
241 })
242 }
243 }) 295 })
244 } 296 }
297
298 // TestCollector runs through a series of end-to-end Collector workflows and
299 // ensures that the Collector behaves appropriately.
300 func TestCollector(t *testing.T) {
301 t.Parallel()
302
303 testCollectorImpl(t, false)
304 }
305
306 // TestCollectorWithCaching runs through a series of end-to-end Collector
307 // workflows and ensures that the Collector behaves appropriately.
308 func TestCollectorWithCaching(t *testing.T) {
309 t.Parallel()
310
311 testCollectorImpl(t, true)
312 }
OLDNEW
« no previous file with comments | « server/internal/logdog/collector/collector.go ('k') | server/internal/logdog/collector/coordinator/cache.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698