| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package collector | 5 package collector |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "fmt" | 9 "fmt" |
| 10 "sync/atomic" | 10 "sync/atomic" |
| 11 "testing" | 11 "testing" |
| 12 | 12 |
| 13 "github.com/luci/luci-go/common/clock/testclock" | 13 "github.com/luci/luci-go/common/clock/testclock" |
| 14 "github.com/luci/luci-go/common/errors" | 14 "github.com/luci/luci-go/common/errors" |
| 15 "github.com/luci/luci-go/common/logdog/butlerproto" | 15 "github.com/luci/luci-go/common/logdog/butlerproto" |
| 16 "github.com/luci/luci-go/common/logdog/types" | 16 "github.com/luci/luci-go/common/logdog/types" |
| 17 "github.com/luci/luci-go/common/proto/logdog/logpb" | 17 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" | 18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" |
| 19 "github.com/luci/luci-go/server/logdog/storage/memory" | 19 "github.com/luci/luci-go/server/logdog/storage/memory" |
| 20 "golang.org/x/net/context" | 20 "golang.org/x/net/context" |
| 21 | 21 |
| 22 . "github.com/luci/luci-go/common/testing/assertions" |
| 22 . "github.com/smartystreets/goconvey/convey" | 23 . "github.com/smartystreets/goconvey/convey" |
| 23 ) | 24 ) |
| 24 | 25 |
| 25 // TestCollector runs through a series of end-to-end Collector workflows and | 26 // TestCollector runs through a series of end-to-end Collector workflows and |
| 26 // ensures that the Collector behaves appropriately. | 27 // ensures that the Collector behaves appropriately. |
| 27 func TestCollector(t *testing.T) { | 28 func TestCollector(t *testing.T) { |
| 28 t.Parallel() | 29 t.Parallel() |
| 29 | 30 |
| 30 Convey(`Using a test configuration`, t, func() { | 31 Convey(`Using a test configuration`, t, func() { |
| 31 c, _ := testclock.UseTime(context.Background(), testclock.TestTi
meLocal) | 32 c, _ := testclock.UseTime(context.Background(), testclock.TestTi
meLocal) |
| (...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 99 tcc.errC = make(chan error, 2) | 100 tcc.errC = make(chan error, 2) |
| 100 tcc.errC <- nil //
Register | 101 tcc.errC <- nil //
Register |
| 101 tcc.errC <- errors.New("test error") //
Terminate | 102 tcc.errC <- errors.New("test error") //
Terminate |
| 102 | 103 |
| 103 bb.addFullStream("foo/+/bar", 128) | 104 bb.addFullStream("foo/+/bar", 128) |
| 104 err := coll.Process(c, bb.bundle()) | 105 err := coll.Process(c, bb.bundle()) |
| 105 So(err, ShouldNotBeNil) | 106 So(err, ShouldNotBeNil) |
| 106 So(errors.IsTransient(err), ShouldBeFals
e) | 107 So(errors.IsTransient(err), ShouldBeFals
e) |
| 107 }) | 108 }) |
| 108 | 109 |
| 109 » » » » Convey(`Will return a transient error if an erro
r happened on storage.`, func() { | 110 » » » » Convey(`Will return a transient error if a trans
ient error happened on storage.`, func() { |
| 110 // Single transient error. | 111 // Single transient error. |
| 111 count := int32(0) | 112 count := int32(0) |
| 112 st.err = func() error { | 113 st.err = func() error { |
| 113 if atomic.AddInt32(&count, 1) ==
1 { | 114 if atomic.AddInt32(&count, 1) ==
1 { |
| 114 » » » » » » » return errors.New("test
error") | 115 » » » » » » » return errors.WrapTransi
ent(errors.New("test error")) |
| 115 } | 116 } |
| 116 return nil | 117 return nil |
| 117 } | 118 } |
| 118 | 119 |
| 119 bb.addFullStream("foo/+/bar", 128) | 120 bb.addFullStream("foo/+/bar", 128) |
| 120 err := coll.Process(c, bb.bundle()) | 121 err := coll.Process(c, bb.bundle()) |
| 121 So(err, ShouldNotBeNil) | 122 So(err, ShouldNotBeNil) |
| 122 So(errors.IsTransient(err), ShouldBeTrue
) | 123 So(errors.IsTransient(err), ShouldBeTrue
) |
| 123 }) | 124 }) |
| 124 | 125 |
| 125 Convey(`Will drop invalid LogStreamDescriptor bu
ndle entries and process the valid ones.`, func() { | 126 Convey(`Will drop invalid LogStreamDescriptor bu
ndle entries and process the valid ones.`, func() { |
| 126 » » » » » be := bb.genBundleEntry("foo/+/trash", 1
337, 4, 6, 7, 8) | 127 » » » » » be := bb.genBundleEntry("foo/+/trash", 1
337, 4, 5, 6, 7, 8) |
| 127 » » » » » be.Desc.ContentType = "" // Missing Cont
entType => invalid. | 128 » » » » » bb.addBundleEntry(be) |
| 128 | 129 |
| 129 » » » » » bb.addStreamEntries("foo/+/trash", -1, 0
, 1, 2, 3, 5) | 130 » » » » » bb.addStreamEntries("foo/+/trash", 0, 1,
3) // Invalid: non-contiguous |
| 130 » » » » » bb.addBundleEntry(be) | |
| 131 bb.addFullStream("foo/+/bar", 32) | 131 bb.addFullStream("foo/+/bar", 32) |
| 132 | 132 |
| 133 » » » » » So(coll.Process(c, bb.bundle()), ShouldB
eNil) | 133 » » » » » err := coll.Process(c, bb.bundle()) |
| 134 » » » » » So(err, ShouldNotBeNil) |
| 135 » » » » » So(errors.IsTransient(err), ShouldBeFals
e) |
| 134 | 136 |
| 135 So(tcc, shouldHaveRegisteredStream, "foo
/+/bar", 32) | 137 So(tcc, shouldHaveRegisteredStream, "foo
/+/bar", 32) |
| 136 So(st, shouldHaveStoredStream, "foo/+/ba
r", indexRange{0, 31}) | 138 So(st, shouldHaveStoredStream, "foo/+/ba
r", indexRange{0, 31}) |
| 137 | 139 |
| 138 » » » » » So(tcc, shouldHaveRegisteredStream, "foo
/+/trash", -1) | 140 » » » » » So(tcc, shouldHaveRegisteredStream, "foo
/+/trash", 1337) |
| 139 » » » » » So(st, shouldHaveStoredStream, "foo/+/tr
ash", 0, 1, 2, 3, 5) | 141 » » » » » So(st, shouldHaveStoredStream, "foo/+/tr
ash", 4, 5, 6, 7, 8) |
| 140 }) | 142 }) |
| 141 | 143 |
| 142 Convey(`Will drop streams with missing secrets.`
, func() { | 144 Convey(`Will drop streams with missing secrets.`
, func() { |
| 143 be := bb.genBundleEntry("foo/+/trash", 2
, 0, 1, 2) | 145 be := bb.genBundleEntry("foo/+/trash", 2
, 0, 1, 2) |
| 144 be.Secret = nil | 146 be.Secret = nil |
| 145 bb.addBundleEntry(be) | 147 bb.addBundleEntry(be) |
| 146 | 148 |
| 147 » » » » » So(coll.Process(c, bb.bundle()), ShouldB
eNil) | 149 » » » » » err := coll.Process(c, bb.bundle()) |
| 148 » » » » » So(tcc, shouldNotHaveRegisteredStream, "
foo/+/bar", 127) | 150 » » » » » So(err, ShouldErrLike, "missing stream s
ecret") |
| 151 » » » » » So(errors.IsTransient(err), ShouldBeFals
e) |
| 152 » » » » » So(tcc, shouldNotHaveRegisteredStream, "
foo/+/bar") |
| 149 }) | 153 }) |
| 150 | 154 |
| 151 Convey(`Will drop messages with mismatching secr
ets.`, func() { | 155 Convey(`Will drop messages with mismatching secr
ets.`, func() { |
| 152 bb.addStreamEntries("foo/+/bar", -1, 0,
1, 2) | 156 bb.addStreamEntries("foo/+/bar", -1, 0,
1, 2) |
| 153 So(coll.Process(c, bb.bundle()), ShouldB
eNil) | 157 So(coll.Process(c, bb.bundle()), ShouldB
eNil) |
| 154 | 158 |
| 155 // Push another bundle with a different
secret. | 159 // Push another bundle with a different
secret. |
| 156 be := bb.genBundleEntry("foo/+/bar", 4,
3, 4) | 160 be := bb.genBundleEntry("foo/+/bar", 4,
3, 4) |
| 157 be.Secret = bytes.Repeat([]byte{0xAA}, t
ypes.StreamSecretLength) | 161 be.Secret = bytes.Repeat([]byte{0xAA}, t
ypes.StreamSecretLength) |
| 158 be.TerminalIndex = 1337 | 162 be.TerminalIndex = 1337 |
| (...skipping 16 matching lines...) Expand all Loading... |
| 175 Convey(`Will drop bundles with unknown ProtoVers
ion string.`, func() { | 179 Convey(`Will drop bundles with unknown ProtoVers
ion string.`, func() { |
| 176 buf := bytes.Buffer{} | 180 buf := bytes.Buffer{} |
| 177 w := butlerproto.Writer{ProtoVersion: "!
!!invalid!!!"} | 181 w := butlerproto.Writer{ProtoVersion: "!
!!invalid!!!"} |
| 178 w.Write(&buf, &logpb.ButlerLogBundle{}) | 182 w.Write(&buf, &logpb.ButlerLogBundle{}) |
| 179 | 183 |
| 180 So(coll.Process(c, buf.Bytes()), ShouldB
eNil) | 184 So(coll.Process(c, buf.Bytes()), ShouldB
eNil) |
| 181 | 185 |
| 182 So(tcc, shouldNotHaveRegisteredStream, "
foo/+/bar") | 186 So(tcc, shouldNotHaveRegisteredStream, "
foo/+/bar") |
| 183 }) | 187 }) |
| 184 | 188 |
| 185 Convey(`Will drop records beyond a local termina
l index.`, func() { | |
| 186 bb.addStreamEntries("foo/+/bar", 3, 0, 1
, 2, 4) | |
| 187 So(coll.Process(c, bb.bundle()), ShouldB
eNil) | |
| 188 | |
| 189 bb.addStreamEntries("foo/+/bar", 3, 3, 5
) | |
| 190 So(coll.Process(c, bb.bundle()), ShouldB
eNil) | |
| 191 | |
| 192 So(tcc, shouldHaveRegisteredStream, "foo
/+/bar", 4) | |
| 193 So(st, shouldHaveStoredStream, "foo/+/ba
r", indexRange{0, 3}) | |
| 194 }) | |
| 195 | |
| 196 Convey(`Will not ingest records beyond a remote
terminal index.`, func() { | |
| 197 tcc.register(coordinator.LogStreamState{ | |
| 198 Path: "foo/+/bar", | |
| 199 Secret: testSecret, | |
| 200 TerminalIndex: 3, | |
| 201 }) | |
| 202 | |
| 203 bb.addStreamEntries("foo/+/bar", 3, 0, 1
, 2) | |
| 204 bb.addStreamEntries("foo/+/bar", 3, 3, 5
) | |
| 205 So(coll.Process(c, bb.bundle()), ShouldB
eNil) | |
| 206 | |
| 207 So(tcc, shouldHaveRegisteredStream, "foo
/+/bar", 3) | |
| 208 So(st, shouldHaveStoredStream, "foo/+/ba
r", indexRange{0, 3}) | |
| 209 }) | |
| 210 | |
| 211 Convey(`Will not ingest records if the stream is
archived.`, func() { | 189 Convey(`Will not ingest records if the stream is
archived.`, func() { |
| 212 tcc.register(coordinator.LogStreamState{ | 190 tcc.register(coordinator.LogStreamState{ |
| 213 Path: "foo/+/bar", | 191 Path: "foo/+/bar", |
| 214 Secret: testSecret, | 192 Secret: testSecret, |
| 215 TerminalIndex: -1, | 193 TerminalIndex: -1, |
| 216 Archived: true, | 194 Archived: true, |
| 217 }) | 195 }) |
| 218 | 196 |
| 219 » » » » » bb.addStreamEntries("foo/+/bar", 3, 0, 1
, 2, 4) | 197 » » » » » bb.addStreamEntries("foo/+/bar", 3, 0, 1
, 2, 3, 4) |
| 220 So(coll.Process(c, bb.bundle()), ShouldB
eNil) | 198 So(coll.Process(c, bb.bundle()), ShouldB
eNil) |
| 221 | 199 |
| 222 So(tcc, shouldHaveRegisteredStream, "foo
/+/bar", -1) | 200 So(tcc, shouldHaveRegisteredStream, "foo
/+/bar", -1) |
| 223 So(st, shouldHaveStoredStream, "foo/+/ba
r") | 201 So(st, shouldHaveStoredStream, "foo/+/ba
r") |
| 224 }) | 202 }) |
| 225 | 203 |
| 226 Convey(`Will not ingest records if the stream is
purged.`, func() { | 204 Convey(`Will not ingest records if the stream is
purged.`, func() { |
| 227 tcc.register(coordinator.LogStreamState{ | 205 tcc.register(coordinator.LogStreamState{ |
| 228 Path: "foo/+/bar", | 206 Path: "foo/+/bar", |
| 229 Secret: testSecret, | 207 Secret: testSecret, |
| 230 TerminalIndex: -1, | 208 TerminalIndex: -1, |
| 231 Purged: true, | 209 Purged: true, |
| 232 }) | 210 }) |
| 233 | 211 |
| 234 So(coll.Process(c, bb.bundle()), ShouldB
eNil) | 212 So(coll.Process(c, bb.bundle()), ShouldB
eNil) |
| 235 | 213 |
| 236 So(tcc, shouldHaveRegisteredStream, "foo
/+/bar", -1) | 214 So(tcc, shouldHaveRegisteredStream, "foo
/+/bar", -1) |
| 237 So(st, shouldHaveStoredStream, "foo/+/ba
r") | 215 So(st, shouldHaveStoredStream, "foo/+/ba
r") |
| 238 }) | 216 }) |
| 239 | 217 |
| 240 Convey(`Will not ingest a bundle with no bundle
entries.`, func() { | 218 Convey(`Will not ingest a bundle with no bundle
entries.`, func() { |
| 241 So(coll.Process(c, bb.bundleWithEntries(
)), ShouldBeNil) | 219 So(coll.Process(c, bb.bundleWithEntries(
)), ShouldBeNil) |
| 242 }) | 220 }) |
| 243 | 221 |
| 244 Convey(`Will not ingest a bundle whose log entri
es don't match their descriptor.`, func() { | 222 Convey(`Will not ingest a bundle whose log entri
es don't match their descriptor.`, func() { |
| 245 » » » » » be := bb.genBundleEntry("foo/+/bar", 4,
0, 1, 3, 4) | 223 » » » » » be := bb.genBundleEntry("foo/+/bar", 4,
0, 1, 2, 3, 4) |
| 246 | 224 |
| 247 // Add a binary log entry. This does NOT
match the text descriptor, and | 225 // Add a binary log entry. This does NOT
match the text descriptor, and |
| 248 // should fail validation. | 226 // should fail validation. |
| 249 be.Logs = append(be.Logs, &logpb.LogEntr
y{ | 227 be.Logs = append(be.Logs, &logpb.LogEntr
y{ |
| 250 StreamIndex: 2, | 228 StreamIndex: 2, |
| 251 Sequence: 2, | 229 Sequence: 2, |
| 252 Content: &logpb.LogEntry_Binary{ | 230 Content: &logpb.LogEntry_Binary{ |
| 253 &logpb.Binary{ | 231 &logpb.Binary{ |
| 254 Data: []byte{0xd
0, 0x6f, 0x00, 0xd5}, | 232 Data: []byte{0xd
0, 0x6f, 0x00, 0xd5}, |
| 255 }, | 233 }, |
| 256 }, | 234 }, |
| 257 }) | 235 }) |
| 258 bb.addBundleEntry(be) | 236 bb.addBundleEntry(be) |
| 259 » » » » » So(coll.Process(c, bb.bundle()), ShouldB
eNil) | 237 » » » » » So(coll.Process(c, bb.bundle()), ShouldE
rrLike, "invalid log entry") |
| 260 | 238 |
| 261 » » » » » So(tcc, shouldHaveRegisteredStream, "foo
/+/bar", 4) | 239 » » » » » So(tcc, shouldNotHaveRegisteredStream, "
foo/+/bar") |
| 262 » » » » » So(st, shouldHaveStoredStream, "foo/+/ba
r", 0, 1, 3, 4) | |
| 263 }) | 240 }) |
| 264 }) | 241 }) |
| 265 } | 242 } |
| 266 }) | 243 }) |
| 267 } | 244 } |
| OLD | NEW |