| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package collector | 5 package collector |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "errors" | 9 "errors" |
| 10 "fmt" | 10 "fmt" |
| 11 "sort" | 11 "sort" |
| 12 "strings" | 12 "strings" |
| 13 "sync" | 13 "sync" |
| 14 "time" | |
| 15 | 14 |
| 16 "github.com/golang/protobuf/proto" | 15 "github.com/golang/protobuf/proto" |
| 17 "github.com/luci/luci-go/common/clock" | 16 "github.com/luci/luci-go/common/clock" |
| 17 "github.com/luci/luci-go/common/config" |
| 18 "github.com/luci/luci-go/common/logdog/butlerproto" | 18 "github.com/luci/luci-go/common/logdog/butlerproto" |
| 19 "github.com/luci/luci-go/common/logdog/types" | 19 "github.com/luci/luci-go/common/logdog/types" |
| 20 "github.com/luci/luci-go/common/proto/google" | 20 "github.com/luci/luci-go/common/proto/google" |
| 21 "github.com/luci/luci-go/common/proto/logdog/logpb" | 21 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 22 cc "github.com/luci/luci-go/server/internal/logdog/collector/coordinator
" | 22 cc "github.com/luci/luci-go/server/internal/logdog/collector/coordinator
" |
| 23 "github.com/luci/luci-go/server/logdog/storage" | 23 "github.com/luci/luci-go/server/logdog/storage" |
| 24 "golang.org/x/net/context" | 24 "golang.org/x/net/context" |
| 25 ) | 25 ) |
| 26 | 26 |
| 27 var testSecret = bytes.Repeat([]byte{0x55}, types.PrefixSecretLength) | 27 var testSecret = bytes.Repeat([]byte{0x55}, types.PrefixSecretLength) |
| 28 | 28 |
| 29 type streamKey struct { |
| 30 project string |
| 31 name string |
| 32 } |
| 33 |
| 34 func mkStreamKey(project, name string) streamKey { |
| 35 return streamKey{project, name} |
| 36 } |
| 37 |
| 29 // testCoordinator is an implementation of Coordinator that can be used for | 38 // testCoordinator is an implementation of Coordinator that can be used for |
| 30 // testing. | 39 // testing. |
| 31 type testCoordinator struct { | 40 type testCoordinator struct { |
| 32 sync.Mutex | 41 sync.Mutex |
| 33 | 42 |
| 34 // errC is a channel that error status will be read from if not nil. | 43 // errC is a channel that error status will be read from if not nil. |
| 35 errC chan error | 44 errC chan error |
| 36 | 45 |
| 37 // state is the latest tracked stream state. | 46 // state is the latest tracked stream state. |
| 38 » state map[string]*cc.LogStreamState | 47 » state map[streamKey]*cc.LogStreamState |
| 39 } | 48 } |
| 40 | 49 |
| 41 var _ cc.Coordinator = (*testCoordinator)(nil) | 50 var _ cc.Coordinator = (*testCoordinator)(nil) |
| 42 | 51 |
| 43 func (c *testCoordinator) register(s cc.LogStreamState) cc.LogStreamState { | 52 func (c *testCoordinator) register(s cc.LogStreamState) cc.LogStreamState { |
| 44 c.Lock() | 53 c.Lock() |
| 45 defer c.Unlock() | 54 defer c.Unlock() |
| 46 | 55 |
| 47 // Update our state. | 56 // Update our state. |
| 48 if c.state == nil { | 57 if c.state == nil { |
| 49 » » c.state = make(map[string]*cc.LogStreamState) | 58 » » c.state = make(map[streamKey]*cc.LogStreamState) |
| 50 } | 59 } |
| 51 » if sp := c.state[string(s.Path)]; sp != nil { | 60 |
| 61 » key := mkStreamKey(string(s.Project), string(s.Path)) |
| 62 » if sp := c.state[key]; sp != nil { |
| 52 return *sp | 63 return *sp |
| 53 } | 64 } |
| 54 » c.state[string(s.Path)] = &s | 65 » c.state[key] = &s |
| 55 return s | 66 return s |
| 56 } | 67 } |
| 57 | 68 |
| 58 func (c *testCoordinator) RegisterStream(ctx context.Context, s *cc.LogStreamSta
te, d *logpb.LogStreamDescriptor) ( | 69 func (c *testCoordinator) RegisterStream(ctx context.Context, s *cc.LogStreamSta
te, d *logpb.LogStreamDescriptor) ( |
| 59 *cc.LogStreamState, error) { | 70 *cc.LogStreamState, error) { |
| 60 if err := c.enter(); err != nil { | 71 if err := c.enter(); err != nil { |
| 61 return nil, err | 72 return nil, err |
| 62 } | 73 } |
| 63 | 74 |
| 64 // Enforce that the submitted stream is not terminal. | 75 // Enforce that the submitted stream is not terminal. |
| (...skipping 11 matching lines...) Expand all Loading... |
| 76 } | 87 } |
| 77 | 88 |
| 78 if st.TerminalIndex < 0 { | 89 if st.TerminalIndex < 0 { |
| 79 return errors.New("submitted stream is not terminal") | 90 return errors.New("submitted stream is not terminal") |
| 80 } | 91 } |
| 81 | 92 |
| 82 c.Lock() | 93 c.Lock() |
| 83 defer c.Unlock() | 94 defer c.Unlock() |
| 84 | 95 |
| 85 // Update our state. | 96 // Update our state. |
| 86 » cachedState, ok := c.state[string(st.Path)] | 97 » cachedState, ok := c.state[mkStreamKey(string(st.Project), string(st.Pat
h))] |
| 87 if !ok { | 98 if !ok { |
| 88 return fmt.Errorf("no such stream: %s", st.Path) | 99 return fmt.Errorf("no such stream: %s", st.Path) |
| 89 } | 100 } |
| 90 if cachedState.TerminalIndex >= 0 && st.TerminalIndex != cachedState.Ter
minalIndex { | 101 if cachedState.TerminalIndex >= 0 && st.TerminalIndex != cachedState.Ter
minalIndex { |
| 91 return fmt.Errorf("incompatible terminal indexes: %d != %d", st.
TerminalIndex, cachedState.TerminalIndex) | 102 return fmt.Errorf("incompatible terminal indexes: %d != %d", st.
TerminalIndex, cachedState.TerminalIndex) |
| 92 } | 103 } |
| 93 | 104 |
| 94 cachedState.TerminalIndex = st.TerminalIndex | 105 cachedState.TerminalIndex = st.TerminalIndex |
| 95 return nil | 106 return nil |
| 96 } | 107 } |
| 97 | 108 |
| 98 // enter is an entry point for client goroutines. It offers the opportunity | 109 // enter is an entry point for client goroutines. It offers the opportunity |
| 99 // to trap executing goroutines within client calls. | 110 // to trap executing goroutines within client calls. |
| 100 // | 111 // |
| 101 // This must NOT be called while the lock is held, else it could lead to | 112 // This must NOT be called while the lock is held, else it could lead to |
| 102 // deadlock if multiple goroutines are trapped. | 113 // deadlock if multiple goroutines are trapped. |
| 103 func (c *testCoordinator) enter() error { | 114 func (c *testCoordinator) enter() error { |
| 104 if c.errC != nil { | 115 if c.errC != nil { |
| 105 return <-c.errC | 116 return <-c.errC |
| 106 } | 117 } |
| 107 return nil | 118 return nil |
| 108 } | 119 } |
| 109 | 120 |
| 110 func (c *testCoordinator) stream(name string) (int, bool) { | 121 func (c *testCoordinator) stream(project, name string) (int, bool) { |
| 111 c.Lock() | 122 c.Lock() |
| 112 defer c.Unlock() | 123 defer c.Unlock() |
| 113 | 124 |
| 114 » sp, ok := c.state[name] | 125 » sp, ok := c.state[mkStreamKey(project, name)] |
| 115 if !ok { | 126 if !ok { |
| 116 return 0, false | 127 return 0, false |
| 117 } | 128 } |
| 118 return int(sp.TerminalIndex), true | 129 return int(sp.TerminalIndex), true |
| 119 } | 130 } |
| 120 | 131 |
| 121 // testStorage is a testing storage instance that returns errors. | 132 // testStorage is a testing storage instance that returns errors. |
| 122 type testStorage struct { | 133 type testStorage struct { |
| 123 storage.Storage | 134 storage.Storage |
| 124 err func() error | 135 err func() error |
| 125 } | 136 } |
| 126 | 137 |
| 127 func (s *testStorage) Put(r storage.PutRequest) error { | 138 func (s *testStorage) Put(r storage.PutRequest) error { |
| 128 if s.err != nil { | 139 if s.err != nil { |
| 129 if err := s.err(); err != nil { | 140 if err := s.err(); err != nil { |
| 130 return err | 141 return err |
| 131 } | 142 } |
| 132 } | 143 } |
| 133 return s.Storage.Put(r) | 144 return s.Storage.Put(r) |
| 134 } | 145 } |
| 135 | 146 |
| 136 // bundleBuilder is a set of utility functions to help test cases construct | 147 // bundleBuilder is a set of utility functions to help test cases construct |
| 137 // specific logpb.ButlerLogBundle layouts. | 148 // specific logpb.ButlerLogBundle layouts. |
| 138 type bundleBuilder struct { | 149 type bundleBuilder struct { |
| 139 context.Context | 150 context.Context |
| 140 | 151 |
| 141 » base time.Time | 152 » base *logpb.ButlerLogBundle |
| 142 » entries []*logpb.ButlerLogBundle_Entry | 153 } |
| 154 |
| 155 func (b *bundleBuilder) genBase() *logpb.ButlerLogBundle { |
| 156 » if b.base == nil { |
| 157 » » b.base = &logpb.ButlerLogBundle{ |
| 158 » » » Source: "test stream", |
| 159 » » » Timestamp: google.NewTimestamp(clock.Now(b)), |
| 160 » » » Project: "test-project", |
| 161 » » » Prefix: "foo", |
| 162 » » » Secret: testSecret, |
| 163 » » } |
| 164 » } |
| 165 » return b.base |
| 143 } | 166 } |
| 144 | 167 |
| 145 func (b *bundleBuilder) addBundleEntry(be *logpb.ButlerLogBundle_Entry) { | 168 func (b *bundleBuilder) addBundleEntry(be *logpb.ButlerLogBundle_Entry) { |
| 146 » if b.base.IsZero() { | 169 » base := b.genBase() |
| 147 » » b.base = clock.Now(b) | 170 » base.Entries = append(base.Entries, be) |
| 148 » } | |
| 149 | |
| 150 » b.entries = append(b.entries, be) | |
| 151 } | 171 } |
| 152 | 172 |
| 153 func (b *bundleBuilder) genBundleEntry(name string, tidx int, idxs ...int) *logp
b.ButlerLogBundle_Entry { | 173 func (b *bundleBuilder) genBundleEntry(name string, tidx int, idxs ...int) *logp
b.ButlerLogBundle_Entry { |
| 154 p, n := types.StreamPath(name).Split() | 174 p, n := types.StreamPath(name).Split() |
| 155 be := logpb.ButlerLogBundle_Entry{ | 175 be := logpb.ButlerLogBundle_Entry{ |
| 156 Secret: testSecret, | |
| 157 Desc: &logpb.LogStreamDescriptor{ | 176 Desc: &logpb.LogStreamDescriptor{ |
| 158 Prefix: string(p), | 177 Prefix: string(p), |
| 159 Name: string(n), | 178 Name: string(n), |
| 160 ContentType: "application/test-message", | 179 ContentType: "application/test-message", |
| 161 StreamType: logpb.StreamType_TEXT, | 180 StreamType: logpb.StreamType_TEXT, |
| 162 Timestamp: google.NewTimestamp(clock.Now(b)), | 181 Timestamp: google.NewTimestamp(clock.Now(b)), |
| 163 }, | 182 }, |
| 164 } | 183 } |
| 165 | 184 |
| 166 if len(idxs) > 0 { | 185 if len(idxs) > 0 { |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 200 Value: fmt.Sprintf("Line #%d
", idx), | 219 Value: fmt.Sprintf("Line #%d
", idx), |
| 201 Delimiter: "\n", | 220 Delimiter: "\n", |
| 202 }, | 221 }, |
| 203 }, | 222 }, |
| 204 }, | 223 }, |
| 205 }, | 224 }, |
| 206 } | 225 } |
| 207 } | 226 } |
| 208 | 227 |
| 209 func (b *bundleBuilder) bundle() []byte { | 228 func (b *bundleBuilder) bundle() []byte { |
| 210 » bytes := b.bundleWithEntries(b.entries...) | 229 » buf := bytes.Buffer{} |
| 211 » b.entries = nil | 230 » w := butlerproto.Writer{Compress: true} |
| 212 | 231 » if err := w.Write(&buf, b.genBase()); err != nil { |
| 213 » return bytes | 232 » » panic(err) |
| 214 } | |
| 215 | |
| 216 func (b *bundleBuilder) bundleWithEntries(e ...*logpb.ButlerLogBundle_Entry) []b
yte { | |
| 217 » bundle := logpb.ButlerLogBundle{ | |
| 218 » » Source: "test stream", | |
| 219 » » Timestamp: google.NewTimestamp(clock.Now(b)), | |
| 220 » » Entries: e, | |
| 221 } | 233 } |
| 222 | 234 |
| 223 » buf := bytes.Buffer{} | 235 » b.base = nil |
| 224 » w := butlerproto.Writer{Compress: true} | |
| 225 » if err := w.Write(&buf, &bundle); err != nil { | |
| 226 » » panic(err) | |
| 227 » } | |
| 228 return buf.Bytes() | 236 return buf.Bytes() |
| 229 } | 237 } |
| 230 | 238 |
| 231 type indexRange struct { | 239 type indexRange struct { |
| 232 start int | 240 start int |
| 233 end int | 241 end int |
| 234 } | 242 } |
| 235 | 243 |
| 236 func (r *indexRange) String() string { return fmt.Sprintf("[%d..%d]", r.start, r
.end) } | 244 func (r *indexRange) String() string { return fmt.Sprintf("[%d..%d]", r.start, r
.end) } |
| 237 | 245 |
| 238 // shouldHaveRegisteredStream asserts that a testCoordinator has | 246 // shouldHaveRegisteredStream asserts that a testCoordinator has |
| 239 // registered a stream (string) and its terminal index (int). | 247 // registered a stream (string) and its terminal index (int). |
| 240 func shouldHaveRegisteredStream(actual interface{}, expected ...interface{}) str
ing { | 248 func shouldHaveRegisteredStream(actual interface{}, expected ...interface{}) str
ing { |
| 241 tcc := actual.(*testCoordinator) | 249 tcc := actual.(*testCoordinator) |
| 242 name := expected[0].(string) | |
| 243 tidx := expected[1].(int) | |
| 244 | 250 |
| 245 » cur, ok := tcc.stream(name) | 251 » if len(expected) != 3 { |
| 252 » » return "invalid number of expected arguments (should be 3)." |
| 253 » } |
| 254 » project := expected[0].(string) |
| 255 » name := expected[1].(string) |
| 256 » tidx := expected[2].(int) |
| 257 |
| 258 » cur, ok := tcc.stream(project, name) |
| 246 if !ok { | 259 if !ok { |
| 247 return fmt.Sprintf("stream %q is not registered", name) | 260 return fmt.Sprintf("stream %q is not registered", name) |
| 248 } | 261 } |
| 249 if tidx >= 0 && cur < 0 { | 262 if tidx >= 0 && cur < 0 { |
| 250 return fmt.Sprintf("stream %q is expected to be terminated, but
isn't.", name) | 263 return fmt.Sprintf("stream %q is expected to be terminated, but
isn't.", name) |
| 251 } | 264 } |
| 252 if cur >= 0 && tidx < 0 { | 265 if cur >= 0 && tidx < 0 { |
| 253 return fmt.Sprintf("stream %q is NOT expected to be terminated,
but it is.", name) | 266 return fmt.Sprintf("stream %q is NOT expected to be terminated,
but it is.", name) |
| 254 } | 267 } |
| 255 return "" | 268 return "" |
| 256 } | 269 } |
| 257 | 270 |
| 258 // shoudNotHaveRegisteredStream asserts that a testCoordinator has not | 271 // shoudNotHaveRegisteredStream asserts that a testCoordinator has not |
| 259 // registered a stream (string). | 272 // registered a stream (string). |
| 260 func shouldNotHaveRegisteredStream(actual interface{}, expected ...interface{})
string { | 273 func shouldNotHaveRegisteredStream(actual interface{}, expected ...interface{})
string { |
| 261 tcc := actual.(*testCoordinator) | 274 tcc := actual.(*testCoordinator) |
| 262 » name := expected[0].(string) | 275 » if len(expected) != 2 { |
| 276 » » return "invalid number of expected arguments (should be 2)." |
| 277 » } |
| 278 » project := expected[0].(string) |
| 279 » name := expected[1].(string) |
| 263 | 280 |
| 264 » if _, ok := tcc.stream(name); ok { | 281 » if _, ok := tcc.stream(project, name); ok { |
| 265 return fmt.Sprintf("stream %q is registered, but it should NOT b
e.", name) | 282 return fmt.Sprintf("stream %q is registered, but it should NOT b
e.", name) |
| 266 } | 283 } |
| 267 return "" | 284 return "" |
| 268 } | 285 } |
| 269 | 286 |
| 270 // shouldHaveStoredStream asserts that a storage.Storage instance has contiguous | 287 // shouldHaveStoredStream asserts that a storage.Storage instance has contiguous |
| 271 // stream records in it. | 288 // stream records in it. |
| 272 // | 289 // |
| 273 // actual is the storage.Storage instance. expected is a stream name (string) | 290 // actual is the storage.Storage instance. expected is a stream name (string) |
| 274 // followed by a a series of records to assert. This can either be a specific | 291 // followed by a a series of records to assert. This can either be a specific |
| 275 // integer index or an intexRange marking a closed range of indices. | 292 // integer index or an intexRange marking a closed range of indices. |
| 276 func shouldHaveStoredStream(actual interface{}, expected ...interface{}) string
{ | 293 func shouldHaveStoredStream(actual interface{}, expected ...interface{}) string
{ |
| 277 st := actual.(storage.Storage) | 294 st := actual.(storage.Storage) |
| 278 » name := expected[0].(string) | 295 » project := expected[0].(string) |
| 296 » name := expected[1].(string) |
| 297 » expected = expected[2:] |
| 279 | 298 |
| 280 // Load all entries for this stream. | 299 // Load all entries for this stream. |
| 281 req := storage.GetRequest{ | 300 req := storage.GetRequest{ |
| 282 » » Path: types.StreamPath(name), | 301 » » Project: config.ProjectName(project), |
| 302 » » Path: types.StreamPath(name), |
| 283 } | 303 } |
| 284 | 304 |
| 285 entries := make(map[int]*logpb.LogEntry) | 305 entries := make(map[int]*logpb.LogEntry) |
| 286 var ierr error | 306 var ierr error |
| 287 err := st.Get(req, func(idx types.MessageIndex, d []byte) bool { | 307 err := st.Get(req, func(idx types.MessageIndex, d []byte) bool { |
| 288 le := logpb.LogEntry{} | 308 le := logpb.LogEntry{} |
| 289 if ierr = proto.Unmarshal(d, &le); ierr != nil { | 309 if ierr = proto.Unmarshal(d, &le); ierr != nil { |
| 290 return false | 310 return false |
| 291 } | 311 } |
| 292 entries[int(idx)] = &le | 312 entries[int(idx)] = &le |
| (...skipping 13 matching lines...) Expand all Loading... |
| 306 } | 326 } |
| 307 delete(entries, i) | 327 delete(entries, i) |
| 308 | 328 |
| 309 if le.StreamIndex != uint64(i) { | 329 if le.StreamIndex != uint64(i) { |
| 310 return fmt.Sprintf("*%d", i) | 330 return fmt.Sprintf("*%d", i) |
| 311 } | 331 } |
| 312 return "" | 332 return "" |
| 313 } | 333 } |
| 314 | 334 |
| 315 var failed []string | 335 var failed []string |
| 316 » for _, exp := range expected[1:] { | 336 » for _, exp := range expected { |
| 317 switch e := exp.(type) { | 337 switch e := exp.(type) { |
| 318 case int: | 338 case int: |
| 319 if err := assertLogEntry(e); err != "" { | 339 if err := assertLogEntry(e); err != "" { |
| 320 failed = append(failed, fmt.Sprintf("missing{%s}
", err)) | 340 failed = append(failed, fmt.Sprintf("missing{%s}
", err)) |
| 321 } | 341 } |
| 322 | 342 |
| 323 case indexRange: | 343 case indexRange: |
| 324 var errs []string | 344 var errs []string |
| 325 for i := e.start; i <= e.end; i++ { | 345 for i := e.start; i <= e.end; i++ { |
| 326 if err := assertLogEntry(i); err != "" { | 346 if err := assertLogEntry(i); err != "" { |
| (...skipping 22 matching lines...) Expand all Loading... |
| 349 extra[i] = fmt.Sprintf("%d", idx) | 369 extra[i] = fmt.Sprintf("%d", idx) |
| 350 } | 370 } |
| 351 failed = append(failed, fmt.Sprintf("extra{%s}", strings.Join(ex
tra, ","))) | 371 failed = append(failed, fmt.Sprintf("extra{%s}", strings.Join(ex
tra, ","))) |
| 352 } | 372 } |
| 353 | 373 |
| 354 if len(failed) > 0 { | 374 if len(failed) > 0 { |
| 355 return strings.Join(failed, ", ") | 375 return strings.Join(failed, ", ") |
| 356 } | 376 } |
| 357 return "" | 377 return "" |
| 358 } | 378 } |
| OLD | NEW |