Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package collector | |
| 6 | |
| 7 import ( | |
| 8 "bytes" | |
| 9 "fmt" | |
| 10 "sync/atomic" | |
| 11 "testing" | |
| 12 | |
| 13 "github.com/luci/luci-go/common/clock/testclock" | |
| 14 "github.com/luci/luci-go/common/errors" | |
| 15 "github.com/luci/luci-go/common/logdog/butlerproto" | |
| 16 "github.com/luci/luci-go/common/logdog/types" | |
| 17 "github.com/luci/luci-go/common/proto/logdog/logpb" | |
| 18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" | |
| 19 "github.com/luci/luci-go/server/logdog/storage/memory" | |
| 20 "golang.org/x/net/context" | |
| 21 | |
| 22 . "github.com/smartystreets/goconvey/convey" | |
| 23 ) | |
| 24 | |
| 25 // TestCollector runs through a series of end-to-end Collector workflows and | |
| 26 // ensures that the Collector behaves appropriately. | |
| 27 func TestCollector(t *testing.T) { | |
| 28 t.Parallel() | |
| 29 | |
| 30 Convey(`Using a test configuration`, t, func() { | |
| 31 c, _ := testclock.UseTime(context.Background(), testclock.TestTi meLocal) | |
| 32 | |
| 33 tcc := &testCoordinator{} | |
| 34 st := &testStorage{Storage: &memory.Storage{}} | |
| 35 | |
| 36 coll := &Collector{ | |
| 37 Coordinator: tcc, | |
| 38 Storage: st, | |
| 39 } | |
| 40 | |
| 41 bb := bundleBuilder{ | |
| 42 Context: c, | |
| 43 } | |
| 44 | |
| 45 for _, v := range []bool{ | |
|
iannucci
2016/02/05 23:41:01
for _, phrase := range []string{"disabled", "enabl
dnj (Google)
2016/02/06 04:10:36
Done.
| |
| 46 false, | |
| 47 true, | |
| 48 } { | |
| 49 phrase := "disabled" | |
| 50 if v { | |
| 51 phrase = "enabled" | |
| 52 } | |
| 53 Convey(fmt.Sprintf(`When caching is %s`, phrase), func() { | |
| 54 if v { | |
| 55 coll.Coordinator = coordinator.NewCache( coll.Coordinator, 0, 0) | |
| 56 } | |
| 57 | |
| 58 Convey(`Can process multiple single full streams from a Butler bundle.`, func() { | |
| 59 bb.addFullStream("foo/+/bar", 128) | |
| 60 bb.addFullStream("foo/+/baz", 256) | |
| 61 | |
| 62 So(coll.Process(c, bb.bundle()), ShouldB eNil) | |
| 63 | |
| 64 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", 127) | |
| 65 So(st, shouldHaveStoredStream, "foo/+/ba r", indexRange{0, 127}) | |
| 66 | |
| 67 So(tcc, shouldHaveRegisteredStream, "foo /+/baz", 255) | |
| 68 So(st, shouldHaveStoredStream, "foo/+/ba z", indexRange{0, 255}) | |
| 69 }) | |
| 70 | |
| 71 Convey(`Will return a transient error if a trans ient error happened while registering.`, func() { | |
| 72 tcc.errC = make(chan error, 1) | |
| 73 tcc.errC <- errors.WrapTransient(errors. New("test error")) | |
| 74 | |
| 75 bb.addFullStream("foo/+/bar", 128) | |
| 76 err := coll.Process(c, bb.bundle()) | |
| 77 So(err, ShouldNotBeNil) | |
| 78 So(errors.IsTransient(err), ShouldBeTrue ) | |
| 79 }) | |
| 80 | |
| 81 Convey(`Will return an error if a non-transient error happened while registering.`, func() { | |
| 82 tcc.errC = make(chan error, 1) | |
| 83 tcc.errC <- errors.New("test error") | |
| 84 | |
| 85 bb.addFullStream("foo/+/bar", 128) | |
| 86 err := coll.Process(c, bb.bundle()) | |
| 87 So(err, ShouldNotBeNil) | |
| 88 So(errors.IsTransient(err), ShouldBeFals e) | |
| 89 }) | |
| 90 | |
| 91 Convey(`Will return a transient error if a trans ient error happened while terminating.`, func() { | |
| 92 tcc.errC = make(chan error, 2) | |
| 93 tcc.errC <- nil // Register | |
| 94 tcc.errC <- errors.WrapTransient(errors. New("test error")) // Terminate | |
| 95 | |
| 96 bb.addFullStream("foo/+/bar", 128) | |
| 97 err := coll.Process(c, bb.bundle()) | |
| 98 So(err, ShouldNotBeNil) | |
| 99 So(errors.IsTransient(err), ShouldBeTrue ) | |
| 100 }) | |
| 101 | |
| 102 Convey(`Will return an error if a non-transient error happened while terminating.`, func() { | |
| 103 tcc.errC = make(chan error, 2) | |
| 104 tcc.errC <- nil // Register | |
| 105 tcc.errC <- errors.New("test error") // Terminate | |
| 106 | |
| 107 bb.addFullStream("foo/+/bar", 128) | |
| 108 err := coll.Process(c, bb.bundle()) | |
| 109 So(err, ShouldNotBeNil) | |
| 110 So(errors.IsTransient(err), ShouldBeFals e) | |
| 111 }) | |
| 112 | |
| 113 Convey(`Will return a transient error if an erro r happened on storage.`, func() { | |
| 114 // Single transient error. | |
| 115 count := int32(0) | |
| 116 st.err = func() error { | |
| 117 if atomic.AddInt32(&count, 1) == 1 { | |
| 118 return errors.New("test error") | |
| 119 } | |
| 120 return nil | |
| 121 } | |
| 122 | |
| 123 bb.addFullStream("foo/+/bar", 128) | |
| 124 err := coll.Process(c, bb.bundle()) | |
| 125 So(err, ShouldNotBeNil) | |
| 126 So(errors.IsTransient(err), ShouldBeTrue ) | |
| 127 }) | |
| 128 | |
| 129 Convey(`Will drop invalid LogStreamDescriptor bu ndle entries and process the valid ones.`, func() { | |
| 130 be := bb.genBundleEntry("foo/+/trash", 1 337, 4, 6, 7, 8) | |
| 131 be.Desc.ContentType = "" // Missing Cont entType => invalid. | |
| 132 | |
| 133 bb.addStreamEntries("foo/+/trash", -1, 0 , 1, 2, 3, 5) | |
| 134 bb.addBundleEntry(be) | |
| 135 bb.addFullStream("foo/+/bar", 32) | |
| 136 | |
| 137 So(coll.Process(c, bb.bundle()), ShouldB eNil) | |
| 138 | |
| 139 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", 32) | |
| 140 So(st, shouldHaveStoredStream, "foo/+/ba r", indexRange{0, 31}) | |
| 141 | |
| 142 So(tcc, shouldHaveRegisteredStream, "foo /+/trash", -1) | |
| 143 So(st, shouldHaveStoredStream, "foo/+/tr ash", 0, 1, 2, 3, 5) | |
| 144 }) | |
| 145 | |
| 146 Convey(`Will drop streams with missing secrets.` , func() { | |
| 147 be := bb.genBundleEntry("foo/+/trash", 2 , 0, 1, 2) | |
| 148 be.Secret = nil | |
| 149 bb.addBundleEntry(be) | |
| 150 | |
| 151 So(coll.Process(c, bb.bundle()), ShouldB eNil) | |
| 152 So(tcc, shouldNotHaveRegisteredStream, " foo/+/bar", 127) | |
| 153 }) | |
| 154 | |
| 155 Convey(`Will drop messages with mismatching secr ets.`, func() { | |
| 156 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2) | |
| 157 So(coll.Process(c, bb.bundle()), ShouldB eNil) | |
| 158 | |
| 159 // Push another bundle with a different secret. | |
| 160 be := bb.genBundleEntry("foo/+/bar", 4, 3, 4) | |
| 161 be.Secret = bytes.Repeat([]byte{0xAA}, t ypes.StreamSecretLength) | |
| 162 be.TerminalIndex = 1337 | |
| 163 bb.addBundleEntry(be) | |
| 164 bb.addFullStream("foo/+/baz", 3) | |
| 165 So(coll.Process(c, bb.bundle()), ShouldB eNil) | |
| 166 | |
| 167 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", -1) | |
| 168 So(st, shouldHaveStoredStream, "foo/+/ba r", indexRange{0, 2}) | |
| 169 | |
| 170 So(tcc, shouldHaveRegisteredStream, "foo /+/baz", 2) | |
| 171 So(st, shouldHaveStoredStream, "foo/+/ba z", indexRange{0, 2}) | |
| 172 }) | |
| 173 | |
| 174 Convey(`Will return no error if the data has a c orrupt bundle header.`, func() { | |
| 175 So(coll.Process(c, []byte{0x00}), Should BeNil) | |
| 176 So(tcc, shouldNotHaveRegisteredStream, " foo/+/bar") | |
| 177 }) | |
| 178 | |
| 179 Convey(`Will drop bundles with unknown ProtoVers ion string.`, func() { | |
| 180 buf := bytes.Buffer{} | |
| 181 w := butlerproto.Writer{ProtoVersion: "! !!invalid!!!"} | |
| 182 w.Write(&buf, &logpb.ButlerLogBundle{}) | |
| 183 | |
| 184 So(coll.Process(c, buf.Bytes()), ShouldB eNil) | |
| 185 | |
| 186 So(tcc, shouldNotHaveRegisteredStream, " foo/+/bar") | |
| 187 }) | |
| 188 | |
| 189 Convey(`Will drop records beyond a local termina l index.`, func() { | |
| 190 bb.addStreamEntries("foo/+/bar", 3, 0, 1 , 2, 4) | |
| 191 So(coll.Process(c, bb.bundle()), ShouldB eNil) | |
| 192 | |
| 193 bb.addStreamEntries("foo/+/bar", 3, 3, 5 ) | |
| 194 So(coll.Process(c, bb.bundle()), ShouldB eNil) | |
| 195 | |
| 196 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", 4) | |
| 197 So(st, shouldHaveStoredStream, "foo/+/ba r", indexRange{0, 3}) | |
| 198 }) | |
| 199 | |
| 200 Convey(`Will not ingest records beyond a remote terminal index.`, func() { | |
| 201 tcc.register(coordinator.LogStreamState{ | |
| 202 Path: "foo/+/bar", | |
| 203 Secret: testSecret, | |
| 204 TerminalIndex: 3, | |
| 205 }) | |
| 206 | |
| 207 bb.addStreamEntries("foo/+/bar", 3, 0, 1 , 2) | |
| 208 bb.addStreamEntries("foo/+/bar", 3, 3, 5 ) | |
| 209 So(coll.Process(c, bb.bundle()), ShouldB eNil) | |
| 210 | |
| 211 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", 3) | |
| 212 So(st, shouldHaveStoredStream, "foo/+/ba r", indexRange{0, 3}) | |
| 213 }) | |
| 214 | |
| 215 Convey(`Will not ingest records if the stream is archived.`, func() { | |
| 216 tcc.register(coordinator.LogStreamState{ | |
| 217 Path: "foo/+/bar", | |
| 218 Secret: testSecret, | |
| 219 TerminalIndex: -1, | |
| 220 Archived: true, | |
| 221 }) | |
| 222 | |
| 223 bb.addStreamEntries("foo/+/bar", 3, 0, 1 , 2, 4) | |
| 224 So(coll.Process(c, bb.bundle()), ShouldB eNil) | |
| 225 | |
| 226 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", -1) | |
| 227 So(st, shouldHaveStoredStream, "foo/+/ba r") | |
| 228 }) | |
| 229 | |
| 230 Convey(`Will not ingest records if the stream is purged.`, func() { | |
| 231 tcc.register(coordinator.LogStreamState{ | |
| 232 Path: "foo/+/bar", | |
| 233 Secret: testSecret, | |
| 234 TerminalIndex: -1, | |
| 235 Purged: true, | |
| 236 }) | |
| 237 | |
| 238 So(coll.Process(c, bb.bundle()), ShouldB eNil) | |
| 239 | |
| 240 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", -1) | |
| 241 So(st, shouldHaveStoredStream, "foo/+/ba r") | |
| 242 }) | |
| 243 | |
| 244 Convey(`Will not ingest a bundle with no bundle entries.`, func() { | |
| 245 So(coll.Process(c, bb.bundleWithEntries( )), ShouldBeNil) | |
| 246 }) | |
| 247 | |
| 248 Convey(`Will not ingest a bundle whose log entri es don't match their descriptor.`, func() { | |
| 249 be := bb.genBundleEntry("foo/+/bar", 4, 0, 1, 3, 4) | |
| 250 | |
| 251 // Add a binary log entry. This does NOT match the text descriptor, and | |
| 252 // should fail validation. | |
| 253 be.Logs = append(be.Logs, &logpb.LogEntr y{ | |
| 254 StreamIndex: 2, | |
| 255 Sequence: 2, | |
| 256 Content: &logpb.LogEntry_Binary{ | |
| 257 &logpb.Binary{ | |
| 258 Data: []byte{0xd 0, 0x6f, 0x00, 0xd5}, | |
| 259 }, | |
| 260 }, | |
| 261 }) | |
| 262 bb.addBundleEntry(be) | |
| 263 So(coll.Process(c, bb.bundle()), ShouldB eNil) | |
| 264 | |
| 265 So(tcc, shouldHaveRegisteredStream, "foo /+/bar", 4) | |
| 266 So(st, shouldHaveStoredStream, "foo/+/ba r", 0, 1, 3, 4) | |
| 267 }) | |
| 268 }) | |
| 269 } | |
| 270 }) | |
| 271 } | |
| OLD | NEW |