OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package collector | 5 package collector |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "errors" | 9 "errors" |
10 "fmt" | 10 "fmt" |
11 "sort" | 11 "sort" |
12 "strings" | 12 "strings" |
13 "sync" | 13 "sync" |
14 "time" | |
15 | 14 |
16 "github.com/golang/protobuf/proto" | 15 "github.com/golang/protobuf/proto" |
17 "github.com/luci/luci-go/common/clock" | 16 "github.com/luci/luci-go/common/clock" |
| 17 "github.com/luci/luci-go/common/config" |
18 "github.com/luci/luci-go/common/logdog/butlerproto" | 18 "github.com/luci/luci-go/common/logdog/butlerproto" |
19 "github.com/luci/luci-go/common/logdog/types" | 19 "github.com/luci/luci-go/common/logdog/types" |
20 "github.com/luci/luci-go/common/proto/google" | 20 "github.com/luci/luci-go/common/proto/google" |
21 "github.com/luci/luci-go/common/proto/logdog/logpb" | 21 "github.com/luci/luci-go/common/proto/logdog/logpb" |
22 cc "github.com/luci/luci-go/server/internal/logdog/collector/coordinator
" | 22 cc "github.com/luci/luci-go/server/internal/logdog/collector/coordinator
" |
23 "github.com/luci/luci-go/server/logdog/storage" | 23 "github.com/luci/luci-go/server/logdog/storage" |
24 "golang.org/x/net/context" | 24 "golang.org/x/net/context" |
25 ) | 25 ) |
26 | 26 |
27 var testSecret = bytes.Repeat([]byte{0x55}, types.PrefixSecretLength) | 27 var testSecret = bytes.Repeat([]byte{0x55}, types.PrefixSecretLength) |
28 | 28 |
| 29 type streamKey struct { |
| 30 project string |
| 31 name string |
| 32 } |
| 33 |
| 34 func mkStreamKey(project, name string) streamKey { |
| 35 return streamKey{project, name} |
| 36 } |
| 37 |
29 // testCoordinator is an implementation of Coordinator that can be used for | 38 // testCoordinator is an implementation of Coordinator that can be used for |
30 // testing. | 39 // testing. |
31 type testCoordinator struct { | 40 type testCoordinator struct { |
32 sync.Mutex | 41 sync.Mutex |
33 | 42 |
34 // errC is a channel that error status will be read from if not nil. | 43 // errC is a channel that error status will be read from if not nil. |
35 errC chan error | 44 errC chan error |
36 | 45 |
37 // state is the latest tracked stream state. | 46 // state is the latest tracked stream state. |
38 » state map[string]*cc.LogStreamState | 47 » state map[streamKey]*cc.LogStreamState |
39 } | 48 } |
40 | 49 |
41 var _ cc.Coordinator = (*testCoordinator)(nil) | 50 var _ cc.Coordinator = (*testCoordinator)(nil) |
42 | 51 |
43 func (c *testCoordinator) register(s cc.LogStreamState) cc.LogStreamState { | 52 func (c *testCoordinator) register(s cc.LogStreamState) cc.LogStreamState { |
44 c.Lock() | 53 c.Lock() |
45 defer c.Unlock() | 54 defer c.Unlock() |
46 | 55 |
47 // Update our state. | 56 // Update our state. |
48 if c.state == nil { | 57 if c.state == nil { |
49 » » c.state = make(map[string]*cc.LogStreamState) | 58 » » c.state = make(map[streamKey]*cc.LogStreamState) |
50 } | 59 } |
51 » if sp := c.state[string(s.Path)]; sp != nil { | 60 |
| 61 » key := mkStreamKey(string(s.Project), string(s.Path)) |
| 62 » if sp := c.state[key]; sp != nil { |
52 return *sp | 63 return *sp |
53 } | 64 } |
54 » c.state[string(s.Path)] = &s | 65 » c.state[key] = &s |
55 return s | 66 return s |
56 } | 67 } |
57 | 68 |
58 func (c *testCoordinator) RegisterStream(ctx context.Context, s *cc.LogStreamSta
te, d *logpb.LogStreamDescriptor) ( | 69 func (c *testCoordinator) RegisterStream(ctx context.Context, s *cc.LogStreamSta
te, d *logpb.LogStreamDescriptor) ( |
59 *cc.LogStreamState, error) { | 70 *cc.LogStreamState, error) { |
60 if err := c.enter(); err != nil { | 71 if err := c.enter(); err != nil { |
61 return nil, err | 72 return nil, err |
62 } | 73 } |
63 | 74 |
64 // Enforce that the submitted stream is not terminal. | 75 // Enforce that the submitted stream is not terminal. |
(...skipping 11 matching lines...) Expand all Loading... |
76 } | 87 } |
77 | 88 |
78 if st.TerminalIndex < 0 { | 89 if st.TerminalIndex < 0 { |
79 return errors.New("submitted stream is not terminal") | 90 return errors.New("submitted stream is not terminal") |
80 } | 91 } |
81 | 92 |
82 c.Lock() | 93 c.Lock() |
83 defer c.Unlock() | 94 defer c.Unlock() |
84 | 95 |
85 // Update our state. | 96 // Update our state. |
86 » cachedState, ok := c.state[string(st.Path)] | 97 » cachedState, ok := c.state[mkStreamKey(string(st.Project), string(st.Pat
h))] |
87 if !ok { | 98 if !ok { |
88 return fmt.Errorf("no such stream: %s", st.Path) | 99 return fmt.Errorf("no such stream: %s", st.Path) |
89 } | 100 } |
90 if cachedState.TerminalIndex >= 0 && st.TerminalIndex != cachedState.Ter
minalIndex { | 101 if cachedState.TerminalIndex >= 0 && st.TerminalIndex != cachedState.Ter
minalIndex { |
91 return fmt.Errorf("incompatible terminal indexes: %d != %d", st.
TerminalIndex, cachedState.TerminalIndex) | 102 return fmt.Errorf("incompatible terminal indexes: %d != %d", st.
TerminalIndex, cachedState.TerminalIndex) |
92 } | 103 } |
93 | 104 |
94 cachedState.TerminalIndex = st.TerminalIndex | 105 cachedState.TerminalIndex = st.TerminalIndex |
95 return nil | 106 return nil |
96 } | 107 } |
97 | 108 |
98 // enter is an entry point for client goroutines. It offers the opportunity | 109 // enter is an entry point for client goroutines. It offers the opportunity |
99 // to trap executing goroutines within client calls. | 110 // to trap executing goroutines within client calls. |
100 // | 111 // |
101 // This must NOT be called while the lock is held, else it could lead to | 112 // This must NOT be called while the lock is held, else it could lead to |
102 // deadlock if multiple goroutines are trapped. | 113 // deadlock if multiple goroutines are trapped. |
103 func (c *testCoordinator) enter() error { | 114 func (c *testCoordinator) enter() error { |
104 if c.errC != nil { | 115 if c.errC != nil { |
105 return <-c.errC | 116 return <-c.errC |
106 } | 117 } |
107 return nil | 118 return nil |
108 } | 119 } |
109 | 120 |
110 func (c *testCoordinator) stream(name string) (int, bool) { | 121 func (c *testCoordinator) stream(project, name string) (int, bool) { |
111 c.Lock() | 122 c.Lock() |
112 defer c.Unlock() | 123 defer c.Unlock() |
113 | 124 |
114 » sp, ok := c.state[name] | 125 » sp, ok := c.state[mkStreamKey(project, name)] |
115 if !ok { | 126 if !ok { |
116 return 0, false | 127 return 0, false |
117 } | 128 } |
118 return int(sp.TerminalIndex), true | 129 return int(sp.TerminalIndex), true |
119 } | 130 } |
120 | 131 |
121 // testStorage is a testing storage instance that returns errors. | 132 // testStorage is a testing storage instance that returns errors. |
122 type testStorage struct { | 133 type testStorage struct { |
123 storage.Storage | 134 storage.Storage |
124 err func() error | 135 err func() error |
125 } | 136 } |
126 | 137 |
127 func (s *testStorage) Put(r storage.PutRequest) error { | 138 func (s *testStorage) Put(r storage.PutRequest) error { |
128 if s.err != nil { | 139 if s.err != nil { |
129 if err := s.err(); err != nil { | 140 if err := s.err(); err != nil { |
130 return err | 141 return err |
131 } | 142 } |
132 } | 143 } |
133 return s.Storage.Put(r) | 144 return s.Storage.Put(r) |
134 } | 145 } |
135 | 146 |
136 // bundleBuilder is a set of utility functions to help test cases construct | 147 // bundleBuilder is a set of utility functions to help test cases construct |
137 // specific logpb.ButlerLogBundle layouts. | 148 // specific logpb.ButlerLogBundle layouts. |
138 type bundleBuilder struct { | 149 type bundleBuilder struct { |
139 context.Context | 150 context.Context |
140 | 151 |
141 » base time.Time | 152 » base *logpb.ButlerLogBundle |
142 » entries []*logpb.ButlerLogBundle_Entry | 153 } |
| 154 |
| 155 func (b *bundleBuilder) genBase() *logpb.ButlerLogBundle { |
| 156 » if b.base == nil { |
| 157 » » b.base = &logpb.ButlerLogBundle{ |
| 158 » » » Source: "test stream", |
| 159 » » » Timestamp: google.NewTimestamp(clock.Now(b)), |
| 160 » » » Project: "test-project", |
| 161 » » » Prefix: "foo", |
| 162 » » » Secret: testSecret, |
| 163 » » } |
| 164 » } |
| 165 » return b.base |
143 } | 166 } |
144 | 167 |
145 func (b *bundleBuilder) addBundleEntry(be *logpb.ButlerLogBundle_Entry) { | 168 func (b *bundleBuilder) addBundleEntry(be *logpb.ButlerLogBundle_Entry) { |
146 » if b.base.IsZero() { | 169 » base := b.genBase() |
147 » » b.base = clock.Now(b) | 170 » base.Entries = append(base.Entries, be) |
148 » } | |
149 | |
150 » b.entries = append(b.entries, be) | |
151 } | 171 } |
152 | 172 |
153 func (b *bundleBuilder) genBundleEntry(name string, tidx int, idxs ...int) *logp
b.ButlerLogBundle_Entry { | 173 func (b *bundleBuilder) genBundleEntry(name string, tidx int, idxs ...int) *logp
b.ButlerLogBundle_Entry { |
154 p, n := types.StreamPath(name).Split() | 174 p, n := types.StreamPath(name).Split() |
155 be := logpb.ButlerLogBundle_Entry{ | 175 be := logpb.ButlerLogBundle_Entry{ |
156 Secret: testSecret, | |
157 Desc: &logpb.LogStreamDescriptor{ | 176 Desc: &logpb.LogStreamDescriptor{ |
158 Prefix: string(p), | 177 Prefix: string(p), |
159 Name: string(n), | 178 Name: string(n), |
160 ContentType: "application/test-message", | 179 ContentType: "application/test-message", |
161 StreamType: logpb.StreamType_TEXT, | 180 StreamType: logpb.StreamType_TEXT, |
162 Timestamp: google.NewTimestamp(clock.Now(b)), | 181 Timestamp: google.NewTimestamp(clock.Now(b)), |
163 }, | 182 }, |
164 } | 183 } |
165 | 184 |
166 if len(idxs) > 0 { | 185 if len(idxs) > 0 { |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
200 Value: fmt.Sprintf("Line #%d
", idx), | 219 Value: fmt.Sprintf("Line #%d
", idx), |
201 Delimiter: "\n", | 220 Delimiter: "\n", |
202 }, | 221 }, |
203 }, | 222 }, |
204 }, | 223 }, |
205 }, | 224 }, |
206 } | 225 } |
207 } | 226 } |
208 | 227 |
209 func (b *bundleBuilder) bundle() []byte { | 228 func (b *bundleBuilder) bundle() []byte { |
210 » bytes := b.bundleWithEntries(b.entries...) | 229 » buf := bytes.Buffer{} |
211 » b.entries = nil | 230 » w := butlerproto.Writer{Compress: true} |
212 | 231 » if err := w.Write(&buf, b.genBase()); err != nil { |
213 » return bytes | 232 » » panic(err) |
214 } | |
215 | |
216 func (b *bundleBuilder) bundleWithEntries(e ...*logpb.ButlerLogBundle_Entry) []b
yte { | |
217 » bundle := logpb.ButlerLogBundle{ | |
218 » » Source: "test stream", | |
219 » » Timestamp: google.NewTimestamp(clock.Now(b)), | |
220 » » Entries: e, | |
221 } | 233 } |
222 | 234 |
223 » buf := bytes.Buffer{} | 235 » b.base = nil |
224 » w := butlerproto.Writer{Compress: true} | |
225 » if err := w.Write(&buf, &bundle); err != nil { | |
226 » » panic(err) | |
227 » } | |
228 return buf.Bytes() | 236 return buf.Bytes() |
229 } | 237 } |
230 | 238 |
231 type indexRange struct { | 239 type indexRange struct { |
232 start int | 240 start int |
233 end int | 241 end int |
234 } | 242 } |
235 | 243 |
236 func (r *indexRange) String() string { return fmt.Sprintf("[%d..%d]", r.start, r
.end) } | 244 func (r *indexRange) String() string { return fmt.Sprintf("[%d..%d]", r.start, r
.end) } |
237 | 245 |
238 // shouldHaveRegisteredStream asserts that a testCoordinator has | 246 // shouldHaveRegisteredStream asserts that a testCoordinator has |
239 // registered a stream (string) and its terminal index (int). | 247 // registered a stream (string) and its terminal index (int). |
240 func shouldHaveRegisteredStream(actual interface{}, expected ...interface{}) str
ing { | 248 func shouldHaveRegisteredStream(actual interface{}, expected ...interface{}) str
ing { |
241 tcc := actual.(*testCoordinator) | 249 tcc := actual.(*testCoordinator) |
242 name := expected[0].(string) | |
243 tidx := expected[1].(int) | |
244 | 250 |
245 » cur, ok := tcc.stream(name) | 251 » if len(expected) != 3 { |
| 252 » » return "invalid number of expected arguments (should be 3)." |
| 253 » } |
| 254 » project := expected[0].(string) |
| 255 » name := expected[1].(string) |
| 256 » tidx := expected[2].(int) |
| 257 |
| 258 » cur, ok := tcc.stream(project, name) |
246 if !ok { | 259 if !ok { |
247 return fmt.Sprintf("stream %q is not registered", name) | 260 return fmt.Sprintf("stream %q is not registered", name) |
248 } | 261 } |
249 if tidx >= 0 && cur < 0 { | 262 if tidx >= 0 && cur < 0 { |
250 return fmt.Sprintf("stream %q is expected to be terminated, but
isn't.", name) | 263 return fmt.Sprintf("stream %q is expected to be terminated, but
isn't.", name) |
251 } | 264 } |
252 if cur >= 0 && tidx < 0 { | 265 if cur >= 0 && tidx < 0 { |
253 return fmt.Sprintf("stream %q is NOT expected to be terminated,
but it is.", name) | 266 return fmt.Sprintf("stream %q is NOT expected to be terminated,
but it is.", name) |
254 } | 267 } |
255 return "" | 268 return "" |
256 } | 269 } |
257 | 270 |
258 // shoudNotHaveRegisteredStream asserts that a testCoordinator has not | 271 // shoudNotHaveRegisteredStream asserts that a testCoordinator has not |
259 // registered a stream (string). | 272 // registered a stream (string). |
260 func shouldNotHaveRegisteredStream(actual interface{}, expected ...interface{})
string { | 273 func shouldNotHaveRegisteredStream(actual interface{}, expected ...interface{})
string { |
261 tcc := actual.(*testCoordinator) | 274 tcc := actual.(*testCoordinator) |
262 » name := expected[0].(string) | 275 » if len(expected) != 2 { |
| 276 » » return "invalid number of expected arguments (should be 2)." |
| 277 » } |
| 278 » project := expected[0].(string) |
| 279 » name := expected[1].(string) |
263 | 280 |
264 » if _, ok := tcc.stream(name); ok { | 281 » if _, ok := tcc.stream(project, name); ok { |
265 return fmt.Sprintf("stream %q is registered, but it should NOT b
e.", name) | 282 return fmt.Sprintf("stream %q is registered, but it should NOT b
e.", name) |
266 } | 283 } |
267 return "" | 284 return "" |
268 } | 285 } |
269 | 286 |
270 // shouldHaveStoredStream asserts that a storage.Storage instance has contiguous | 287 // shouldHaveStoredStream asserts that a storage.Storage instance has contiguous |
271 // stream records in it. | 288 // stream records in it. |
272 // | 289 // |
273 // actual is the storage.Storage instance. expected is a stream name (string) | 290 // actual is the storage.Storage instance. expected is a stream name (string) |
274 // followed by a a series of records to assert. This can either be a specific | 291 // followed by a a series of records to assert. This can either be a specific |
275 // integer index or an intexRange marking a closed range of indices. | 292 // integer index or an intexRange marking a closed range of indices. |
276 func shouldHaveStoredStream(actual interface{}, expected ...interface{}) string
{ | 293 func shouldHaveStoredStream(actual interface{}, expected ...interface{}) string
{ |
277 st := actual.(storage.Storage) | 294 st := actual.(storage.Storage) |
278 » name := expected[0].(string) | 295 » project := expected[0].(string) |
| 296 » name := expected[1].(string) |
| 297 » expected = expected[2:] |
279 | 298 |
280 // Load all entries for this stream. | 299 // Load all entries for this stream. |
281 req := storage.GetRequest{ | 300 req := storage.GetRequest{ |
282 » » Path: types.StreamPath(name), | 301 » » Project: config.ProjectName(project), |
| 302 » » Path: types.StreamPath(name), |
283 } | 303 } |
284 | 304 |
285 entries := make(map[int]*logpb.LogEntry) | 305 entries := make(map[int]*logpb.LogEntry) |
286 var ierr error | 306 var ierr error |
287 err := st.Get(req, func(idx types.MessageIndex, d []byte) bool { | 307 err := st.Get(req, func(idx types.MessageIndex, d []byte) bool { |
288 le := logpb.LogEntry{} | 308 le := logpb.LogEntry{} |
289 if ierr = proto.Unmarshal(d, &le); ierr != nil { | 309 if ierr = proto.Unmarshal(d, &le); ierr != nil { |
290 return false | 310 return false |
291 } | 311 } |
292 entries[int(idx)] = &le | 312 entries[int(idx)] = &le |
(...skipping 13 matching lines...) Expand all Loading... |
306 } | 326 } |
307 delete(entries, i) | 327 delete(entries, i) |
308 | 328 |
309 if le.StreamIndex != uint64(i) { | 329 if le.StreamIndex != uint64(i) { |
310 return fmt.Sprintf("*%d", i) | 330 return fmt.Sprintf("*%d", i) |
311 } | 331 } |
312 return "" | 332 return "" |
313 } | 333 } |
314 | 334 |
315 var failed []string | 335 var failed []string |
316 » for _, exp := range expected[1:] { | 336 » for _, exp := range expected { |
317 switch e := exp.(type) { | 337 switch e := exp.(type) { |
318 case int: | 338 case int: |
319 if err := assertLogEntry(e); err != "" { | 339 if err := assertLogEntry(e); err != "" { |
320 failed = append(failed, fmt.Sprintf("missing{%s}
", err)) | 340 failed = append(failed, fmt.Sprintf("missing{%s}
", err)) |
321 } | 341 } |
322 | 342 |
323 case indexRange: | 343 case indexRange: |
324 var errs []string | 344 var errs []string |
325 for i := e.start; i <= e.end; i++ { | 345 for i := e.start; i <= e.end; i++ { |
326 if err := assertLogEntry(i); err != "" { | 346 if err := assertLogEntry(i); err != "" { |
(...skipping 22 matching lines...) Expand all Loading... |
349 extra[i] = fmt.Sprintf("%d", idx) | 369 extra[i] = fmt.Sprintf("%d", idx) |
350 } | 370 } |
351 failed = append(failed, fmt.Sprintf("extra{%s}", strings.Join(ex
tra, ","))) | 371 failed = append(failed, fmt.Sprintf("extra{%s}", strings.Join(ex
tra, ","))) |
352 } | 372 } |
353 | 373 |
354 if len(failed) > 0 { | 374 if len(failed) > 0 { |
355 return strings.Join(failed, ", ") | 375 return strings.Join(failed, ", ") |
356 } | 376 } |
357 return "" | 377 return "" |
358 } | 378 } |
OLD | NEW |