| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package collector |
| 6 |
| 7 import ( |
| 8 "bytes" |
| 9 "fmt" |
| 10 "sync/atomic" |
| 11 "testing" |
| 12 |
| 13 "github.com/luci/luci-go/common/clock/testclock" |
| 14 "github.com/luci/luci-go/common/errors" |
| 15 "github.com/luci/luci-go/common/logdog/butlerproto" |
| 16 "github.com/luci/luci-go/common/logdog/types" |
| 17 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" |
| 19 "github.com/luci/luci-go/server/logdog/storage/memory" |
| 20 "golang.org/x/net/context" |
| 21 |
| 22 . "github.com/smartystreets/goconvey/convey" |
| 23 ) |
| 24 |
| 25 // TestCollector runs through a series of end-to-end Collector workflows and |
| 26 // ensures that the Collector behaves appropriately. |
| 27 func TestCollector(t *testing.T) { |
| 28 t.Parallel() |
| 29 |
| 30 Convey(`Using a test configuration`, t, func() { |
| 31 c, _ := testclock.UseTime(context.Background(), testclock.TestTi
meLocal) |
| 32 |
| 33 tcc := &testCoordinator{} |
| 34 st := &testStorage{Storage: &memory.Storage{}} |
| 35 |
| 36 coll := &Collector{ |
| 37 Coordinator: tcc, |
| 38 Storage: st, |
| 39 } |
| 40 |
| 41 bb := bundleBuilder{ |
| 42 Context: c, |
| 43 } |
| 44 |
| 45 for _, phrase := range []string{"disabled", "enabled"} { |
| 46 v := phrase == "enabled" |
| 47 |
| 48 Convey(fmt.Sprintf(`When caching is %s`, phrase), func()
{ |
| 49 if v { |
| 50 coll.Coordinator = coordinator.NewCache(
coll.Coordinator, 0, 0) |
| 51 } |
| 52 |
| 53 Convey(`Can process multiple single full streams
from a Butler bundle.`, func() { |
| 54 bb.addFullStream("foo/+/bar", 128) |
| 55 bb.addFullStream("foo/+/baz", 256) |
| 56 |
| 57 So(coll.Process(c, bb.bundle()), ShouldB
eNil) |
| 58 |
| 59 So(tcc, shouldHaveRegisteredStream, "foo
/+/bar", 127) |
| 60 So(st, shouldHaveStoredStream, "foo/+/ba
r", indexRange{0, 127}) |
| 61 |
| 62 So(tcc, shouldHaveRegisteredStream, "foo
/+/baz", 255) |
| 63 So(st, shouldHaveStoredStream, "foo/+/ba
z", indexRange{0, 255}) |
| 64 }) |
| 65 |
| 66 Convey(`Will return a transient error if a trans
ient error happened while registering.`, func() { |
| 67 tcc.errC = make(chan error, 1) |
| 68 tcc.errC <- errors.WrapTransient(errors.
New("test error")) |
| 69 |
| 70 bb.addFullStream("foo/+/bar", 128) |
| 71 err := coll.Process(c, bb.bundle()) |
| 72 So(err, ShouldNotBeNil) |
| 73 So(errors.IsTransient(err), ShouldBeTrue
) |
| 74 }) |
| 75 |
| 76 Convey(`Will return an error if a non-transient
error happened while registering.`, func() { |
| 77 tcc.errC = make(chan error, 1) |
| 78 tcc.errC <- errors.New("test error") |
| 79 |
| 80 bb.addFullStream("foo/+/bar", 128) |
| 81 err := coll.Process(c, bb.bundle()) |
| 82 So(err, ShouldNotBeNil) |
| 83 So(errors.IsTransient(err), ShouldBeFals
e) |
| 84 }) |
| 85 |
| 86 Convey(`Will return a transient error if a trans
ient error happened while terminating.`, func() { |
| 87 tcc.errC = make(chan error, 2) |
| 88 tcc.errC <- nil
// Register |
| 89 tcc.errC <- errors.WrapTransient(errors.
New("test error")) // Terminate |
| 90 |
| 91 bb.addFullStream("foo/+/bar", 128) |
| 92 err := coll.Process(c, bb.bundle()) |
| 93 So(err, ShouldNotBeNil) |
| 94 So(errors.IsTransient(err), ShouldBeTrue
) |
| 95 }) |
| 96 |
| 97 Convey(`Will return an error if a non-transient
error happened while terminating.`, func() { |
| 98 tcc.errC = make(chan error, 2) |
| 99 tcc.errC <- nil //
Register |
| 100 tcc.errC <- errors.New("test error") //
Terminate |
| 101 |
| 102 bb.addFullStream("foo/+/bar", 128) |
| 103 err := coll.Process(c, bb.bundle()) |
| 104 So(err, ShouldNotBeNil) |
| 105 So(errors.IsTransient(err), ShouldBeFals
e) |
| 106 }) |
| 107 |
| 108 Convey(`Will return a transient error if an erro
r happened on storage.`, func() { |
| 109 // Single transient error. |
| 110 count := int32(0) |
| 111 st.err = func() error { |
| 112 if atomic.AddInt32(&count, 1) ==
1 { |
| 113 return errors.New("test
error") |
| 114 } |
| 115 return nil |
| 116 } |
| 117 |
| 118 bb.addFullStream("foo/+/bar", 128) |
| 119 err := coll.Process(c, bb.bundle()) |
| 120 So(err, ShouldNotBeNil) |
| 121 So(errors.IsTransient(err), ShouldBeTrue
) |
| 122 }) |
| 123 |
| 124 Convey(`Will drop invalid LogStreamDescriptor bu
ndle entries and process the valid ones.`, func() { |
| 125 be := bb.genBundleEntry("foo/+/trash", 1
337, 4, 6, 7, 8) |
| 126 be.Desc.ContentType = "" // Missing Cont
entType => invalid. |
| 127 |
| 128 bb.addStreamEntries("foo/+/trash", -1, 0
, 1, 2, 3, 5) |
| 129 bb.addBundleEntry(be) |
| 130 bb.addFullStream("foo/+/bar", 32) |
| 131 |
| 132 So(coll.Process(c, bb.bundle()), ShouldB
eNil) |
| 133 |
| 134 So(tcc, shouldHaveRegisteredStream, "foo
/+/bar", 32) |
| 135 So(st, shouldHaveStoredStream, "foo/+/ba
r", indexRange{0, 31}) |
| 136 |
| 137 So(tcc, shouldHaveRegisteredStream, "foo
/+/trash", -1) |
| 138 So(st, shouldHaveStoredStream, "foo/+/tr
ash", 0, 1, 2, 3, 5) |
| 139 }) |
| 140 |
| 141 Convey(`Will drop streams with missing secrets.`
, func() { |
| 142 be := bb.genBundleEntry("foo/+/trash", 2
, 0, 1, 2) |
| 143 be.Secret = nil |
| 144 bb.addBundleEntry(be) |
| 145 |
| 146 So(coll.Process(c, bb.bundle()), ShouldB
eNil) |
| 147 So(tcc, shouldNotHaveRegisteredStream, "
foo/+/bar", 127) |
| 148 }) |
| 149 |
| 150 Convey(`Will drop messages with mismatching secr
ets.`, func() { |
| 151 bb.addStreamEntries("foo/+/bar", -1, 0,
1, 2) |
| 152 So(coll.Process(c, bb.bundle()), ShouldB
eNil) |
| 153 |
| 154 // Push another bundle with a different
secret. |
| 155 be := bb.genBundleEntry("foo/+/bar", 4,
3, 4) |
| 156 be.Secret = bytes.Repeat([]byte{0xAA}, t
ypes.StreamSecretLength) |
| 157 be.TerminalIndex = 1337 |
| 158 bb.addBundleEntry(be) |
| 159 bb.addFullStream("foo/+/baz", 3) |
| 160 So(coll.Process(c, bb.bundle()), ShouldB
eNil) |
| 161 |
| 162 So(tcc, shouldHaveRegisteredStream, "foo
/+/bar", -1) |
| 163 So(st, shouldHaveStoredStream, "foo/+/ba
r", indexRange{0, 2}) |
| 164 |
| 165 So(tcc, shouldHaveRegisteredStream, "foo
/+/baz", 2) |
| 166 So(st, shouldHaveStoredStream, "foo/+/ba
z", indexRange{0, 2}) |
| 167 }) |
| 168 |
| 169 Convey(`Will return no error if the data has a c
orrupt bundle header.`, func() { |
| 170 So(coll.Process(c, []byte{0x00}), Should
BeNil) |
| 171 So(tcc, shouldNotHaveRegisteredStream, "
foo/+/bar") |
| 172 }) |
| 173 |
| 174 Convey(`Will drop bundles with unknown ProtoVers
ion string.`, func() { |
| 175 buf := bytes.Buffer{} |
| 176 w := butlerproto.Writer{ProtoVersion: "!
!!invalid!!!"} |
| 177 w.Write(&buf, &logpb.ButlerLogBundle{}) |
| 178 |
| 179 So(coll.Process(c, buf.Bytes()), ShouldB
eNil) |
| 180 |
| 181 So(tcc, shouldNotHaveRegisteredStream, "
foo/+/bar") |
| 182 }) |
| 183 |
| 184 Convey(`Will drop records beyond a local termina
l index.`, func() { |
| 185 bb.addStreamEntries("foo/+/bar", 3, 0, 1
, 2, 4) |
| 186 So(coll.Process(c, bb.bundle()), ShouldB
eNil) |
| 187 |
| 188 bb.addStreamEntries("foo/+/bar", 3, 3, 5
) |
| 189 So(coll.Process(c, bb.bundle()), ShouldB
eNil) |
| 190 |
| 191 So(tcc, shouldHaveRegisteredStream, "foo
/+/bar", 4) |
| 192 So(st, shouldHaveStoredStream, "foo/+/ba
r", indexRange{0, 3}) |
| 193 }) |
| 194 |
| 195 Convey(`Will not ingest records beyond a remote
terminal index.`, func() { |
| 196 tcc.register(coordinator.LogStreamState{ |
| 197 Path: "foo/+/bar", |
| 198 Secret: testSecret, |
| 199 TerminalIndex: 3, |
| 200 }) |
| 201 |
| 202 bb.addStreamEntries("foo/+/bar", 3, 0, 1
, 2) |
| 203 bb.addStreamEntries("foo/+/bar", 3, 3, 5
) |
| 204 So(coll.Process(c, bb.bundle()), ShouldB
eNil) |
| 205 |
| 206 So(tcc, shouldHaveRegisteredStream, "foo
/+/bar", 3) |
| 207 So(st, shouldHaveStoredStream, "foo/+/ba
r", indexRange{0, 3}) |
| 208 }) |
| 209 |
| 210 Convey(`Will not ingest records if the stream is
archived.`, func() { |
| 211 tcc.register(coordinator.LogStreamState{ |
| 212 Path: "foo/+/bar", |
| 213 Secret: testSecret, |
| 214 TerminalIndex: -1, |
| 215 Archived: true, |
| 216 }) |
| 217 |
| 218 bb.addStreamEntries("foo/+/bar", 3, 0, 1
, 2, 4) |
| 219 So(coll.Process(c, bb.bundle()), ShouldB
eNil) |
| 220 |
| 221 So(tcc, shouldHaveRegisteredStream, "foo
/+/bar", -1) |
| 222 So(st, shouldHaveStoredStream, "foo/+/ba
r") |
| 223 }) |
| 224 |
| 225 Convey(`Will not ingest records if the stream is
purged.`, func() { |
| 226 tcc.register(coordinator.LogStreamState{ |
| 227 Path: "foo/+/bar", |
| 228 Secret: testSecret, |
| 229 TerminalIndex: -1, |
| 230 Purged: true, |
| 231 }) |
| 232 |
| 233 So(coll.Process(c, bb.bundle()), ShouldB
eNil) |
| 234 |
| 235 So(tcc, shouldHaveRegisteredStream, "foo
/+/bar", -1) |
| 236 So(st, shouldHaveStoredStream, "foo/+/ba
r") |
| 237 }) |
| 238 |
| 239 Convey(`Will not ingest a bundle with no bundle
entries.`, func() { |
| 240 So(coll.Process(c, bb.bundleWithEntries(
)), ShouldBeNil) |
| 241 }) |
| 242 |
| 243 Convey(`Will not ingest a bundle whose log entri
es don't match their descriptor.`, func() { |
| 244 be := bb.genBundleEntry("foo/+/bar", 4,
0, 1, 3, 4) |
| 245 |
| 246 // Add a binary log entry. This does NOT
match the text descriptor, and |
| 247 // should fail validation. |
| 248 be.Logs = append(be.Logs, &logpb.LogEntr
y{ |
| 249 StreamIndex: 2, |
| 250 Sequence: 2, |
| 251 Content: &logpb.LogEntry_Binary{ |
| 252 &logpb.Binary{ |
| 253 Data: []byte{0xd
0, 0x6f, 0x00, 0xd5}, |
| 254 }, |
| 255 }, |
| 256 }) |
| 257 bb.addBundleEntry(be) |
| 258 So(coll.Process(c, bb.bundle()), ShouldB
eNil) |
| 259 |
| 260 So(tcc, shouldHaveRegisteredStream, "foo
/+/bar", 4) |
| 261 So(st, shouldHaveStoredStream, "foo/+/ba
r", 0, 1, 3, 4) |
| 262 }) |
| 263 }) |
| 264 } |
| 265 }) |
| 266 } |
| OLD | NEW |