| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package bundler | 5 package bundler |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "container/heap" | 8 "container/heap" |
| 9 "fmt" | 9 "fmt" |
| 10 "sync" | 10 "sync" |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 45 // bundled data. Other factors can cause the bundle to be sent before th
is, | 45 // bundled data. Other factors can cause the bundle to be sent before th
is, |
| 46 // but it is an upper bound. | 46 // but it is an upper bound. |
| 47 MaxBufferDelay time.Duration | 47 MaxBufferDelay time.Duration |
| 48 } | 48 } |
| 49 | 49 |
| 50 type bundlerStream interface { | 50 type bundlerStream interface { |
| 51 isDrained() bool | 51 isDrained() bool |
| 52 name() string | 52 name() string |
| 53 expireTime() (time.Time, bool) | 53 expireTime() (time.Time, bool) |
| 54 nextBundleEntry(*builder, bool) bool | 54 nextBundleEntry(*builder, bool) bool |
| 55 streamDesc() *logpb.LogStreamDescriptor |
| 55 } | 56 } |
| 56 | 57 |
| 57 // Bundler is the main Bundler instance. It exposes goroutine-safe endpoints for | 58 // Bundler is the main Bundler instance. It exposes goroutine-safe endpoints for |
| 58 // stream registration and bundle consumption. | 59 // stream registration and bundle consumption. |
| 59 type Bundler struct { | 60 type Bundler struct { |
| 60 c *Config | 61 c *Config |
| 61 | 62 |
| 62 // finishedC is closed when makeBundles goroutine has terminated. | 63 // finishedC is closed when makeBundles goroutine has terminated. |
| 63 finishedC chan struct{} | 64 finishedC chan struct{} |
| 64 bundleC chan *logpb.ButlerLogBundle | 65 bundleC chan *logpb.ButlerLogBundle |
| (...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 134 return nil, fmt.Errorf("a Stream is already registered for %q",
p.Name) | 135 return nil, fmt.Errorf("a Stream is already registered for %q",
p.Name) |
| 135 } | 136 } |
| 136 | 137 |
| 137 // Create a new stream. This will kick off its processing goroutine, whi
ch | 138 // Create a new stream. This will kick off its processing goroutine, whi
ch |
| 138 // will not stop until it is closed. | 139 // will not stop until it is closed. |
| 139 s := newStream(c) | 140 s := newStream(c) |
| 140 b.registerStreamLocked(s) | 141 b.registerStreamLocked(s) |
| 141 return s, nil | 142 return s, nil |
| 142 } | 143 } |
| 143 | 144 |
| 145 // GetStreamDescs returns the set of registered stream names mapped to their |
| 146 // descriptors. |
| 147 // |
| 148 // This is intended for testing purposes. DO NOT modify the resulting |
| 149 // descriptors. |
| 150 func (b *Bundler) GetStreamDescs() map[string]*logpb.LogStreamDescriptor { |
| 151 b.streamsLock.Lock() |
| 152 defer b.streamsLock.Unlock() |
| 153 |
| 154 if len(b.streams) == 0 { |
| 155 return nil |
| 156 } |
| 157 |
| 158 streams := make(map[string]*logpb.LogStreamDescriptor, len(b.streams)) |
| 159 for k, s := range b.streams { |
| 160 streams[k] = s.streamDesc() |
| 161 } |
| 162 return streams |
| 163 } |
| 164 |
| 144 // CloseAndFlush closes the Bundler, alerting it that no more streams will be | 165 // CloseAndFlush closes the Bundler, alerting it that no more streams will be |
| 145 // added and that existing data may be aggressively output. | 166 // added and that existing data may be aggressively output. |
| 146 // | 167 // |
| 147 // CloseAndFlush will block until all buffered data has been consumed. | 168 // CloseAndFlush will block until all buffered data has been consumed. |
| 148 func (b *Bundler) CloseAndFlush() { | 169 func (b *Bundler) CloseAndFlush() { |
| 149 // Mark that we're flushing. This will cause us to perform more aggressi
ve | 170 // Mark that we're flushing. This will cause us to perform more aggressi
ve |
| 150 // bundling in Next(). | 171 // bundling in Next(). |
| 151 b.startFlushing() | 172 b.startFlushing() |
| 152 <-b.finishedC | 173 <-b.finishedC |
| 153 } | 174 } |
| (...skipping 339 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 493 | 514 |
| 494 func (s *streamState) Push(x interface{}) { | 515 func (s *streamState) Push(x interface{}) { |
| 495 s.streams = append(s.streams, x.(bundlerStream)) | 516 s.streams = append(s.streams, x.(bundlerStream)) |
| 496 } | 517 } |
| 497 | 518 |
| 498 func (s *streamState) Pop() interface{} { | 519 func (s *streamState) Pop() interface{} { |
| 499 last := s.streams[len(s.streams)-1] | 520 last := s.streams[len(s.streams)-1] |
| 500 s.streams = s.streams[:len(s.streams)-1] | 521 s.streams = s.streams[:len(s.streams)-1] |
| 501 return last | 522 return last |
| 502 } | 523 } |
| OLD | NEW |