| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package collector | 5 package collector |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "fmt" | 9 "fmt" |
| 10 "sync/atomic" | 10 "sync/atomic" |
| 11 "testing" | 11 "testing" |
| 12 | 12 |
| 13 "github.com/luci/luci-go/common/clock/testclock" | 13 "github.com/luci/luci-go/common/clock/testclock" |
| 14 "github.com/luci/luci-go/common/errors" | 14 "github.com/luci/luci-go/common/errors" |
| 15 "github.com/luci/luci-go/common/retry" |
| 15 "github.com/luci/luci-go/logdog/api/logpb" | 16 "github.com/luci/luci-go/logdog/api/logpb" |
| 16 "github.com/luci/luci-go/logdog/client/butlerproto" | 17 "github.com/luci/luci-go/logdog/client/butlerproto" |
| 17 "github.com/luci/luci-go/logdog/common/storage/memory" | 18 "github.com/luci/luci-go/logdog/common/storage/memory" |
| 18 "github.com/luci/luci-go/logdog/common/types" | 19 "github.com/luci/luci-go/logdog/common/types" |
| 19 cc "github.com/luci/luci-go/logdog/server/collector/coordinator" | 20 cc "github.com/luci/luci-go/logdog/server/collector/coordinator" |
| 20 "github.com/luci/luci-go/luci_config/common/cfgtypes" | 21 "github.com/luci/luci-go/luci_config/common/cfgtypes" |
| 21 "golang.org/x/net/context" | 22 "golang.org/x/net/context" |
| 22 | 23 |
| 23 . "github.com/luci/luci-go/common/testing/assertions" | 24 . "github.com/luci/luci-go/common/testing/assertions" |
| 24 . "github.com/smartystreets/goconvey/convey" | 25 . "github.com/smartystreets/goconvey/convey" |
| (...skipping 29 matching lines...) Expand all Loading... |
| 54 So(coll.Process(c, bb.bundle()), ShouldBeNil) | 55 So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| 55 | 56 |
| 56 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/bar", 127) | 57 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/bar", 127) |
| 57 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
r", indexRange{0, 127}) | 58 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
r", indexRange{0, 127}) |
| 58 | 59 |
| 59 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/baz", 255) | 60 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/baz", 255) |
| 60 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
z", indexRange{0, 255}) | 61 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
z", indexRange{0, 255}) |
| 61 }) | 62 }) |
| 62 | 63 |
| 63 Convey(`Will return a transient error if a transient error happe
ned while registering.`, func() { | 64 Convey(`Will return a transient error if a transient error happe
ned while registering.`, func() { |
| 64 » » » tcc.registerCallback = func(cc.LogStreamState) error { r
eturn errors.WrapTransient(errors.New("test error")) } | 65 » » » tcc.registerCallback = func(cc.LogStreamState) error { r
eturn errors.New("test error", retry.Tag) } |
| 65 | 66 |
| 66 bb.addFullStream("foo/+/bar", 128) | 67 bb.addFullStream("foo/+/bar", 128) |
| 67 err := coll.Process(c, bb.bundle()) | 68 err := coll.Process(c, bb.bundle()) |
| 68 So(err, ShouldNotBeNil) | 69 So(err, ShouldNotBeNil) |
| 69 » » » So(errors.IsTransient(err), ShouldBeTrue) | 70 » » » So(retry.Tag.In(err), ShouldBeTrue) |
| 70 }) | 71 }) |
| 71 | 72 |
| 72 Convey(`Will return an error if a non-transient error happened w
hile registering.`, func() { | 73 Convey(`Will return an error if a non-transient error happened w
hile registering.`, func() { |
| 73 tcc.registerCallback = func(cc.LogStreamState) error { r
eturn errors.New("test error") } | 74 tcc.registerCallback = func(cc.LogStreamState) error { r
eturn errors.New("test error") } |
| 74 | 75 |
| 75 bb.addFullStream("foo/+/bar", 128) | 76 bb.addFullStream("foo/+/bar", 128) |
| 76 err := coll.Process(c, bb.bundle()) | 77 err := coll.Process(c, bb.bundle()) |
| 77 So(err, ShouldNotBeNil) | 78 So(err, ShouldNotBeNil) |
| 78 » » » So(errors.IsTransient(err), ShouldBeFalse) | 79 » » » So(retry.Tag.In(err), ShouldBeFalse) |
| 79 }) | 80 }) |
| 80 | 81 |
| 81 // This will happen when one registration request registers non-
terminal, | 82 // This will happen when one registration request registers non-
terminal, |
| 82 // and a follow-on registration request registers with a termina
l index. The | 83 // and a follow-on registration request registers with a termina
l index. The |
| 83 // latter registration request will idempotently succeed, but no
t accept the | 84 // latter registration request will idempotently succeed, but no
t accept the |
| 84 // terminal index, so termination is still required. | 85 // terminal index, so termination is still required. |
| 85 Convey(`Will terminate a stream if a terminal registration retur
ns a non-terminal response.`, func() { | 86 Convey(`Will terminate a stream if a terminal registration retur
ns a non-terminal response.`, func() { |
| 86 terminateCalled := false | 87 terminateCalled := false |
| 87 tcc.terminateCallback = func(cc.TerminateRequest) error
{ | 88 tcc.terminateCallback = func(cc.TerminateRequest) error
{ |
| 88 terminateCalled = true | 89 terminateCalled = true |
| 89 return nil | 90 return nil |
| 90 } | 91 } |
| 91 | 92 |
| 92 bb.addStreamEntries("foo/+/bar", -1, 0, 1) | 93 bb.addStreamEntries("foo/+/bar", -1, 0, 1) |
| 93 So(coll.Process(c, bb.bundle()), ShouldBeNil) | 94 So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| 94 | 95 |
| 95 bb.addStreamEntries("foo/+/bar", 3, 2, 3) | 96 bb.addStreamEntries("foo/+/bar", 3, 2, 3) |
| 96 So(coll.Process(c, bb.bundle()), ShouldBeNil) | 97 So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| 97 So(terminateCalled, ShouldBeTrue) | 98 So(terminateCalled, ShouldBeTrue) |
| 98 }) | 99 }) |
| 99 | 100 |
| 100 Convey(`Will return a transient error if a transient error happe
ned while terminating.`, func() { | 101 Convey(`Will return a transient error if a transient error happe
ned while terminating.`, func() { |
| 101 » » » tcc.terminateCallback = func(cc.TerminateRequest) error
{ return errors.WrapTransient(errors.New("test error")) } | 102 » » » tcc.terminateCallback = func(cc.TerminateRequest) error
{ return errors.New("test error", retry.Tag) } |
| 102 | 103 |
| 103 // Register independently from terminate so we don't bun
dle RPC. | 104 // Register independently from terminate so we don't bun
dle RPC. |
| 104 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2, 3, 4) | 105 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2, 3, 4) |
| 105 So(coll.Process(c, bb.bundle()), ShouldBeNil) | 106 So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| 106 | 107 |
| 107 // Add terminal index. | 108 // Add terminal index. |
| 108 bb.addStreamEntries("foo/+/bar", 5, 5) | 109 bb.addStreamEntries("foo/+/bar", 5, 5) |
| 109 err := coll.Process(c, bb.bundle()) | 110 err := coll.Process(c, bb.bundle()) |
| 110 So(err, ShouldNotBeNil) | 111 So(err, ShouldNotBeNil) |
| 111 » » » So(errors.IsTransient(err), ShouldBeTrue) | 112 » » » So(retry.Tag.In(err), ShouldBeTrue) |
| 112 }) | 113 }) |
| 113 | 114 |
| 114 Convey(`Will return an error if a non-transient error happened w
hile terminating.`, func() { | 115 Convey(`Will return an error if a non-transient error happened w
hile terminating.`, func() { |
| 115 tcc.terminateCallback = func(cc.TerminateRequest) error
{ return errors.New("test error") } | 116 tcc.terminateCallback = func(cc.TerminateRequest) error
{ return errors.New("test error") } |
| 116 | 117 |
| 117 // Register independently from terminate so we don't bun
dle RPC. | 118 // Register independently from terminate so we don't bun
dle RPC. |
| 118 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2, 3, 4) | 119 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2, 3, 4) |
| 119 So(coll.Process(c, bb.bundle()), ShouldBeNil) | 120 So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| 120 | 121 |
| 121 // Add terminal index. | 122 // Add terminal index. |
| 122 bb.addStreamEntries("foo/+/bar", 5, 5) | 123 bb.addStreamEntries("foo/+/bar", 5, 5) |
| 123 err := coll.Process(c, bb.bundle()) | 124 err := coll.Process(c, bb.bundle()) |
| 124 So(err, ShouldNotBeNil) | 125 So(err, ShouldNotBeNil) |
| 125 » » » So(errors.IsTransient(err), ShouldBeFalse) | 126 » » » So(retry.Tag.In(err), ShouldBeFalse) |
| 126 }) | 127 }) |
| 127 | 128 |
| 128 Convey(`Will return a transient error if a transient error happe
ned on storage.`, func() { | 129 Convey(`Will return a transient error if a transient error happe
ned on storage.`, func() { |
| 129 // Single transient error. | 130 // Single transient error. |
| 130 count := int32(0) | 131 count := int32(0) |
| 131 st.err = func() error { | 132 st.err = func() error { |
| 132 if atomic.AddInt32(&count, 1) == 1 { | 133 if atomic.AddInt32(&count, 1) == 1 { |
| 133 » » » » » return errors.WrapTransient(errors.New("
test error")) | 134 » » » » » return errors.New("test error", retry.Ta
g) |
| 134 } | 135 } |
| 135 return nil | 136 return nil |
| 136 } | 137 } |
| 137 | 138 |
| 138 bb.addFullStream("foo/+/bar", 128) | 139 bb.addFullStream("foo/+/bar", 128) |
| 139 err := coll.Process(c, bb.bundle()) | 140 err := coll.Process(c, bb.bundle()) |
| 140 So(err, ShouldNotBeNil) | 141 So(err, ShouldNotBeNil) |
| 141 » » » So(errors.IsTransient(err), ShouldBeTrue) | 142 » » » So(retry.Tag.In(err), ShouldBeTrue) |
| 142 }) | 143 }) |
| 143 | 144 |
| 144 Convey(`Will drop invalid LogStreamDescriptor bundle entries and
process the valid ones.`, func() { | 145 Convey(`Will drop invalid LogStreamDescriptor bundle entries and
process the valid ones.`, func() { |
| 145 be := bb.genBundleEntry("foo/+/trash", 1337, 4, 5, 6, 7,
8) | 146 be := bb.genBundleEntry("foo/+/trash", 1337, 4, 5, 6, 7,
8) |
| 146 bb.addBundleEntry(be) | 147 bb.addBundleEntry(be) |
| 147 | 148 |
| 148 bb.addStreamEntries("foo/+/trash", 0, 1, 3) // Invalid:
non-contiguous | 149 bb.addStreamEntries("foo/+/trash", 0, 1, 3) // Invalid:
non-contiguous |
| 149 bb.addFullStream("foo/+/bar", 32) | 150 bb.addFullStream("foo/+/bar", 32) |
| 150 | 151 |
| 151 err := coll.Process(c, bb.bundle()) | 152 err := coll.Process(c, bb.bundle()) |
| 152 So(err, ShouldNotBeNil) | 153 So(err, ShouldNotBeNil) |
| 153 » » » So(errors.IsTransient(err), ShouldBeFalse) | 154 » » » So(retry.Tag.In(err), ShouldBeFalse) |
| 154 | 155 |
| 155 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/bar", 32) | 156 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/bar", 32) |
| 156 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
r", indexRange{0, 31}) | 157 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
r", indexRange{0, 31}) |
| 157 | 158 |
| 158 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/trash", 1337) | 159 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/trash", 1337) |
| 159 So(st, shouldHaveStoredStream, "test-project", "foo/+/tr
ash", 4, 5, 6, 7, 8) | 160 So(st, shouldHaveStoredStream, "test-project", "foo/+/tr
ash", 4, 5, 6, 7, 8) |
| 160 }) | 161 }) |
| 161 | 162 |
| 162 Convey(`Will drop streams with missing (invalid) secrets.`, func
() { | 163 Convey(`Will drop streams with missing (invalid) secrets.`, func
() { |
| 163 b := bb.genBase() | 164 b := bb.genBase() |
| 164 b.Secret = nil | 165 b.Secret = nil |
| 165 bb.addFullStream("foo/+/bar", 4) | 166 bb.addFullStream("foo/+/bar", 4) |
| 166 | 167 |
| 167 err := coll.Process(c, bb.bundle()) | 168 err := coll.Process(c, bb.bundle()) |
| 168 So(err, ShouldErrLike, "invalid prefix secret") | 169 So(err, ShouldErrLike, "invalid prefix secret") |
| 169 » » » So(errors.IsTransient(err), ShouldBeFalse) | 170 » » » So(retry.Tag.In(err), ShouldBeFalse) |
| 170 }) | 171 }) |
| 171 | 172 |
| 172 Convey(`Will drop messages with mismatching secrets.`, func() { | 173 Convey(`Will drop messages with mismatching secrets.`, func() { |
| 173 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2) | 174 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2) |
| 174 So(coll.Process(c, bb.bundle()), ShouldBeNil) | 175 So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| 175 | 176 |
| 176 // Push another bundle with a different secret. | 177 // Push another bundle with a different secret. |
| 177 b := bb.genBase() | 178 b := bb.genBase() |
| 178 b.Secret = bytes.Repeat([]byte{0xAA}, types.PrefixSecret
Length) | 179 b.Secret = bytes.Repeat([]byte{0xAA}, types.PrefixSecret
Length) |
| 179 be := bb.genBundleEntry("foo/+/bar", 4, 3, 4) | 180 be := bb.genBundleEntry("foo/+/bar", 4, 3, 4) |
| 180 be.TerminalIndex = 1337 | 181 be.TerminalIndex = 1337 |
| 181 bb.addBundleEntry(be) | 182 bb.addBundleEntry(be) |
| 182 bb.addFullStream("foo/+/baz", 3) | 183 bb.addFullStream("foo/+/baz", 3) |
| 183 So(coll.Process(c, bb.bundle()), ShouldBeNil) | 184 So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| 184 | 185 |
| 185 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/bar", -1) | 186 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/bar", -1) |
| 186 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
r", indexRange{0, 2}) | 187 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
r", indexRange{0, 2}) |
| 187 | 188 |
| 188 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/baz", 2) | 189 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/baz", 2) |
| 189 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
z", indexRange{0, 2}) | 190 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
z", indexRange{0, 2}) |
| 190 }) | 191 }) |
| 191 | 192 |
| 192 Convey(`With an empty project name, will drop the stream.`, func
() { | 193 Convey(`With an empty project name, will drop the stream.`, func
() { |
| 193 b := bb.genBase() | 194 b := bb.genBase() |
| 194 b.Project = "" | 195 b.Project = "" |
| 195 bb.addFullStream("foo/+/baz", 3) | 196 bb.addFullStream("foo/+/baz", 3) |
| 196 | 197 |
| 197 err := coll.Process(c, bb.bundle()) | 198 err := coll.Process(c, bb.bundle()) |
| 198 So(err, ShouldErrLike, "invalid bundle project name") | 199 So(err, ShouldErrLike, "invalid bundle project name") |
| 199 » » » So(errors.IsTransient(err), ShouldBeFalse) | 200 » » » So(retry.Tag.In(err), ShouldBeFalse) |
| 200 }) | 201 }) |
| 201 | 202 |
| 202 Convey(`Will drop streams with invalid project names.`, func() { | 203 Convey(`Will drop streams with invalid project names.`, func() { |
| 203 b := bb.genBase() | 204 b := bb.genBase() |
| 204 b.Project = "!!!invalid name!!!" | 205 b.Project = "!!!invalid name!!!" |
| 205 So(cfgtypes.ProjectName(b.Project).Validate(), ShouldNot
BeNil) | 206 So(cfgtypes.ProjectName(b.Project).Validate(), ShouldNot
BeNil) |
| 206 | 207 |
| 207 err := coll.Process(c, bb.bundle()) | 208 err := coll.Process(c, bb.bundle()) |
| 208 So(err, ShouldErrLike, "invalid bundle project name") | 209 So(err, ShouldErrLike, "invalid bundle project name") |
| 209 » » » So(errors.IsTransient(err), ShouldBeFalse) | 210 » » » So(retry.Tag.In(err), ShouldBeFalse) |
| 210 }) | 211 }) |
| 211 | 212 |
| 212 Convey(`Will drop streams with empty bundle prefixes.`, func() { | 213 Convey(`Will drop streams with empty bundle prefixes.`, func() { |
| 213 b := bb.genBase() | 214 b := bb.genBase() |
| 214 b.Prefix = "" | 215 b.Prefix = "" |
| 215 | 216 |
| 216 err := coll.Process(c, bb.bundle()) | 217 err := coll.Process(c, bb.bundle()) |
| 217 So(err, ShouldErrLike, "invalid bundle prefix") | 218 So(err, ShouldErrLike, "invalid bundle prefix") |
| 218 » » » So(errors.IsTransient(err), ShouldBeFalse) | 219 » » » So(retry.Tag.In(err), ShouldBeFalse) |
| 219 }) | 220 }) |
| 220 | 221 |
| 221 Convey(`Will drop streams with invalid bundle prefixes.`, func()
{ | 222 Convey(`Will drop streams with invalid bundle prefixes.`, func()
{ |
| 222 b := bb.genBase() | 223 b := bb.genBase() |
| 223 b.Prefix = "!!!invalid prefix!!!" | 224 b.Prefix = "!!!invalid prefix!!!" |
| 224 So(types.StreamName(b.Prefix).Validate(), ShouldNotBeNil
) | 225 So(types.StreamName(b.Prefix).Validate(), ShouldNotBeNil
) |
| 225 | 226 |
| 226 err := coll.Process(c, bb.bundle()) | 227 err := coll.Process(c, bb.bundle()) |
| 227 So(err, ShouldErrLike, "invalid bundle prefix") | 228 So(err, ShouldErrLike, "invalid bundle prefix") |
| 228 » » » So(errors.IsTransient(err), ShouldBeFalse) | 229 » » » So(retry.Tag.In(err), ShouldBeFalse) |
| 229 }) | 230 }) |
| 230 | 231 |
| 231 Convey(`Will drop streams whose descriptor prefix doesn't match
its bundle's prefix.`, func() { | 232 Convey(`Will drop streams whose descriptor prefix doesn't match
its bundle's prefix.`, func() { |
| 232 bb.addStreamEntries("baz/+/bar", 3, 0, 1, 2, 3, 4) | 233 bb.addStreamEntries("baz/+/bar", 3, 0, 1, 2, 3, 4) |
| 233 | 234 |
| 234 err := coll.Process(c, bb.bundle()) | 235 err := coll.Process(c, bb.bundle()) |
| 235 So(err, ShouldErrLike, "mismatched bundle and entry pref
ixes") | 236 So(err, ShouldErrLike, "mismatched bundle and entry pref
ixes") |
| 236 » » » So(errors.IsTransient(err), ShouldBeFalse) | 237 » » » So(retry.Tag.In(err), ShouldBeFalse) |
| 237 }) | 238 }) |
| 238 | 239 |
| 239 Convey(`Will return no error if the data has a corrupt bundle he
ader.`, func() { | 240 Convey(`Will return no error if the data has a corrupt bundle he
ader.`, func() { |
| 240 So(coll.Process(c, []byte{0x00}), ShouldBeNil) | 241 So(coll.Process(c, []byte{0x00}), ShouldBeNil) |
| 241 So(tcc, shouldNotHaveRegisteredStream, "test-project", "
foo/+/bar") | 242 So(tcc, shouldNotHaveRegisteredStream, "test-project", "
foo/+/bar") |
| 242 }) | 243 }) |
| 243 | 244 |
| 244 Convey(`Will drop bundles with unknown ProtoVersion string.`, fu
nc() { | 245 Convey(`Will drop bundles with unknown ProtoVersion string.`, fu
nc() { |
| 245 buf := bytes.Buffer{} | 246 buf := bytes.Buffer{} |
| 246 w := butlerproto.Writer{ProtoVersion: "!!!invalid!!!"} | 247 w := butlerproto.Writer{ProtoVersion: "!!!invalid!!!"} |
| (...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 316 testCollectorImpl(t, false) | 317 testCollectorImpl(t, false) |
| 317 } | 318 } |
| 318 | 319 |
| 319 // TestCollectorWithCaching runs through a series of end-to-end Collector | 320 // TestCollectorWithCaching runs through a series of end-to-end Collector |
| 320 // workflows and ensures that the Collector behaves appropriately. | 321 // workflows and ensures that the Collector behaves appropriately. |
| 321 func TestCollectorWithCaching(t *testing.T) { | 322 func TestCollectorWithCaching(t *testing.T) { |
| 322 t.Parallel() | 323 t.Parallel() |
| 323 | 324 |
| 324 testCollectorImpl(t, true) | 325 testCollectorImpl(t, true) |
| 325 } | 326 } |
| OLD | NEW |