| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package bundler |
| 6 |
| 7 import ( |
| 8 "container/heap" |
| 9 "fmt" |
| 10 "sync" |
| 11 "time" |
| 12 |
| 13 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" |
| 14 "github.com/luci/luci-go/common/clock" |
| 15 "github.com/luci/luci-go/common/logdog/protocol" |
| 16 "github.com/luci/luci-go/common/logdog/types" |
| 17 "github.com/luci/luci-go/common/proto/google" |
| 18 ) |
| 19 |
| 20 // Config is the Bundler configuration. |
| 21 type Config struct { |
| 22 // Clock is the clock instance that will be used for Bundler and stream |
| 23 // timing. |
| 24 Clock clock.Clock |
| 25 |
| 26 // Source is the bundle source string to use. This can be empty if there
is no |
| 27 // source information to include. |
| 28 Source string |
| 29 |
| 30 // MaxBufferedBytes is the maximum number of bytes to buffer in memory p
er |
| 31 // stream. |
| 32 MaxBufferedBytes int64 |
| 33 |
| 34 // MaxBundleSize is the maximum bundle size in bytes that may be generat
ed. |
| 35 // |
| 36 // If this value is zero, no size constraint will be applied to generate
d |
| 37 // bundles. |
| 38 MaxBundleSize int |
| 39 |
| 40 // MaxBufferDelay is the maximum amount of time we're willing to buffer |
| 41 // bundled data. Other factors can cause the bundle to be sent before th
is, |
| 42 // but it is an upper bound. |
| 43 MaxBufferDelay time.Duration |
| 44 } |
| 45 |
| 46 type bundlerStream interface { |
| 47 isDrained() bool |
| 48 name() string |
| 49 expireTime() (time.Time, bool) |
| 50 nextBundleEntry(*builder, bool) bool |
| 51 } |
| 52 |
| 53 // Bundler is the main Bundler instance. It exposes goroutine-safe endpoints for |
| 54 // stream registration and bundle consumption. |
| 55 type Bundler interface { |
| 56 Register(streamproto.Properties) (Stream, error) |
| 57 |
| 58 // Next causes the Bundler to pack a new ButlerLogBundle message for |
| 59 // transmission. |
| 60 // |
| 61 // The maximum marshalled byte size of the bundle's protobuf is supplied
; the |
| 62 // generated bundle will not exceed this size. |
| 63 Next() *protocol.ButlerLogBundle |
| 64 |
| 65 // CloseAndFlush flushes the Bundler, blocking until all registered Stre
ams |
| 66 // have closed and all buffered data has been output. |
| 67 CloseAndFlush() |
| 68 } |
| 69 |
| 70 type bundlerImpl struct { |
| 71 *Config |
| 72 |
| 73 // finishedC is closed when makeBundles goroutine has terminated. |
| 74 finishedC chan struct{} |
| 75 bundleC chan *protocol.ButlerLogBundle |
| 76 |
| 77 // streamsLock is a lock around the `streams` map and its contents. |
| 78 streamsLock sync.Mutex |
| 79 // streamsCond is a Cond bound to streamsLock, used to signal Next() whe
n new |
| 80 // data is available. |
| 81 streamsCond *timeoutCond |
| 82 // streams is the set of currently-registered Streams. |
| 83 streams map[string]bundlerStream |
| 84 // flushing is true if we're blocking on CloseAndFlush(). |
| 85 flushing bool |
| 86 |
| 87 // prefixCounter is a global counter for Prefix-wide streams. |
| 88 prefixCounter counter |
| 89 } |
| 90 |
| 91 // New instantiates a new Bundler instance. |
| 92 func New(c Config) Bundler { |
| 93 b := bundlerImpl{ |
| 94 Config: &c, |
| 95 finishedC: make(chan struct{}), |
| 96 bundleC: make(chan *protocol.ButlerLogBundle), |
| 97 streams: map[string]bundlerStream{}, |
| 98 } |
| 99 b.streamsCond = newTimeoutCond(b.getClock(), &b.streamsLock) |
| 100 |
| 101 go b.makeBundles() |
| 102 return &b |
| 103 } |
| 104 |
| 105 func (b *bundlerImpl) Register(p streamproto.Properties) (Stream, error) { |
| 106 b.streamsLock.Lock() |
| 107 defer b.streamsLock.Unlock() |
| 108 |
| 109 if s := b.streams[p.Name]; s != nil { |
| 110 return nil, fmt.Errorf("a Stream is already registered for %q",
p.Name) |
| 111 } |
| 112 |
| 113 // Our Properties must validate. |
| 114 if err := p.Validate(); err != nil { |
| 115 return nil, err |
| 116 } |
| 117 |
| 118 // Construct a parser for this stream. |
| 119 c := streamConfig{ |
| 120 name: p.Name, |
| 121 template: protocol.ButlerLogBundle_Entry{ |
| 122 Desc: &p.LogStreamDescriptor, |
| 123 }, |
| 124 maximumBufferDuration: b.MaxBufferDelay, |
| 125 maximumBufferedBytes: b.MaxBufferedBytes, |
| 126 onAppend: func(appended bool) { |
| 127 if appended { |
| 128 b.signalStreamUpdate() |
| 129 } |
| 130 }, |
| 131 } |
| 132 |
| 133 err := error(nil) |
| 134 c.parser, err = newParser(&p, &b.prefixCounter) |
| 135 if err != nil { |
| 136 return nil, fmt.Errorf("failed to create stream parser: %s", err
) |
| 137 } |
| 138 |
| 139 // Generate a secret for this Stream instance. |
| 140 c.template.Secret, err = types.NewStreamSecret() |
| 141 if err != nil { |
| 142 return nil, fmt.Errorf("failed to generate stream secret: %s", e
rr) |
| 143 } |
| 144 |
| 145 // Create a new stream. This will kick off its processing goroutine, whi
ch |
| 146 // will not stop until it is closed. |
| 147 s := newStream(c) |
| 148 b.registerStreamLocked(s) |
| 149 return s, nil |
| 150 } |
| 151 |
| 152 func (b *bundlerImpl) CloseAndFlush() { |
| 153 // Mark that we're flushing. This will cause us to perform more aggressi
ve |
| 154 // bundling in Next(). |
| 155 b.startFlushing() |
| 156 <-b.finishedC |
| 157 } |
| 158 |
| 159 func (b *bundlerImpl) Next() *protocol.ButlerLogBundle { |
| 160 return <-b.bundleC |
| 161 } |
| 162 |
| 163 func (b *bundlerImpl) startFlushing() { |
| 164 b.streamsLock.Lock() |
| 165 defer b.streamsLock.Unlock() |
| 166 |
| 167 if !b.flushing { |
| 168 b.flushing = true |
| 169 b.signalStreamUpdateLocked() |
| 170 } |
| 171 } |
| 172 |
| 173 // makeBundles is run in its own goroutine. It runs continuously, responding |
| 174 // to Stream constraints and availability and sending ButlerLogBundles through |
| 175 // bundleC when available. |
| 176 // |
| 177 // makeBundles will terminate when closeC is closed and all streams are drained. |
| 178 func (b *bundlerImpl) makeBundles() { |
| 179 defer close(b.finishedC) |
| 180 defer close(b.bundleC) |
| 181 |
| 182 b.streamsLock.Lock() |
| 183 defer b.streamsLock.Unlock() |
| 184 |
| 185 bb := (*builder)(nil) |
| 186 defer func() { |
| 187 if bb != nil && bb.hasContent() { |
| 188 b.bundleC <- bb.bundle() |
| 189 } |
| 190 }() |
| 191 |
| 192 for { |
| 193 bb = &builder{ |
| 194 template: protocol.ButlerLogBundle{ |
| 195 Source: b.Source, |
| 196 Timestamp: google.NewTimestamp(b.getClock().Now(
)), |
| 197 }, |
| 198 size: b.MaxBundleSize, |
| 199 } |
| 200 oldestContentTime := time.Time{} |
| 201 |
| 202 for { |
| 203 state := b.getStreamStateLocked() |
| 204 |
| 205 // Attempt to create more bundles. |
| 206 sendNow := b.bundleRoundLocked(bb, state) |
| 207 |
| 208 // Prune any drained streams. |
| 209 state.forEachStream(func(s bundlerStream) bool { |
| 210 if s.isDrained() { |
| 211 state.removeStream(s.name()) |
| 212 b.unregisterStreamLocked(s) |
| 213 } |
| 214 |
| 215 return true |
| 216 }) |
| 217 |
| 218 if b.flushing && len(state.streams) == 0 { |
| 219 // We're flushing, and there are no more streams
, so we're completely |
| 220 // finished. |
| 221 // |
| 222 // If we have any content in our builder, it wil
l be exported via defer. |
| 223 return |
| 224 } |
| 225 |
| 226 // If we have content, consider emitting this bundle. |
| 227 if bb.hasContent() { |
| 228 if b.MaxBufferDelay == 0 || sendNow || bb.ready(
) { |
| 229 break |
| 230 } |
| 231 } |
| 232 |
| 233 // Mark the first time this round where we actually saw
data. |
| 234 if oldestContentTime.IsZero() && bb.hasContent() { |
| 235 oldestContentTime = state.now |
| 236 } |
| 237 |
| 238 // We will yield our stream lock and sleep, waiting for
either: |
| 239 // 1) The earliest expiration time. |
| 240 // 2) A streams channel signal. |
| 241 // |
| 242 // We use a Cond here because we want Streams to be able
to be added |
| 243 // while we're waiting for stream data. |
| 244 nextExpire, has := state.nextExpire() |
| 245 |
| 246 // If we have an oldest content time, that also means th
at we have |
| 247 // content. Factor this constraint in. |
| 248 if !oldestContentTime.IsZero() { |
| 249 roundExpire := oldestContentTime.Add(b.MaxBuffer
Delay) |
| 250 if !roundExpire.After(state.now) { |
| 251 break |
| 252 } |
| 253 |
| 254 if !has || roundExpire.Before(nextExpire) { |
| 255 nextExpire = roundExpire |
| 256 has = true |
| 257 } |
| 258 } |
| 259 |
| 260 if has { |
| 261 b.streamsCond.waitTimeout(nextExpire.Sub(state.n
ow)) |
| 262 } else { |
| 263 // No data, no expire constraints. Wait indefini
tely for something |
| 264 // to change. |
| 265 b.streamsCond.Wait() |
| 266 } |
| 267 } |
| 268 |
| 269 // If our bundler has contents, send them. |
| 270 if bb.hasContent() { |
| 271 b.bundleC <- bb.bundle() |
| 272 bb = nil |
| 273 } |
| 274 } |
| 275 } |
| 276 |
| 277 // Performs a round of bundling with a stream state. polling stream state and ad
ding |
| 278 // |
| 279 // This method will block until a suitable bundle is available. Availability |
| 280 // is subject both to time and data constraints: |
| 281 // - If data has exceeded its buffer duration threshold, a Bundle will be cut |
| 282 // immediately. |
| 283 // - If no data is set to expire, the Bundler may wait for more data to |
| 284 // produce a more optimally-packed bundle. |
| 285 // |
| 286 // At a high level, Next operates as follows: |
| 287 // 1) Freeze all stream state. |
| 288 // 2) Scan streams for data that has exceeded its threshold; if data is found: |
| 289 // - Aggressively pack expired data into a Bundle. |
| 290 // - Optimally pack the remainder of the Bundle with any available data. |
| 291 // - Return the Bundle. |
| 292 // |
| 293 // 3) Examine the remaining data sizes, waiting for either: |
| 294 // - Enough stream data to fill our Bundle. |
| 295 // - Our timeout, if the Bundler is not closed. |
| 296 // 4) Pack a Bundle with the remaining data optimally, emphasizing streams |
| 297 // with older data. |
| 298 // |
| 299 // Returns true if bundle some data was added that should be sent immediately. |
| 300 func (b *bundlerImpl) bundleRoundLocked(bb *builder, state *streamState) bool { |
| 301 sendNow := false |
| 302 |
| 303 // First pass: non-blocking data that has exceeded its storage threshold
. |
| 304 for bb.remaining() > 0 { |
| 305 s := state.next() |
| 306 if s == nil { |
| 307 break |
| 308 } |
| 309 |
| 310 if et, has := s.expireTime(); !has || et.After(state.now) { |
| 311 // This stream (and all other streams, since we're sorte
d) expires in |
| 312 // the future, so we're done with the first pass. |
| 313 break |
| 314 } |
| 315 |
| 316 // Pull bundles from this stream. |
| 317 if modified := s.nextBundleEntry(bb, true); modified { |
| 318 state.streamUpdated(s.name()) |
| 319 |
| 320 // We have at least one time-sensitive bundle, so send t
his round. |
| 321 sendNow = true |
| 322 } |
| 323 } |
| 324 |
| 325 // Second pass: bundle any available data. |
| 326 state.forEachStream(func(s bundlerStream) bool { |
| 327 if bb.remaining() == 0 { |
| 328 return false |
| 329 } |
| 330 |
| 331 if modified := s.nextBundleEntry(bb, b.flushing); modified { |
| 332 state.streamUpdated(s.name()) |
| 333 } |
| 334 return true |
| 335 }) |
| 336 |
| 337 return sendNow |
| 338 } |
| 339 |
| 340 func (b *bundlerImpl) getStreamStateLocked() *streamState { |
| 341 // Lock and collect each stream. |
| 342 state := &streamState{ |
| 343 streams: make([]bundlerStream, 0, len(b.streams)), |
| 344 now: b.getClock().Now(), |
| 345 } |
| 346 |
| 347 for _, s := range b.streams { |
| 348 state.streams = append(state.streams, s) |
| 349 } |
| 350 heap.Init(state) |
| 351 |
| 352 return state |
| 353 } |
| 354 |
| 355 func (b *bundlerImpl) registerStreamLocked(s bundlerStream) { |
| 356 b.streams[s.name()] = s |
| 357 b.signalStreamUpdateLocked() |
| 358 } |
| 359 |
| 360 func (b *bundlerImpl) unregisterStreamLocked(s bundlerStream) { |
| 361 delete(b.streams, s.name()) |
| 362 } |
| 363 |
| 364 func (b *bundlerImpl) signalStreamUpdate() { |
| 365 b.streamsLock.Lock() |
| 366 defer b.streamsLock.Unlock() |
| 367 |
| 368 b.signalStreamUpdateLocked() |
| 369 } |
| 370 |
| 371 func (b *bundlerImpl) signalStreamUpdateLocked() { |
| 372 b.streamsCond.Broadcast() |
| 373 } |
| 374 |
| 375 // nextPrefixIndex is a goroutine-safe method that returns the next prefix index |
| 376 // for the given Bundler. |
| 377 func (b *bundlerImpl) nextPrefixIndex() uint64 { |
| 378 return uint64(b.prefixCounter.next()) |
| 379 } |
| 380 |
| 381 func (b *bundlerImpl) getClock() clock.Clock { |
| 382 c := b.Clock |
| 383 if c != nil { |
| 384 return c |
| 385 } |
| 386 return clock.GetSystemClock() |
| 387 } |
| 388 |
| 389 // streamState is a snapshot of the current stream registration. All operations |
| 390 // performed on the state require streamLock to be held. |
| 391 // |
| 392 // streamState implements heap.Interface for its streams array. Streams without |
| 393 // data times (nil) are considered to be greater than those with times. |
| 394 type streamState struct { |
| 395 streams []bundlerStream |
| 396 now time.Time |
| 397 } |
| 398 |
| 399 var _ heap.Interface = (*streamState)(nil) |
| 400 |
| 401 func (s *streamState) next() bundlerStream { |
| 402 if len(s.streams) == 0 { |
| 403 return nil |
| 404 } |
| 405 return s.streams[0] |
| 406 } |
| 407 |
| 408 func (s *streamState) nextExpire() (time.Time, bool) { |
| 409 if next := s.next(); next != nil { |
| 410 if ts, ok := next.expireTime(); ok { |
| 411 return ts, true |
| 412 } |
| 413 } |
| 414 return time.Time{}, false |
| 415 } |
| 416 |
| 417 func (s *streamState) streamUpdated(name string) { |
| 418 if si, idx := s.streamIndex(name); si != nil { |
| 419 heap.Fix(s, idx) |
| 420 } |
| 421 } |
| 422 |
| 423 func (s *streamState) forEachStream(f func(bundlerStream) bool) { |
| 424 // Clone our streams, since the callback may mutate their order. |
| 425 streams := make([]bundlerStream, len(s.streams)) |
| 426 for i, s := range s.streams { |
| 427 streams[i] = s |
| 428 } |
| 429 |
| 430 for _, s := range streams { |
| 431 if !f(s) { |
| 432 break |
| 433 } |
| 434 } |
| 435 } |
| 436 |
| 437 // removeStream removes a stream from the stream state. |
| 438 func (s *streamState) removeStream(name string) bundlerStream { |
| 439 if si, idx := s.streamIndex(name); si != nil { |
| 440 heap.Remove(s, idx) |
| 441 return si |
| 442 } |
| 443 return nil |
| 444 } |
| 445 |
| 446 func (s *streamState) streamIndex(name string) (bundlerStream, int) { |
| 447 for i, si := range s.streams { |
| 448 if si.name() == name { |
| 449 return si, i |
| 450 } |
| 451 } |
| 452 return nil, -1 |
| 453 } |
| 454 |
| 455 func (s *streamState) Len() int { |
| 456 return len(s.streams) |
| 457 } |
| 458 |
| 459 func (s *streamState) Less(i, j int) bool { |
| 460 si, sj := s.streams[i], s.streams[j] |
| 461 |
| 462 if it, ok := si.expireTime(); ok { |
| 463 if jt, ok := sj.expireTime(); ok { |
| 464 return it.Before(jt) |
| 465 } |
| 466 |
| 467 // i has data, but j does not, so i < j. |
| 468 return true |
| 469 } |
| 470 |
| 471 // i has no data, so i us greater than all other streams. |
| 472 return false |
| 473 } |
| 474 |
| 475 func (s *streamState) Swap(i, j int) { |
| 476 s.streams[i], s.streams[j] = s.streams[j], s.streams[i] |
| 477 } |
| 478 |
| 479 func (s *streamState) Push(x interface{}) { |
| 480 s.streams = append(s.streams, x.(bundlerStream)) |
| 481 } |
| 482 |
| 483 func (s *streamState) Pop() interface{} { |
| 484 last := s.streams[len(s.streams)-1] |
| 485 s.streams = s.streams[:len(s.streams)-1] |
| 486 return last |
| 487 } |
| OLD | NEW |