OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package collector | 5 package collector |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "fmt" | 9 "fmt" |
10 "sync/atomic" | 10 "sync/atomic" |
11 "testing" | 11 "testing" |
12 | 12 |
13 "github.com/luci/luci-go/common/clock/testclock" | 13 "github.com/luci/luci-go/common/clock/testclock" |
| 14 "github.com/luci/luci-go/common/config" |
14 "github.com/luci/luci-go/common/errors" | 15 "github.com/luci/luci-go/common/errors" |
15 "github.com/luci/luci-go/common/logdog/butlerproto" | 16 "github.com/luci/luci-go/common/logdog/butlerproto" |
16 "github.com/luci/luci-go/common/logdog/types" | 17 "github.com/luci/luci-go/common/logdog/types" |
17 "github.com/luci/luci-go/common/proto/logdog/logpb" | 18 "github.com/luci/luci-go/common/proto/logdog/logpb" |
18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" | 19 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" |
19 "github.com/luci/luci-go/server/logdog/storage/memory" | 20 "github.com/luci/luci-go/server/logdog/storage/memory" |
20 "golang.org/x/net/context" | 21 "golang.org/x/net/context" |
21 | 22 |
22 . "github.com/luci/luci-go/common/testing/assertions" | 23 . "github.com/luci/luci-go/common/testing/assertions" |
23 . "github.com/smartystreets/goconvey/convey" | 24 . "github.com/smartystreets/goconvey/convey" |
24 ) | 25 ) |
25 | 26 |
26 // TestCollector runs through a series of end-to-end Collector workflows and | 27 // TestCollector runs through a series of end-to-end Collector workflows and |
27 // ensures that the Collector behaves appropriately. | 28 // ensures that the Collector behaves appropriately. |
28 func TestCollector(t *testing.T) { | 29 func testCollectorImpl(t *testing.T, caching bool) { |
29 » t.Parallel() | 30 » Convey(fmt.Sprintf(`Using a test configuration with caching == %v`, cach
ing), t, func() { |
30 | |
31 » Convey(`Using a test configuration`, t, func() { | |
32 c, _ := testclock.UseTime(context.Background(), testclock.TestTi
meLocal) | 31 c, _ := testclock.UseTime(context.Background(), testclock.TestTi
meLocal) |
33 | 32 |
34 tcc := &testCoordinator{} | 33 tcc := &testCoordinator{} |
35 st := &testStorage{Storage: &memory.Storage{}} | 34 st := &testStorage{Storage: &memory.Storage{}} |
36 | 35 |
37 coll := &Collector{ | 36 coll := &Collector{ |
38 Coordinator: tcc, | 37 Coordinator: tcc, |
39 Storage: st, | 38 Storage: st, |
40 } | 39 } |
41 defer coll.Close() | 40 defer coll.Close() |
42 | 41 |
43 bb := bundleBuilder{ | 42 bb := bundleBuilder{ |
44 Context: c, | 43 Context: c, |
45 } | 44 } |
46 | 45 |
47 » » for _, phrase := range []string{"disabled", "enabled"} { | 46 » » if caching { |
48 » » » v := phrase == "enabled" | 47 » » » coll.Coordinator = coordinator.NewCache(coll.Coordinator
, 0, 0) |
49 | 48 » » } |
50 » » » Convey(fmt.Sprintf(`When caching is %s`, phrase), func()
{ | 49 |
51 » » » » if v { | 50 » » Convey(`Can process multiple single full streams from a Butler b
undle.`, func() { |
52 » » » » » coll.Coordinator = coordinator.NewCache(
coll.Coordinator, 0, 0) | 51 » » » bb.addFullStream("foo/+/bar", 128) |
| 52 » » » bb.addFullStream("foo/+/baz", 256) |
| 53 |
| 54 » » » So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| 55 |
| 56 » » » So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/bar", 127) |
| 57 » » » So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
r", indexRange{0, 127}) |
| 58 |
| 59 » » » So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/baz", 255) |
| 60 » » » So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
z", indexRange{0, 255}) |
| 61 » » }) |
| 62 |
| 63 » » Convey(`Will return a transient error if a transient error happe
ned while registering.`, func() { |
| 64 » » » tcc.errC = make(chan error, 1) |
| 65 » » » tcc.errC <- errors.WrapTransient(errors.New("test error"
)) |
| 66 |
| 67 » » » bb.addFullStream("foo/+/bar", 128) |
| 68 » » » err := coll.Process(c, bb.bundle()) |
| 69 » » » So(err, ShouldNotBeNil) |
| 70 » » » So(errors.IsTransient(err), ShouldBeTrue) |
| 71 » » }) |
| 72 |
| 73 » » Convey(`Will return an error if a non-transient error happened w
hile registering.`, func() { |
| 74 » » » tcc.errC = make(chan error, 1) |
| 75 » » » tcc.errC <- errors.New("test error") |
| 76 |
| 77 » » » bb.addFullStream("foo/+/bar", 128) |
| 78 » » » err := coll.Process(c, bb.bundle()) |
| 79 » » » So(err, ShouldNotBeNil) |
| 80 » » » So(errors.IsTransient(err), ShouldBeFalse) |
| 81 » » }) |
| 82 |
| 83 » » Convey(`Will return a transient error if a transient error happe
ned while terminating.`, func() { |
| 84 » » » tcc.errC = make(chan error, 2) |
| 85 » » » tcc.errC <- nil
// Register |
| 86 » » » tcc.errC <- errors.WrapTransient(errors.New("test error"
)) // Terminate |
| 87 |
| 88 » » » bb.addFullStream("foo/+/bar", 128) |
| 89 » » » err := coll.Process(c, bb.bundle()) |
| 90 » » » So(err, ShouldNotBeNil) |
| 91 » » » So(errors.IsTransient(err), ShouldBeTrue) |
| 92 » » }) |
| 93 |
| 94 » » Convey(`Will return an error if a non-transient error happened w
hile terminating.`, func() { |
| 95 » » » tcc.errC = make(chan error, 2) |
| 96 » » » tcc.errC <- nil // Register |
| 97 » » » tcc.errC <- errors.New("test error") // Terminate |
| 98 |
| 99 » » » bb.addFullStream("foo/+/bar", 128) |
| 100 » » » err := coll.Process(c, bb.bundle()) |
| 101 » » » So(err, ShouldNotBeNil) |
| 102 » » » So(errors.IsTransient(err), ShouldBeFalse) |
| 103 » » }) |
| 104 |
| 105 » » Convey(`Will return a transient error if a transient error happe
ned on storage.`, func() { |
| 106 » » » // Single transient error. |
| 107 » » » count := int32(0) |
| 108 » » » st.err = func() error { |
| 109 » » » » if atomic.AddInt32(&count, 1) == 1 { |
| 110 » » » » » return errors.WrapTransient(errors.New("
test error")) |
53 } | 111 } |
54 | 112 return nil |
55 Convey(`Can process multiple single full streams
from a Butler bundle.`, func() { | 113 } |
56 bb.addFullStream("foo/+/bar", 128) | 114 |
57 bb.addFullStream("foo/+/baz", 256) | 115 bb.addFullStream("foo/+/bar", 128) |
58 | 116 err := coll.Process(c, bb.bundle()) |
59 So(coll.Process(c, bb.bundle()), ShouldB
eNil) | 117 So(err, ShouldNotBeNil) |
60 | 118 So(errors.IsTransient(err), ShouldBeTrue) |
61 So(tcc, shouldHaveRegisteredStream, "foo
/+/bar", 127) | 119 }) |
62 So(st, shouldHaveStoredStream, "foo/+/ba
r", indexRange{0, 127}) | 120 |
63 | 121 Convey(`Will drop invalid LogStreamDescriptor bundle entries and
process the valid ones.`, func() { |
64 So(tcc, shouldHaveRegisteredStream, "foo
/+/baz", 255) | 122 be := bb.genBundleEntry("foo/+/trash", 1337, 4, 5, 6, 7,
8) |
65 So(st, shouldHaveStoredStream, "foo/+/ba
z", indexRange{0, 255}) | 123 bb.addBundleEntry(be) |
66 }) | 124 |
67 | 125 bb.addStreamEntries("foo/+/trash", 0, 1, 3) // Invalid:
non-contiguous |
68 Convey(`Will return a transient error if a trans
ient error happened while registering.`, func() { | 126 bb.addFullStream("foo/+/bar", 32) |
69 tcc.errC = make(chan error, 1) | 127 |
70 tcc.errC <- errors.WrapTransient(errors.
New("test error")) | 128 err := coll.Process(c, bb.bundle()) |
71 | 129 So(err, ShouldNotBeNil) |
72 bb.addFullStream("foo/+/bar", 128) | 130 So(errors.IsTransient(err), ShouldBeFalse) |
73 err := coll.Process(c, bb.bundle()) | 131 |
74 So(err, ShouldNotBeNil) | 132 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/bar", 32) |
75 So(errors.IsTransient(err), ShouldBeTrue
) | 133 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
r", indexRange{0, 31}) |
76 }) | 134 |
77 | 135 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/trash", 1337) |
78 Convey(`Will return an error if a non-transient
error happened while registering.`, func() { | 136 So(st, shouldHaveStoredStream, "test-project", "foo/+/tr
ash", 4, 5, 6, 7, 8) |
79 tcc.errC = make(chan error, 1) | 137 }) |
80 tcc.errC <- errors.New("test error") | 138 |
81 | 139 Convey(`Will drop streams with missing (invalid) secrets.`, func
() { |
82 bb.addFullStream("foo/+/bar", 128) | 140 b := bb.genBase() |
83 err := coll.Process(c, bb.bundle()) | 141 b.Secret = nil |
84 So(err, ShouldNotBeNil) | 142 |
85 So(errors.IsTransient(err), ShouldBeFals
e) | 143 err := coll.Process(c, bb.bundle()) |
86 }) | 144 So(err, ShouldErrLike, "invalid prefix secret") |
87 | 145 So(errors.IsTransient(err), ShouldBeFalse) |
88 Convey(`Will return a transient error if a trans
ient error happened while terminating.`, func() { | 146 }) |
89 tcc.errC = make(chan error, 2) | 147 |
90 tcc.errC <- nil
// Register | 148 Convey(`Will drop messages with mismatching secrets.`, func() { |
91 tcc.errC <- errors.WrapTransient(errors.
New("test error")) // Terminate | 149 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2) |
92 | 150 So(coll.Process(c, bb.bundle()), ShouldBeNil) |
93 bb.addFullStream("foo/+/bar", 128) | 151 |
94 err := coll.Process(c, bb.bundle()) | 152 // Push another bundle with a different secret. |
95 So(err, ShouldNotBeNil) | 153 b := bb.genBase() |
96 So(errors.IsTransient(err), ShouldBeTrue
) | 154 b.Secret = bytes.Repeat([]byte{0xAA}, types.PrefixSecret
Length) |
97 }) | 155 be := bb.genBundleEntry("foo/+/bar", 4, 3, 4) |
98 | 156 be.TerminalIndex = 1337 |
99 Convey(`Will return an error if a non-transient
error happened while terminating.`, func() { | 157 bb.addBundleEntry(be) |
100 tcc.errC = make(chan error, 2) | 158 bb.addFullStream("foo/+/baz", 3) |
101 tcc.errC <- nil //
Register | 159 So(coll.Process(c, bb.bundle()), ShouldBeNil) |
102 tcc.errC <- errors.New("test error") //
Terminate | 160 |
103 | 161 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/bar", -1) |
104 bb.addFullStream("foo/+/bar", 128) | 162 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
r", indexRange{0, 2}) |
105 err := coll.Process(c, bb.bundle()) | 163 |
106 So(err, ShouldNotBeNil) | 164 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/baz", 2) |
107 So(errors.IsTransient(err), ShouldBeFals
e) | 165 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
z", indexRange{0, 2}) |
108 }) | 166 }) |
109 | 167 |
110 Convey(`Will return a transient error if a trans
ient error happened on storage.`, func() { | 168 Convey(`With an empty project name`, func() { |
111 // Single transient error. | 169 b := bb.genBase() |
112 count := int32(0) | 170 b.Project = "" |
113 st.err = func() error { | 171 bb.addFullStream("foo/+/baz", 3) |
114 if atomic.AddInt32(&count, 1) ==
1 { | 172 |
115 return errors.WrapTransi
ent(errors.New("test error")) | 173 // TODO(dnj): Enable this when project name is required. |
116 } | 174 SkipConvey(`Will drop the stream.`, func() { |
117 return nil | 175 |
118 } | 176 err := coll.Process(c, bb.bundle()) |
119 | 177 So(err, ShouldErrLike, "invalid project name") |
120 bb.addFullStream("foo/+/bar", 128) | 178 So(errors.IsTransient(err), ShouldBeFalse) |
121 err := coll.Process(c, bb.bundle()) | 179 }) |
122 So(err, ShouldNotBeNil) | 180 |
123 So(errors.IsTransient(err), ShouldBeTrue
) | 181 Convey(`Will register the stream.`, func() { |
124 }) | 182 So(coll.Process(c, bb.bundle()), ShouldBeNil) |
125 | 183 |
126 Convey(`Will drop invalid LogStreamDescriptor bu
ndle entries and process the valid ones.`, func() { | 184 So(tcc, shouldHaveRegisteredStream, "", "foo/+/b
az", 2) |
127 be := bb.genBundleEntry("foo/+/trash", 1
337, 4, 5, 6, 7, 8) | 185 So(st, shouldHaveStoredStream, "", "foo/+/baz",
indexRange{0, 2}) |
128 bb.addBundleEntry(be) | 186 }) |
129 | 187 }) |
130 bb.addStreamEntries("foo/+/trash", 0, 1,
3) // Invalid: non-contiguous | 188 |
131 bb.addFullStream("foo/+/bar", 32) | 189 Convey(`Will drop streams with invalid project names.`, func() { |
132 | 190 b := bb.genBase() |
133 err := coll.Process(c, bb.bundle()) | 191 b.Project = "!!!invalid name!!!" |
134 So(err, ShouldNotBeNil) | 192 So(config.ProjectName(b.Project).Validate(), ShouldNotBe
Nil) |
135 So(errors.IsTransient(err), ShouldBeFals
e) | 193 |
136 | 194 err := coll.Process(c, bb.bundle()) |
137 So(tcc, shouldHaveRegisteredStream, "foo
/+/bar", 32) | 195 So(err, ShouldErrLike, "invalid bundle project name") |
138 So(st, shouldHaveStoredStream, "foo/+/ba
r", indexRange{0, 31}) | 196 So(errors.IsTransient(err), ShouldBeFalse) |
139 | 197 }) |
140 So(tcc, shouldHaveRegisteredStream, "foo
/+/trash", 1337) | 198 |
141 So(st, shouldHaveStoredStream, "foo/+/tr
ash", 4, 5, 6, 7, 8) | 199 Convey(`Will drop streams with empty bundle prefixes.`, func() { |
142 }) | 200 b := bb.genBase() |
143 | 201 b.Prefix = "" |
144 Convey(`Will drop streams with missing secrets.`
, func() { | 202 |
145 be := bb.genBundleEntry("foo/+/trash", 2
, 0, 1, 2) | 203 err := coll.Process(c, bb.bundle()) |
146 be.Secret = nil | 204 So(err, ShouldErrLike, "invalid bundle prefix") |
147 bb.addBundleEntry(be) | 205 So(errors.IsTransient(err), ShouldBeFalse) |
148 | 206 }) |
149 err := coll.Process(c, bb.bundle()) | 207 |
150 So(err, ShouldErrLike, "missing stream s
ecret") | 208 Convey(`Will drop streams with invalid bundle prefixes.`, func()
{ |
151 So(errors.IsTransient(err), ShouldBeFals
e) | 209 b := bb.genBase() |
152 So(tcc, shouldNotHaveRegisteredStream, "
foo/+/bar") | 210 b.Prefix = "!!!invalid prefix!!!" |
153 }) | 211 So(types.StreamName(b.Prefix).Validate(), ShouldNotBeNil
) |
154 | 212 |
155 Convey(`Will drop messages with mismatching secr
ets.`, func() { | 213 err := coll.Process(c, bb.bundle()) |
156 bb.addStreamEntries("foo/+/bar", -1, 0,
1, 2) | 214 So(err, ShouldErrLike, "invalid bundle prefix") |
157 So(coll.Process(c, bb.bundle()), ShouldB
eNil) | 215 So(errors.IsTransient(err), ShouldBeFalse) |
158 | 216 }) |
159 // Push another bundle with a different
secret. | 217 |
160 be := bb.genBundleEntry("foo/+/bar", 4,
3, 4) | 218 Convey(`Will drop streams whose descriptor prefix doesn't match
its bundle's prefix.`, func() { |
161 be.Secret = bytes.Repeat([]byte{0xAA}, t
ypes.PrefixSecretLength) | 219 bb.addStreamEntries("baz/+/bar", 3, 0, 1, 2, 3, 4) |
162 be.TerminalIndex = 1337 | 220 |
163 bb.addBundleEntry(be) | 221 err := coll.Process(c, bb.bundle()) |
164 bb.addFullStream("foo/+/baz", 3) | 222 So(err, ShouldErrLike, "mismatched bundle and entry pref
ixes") |
165 So(coll.Process(c, bb.bundle()), ShouldB
eNil) | 223 So(errors.IsTransient(err), ShouldBeFalse) |
166 | 224 }) |
167 So(tcc, shouldHaveRegisteredStream, "foo
/+/bar", -1) | 225 |
168 So(st, shouldHaveStoredStream, "foo/+/ba
r", indexRange{0, 2}) | 226 Convey(`Will return no error if the data has a corrupt bundle he
ader.`, func() { |
169 | 227 So(coll.Process(c, []byte{0x00}), ShouldBeNil) |
170 So(tcc, shouldHaveRegisteredStream, "foo
/+/baz", 2) | 228 So(tcc, shouldNotHaveRegisteredStream, "test-project", "
foo/+/bar") |
171 So(st, shouldHaveStoredStream, "foo/+/ba
z", indexRange{0, 2}) | 229 }) |
172 }) | 230 |
173 | 231 Convey(`Will drop bundles with unknown ProtoVersion string.`, fu
nc() { |
174 Convey(`Will return no error if the data has a c
orrupt bundle header.`, func() { | 232 buf := bytes.Buffer{} |
175 So(coll.Process(c, []byte{0x00}), Should
BeNil) | 233 w := butlerproto.Writer{ProtoVersion: "!!!invalid!!!"} |
176 So(tcc, shouldNotHaveRegisteredStream, "
foo/+/bar") | 234 w.Write(&buf, &logpb.ButlerLogBundle{}) |
177 }) | 235 |
178 | 236 So(coll.Process(c, buf.Bytes()), ShouldBeNil) |
179 Convey(`Will drop bundles with unknown ProtoVers
ion string.`, func() { | 237 |
180 buf := bytes.Buffer{} | 238 So(tcc, shouldNotHaveRegisteredStream, "test-project", "
foo/+/bar") |
181 w := butlerproto.Writer{ProtoVersion: "!
!!invalid!!!"} | 239 }) |
182 w.Write(&buf, &logpb.ButlerLogBundle{}) | 240 |
183 | 241 Convey(`Will not ingest records if the stream is archived.`, fun
c() { |
184 So(coll.Process(c, buf.Bytes()), ShouldB
eNil) | 242 tcc.register(coordinator.LogStreamState{ |
185 | 243 Project: "test-project", |
186 So(tcc, shouldNotHaveRegisteredStream, "
foo/+/bar") | 244 Path: "foo/+/bar", |
187 }) | 245 Secret: testSecret, |
188 | 246 TerminalIndex: -1, |
189 Convey(`Will not ingest records if the stream is
archived.`, func() { | 247 Archived: true, |
190 tcc.register(coordinator.LogStreamState{ | 248 }) |
191 Path: "foo/+/bar", | 249 |
192 Secret: testSecret, | 250 bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 3, 4) |
193 TerminalIndex: -1, | 251 So(coll.Process(c, bb.bundle()), ShouldBeNil) |
194 Archived: true, | 252 |
195 }) | 253 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/bar", -1) |
196 | 254 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
r") |
197 bb.addStreamEntries("foo/+/bar", 3, 0, 1
, 2, 3, 4) | 255 }) |
198 So(coll.Process(c, bb.bundle()), ShouldB
eNil) | 256 |
199 | 257 Convey(`Will not ingest records if the stream is purged.`, func(
) { |
200 So(tcc, shouldHaveRegisteredStream, "foo
/+/bar", -1) | 258 tcc.register(coordinator.LogStreamState{ |
201 So(st, shouldHaveStoredStream, "foo/+/ba
r") | 259 Project: "test-project", |
202 }) | 260 Path: "foo/+/bar", |
203 | 261 Secret: testSecret, |
204 Convey(`Will not ingest records if the stream is
purged.`, func() { | 262 TerminalIndex: -1, |
205 tcc.register(coordinator.LogStreamState{ | 263 Purged: true, |
206 Path: "foo/+/bar", | 264 }) |
207 Secret: testSecret, | 265 |
208 TerminalIndex: -1, | 266 So(coll.Process(c, bb.bundle()), ShouldBeNil) |
209 Purged: true, | 267 |
210 }) | 268 So(tcc, shouldHaveRegisteredStream, "test-project", "foo
/+/bar", -1) |
211 | 269 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba
r") |
212 So(coll.Process(c, bb.bundle()), ShouldB
eNil) | 270 }) |
213 | 271 |
214 So(tcc, shouldHaveRegisteredStream, "foo
/+/bar", -1) | 272 Convey(`Will not ingest a bundle with no bundle entries.`, func(
) { |
215 So(st, shouldHaveStoredStream, "foo/+/ba
r") | 273 So(coll.Process(c, bb.bundle()), ShouldBeNil) |
216 }) | 274 }) |
217 | 275 |
218 Convey(`Will not ingest a bundle with no bundle
entries.`, func() { | 276 Convey(`Will not ingest a bundle whose log entries don't match t
heir descriptor.`, func() { |
219 So(coll.Process(c, bb.bundleWithEntries(
)), ShouldBeNil) | 277 be := bb.genBundleEntry("foo/+/bar", 4, 0, 1, 2, 3, 4) |
220 }) | 278 |
221 | 279 // Add a binary log entry. This does NOT match the text
descriptor, and |
222 Convey(`Will not ingest a bundle whose log entri
es don't match their descriptor.`, func() { | 280 // should fail validation. |
223 be := bb.genBundleEntry("foo/+/bar", 4,
0, 1, 2, 3, 4) | 281 be.Logs = append(be.Logs, &logpb.LogEntry{ |
224 | 282 StreamIndex: 2, |
225 // Add a binary log entry. This does NOT
match the text descriptor, and | 283 Sequence: 2, |
226 // should fail validation. | 284 Content: &logpb.LogEntry_Binary{ |
227 be.Logs = append(be.Logs, &logpb.LogEntr
y{ | 285 &logpb.Binary{ |
228 StreamIndex: 2, | 286 Data: []byte{0xd0, 0x6f, 0x00, 0
xd5}, |
229 Sequence: 2, | 287 }, |
230 Content: &logpb.LogEntry_Binary{ | 288 }, |
231 &logpb.Binary{ | 289 }) |
232 Data: []byte{0xd
0, 0x6f, 0x00, 0xd5}, | 290 bb.addBundleEntry(be) |
233 }, | 291 So(coll.Process(c, bb.bundle()), ShouldErrLike, "invalid
log entry") |
234 }, | 292 |
235 }) | 293 So(tcc, shouldNotHaveRegisteredStream, "test-project", "
foo/+/bar") |
236 bb.addBundleEntry(be) | 294 }) |
237 So(coll.Process(c, bb.bundle()), ShouldE
rrLike, "invalid log entry") | |
238 | |
239 So(tcc, shouldNotHaveRegisteredStream, "
foo/+/bar") | |
240 }) | |
241 }) | |
242 } | |
243 }) | 295 }) |
244 } | 296 } |
| 297 |
| 298 // TestCollector runs through a series of end-to-end Collector workflows and |
| 299 // ensures that the Collector behaves appropriately. |
| 300 func TestCollector(t *testing.T) { |
| 301 t.Parallel() |
| 302 |
| 303 testCollectorImpl(t, false) |
| 304 } |
| 305 |
| 306 // TestCollectorWithCaching runs through a series of end-to-end Collector |
| 307 // workflows and ensures that the Collector behaves appropriately. |
| 308 func TestCollectorWithCaching(t *testing.T) { |
| 309 t.Parallel() |
| 310 |
| 311 testCollectorImpl(t, true) |
| 312 } |
OLD | NEW |