OLD | NEW |
1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package collector | 5 package collector |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "fmt" | 9 "fmt" |
10 "sync/atomic" | 10 "sync/atomic" |
11 "testing" | 11 "testing" |
12 | 12 |
13 "github.com/luci/luci-go/common/clock/testclock" | 13 "github.com/luci/luci-go/common/clock/testclock" |
14 "github.com/luci/luci-go/common/errors" | 14 "github.com/luci/luci-go/common/errors" |
| 15 "github.com/luci/luci-go/common/retry/transient" |
15 "github.com/luci/luci-go/logdog/api/logpb" | 16 "github.com/luci/luci-go/logdog/api/logpb" |
16 "github.com/luci/luci-go/logdog/client/butlerproto" | 17 "github.com/luci/luci-go/logdog/client/butlerproto" |
17 "github.com/luci/luci-go/logdog/common/storage/memory" | 18 "github.com/luci/luci-go/logdog/common/storage/memory" |
18 "github.com/luci/luci-go/logdog/common/types" | 19 "github.com/luci/luci-go/logdog/common/types" |
19 cc "github.com/luci/luci-go/logdog/server/collector/coordinator" | 20 cc "github.com/luci/luci-go/logdog/server/collector/coordinator" |
20 "github.com/luci/luci-go/luci_config/common/cfgtypes" | 21 "github.com/luci/luci-go/luci_config/common/cfgtypes" |
21 "golang.org/x/net/context" | 22 "golang.org/x/net/context" |
22 | 23 |
23 . "github.com/luci/luci-go/common/testing/assertions" | 24 . "github.com/luci/luci-go/common/testing/assertions" |
24 . "github.com/smartystreets/goconvey/convey" | 25 . "github.com/smartystreets/goconvey/convey" |
(...skipping 29 matching lines...) Expand all Loading... |
54 So(coll.Process(c, bb.bundle()), ShouldBeNil) | 55 So(coll.Process(c, bb.bundle()), ShouldBeNil) |
55 | 56 |
56 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/bar", 127) | 57 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/bar", 127) |
57 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
r", indexRange{0, 127}) | 58 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
r", indexRange{0, 127}) |
58 | 59 |
59 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/baz", 255) | 60 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/baz", 255) |
60 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
z", indexRange{0, 255}) | 61 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
z", indexRange{0, 255}) |
61 }) | 62 }) |
62 | 63 |
63 Convey(`Will return a transient error if a transient error happe
ned while registering.`, func() { | 64 Convey(`Will return a transient error if a transient error happe
ned while registering.`, func() { |
64 » » » tcc.registerCallback = func(cc.LogStreamState) error { r
eturn errors.WrapTransient(errors.New("test error")) } | 65 » » » tcc.registerCallback = func(cc.LogStreamState) error { r
eturn errors.New("test error", transient.Tag) } |
65 | 66 |
66 bb.addFullStream("foo/+/bar", 128) | 67 bb.addFullStream("foo/+/bar", 128) |
67 err := coll.Process(c, bb.bundle()) | 68 err := coll.Process(c, bb.bundle()) |
68 So(err, ShouldNotBeNil) | 69 So(err, ShouldNotBeNil) |
69 » » » So(errors.IsTransient(err), ShouldBeTrue) | 70 » » » So(transient.Tag.In(err), ShouldBeTrue) |
70 }) | 71 }) |
71 | 72 |
72 Convey(`Will return an error if a non-transient error happened w
hile registering.`, func() { | 73 Convey(`Will return an error if a non-transient error happened w
hile registering.`, func() { |
73 tcc.registerCallback = func(cc.LogStreamState) error { r
eturn errors.New("test error") } | 74 tcc.registerCallback = func(cc.LogStreamState) error { r
eturn errors.New("test error") } |
74 | 75 |
75 bb.addFullStream("foo/+/bar", 128) | 76 bb.addFullStream("foo/+/bar", 128) |
76 err := coll.Process(c, bb.bundle()) | 77 err := coll.Process(c, bb.bundle()) |
77 So(err, ShouldNotBeNil) | 78 So(err, ShouldNotBeNil) |
78 » » » So(errors.IsTransient(err), ShouldBeFalse) | 79 » » » So(transient.Tag.In(err), ShouldBeFalse) |
79 }) | 80 }) |
80 | 81 |
81 // This will happen when one registration request registers non-
terminal, | 82 // This will happen when one registration request registers non-
terminal, |
82 // and a follow-on registration request registers with a termina
l index. The | 83 // and a follow-on registration request registers with a termina
l index. The |
83 // latter registration request will idempotently succeed, but no
t accept the | 84 // latter registration request will idempotently succeed, but no
t accept the |
84 // terminal index, so termination is still required. | 85 // terminal index, so termination is still required. |
85 Convey(`Will terminate a stream if a terminal registration retur
ns a non-terminal response.`, func() { | 86 Convey(`Will terminate a stream if a terminal registration retur
ns a non-terminal response.`, func() { |
86 terminateCalled := false | 87 terminateCalled := false |
87 tcc.terminateCallback = func(cc.TerminateRequest) error
{ | 88 tcc.terminateCallback = func(cc.TerminateRequest) error
{ |
88 terminateCalled = true | 89 terminateCalled = true |
89 return nil | 90 return nil |
90 } | 91 } |
91 | 92 |
92 bb.addStreamEntries("foo/+/bar", -1, 0, 1) | 93 bb.addStreamEntries("foo/+/bar", -1, 0, 1) |
93 So(coll.Process(c, bb.bundle()), ShouldBeNil) | 94 So(coll.Process(c, bb.bundle()), ShouldBeNil) |
94 | 95 |
95 bb.addStreamEntries("foo/+/bar", 3, 2, 3) | 96 bb.addStreamEntries("foo/+/bar", 3, 2, 3) |
96 So(coll.Process(c, bb.bundle()), ShouldBeNil) | 97 So(coll.Process(c, bb.bundle()), ShouldBeNil) |
97 So(terminateCalled, ShouldBeTrue) | 98 So(terminateCalled, ShouldBeTrue) |
98 }) | 99 }) |
99 | 100 |
100 Convey(`Will return a transient error if a transient error happe
ned while terminating.`, func() { | 101 Convey(`Will return a transient error if a transient error happe
ned while terminating.`, func() { |
101 » » » tcc.terminateCallback = func(cc.TerminateRequest) error
{ return errors.WrapTransient(errors.New("test error")) } | 102 » » » tcc.terminateCallback = func(cc.TerminateRequest) error
{ return errors.New("test error", transient.Tag) } |
102 | 103 |
103 // Register independently from terminate so we don't bun
dle RPC. | 104 // Register independently from terminate so we don't bun
dle RPC. |
104 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2, 3, 4) | 105 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2, 3, 4) |
105 So(coll.Process(c, bb.bundle()), ShouldBeNil) | 106 So(coll.Process(c, bb.bundle()), ShouldBeNil) |
106 | 107 |
107 // Add terminal index. | 108 // Add terminal index. |
108 bb.addStreamEntries("foo/+/bar", 5, 5) | 109 bb.addStreamEntries("foo/+/bar", 5, 5) |
109 err := coll.Process(c, bb.bundle()) | 110 err := coll.Process(c, bb.bundle()) |
110 So(err, ShouldNotBeNil) | 111 So(err, ShouldNotBeNil) |
111 » » » So(errors.IsTransient(err), ShouldBeTrue) | 112 » » » So(transient.Tag.In(err), ShouldBeTrue) |
112 }) | 113 }) |
113 | 114 |
114 Convey(`Will return an error if a non-transient error happened w
hile terminating.`, func() { | 115 Convey(`Will return an error if a non-transient error happened w
hile terminating.`, func() { |
115 tcc.terminateCallback = func(cc.TerminateRequest) error
{ return errors.New("test error") } | 116 tcc.terminateCallback = func(cc.TerminateRequest) error
{ return errors.New("test error") } |
116 | 117 |
117 // Register independently from terminate so we don't bun
dle RPC. | 118 // Register independently from terminate so we don't bun
dle RPC. |
118 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2, 3, 4) | 119 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2, 3, 4) |
119 So(coll.Process(c, bb.bundle()), ShouldBeNil) | 120 So(coll.Process(c, bb.bundle()), ShouldBeNil) |
120 | 121 |
121 // Add terminal index. | 122 // Add terminal index. |
122 bb.addStreamEntries("foo/+/bar", 5, 5) | 123 bb.addStreamEntries("foo/+/bar", 5, 5) |
123 err := coll.Process(c, bb.bundle()) | 124 err := coll.Process(c, bb.bundle()) |
124 So(err, ShouldNotBeNil) | 125 So(err, ShouldNotBeNil) |
125 » » » So(errors.IsTransient(err), ShouldBeFalse) | 126 » » » So(transient.Tag.In(err), ShouldBeFalse) |
126 }) | 127 }) |
127 | 128 |
128 Convey(`Will return a transient error if a transient error happe
ned on storage.`, func() { | 129 Convey(`Will return a transient error if a transient error happe
ned on storage.`, func() { |
129 // Single transient error. | 130 // Single transient error. |
130 count := int32(0) | 131 count := int32(0) |
131 st.err = func() error { | 132 st.err = func() error { |
132 if atomic.AddInt32(&count, 1) == 1 { | 133 if atomic.AddInt32(&count, 1) == 1 { |
133 » » » » » return errors.WrapTransient(errors.New("
test error")) | 134 » » » » » return errors.New("test error", transien
t.Tag) |
134 } | 135 } |
135 return nil | 136 return nil |
136 } | 137 } |
137 | 138 |
138 bb.addFullStream("foo/+/bar", 128) | 139 bb.addFullStream("foo/+/bar", 128) |
139 err := coll.Process(c, bb.bundle()) | 140 err := coll.Process(c, bb.bundle()) |
140 So(err, ShouldNotBeNil) | 141 So(err, ShouldNotBeNil) |
141 » » » So(errors.IsTransient(err), ShouldBeTrue) | 142 » » » So(transient.Tag.In(err), ShouldBeTrue) |
142 }) | 143 }) |
143 | 144 |
144 Convey(`Will drop invalid LogStreamDescriptor bundle entries and
process the valid ones.`, func() { | 145 Convey(`Will drop invalid LogStreamDescriptor bundle entries and
process the valid ones.`, func() { |
145 be := bb.genBundleEntry("foo/+/trash", 1337, 4, 5, 6, 7,
8) | 146 be := bb.genBundleEntry("foo/+/trash", 1337, 4, 5, 6, 7,
8) |
146 bb.addBundleEntry(be) | 147 bb.addBundleEntry(be) |
147 | 148 |
148 bb.addStreamEntries("foo/+/trash", 0, 1, 3) // Invalid:
non-contiguous | 149 bb.addStreamEntries("foo/+/trash", 0, 1, 3) // Invalid:
non-contiguous |
149 bb.addFullStream("foo/+/bar", 32) | 150 bb.addFullStream("foo/+/bar", 32) |
150 | 151 |
151 err := coll.Process(c, bb.bundle()) | 152 err := coll.Process(c, bb.bundle()) |
152 So(err, ShouldNotBeNil) | 153 So(err, ShouldNotBeNil) |
153 » » » So(errors.IsTransient(err), ShouldBeFalse) | 154 » » » So(transient.Tag.In(err), ShouldBeFalse) |
154 | 155 |
155 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/bar", 32) | 156 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/bar", 32) |
156 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
r", indexRange{0, 31}) | 157 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
r", indexRange{0, 31}) |
157 | 158 |
158 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/trash", 1337) | 159 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/trash", 1337) |
159 So(st, shouldHaveStoredStream, "test-project", "foo/+/tr
ash", 4, 5, 6, 7, 8) | 160 So(st, shouldHaveStoredStream, "test-project", "foo/+/tr
ash", 4, 5, 6, 7, 8) |
160 }) | 161 }) |
161 | 162 |
162 Convey(`Will drop streams with missing (invalid) secrets.`, func
() { | 163 Convey(`Will drop streams with missing (invalid) secrets.`, func
() { |
163 b := bb.genBase() | 164 b := bb.genBase() |
164 b.Secret = nil | 165 b.Secret = nil |
165 bb.addFullStream("foo/+/bar", 4) | 166 bb.addFullStream("foo/+/bar", 4) |
166 | 167 |
167 err := coll.Process(c, bb.bundle()) | 168 err := coll.Process(c, bb.bundle()) |
168 So(err, ShouldErrLike, "invalid prefix secret") | 169 So(err, ShouldErrLike, "invalid prefix secret") |
169 » » » So(errors.IsTransient(err), ShouldBeFalse) | 170 » » » So(transient.Tag.In(err), ShouldBeFalse) |
170 }) | 171 }) |
171 | 172 |
172 Convey(`Will drop messages with mismatching secrets.`, func() { | 173 Convey(`Will drop messages with mismatching secrets.`, func() { |
173 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2) | 174 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2) |
174 So(coll.Process(c, bb.bundle()), ShouldBeNil) | 175 So(coll.Process(c, bb.bundle()), ShouldBeNil) |
175 | 176 |
176 // Push another bundle with a different secret. | 177 // Push another bundle with a different secret. |
177 b := bb.genBase() | 178 b := bb.genBase() |
178 b.Secret = bytes.Repeat([]byte{0xAA}, types.PrefixSecret
Length) | 179 b.Secret = bytes.Repeat([]byte{0xAA}, types.PrefixSecret
Length) |
179 be := bb.genBundleEntry("foo/+/bar", 4, 3, 4) | 180 be := bb.genBundleEntry("foo/+/bar", 4, 3, 4) |
180 be.TerminalIndex = 1337 | 181 be.TerminalIndex = 1337 |
181 bb.addBundleEntry(be) | 182 bb.addBundleEntry(be) |
182 bb.addFullStream("foo/+/baz", 3) | 183 bb.addFullStream("foo/+/baz", 3) |
183 So(coll.Process(c, bb.bundle()), ShouldBeNil) | 184 So(coll.Process(c, bb.bundle()), ShouldBeNil) |
184 | 185 |
185 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/bar", -1) | 186 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/bar", -1) |
186 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
r", indexRange{0, 2}) | 187 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
r", indexRange{0, 2}) |
187 | 188 |
188 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/baz", 2) | 189 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/baz", 2) |
189 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
z", indexRange{0, 2}) | 190 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
z", indexRange{0, 2}) |
190 }) | 191 }) |
191 | 192 |
192 Convey(`With an empty project name, will drop the stream.`, func
() { | 193 Convey(`With an empty project name, will drop the stream.`, func
() { |
193 b := bb.genBase() | 194 b := bb.genBase() |
194 b.Project = "" | 195 b.Project = "" |
195 bb.addFullStream("foo/+/baz", 3) | 196 bb.addFullStream("foo/+/baz", 3) |
196 | 197 |
197 err := coll.Process(c, bb.bundle()) | 198 err := coll.Process(c, bb.bundle()) |
198 So(err, ShouldErrLike, "invalid bundle project name") | 199 So(err, ShouldErrLike, "invalid bundle project name") |
199 » » » So(errors.IsTransient(err), ShouldBeFalse) | 200 » » » So(transient.Tag.In(err), ShouldBeFalse) |
200 }) | 201 }) |
201 | 202 |
202 Convey(`Will drop streams with invalid project names.`, func() { | 203 Convey(`Will drop streams with invalid project names.`, func() { |
203 b := bb.genBase() | 204 b := bb.genBase() |
204 b.Project = "!!!invalid name!!!" | 205 b.Project = "!!!invalid name!!!" |
205 So(cfgtypes.ProjectName(b.Project).Validate(), ShouldNot
BeNil) | 206 So(cfgtypes.ProjectName(b.Project).Validate(), ShouldNot
BeNil) |
206 | 207 |
207 err := coll.Process(c, bb.bundle()) | 208 err := coll.Process(c, bb.bundle()) |
208 So(err, ShouldErrLike, "invalid bundle project name") | 209 So(err, ShouldErrLike, "invalid bundle project name") |
209 » » » So(errors.IsTransient(err), ShouldBeFalse) | 210 » » » So(transient.Tag.In(err), ShouldBeFalse) |
210 }) | 211 }) |
211 | 212 |
212 Convey(`Will drop streams with empty bundle prefixes.`, func() { | 213 Convey(`Will drop streams with empty bundle prefixes.`, func() { |
213 b := bb.genBase() | 214 b := bb.genBase() |
214 b.Prefix = "" | 215 b.Prefix = "" |
215 | 216 |
216 err := coll.Process(c, bb.bundle()) | 217 err := coll.Process(c, bb.bundle()) |
217 So(err, ShouldErrLike, "invalid bundle prefix") | 218 So(err, ShouldErrLike, "invalid bundle prefix") |
218 » » » So(errors.IsTransient(err), ShouldBeFalse) | 219 » » » So(transient.Tag.In(err), ShouldBeFalse) |
219 }) | 220 }) |
220 | 221 |
221 Convey(`Will drop streams with invalid bundle prefixes.`, func()
{ | 222 Convey(`Will drop streams with invalid bundle prefixes.`, func()
{ |
222 b := bb.genBase() | 223 b := bb.genBase() |
223 b.Prefix = "!!!invalid prefix!!!" | 224 b.Prefix = "!!!invalid prefix!!!" |
224 So(types.StreamName(b.Prefix).Validate(), ShouldNotBeNil
) | 225 So(types.StreamName(b.Prefix).Validate(), ShouldNotBeNil
) |
225 | 226 |
226 err := coll.Process(c, bb.bundle()) | 227 err := coll.Process(c, bb.bundle()) |
227 So(err, ShouldErrLike, "invalid bundle prefix") | 228 So(err, ShouldErrLike, "invalid bundle prefix") |
228 » » » So(errors.IsTransient(err), ShouldBeFalse) | 229 » » » So(transient.Tag.In(err), ShouldBeFalse) |
229 }) | 230 }) |
230 | 231 |
231 Convey(`Will drop streams whose descriptor prefix doesn't match
its bundle's prefix.`, func() { | 232 Convey(`Will drop streams whose descriptor prefix doesn't match
its bundle's prefix.`, func() { |
232 bb.addStreamEntries("baz/+/bar", 3, 0, 1, 2, 3, 4) | 233 bb.addStreamEntries("baz/+/bar", 3, 0, 1, 2, 3, 4) |
233 | 234 |
234 err := coll.Process(c, bb.bundle()) | 235 err := coll.Process(c, bb.bundle()) |
235 So(err, ShouldErrLike, "mismatched bundle and entry pref
ixes") | 236 So(err, ShouldErrLike, "mismatched bundle and entry pref
ixes") |
236 » » » So(errors.IsTransient(err), ShouldBeFalse) | 237 » » » So(transient.Tag.In(err), ShouldBeFalse) |
237 }) | 238 }) |
238 | 239 |
239 Convey(`Will return no error if the data has a corrupt bundle he
ader.`, func() { | 240 Convey(`Will return no error if the data has a corrupt bundle he
ader.`, func() { |
240 So(coll.Process(c, []byte{0x00}), ShouldBeNil) | 241 So(coll.Process(c, []byte{0x00}), ShouldBeNil) |
241 So(tcc, shouldNotHaveRegisteredStream, "test-project", "
foo/+/bar") | 242 So(tcc, shouldNotHaveRegisteredStream, "test-project", "
foo/+/bar") |
242 }) | 243 }) |
243 | 244 |
244 Convey(`Will drop bundles with unknown ProtoVersion string.`, fu
nc() { | 245 Convey(`Will drop bundles with unknown ProtoVersion string.`, fu
nc() { |
245 buf := bytes.Buffer{} | 246 buf := bytes.Buffer{} |
246 w := butlerproto.Writer{ProtoVersion: "!!!invalid!!!"} | 247 w := butlerproto.Writer{ProtoVersion: "!!!invalid!!!"} |
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
316 testCollectorImpl(t, false) | 317 testCollectorImpl(t, false) |
317 } | 318 } |
318 | 319 |
319 // TestCollectorWithCaching runs through a series of end-to-end Collector | 320 // TestCollectorWithCaching runs through a series of end-to-end Collector |
320 // workflows and ensures that the Collector behaves appropriately. | 321 // workflows and ensures that the Collector behaves appropriately. |
321 func TestCollectorWithCaching(t *testing.T) { | 322 func TestCollectorWithCaching(t *testing.T) { |
322 t.Parallel() | 323 t.Parallel() |
323 | 324 |
324 testCollectorImpl(t, true) | 325 testCollectorImpl(t, true) |
325 } | 326 } |
OLD | NEW |