Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(288)

Side by Side Diff: logdog/server/collector/collector_test.go

Issue 2951393002: [errors] de-specialize Transient in favor of Tags. (Closed)
Patch Set: more refactor Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The LUCI Authors. All rights reserved. 1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package collector 5 package collector
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "fmt" 9 "fmt"
10 "sync/atomic" 10 "sync/atomic"
11 "testing" 11 "testing"
12 12
13 "github.com/luci/luci-go/common/clock/testclock" 13 "github.com/luci/luci-go/common/clock/testclock"
14 "github.com/luci/luci-go/common/errors" 14 "github.com/luci/luci-go/common/errors"
15 "github.com/luci/luci-go/common/retry/transient"
15 "github.com/luci/luci-go/logdog/api/logpb" 16 "github.com/luci/luci-go/logdog/api/logpb"
16 "github.com/luci/luci-go/logdog/client/butlerproto" 17 "github.com/luci/luci-go/logdog/client/butlerproto"
17 "github.com/luci/luci-go/logdog/common/storage/memory" 18 "github.com/luci/luci-go/logdog/common/storage/memory"
18 "github.com/luci/luci-go/logdog/common/types" 19 "github.com/luci/luci-go/logdog/common/types"
19 cc "github.com/luci/luci-go/logdog/server/collector/coordinator" 20 cc "github.com/luci/luci-go/logdog/server/collector/coordinator"
20 "github.com/luci/luci-go/luci_config/common/cfgtypes" 21 "github.com/luci/luci-go/luci_config/common/cfgtypes"
21 "golang.org/x/net/context" 22 "golang.org/x/net/context"
22 23
23 . "github.com/luci/luci-go/common/testing/assertions" 24 . "github.com/luci/luci-go/common/testing/assertions"
24 . "github.com/smartystreets/goconvey/convey" 25 . "github.com/smartystreets/goconvey/convey"
(...skipping 29 matching lines...) Expand all
54 So(coll.Process(c, bb.bundle()), ShouldBeNil) 55 So(coll.Process(c, bb.bundle()), ShouldBeNil)
55 56
56 So(tcc, shouldHaveRegisteredStream, "test-project", "foo /+/bar", 127) 57 So(tcc, shouldHaveRegisteredStream, "test-project", "foo /+/bar", 127)
57 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba r", indexRange{0, 127}) 58 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba r", indexRange{0, 127})
58 59
59 So(tcc, shouldHaveRegisteredStream, "test-project", "foo /+/baz", 255) 60 So(tcc, shouldHaveRegisteredStream, "test-project", "foo /+/baz", 255)
60 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba z", indexRange{0, 255}) 61 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba z", indexRange{0, 255})
61 }) 62 })
62 63
63 Convey(`Will return a transient error if a transient error happe ned while registering.`, func() { 64 Convey(`Will return a transient error if a transient error happe ned while registering.`, func() {
64 » » » tcc.registerCallback = func(cc.LogStreamState) error { r eturn errors.WrapTransient(errors.New("test error")) } 65 » » » tcc.registerCallback = func(cc.LogStreamState) error { r eturn errors.New("test error", transient.Tag) }
65 66
66 bb.addFullStream("foo/+/bar", 128) 67 bb.addFullStream("foo/+/bar", 128)
67 err := coll.Process(c, bb.bundle()) 68 err := coll.Process(c, bb.bundle())
68 So(err, ShouldNotBeNil) 69 So(err, ShouldNotBeNil)
69 » » » So(errors.IsTransient(err), ShouldBeTrue) 70 » » » So(transient.Tag.In(err), ShouldBeTrue)
70 }) 71 })
71 72
72 Convey(`Will return an error if a non-transient error happened w hile registering.`, func() { 73 Convey(`Will return an error if a non-transient error happened w hile registering.`, func() {
73 tcc.registerCallback = func(cc.LogStreamState) error { r eturn errors.New("test error") } 74 tcc.registerCallback = func(cc.LogStreamState) error { r eturn errors.New("test error") }
74 75
75 bb.addFullStream("foo/+/bar", 128) 76 bb.addFullStream("foo/+/bar", 128)
76 err := coll.Process(c, bb.bundle()) 77 err := coll.Process(c, bb.bundle())
77 So(err, ShouldNotBeNil) 78 So(err, ShouldNotBeNil)
78 » » » So(errors.IsTransient(err), ShouldBeFalse) 79 » » » So(transient.Tag.In(err), ShouldBeFalse)
79 }) 80 })
80 81
81 // This will happen when one registration request registers non- terminal, 82 // This will happen when one registration request registers non- terminal,
82 // and a follow-on registration request registers with a termina l index. The 83 // and a follow-on registration request registers with a termina l index. The
83 // latter registration request will idempotently succeed, but no t accept the 84 // latter registration request will idempotently succeed, but no t accept the
84 // terminal index, so termination is still required. 85 // terminal index, so termination is still required.
85 Convey(`Will terminate a stream if a terminal registration retur ns a non-terminal response.`, func() { 86 Convey(`Will terminate a stream if a terminal registration retur ns a non-terminal response.`, func() {
86 terminateCalled := false 87 terminateCalled := false
87 tcc.terminateCallback = func(cc.TerminateRequest) error { 88 tcc.terminateCallback = func(cc.TerminateRequest) error {
88 terminateCalled = true 89 terminateCalled = true
89 return nil 90 return nil
90 } 91 }
91 92
92 bb.addStreamEntries("foo/+/bar", -1, 0, 1) 93 bb.addStreamEntries("foo/+/bar", -1, 0, 1)
93 So(coll.Process(c, bb.bundle()), ShouldBeNil) 94 So(coll.Process(c, bb.bundle()), ShouldBeNil)
94 95
95 bb.addStreamEntries("foo/+/bar", 3, 2, 3) 96 bb.addStreamEntries("foo/+/bar", 3, 2, 3)
96 So(coll.Process(c, bb.bundle()), ShouldBeNil) 97 So(coll.Process(c, bb.bundle()), ShouldBeNil)
97 So(terminateCalled, ShouldBeTrue) 98 So(terminateCalled, ShouldBeTrue)
98 }) 99 })
99 100
100 Convey(`Will return a transient error if a transient error happe ned while terminating.`, func() { 101 Convey(`Will return a transient error if a transient error happe ned while terminating.`, func() {
101 » » » tcc.terminateCallback = func(cc.TerminateRequest) error { return errors.WrapTransient(errors.New("test error")) } 102 » » » tcc.terminateCallback = func(cc.TerminateRequest) error { return errors.New("test error", transient.Tag) }
102 103
103 // Register independently from terminate so we don't bun dle RPC. 104 // Register independently from terminate so we don't bun dle RPC.
104 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2, 3, 4) 105 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2, 3, 4)
105 So(coll.Process(c, bb.bundle()), ShouldBeNil) 106 So(coll.Process(c, bb.bundle()), ShouldBeNil)
106 107
107 // Add terminal index. 108 // Add terminal index.
108 bb.addStreamEntries("foo/+/bar", 5, 5) 109 bb.addStreamEntries("foo/+/bar", 5, 5)
109 err := coll.Process(c, bb.bundle()) 110 err := coll.Process(c, bb.bundle())
110 So(err, ShouldNotBeNil) 111 So(err, ShouldNotBeNil)
111 » » » So(errors.IsTransient(err), ShouldBeTrue) 112 » » » So(transient.Tag.In(err), ShouldBeTrue)
112 }) 113 })
113 114
114 Convey(`Will return an error if a non-transient error happened w hile terminating.`, func() { 115 Convey(`Will return an error if a non-transient error happened w hile terminating.`, func() {
115 tcc.terminateCallback = func(cc.TerminateRequest) error { return errors.New("test error") } 116 tcc.terminateCallback = func(cc.TerminateRequest) error { return errors.New("test error") }
116 117
117 // Register independently from terminate so we don't bun dle RPC. 118 // Register independently from terminate so we don't bun dle RPC.
118 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2, 3, 4) 119 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2, 3, 4)
119 So(coll.Process(c, bb.bundle()), ShouldBeNil) 120 So(coll.Process(c, bb.bundle()), ShouldBeNil)
120 121
121 // Add terminal index. 122 // Add terminal index.
122 bb.addStreamEntries("foo/+/bar", 5, 5) 123 bb.addStreamEntries("foo/+/bar", 5, 5)
123 err := coll.Process(c, bb.bundle()) 124 err := coll.Process(c, bb.bundle())
124 So(err, ShouldNotBeNil) 125 So(err, ShouldNotBeNil)
125 » » » So(errors.IsTransient(err), ShouldBeFalse) 126 » » » So(transient.Tag.In(err), ShouldBeFalse)
126 }) 127 })
127 128
128 Convey(`Will return a transient error if a transient error happe ned on storage.`, func() { 129 Convey(`Will return a transient error if a transient error happe ned on storage.`, func() {
129 // Single transient error. 130 // Single transient error.
130 count := int32(0) 131 count := int32(0)
131 st.err = func() error { 132 st.err = func() error {
132 if atomic.AddInt32(&count, 1) == 1 { 133 if atomic.AddInt32(&count, 1) == 1 {
133 » » » » » return errors.WrapTransient(errors.New(" test error")) 134 » » » » » return errors.New("test error", transien t.Tag)
134 } 135 }
135 return nil 136 return nil
136 } 137 }
137 138
138 bb.addFullStream("foo/+/bar", 128) 139 bb.addFullStream("foo/+/bar", 128)
139 err := coll.Process(c, bb.bundle()) 140 err := coll.Process(c, bb.bundle())
140 So(err, ShouldNotBeNil) 141 So(err, ShouldNotBeNil)
141 » » » So(errors.IsTransient(err), ShouldBeTrue) 142 » » » So(transient.Tag.In(err), ShouldBeTrue)
142 }) 143 })
143 144
144 Convey(`Will drop invalid LogStreamDescriptor bundle entries and process the valid ones.`, func() { 145 Convey(`Will drop invalid LogStreamDescriptor bundle entries and process the valid ones.`, func() {
145 be := bb.genBundleEntry("foo/+/trash", 1337, 4, 5, 6, 7, 8) 146 be := bb.genBundleEntry("foo/+/trash", 1337, 4, 5, 6, 7, 8)
146 bb.addBundleEntry(be) 147 bb.addBundleEntry(be)
147 148
148 bb.addStreamEntries("foo/+/trash", 0, 1, 3) // Invalid: non-contiguous 149 bb.addStreamEntries("foo/+/trash", 0, 1, 3) // Invalid: non-contiguous
149 bb.addFullStream("foo/+/bar", 32) 150 bb.addFullStream("foo/+/bar", 32)
150 151
151 err := coll.Process(c, bb.bundle()) 152 err := coll.Process(c, bb.bundle())
152 So(err, ShouldNotBeNil) 153 So(err, ShouldNotBeNil)
153 » » » So(errors.IsTransient(err), ShouldBeFalse) 154 » » » So(transient.Tag.In(err), ShouldBeFalse)
154 155
155 So(tcc, shouldHaveRegisteredStream, "test-project", "foo /+/bar", 32) 156 So(tcc, shouldHaveRegisteredStream, "test-project", "foo /+/bar", 32)
156 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba r", indexRange{0, 31}) 157 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba r", indexRange{0, 31})
157 158
158 So(tcc, shouldHaveRegisteredStream, "test-project", "foo /+/trash", 1337) 159 So(tcc, shouldHaveRegisteredStream, "test-project", "foo /+/trash", 1337)
159 So(st, shouldHaveStoredStream, "test-project", "foo/+/tr ash", 4, 5, 6, 7, 8) 160 So(st, shouldHaveStoredStream, "test-project", "foo/+/tr ash", 4, 5, 6, 7, 8)
160 }) 161 })
161 162
162 Convey(`Will drop streams with missing (invalid) secrets.`, func () { 163 Convey(`Will drop streams with missing (invalid) secrets.`, func () {
163 b := bb.genBase() 164 b := bb.genBase()
164 b.Secret = nil 165 b.Secret = nil
165 bb.addFullStream("foo/+/bar", 4) 166 bb.addFullStream("foo/+/bar", 4)
166 167
167 err := coll.Process(c, bb.bundle()) 168 err := coll.Process(c, bb.bundle())
168 So(err, ShouldErrLike, "invalid prefix secret") 169 So(err, ShouldErrLike, "invalid prefix secret")
169 » » » So(errors.IsTransient(err), ShouldBeFalse) 170 » » » So(transient.Tag.In(err), ShouldBeFalse)
170 }) 171 })
171 172
172 Convey(`Will drop messages with mismatching secrets.`, func() { 173 Convey(`Will drop messages with mismatching secrets.`, func() {
173 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2) 174 bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2)
174 So(coll.Process(c, bb.bundle()), ShouldBeNil) 175 So(coll.Process(c, bb.bundle()), ShouldBeNil)
175 176
176 // Push another bundle with a different secret. 177 // Push another bundle with a different secret.
177 b := bb.genBase() 178 b := bb.genBase()
178 b.Secret = bytes.Repeat([]byte{0xAA}, types.PrefixSecret Length) 179 b.Secret = bytes.Repeat([]byte{0xAA}, types.PrefixSecret Length)
179 be := bb.genBundleEntry("foo/+/bar", 4, 3, 4) 180 be := bb.genBundleEntry("foo/+/bar", 4, 3, 4)
180 be.TerminalIndex = 1337 181 be.TerminalIndex = 1337
181 bb.addBundleEntry(be) 182 bb.addBundleEntry(be)
182 bb.addFullStream("foo/+/baz", 3) 183 bb.addFullStream("foo/+/baz", 3)
183 So(coll.Process(c, bb.bundle()), ShouldBeNil) 184 So(coll.Process(c, bb.bundle()), ShouldBeNil)
184 185
185 So(tcc, shouldHaveRegisteredStream, "test-project", "foo /+/bar", -1) 186 So(tcc, shouldHaveRegisteredStream, "test-project", "foo /+/bar", -1)
186 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba r", indexRange{0, 2}) 187 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba r", indexRange{0, 2})
187 188
188 So(tcc, shouldHaveRegisteredStream, "test-project", "foo /+/baz", 2) 189 So(tcc, shouldHaveRegisteredStream, "test-project", "foo /+/baz", 2)
189 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba z", indexRange{0, 2}) 190 So(st, shouldHaveStoredStream, "test-project", "foo/+/ba z", indexRange{0, 2})
190 }) 191 })
191 192
192 Convey(`With an empty project name, will drop the stream.`, func () { 193 Convey(`With an empty project name, will drop the stream.`, func () {
193 b := bb.genBase() 194 b := bb.genBase()
194 b.Project = "" 195 b.Project = ""
195 bb.addFullStream("foo/+/baz", 3) 196 bb.addFullStream("foo/+/baz", 3)
196 197
197 err := coll.Process(c, bb.bundle()) 198 err := coll.Process(c, bb.bundle())
198 So(err, ShouldErrLike, "invalid bundle project name") 199 So(err, ShouldErrLike, "invalid bundle project name")
199 » » » So(errors.IsTransient(err), ShouldBeFalse) 200 » » » So(transient.Tag.In(err), ShouldBeFalse)
200 }) 201 })
201 202
202 Convey(`Will drop streams with invalid project names.`, func() { 203 Convey(`Will drop streams with invalid project names.`, func() {
203 b := bb.genBase() 204 b := bb.genBase()
204 b.Project = "!!!invalid name!!!" 205 b.Project = "!!!invalid name!!!"
205 So(cfgtypes.ProjectName(b.Project).Validate(), ShouldNot BeNil) 206 So(cfgtypes.ProjectName(b.Project).Validate(), ShouldNot BeNil)
206 207
207 err := coll.Process(c, bb.bundle()) 208 err := coll.Process(c, bb.bundle())
208 So(err, ShouldErrLike, "invalid bundle project name") 209 So(err, ShouldErrLike, "invalid bundle project name")
209 » » » So(errors.IsTransient(err), ShouldBeFalse) 210 » » » So(transient.Tag.In(err), ShouldBeFalse)
210 }) 211 })
211 212
212 Convey(`Will drop streams with empty bundle prefixes.`, func() { 213 Convey(`Will drop streams with empty bundle prefixes.`, func() {
213 b := bb.genBase() 214 b := bb.genBase()
214 b.Prefix = "" 215 b.Prefix = ""
215 216
216 err := coll.Process(c, bb.bundle()) 217 err := coll.Process(c, bb.bundle())
217 So(err, ShouldErrLike, "invalid bundle prefix") 218 So(err, ShouldErrLike, "invalid bundle prefix")
218 » » » So(errors.IsTransient(err), ShouldBeFalse) 219 » » » So(transient.Tag.In(err), ShouldBeFalse)
219 }) 220 })
220 221
221 Convey(`Will drop streams with invalid bundle prefixes.`, func() { 222 Convey(`Will drop streams with invalid bundle prefixes.`, func() {
222 b := bb.genBase() 223 b := bb.genBase()
223 b.Prefix = "!!!invalid prefix!!!" 224 b.Prefix = "!!!invalid prefix!!!"
224 So(types.StreamName(b.Prefix).Validate(), ShouldNotBeNil ) 225 So(types.StreamName(b.Prefix).Validate(), ShouldNotBeNil )
225 226
226 err := coll.Process(c, bb.bundle()) 227 err := coll.Process(c, bb.bundle())
227 So(err, ShouldErrLike, "invalid bundle prefix") 228 So(err, ShouldErrLike, "invalid bundle prefix")
228 » » » So(errors.IsTransient(err), ShouldBeFalse) 229 » » » So(transient.Tag.In(err), ShouldBeFalse)
229 }) 230 })
230 231
231 Convey(`Will drop streams whose descriptor prefix doesn't match its bundle's prefix.`, func() { 232 Convey(`Will drop streams whose descriptor prefix doesn't match its bundle's prefix.`, func() {
232 bb.addStreamEntries("baz/+/bar", 3, 0, 1, 2, 3, 4) 233 bb.addStreamEntries("baz/+/bar", 3, 0, 1, 2, 3, 4)
233 234
234 err := coll.Process(c, bb.bundle()) 235 err := coll.Process(c, bb.bundle())
235 So(err, ShouldErrLike, "mismatched bundle and entry pref ixes") 236 So(err, ShouldErrLike, "mismatched bundle and entry pref ixes")
236 » » » So(errors.IsTransient(err), ShouldBeFalse) 237 » » » So(transient.Tag.In(err), ShouldBeFalse)
237 }) 238 })
238 239
239 Convey(`Will return no error if the data has a corrupt bundle he ader.`, func() { 240 Convey(`Will return no error if the data has a corrupt bundle he ader.`, func() {
240 So(coll.Process(c, []byte{0x00}), ShouldBeNil) 241 So(coll.Process(c, []byte{0x00}), ShouldBeNil)
241 So(tcc, shouldNotHaveRegisteredStream, "test-project", " foo/+/bar") 242 So(tcc, shouldNotHaveRegisteredStream, "test-project", " foo/+/bar")
242 }) 243 })
243 244
244 Convey(`Will drop bundles with unknown ProtoVersion string.`, fu nc() { 245 Convey(`Will drop bundles with unknown ProtoVersion string.`, fu nc() {
245 buf := bytes.Buffer{} 246 buf := bytes.Buffer{}
246 w := butlerproto.Writer{ProtoVersion: "!!!invalid!!!"} 247 w := butlerproto.Writer{ProtoVersion: "!!!invalid!!!"}
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after
316 testCollectorImpl(t, false) 317 testCollectorImpl(t, false)
317 } 318 }
318 319
319 // TestCollectorWithCaching runs through a series of end-to-end Collector 320 // TestCollectorWithCaching runs through a series of end-to-end Collector
320 // workflows and ensures that the Collector behaves appropriately. 321 // workflows and ensures that the Collector behaves appropriately.
321 func TestCollectorWithCaching(t *testing.T) { 322 func TestCollectorWithCaching(t *testing.T) {
322 t.Parallel() 323 t.Parallel()
323 324
324 testCollectorImpl(t, true) 325 testCollectorImpl(t, true)
325 } 326 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698