OLD | NEW |
(Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package bundler |
| 6 |
| 7 import ( |
| 8 "container/heap" |
| 9 "fmt" |
| 10 "sync" |
| 11 "time" |
| 12 |
| 13 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" |
| 14 "github.com/luci/luci-go/common/clock" |
| 15 "github.com/luci/luci-go/common/logdog/protocol" |
| 16 "github.com/luci/luci-go/common/logdog/types" |
| 17 "github.com/luci/luci-go/common/proto/google" |
| 18 ) |
| 19 |
| 20 // Config is the Bundler configuration. |
| 21 type Config struct { |
| 22 // Clock is the clock instance that will be used for Bundler and stream |
| 23 // timing. |
| 24 Clock clock.Clock |
| 25 |
| 26 // Source is the bundle source string to use. This can be empty if there
is no |
| 27 // source information to include. |
| 28 Source string |
| 29 |
| 30 // MaxBufferedBytes is the maximum number of bytes to buffer in memory p
er |
| 31 // stream. |
| 32 MaxBufferedBytes int64 |
| 33 |
| 34 // MaxBundleSize is the maximum bundle size in bytes that may be generat
ed. |
| 35 // |
| 36 // If this value is zero, no size constraint will be applied to generate
d |
| 37 // bundles. |
| 38 MaxBundleSize int |
| 39 |
| 40 // MaxBufferDelay is the maximum amount of time we're willing to buffer |
| 41 // bundled data. Other factors can cause the bundle to be sent before th
is, |
| 42 // but it is an upper bound. |
| 43 MaxBufferDelay time.Duration |
| 44 } |
| 45 |
| 46 type bundlerStream interface { |
| 47 isDrained() bool |
| 48 name() string |
| 49 expireTime() (time.Time, bool) |
| 50 nextBundleEntry(*builder, bool) bool |
| 51 } |
| 52 |
| 53 // Bundler is the main Bundler instance. It exposes goroutine-safe endpoints for |
| 54 // stream registration and bundle consumption. |
| 55 type Bundler interface { |
| 56 Register(streamproto.Properties) (Stream, error) |
| 57 |
| 58 // Next causes the Bundler to pack a new ButlerLogBundle message for |
| 59 // transmission. |
| 60 // |
| 61 // The maximum marshalled byte size of the bundle's protobuf is supplied
; the |
| 62 // generated bundle will not exceed this size. |
| 63 Next() *protocol.ButlerLogBundle |
| 64 |
| 65 // CloseAndFlush flushes the Bundler, blocking until all registered Stre
ams |
| 66 // have closed and all buffered data has been output. |
| 67 CloseAndFlush() |
| 68 } |
| 69 |
| 70 type bundlerImpl struct { |
| 71 *Config |
| 72 |
| 73 // finishedC is closed when makeBundles goroutine has terminated. |
| 74 finishedC chan struct{} |
| 75 bundleC chan *protocol.ButlerLogBundle |
| 76 |
| 77 // streamsLock is a lock around the `streams` map and its contents. |
| 78 streamsLock sync.Mutex |
| 79 // streamsCond is a Cond bound to streamsLock, used to signal Next() whe
n new |
| 80 // data is available. |
| 81 streamsCond *timeoutCond |
| 82 // streams is the set of currently-registered Streams. |
| 83 streams map[string]bundlerStream |
| 84 // flushing is true if we're blocking on CloseAndFlush(). |
| 85 flushing bool |
| 86 |
| 87 // prefixCounter is a global counter for Prefix-wide streams. |
| 88 prefixCounter counter |
| 89 } |
| 90 |
| 91 // New instantiates a new Bundler instance. |
| 92 func New(c Config) Bundler { |
| 93 b := bundlerImpl{ |
| 94 Config: &c, |
| 95 finishedC: make(chan struct{}), |
| 96 bundleC: make(chan *protocol.ButlerLogBundle), |
| 97 streams: map[string]bundlerStream{}, |
| 98 } |
| 99 b.streamsCond = newTimeoutCond(b.getClock(), &b.streamsLock) |
| 100 |
| 101 go b.makeBundles() |
| 102 return &b |
| 103 } |
| 104 |
| 105 func (b *bundlerImpl) Register(p streamproto.Properties) (Stream, error) { |
| 106 if s := b.streams[p.Name]; s != nil { |
| 107 return nil, fmt.Errorf("a Stream is already registered for %q",
p.Name) |
| 108 } |
| 109 |
| 110 // Our Properties must validate. |
| 111 if err := p.Validate(); err != nil { |
| 112 return nil, err |
| 113 } |
| 114 |
| 115 // Construct a parser for this stream. |
| 116 c := streamConfig{ |
| 117 name: p.Name, |
| 118 template: protocol.ButlerLogBundle_Entry{ |
| 119 Desc: &p.LogStreamDescriptor, |
| 120 }, |
| 121 maximumBufferDuration: b.MaxBufferDelay, |
| 122 maximumBufferedBytes: b.MaxBufferedBytes, |
| 123 onAppend: func(appended bool) { |
| 124 if appended { |
| 125 b.signalStreamUpdate() |
| 126 } |
| 127 }, |
| 128 } |
| 129 |
| 130 err := error(nil) |
| 131 c.parser, err = newParser(&p, &b.prefixCounter) |
| 132 if err != nil { |
| 133 return nil, fmt.Errorf("failed to create stream parser: %s", err
) |
| 134 } |
| 135 |
| 136 // Generate a secret for this Stream instance. |
| 137 c.template.Secret, err = types.NewStreamSecret() |
| 138 if err != nil { |
| 139 return nil, fmt.Errorf("failed to generate stream secret: %s", e
rr) |
| 140 } |
| 141 |
| 142 // Create a new stream. This will kick off its processing goroutine, whi
ch |
| 143 // will not stop until it is closed. |
| 144 s := newStream(c) |
| 145 |
| 146 // Register the stream. |
| 147 b.streamsLock.Lock() |
| 148 defer b.streamsLock.Unlock() |
| 149 |
| 150 b.registerStreamLocked(s) |
| 151 return s, nil |
| 152 } |
| 153 |
| 154 func (b *bundlerImpl) CloseAndFlush() { |
| 155 // Mark that we're flushing. This will cause us to perform more aggressi
ve |
| 156 // bundling in Next(). |
| 157 b.startFlushing() |
| 158 <-b.finishedC |
| 159 } |
| 160 |
| 161 func (b *bundlerImpl) Next() *protocol.ButlerLogBundle { |
| 162 return <-b.bundleC |
| 163 } |
| 164 |
| 165 func (b *bundlerImpl) startFlushing() { |
| 166 b.streamsLock.Lock() |
| 167 defer b.streamsLock.Unlock() |
| 168 |
| 169 if !b.flushing { |
| 170 b.flushing = true |
| 171 b.signalStreamUpdateLocked() |
| 172 } |
| 173 } |
| 174 |
| 175 // makeBundles is run in its own goroutine. It runs continuously, responding |
| 176 // to Stream constraints and availability and sending ButlerLogBundles through |
| 177 // bundleC when available. |
| 178 // |
| 179 // makeBundles will terminate when closeC is closed and all streams are drained. |
| 180 func (b *bundlerImpl) makeBundles() { |
| 181 defer close(b.finishedC) |
| 182 defer close(b.bundleC) |
| 183 |
| 184 b.streamsLock.Lock() |
| 185 defer b.streamsLock.Unlock() |
| 186 |
| 187 bb := (*builder)(nil) |
| 188 defer func() { |
| 189 if bb != nil && bb.hasContent() { |
| 190 b.bundleC <- bb.bundle() |
| 191 } |
| 192 }() |
| 193 |
| 194 for { |
| 195 bb = &builder{ |
| 196 size: b.MaxBundleSize, |
| 197 template: protocol.ButlerLogBundle{ |
| 198 Source: b.Source, |
| 199 Timestamp: google.NewTimestamp(b.getClock().Now(
)), |
| 200 }, |
| 201 } |
| 202 oldestContentTime := time.Time{} |
| 203 |
| 204 for { |
| 205 state := b.getStreamStateLocked() |
| 206 |
| 207 // Attempt to create more bundles. |
| 208 sendNow := b.bundleRoundLocked(bb, state) |
| 209 |
| 210 // Prune any drained streams. |
| 211 state.forEachStream(func(s bundlerStream) bool { |
| 212 if s.isDrained() { |
| 213 state.removeStream(s.name()) |
| 214 b.unregisterStreamLocked(s) |
| 215 } |
| 216 |
| 217 return true |
| 218 }) |
| 219 |
| 220 if b.flushing && len(state.streams) == 0 { |
| 221 // We're flushing, and there are no more streams
, so we're completely |
| 222 // finished. |
| 223 // |
| 224 // If we have any content in our builder, it wil
l be exported via defer. |
| 225 return |
| 226 } |
| 227 |
| 228 // If we have content, consider emitting this bundle. |
| 229 if bb.hasContent() { |
| 230 if b.MaxBufferDelay == 0 || sendNow || bb.ready(
) { |
| 231 break |
| 232 } |
| 233 } |
| 234 |
| 235 // Mark the first time this round where we actually saw
data. |
| 236 if oldestContentTime.IsZero() && bb.hasContent() { |
| 237 oldestContentTime = state.now |
| 238 } |
| 239 |
| 240 // We will yield our stream lock and sleep, waiting for
either: |
| 241 // 1) The earliest expiration time. |
| 242 // 2) A streams channel signal. |
| 243 // |
| 244 // We use a Cond here because we want Streams to be able
to be added |
| 245 // while we're waiting for stream data. |
| 246 nextExpire, has := state.nextExpire() |
| 247 |
| 248 // If we have an oldest content time, that also means th
at we have |
| 249 // content. Factor this constraint in. |
| 250 if !oldestContentTime.IsZero() { |
| 251 roundExpire := oldestContentTime.Add(b.MaxBuffer
Delay) |
| 252 if !roundExpire.After(state.now) { |
| 253 break |
| 254 } |
| 255 |
| 256 if !has || roundExpire.Before(nextExpire) { |
| 257 nextExpire = roundExpire |
| 258 has = true |
| 259 } |
| 260 } |
| 261 |
| 262 if has { |
| 263 b.streamsCond.waitTimeout(nextExpire.Sub(state.n
ow)) |
| 264 } else { |
| 265 // No data, no expire constraints. Wait indefini
tely for something |
| 266 // to change. |
| 267 b.streamsCond.Wait() |
| 268 } |
| 269 } |
| 270 |
| 271 // If our bundler has contents, send them. |
| 272 if bb.hasContent() { |
| 273 b.bundleC <- bb.bundle() |
| 274 bb = nil |
| 275 } |
| 276 } |
| 277 } |
| 278 |
| 279 // Implements a single bundle building round. This incrementally adds data from |
| 280 // the stream state to the supplied builder. |
| 281 // |
| 282 // This method will block until a suitable bundle is available. Availability |
| 283 // is subject both to time and data constraints: |
| 284 // - If buffered data, which is timestampped at ingest, has exceeded its |
| 285 // buffer duration threshold, a Bundle will be cut immediately. |
| 286 // - If no data is set to expire, the Bundler may wait for more data to |
| 287 // produce a more optimally-packed bundle. |
| 288 // |
| 289 // At a high level, Next operates as follows: |
| 290 // 1) Freeze all stream state. |
| 291 // 2) Scan streams for data that has exceeded its threshold; if data is found: |
| 292 // - Aggressively pack expired data into a Bundle. |
| 293 // - Optimally pack the remainder of the Bundle with any available data. |
| 294 // - Return the Bundle. |
| 295 // |
| 296 // 3) Examine the remaining data sizes, waiting for either: |
| 297 // - Enough stream data to fill our Bundle. |
| 298 // - Our timeout, if the Bundler is not closed. |
| 299 // 4) Pack a Bundle with the remaining data optimally, emphasizing streams |
| 300 // with older data. |
| 301 // |
| 302 // Returns true if bundle some data was added that should be sent immediately. |
| 303 func (b *bundlerImpl) bundleRoundLocked(bb *builder, state *streamState) bool { |
| 304 sendNow := false |
| 305 |
| 306 // First pass: non-blocking data that has exceeded its storage threshold
. |
| 307 for bb.remaining() > 0 { |
| 308 s := state.next() |
| 309 if s == nil { |
| 310 break |
| 311 } |
| 312 |
| 313 if et, has := s.expireTime(); !has || et.After(state.now) { |
| 314 // This stream (and all other streams, since we're sorte
d) expires in |
| 315 // the future, so we're done with the first pass. |
| 316 break |
| 317 } |
| 318 |
| 319 // Pull bundles from this stream. |
| 320 if modified := s.nextBundleEntry(bb, true); modified { |
| 321 state.streamUpdated(s.name()) |
| 322 |
| 323 // We have at least one time-sensitive bundle, so send t
his round. |
| 324 sendNow = true |
| 325 } |
| 326 } |
| 327 |
| 328 // Second pass: bundle any available data. |
| 329 state.forEachStream(func(s bundlerStream) bool { |
| 330 if bb.remaining() == 0 { |
| 331 return false |
| 332 } |
| 333 |
| 334 if modified := s.nextBundleEntry(bb, b.flushing); modified { |
| 335 state.streamUpdated(s.name()) |
| 336 } |
| 337 return true |
| 338 }) |
| 339 |
| 340 return sendNow |
| 341 } |
| 342 |
| 343 func (b *bundlerImpl) getStreamStateLocked() *streamState { |
| 344 // Lock and collect each stream. |
| 345 state := &streamState{ |
| 346 streams: make([]bundlerStream, 0, len(b.streams)), |
| 347 now: b.getClock().Now(), |
| 348 } |
| 349 |
| 350 for _, s := range b.streams { |
| 351 state.streams = append(state.streams, s) |
| 352 } |
| 353 heap.Init(state) |
| 354 |
| 355 return state |
| 356 } |
| 357 |
| 358 func (b *bundlerImpl) registerStreamLocked(s bundlerStream) { |
| 359 b.streams[s.name()] = s |
| 360 b.signalStreamUpdateLocked() |
| 361 } |
| 362 |
| 363 func (b *bundlerImpl) unregisterStreamLocked(s bundlerStream) { |
| 364 delete(b.streams, s.name()) |
| 365 } |
| 366 |
| 367 func (b *bundlerImpl) signalStreamUpdate() { |
| 368 b.streamsLock.Lock() |
| 369 defer b.streamsLock.Unlock() |
| 370 |
| 371 b.signalStreamUpdateLocked() |
| 372 } |
| 373 |
| 374 func (b *bundlerImpl) signalStreamUpdateLocked() { |
| 375 b.streamsCond.Broadcast() |
| 376 } |
| 377 |
| 378 // nextPrefixIndex is a goroutine-safe method that returns the next prefix index |
| 379 // for the given Bundler. |
| 380 func (b *bundlerImpl) nextPrefixIndex() uint64 { |
| 381 return uint64(b.prefixCounter.next()) |
| 382 } |
| 383 |
| 384 func (b *bundlerImpl) getClock() clock.Clock { |
| 385 c := b.Clock |
| 386 if c != nil { |
| 387 return c |
| 388 } |
| 389 return clock.GetSystemClock() |
| 390 } |
| 391 |
| 392 // streamState is a snapshot of the current stream registration. All operations |
| 393 // performed on the state require streamLock to be held. |
| 394 // |
| 395 // streamState implements heap.Interface for its streams array. Streams without |
| 396 // data times (nil) are considered to be greater than those with times. |
| 397 type streamState struct { |
| 398 streams []bundlerStream |
| 399 now time.Time |
| 400 } |
| 401 |
| 402 var _ heap.Interface = (*streamState)(nil) |
| 403 |
| 404 func (s *streamState) next() bundlerStream { |
| 405 if len(s.streams) == 0 { |
| 406 return nil |
| 407 } |
| 408 return s.streams[0] |
| 409 } |
| 410 |
| 411 func (s *streamState) nextExpire() (time.Time, bool) { |
| 412 if next := s.next(); next != nil { |
| 413 if ts, ok := next.expireTime(); ok { |
| 414 return ts, true |
| 415 } |
| 416 } |
| 417 return time.Time{}, false |
| 418 } |
| 419 |
| 420 func (s *streamState) streamUpdated(name string) { |
| 421 if si, idx := s.streamIndex(name); si != nil { |
| 422 heap.Fix(s, idx) |
| 423 } |
| 424 } |
| 425 |
| 426 func (s *streamState) forEachStream(f func(bundlerStream) bool) { |
| 427 // Clone our streams, since the callback may mutate their order. |
| 428 streams := make([]bundlerStream, len(s.streams)) |
| 429 for i, s := range s.streams { |
| 430 streams[i] = s |
| 431 } |
| 432 |
| 433 for _, s := range streams { |
| 434 if !f(s) { |
| 435 break |
| 436 } |
| 437 } |
| 438 } |
| 439 |
| 440 // removeStream removes a stream from the stream state. |
| 441 func (s *streamState) removeStream(name string) bundlerStream { |
| 442 if si, idx := s.streamIndex(name); si != nil { |
| 443 heap.Remove(s, idx) |
| 444 return si |
| 445 } |
| 446 return nil |
| 447 } |
| 448 |
| 449 func (s *streamState) streamIndex(name string) (bundlerStream, int) { |
| 450 for i, si := range s.streams { |
| 451 if si.name() == name { |
| 452 return si, i |
| 453 } |
| 454 } |
| 455 return nil, -1 |
| 456 } |
| 457 |
| 458 func (s *streamState) Len() int { |
| 459 return len(s.streams) |
| 460 } |
| 461 |
| 462 func (s *streamState) Less(i, j int) bool { |
| 463 si, sj := s.streams[i], s.streams[j] |
| 464 |
| 465 if it, ok := si.expireTime(); ok { |
| 466 if jt, ok := sj.expireTime(); ok { |
| 467 return it.Before(jt) |
| 468 } |
| 469 |
| 470 // i has data, but j does not, so i < j. |
| 471 return true |
| 472 } |
| 473 |
| 474 // i has no data, so i us greater than all other streams. |
| 475 return false |
| 476 } |
| 477 |
| 478 func (s *streamState) Swap(i, j int) { |
| 479 s.streams[i], s.streams[j] = s.streams[j], s.streams[i] |
| 480 } |
| 481 |
| 482 func (s *streamState) Push(x interface{}) { |
| 483 s.streams = append(s.streams, x.(bundlerStream)) |
| 484 } |
| 485 |
| 486 func (s *streamState) Pop() interface{} { |
| 487 last := s.streams[len(s.streams)-1] |
| 488 s.streams = s.streams[:len(s.streams)-1] |
| 489 return last |
| 490 } |
OLD | NEW |