Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(62)

Side by Side Diff: logdog/client/butler/bundler/bundler.go

Issue 2601583002: Butler: fix global tags applied to empty map. (Closed)
Patch Set: Added tests, testing method to Bundler. Created 3 years, 12 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | logdog/client/butler/bundler/stream.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package bundler 5 package bundler
6 6
7 import ( 7 import (
8 "container/heap" 8 "container/heap"
9 "fmt" 9 "fmt"
10 "sync" 10 "sync"
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
45 // bundled data. Other factors can cause the bundle to be sent before th is, 45 // bundled data. Other factors can cause the bundle to be sent before th is,
46 // but it is an upper bound. 46 // but it is an upper bound.
47 MaxBufferDelay time.Duration 47 MaxBufferDelay time.Duration
48 } 48 }
49 49
50 type bundlerStream interface { 50 type bundlerStream interface {
51 isDrained() bool 51 isDrained() bool
52 name() string 52 name() string
53 expireTime() (time.Time, bool) 53 expireTime() (time.Time, bool)
54 nextBundleEntry(*builder, bool) bool 54 nextBundleEntry(*builder, bool) bool
55 streamDesc() *logpb.LogStreamDescriptor
55 } 56 }
56 57
57 // Bundler is the main Bundler instance. It exposes goroutine-safe endpoints for 58 // Bundler is the main Bundler instance. It exposes goroutine-safe endpoints for
58 // stream registration and bundle consumption. 59 // stream registration and bundle consumption.
59 type Bundler struct { 60 type Bundler struct {
60 c *Config 61 c *Config
61 62
62 // finishedC is closed when makeBundles goroutine has terminated. 63 // finishedC is closed when makeBundles goroutine has terminated.
63 finishedC chan struct{} 64 finishedC chan struct{}
64 bundleC chan *logpb.ButlerLogBundle 65 bundleC chan *logpb.ButlerLogBundle
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after
134 return nil, fmt.Errorf("a Stream is already registered for %q", p.Name) 135 return nil, fmt.Errorf("a Stream is already registered for %q", p.Name)
135 } 136 }
136 137
137 // Create a new stream. This will kick off its processing goroutine, whi ch 138 // Create a new stream. This will kick off its processing goroutine, whi ch
138 // will not stop until it is closed. 139 // will not stop until it is closed.
139 s := newStream(c) 140 s := newStream(c)
140 b.registerStreamLocked(s) 141 b.registerStreamLocked(s)
141 return s, nil 142 return s, nil
142 } 143 }
143 144
145 // GetStreamDescs returns the set of registered stream names mapped to their
146 // descriptors.
147 //
148 // This is intended for testing purposes. DO NOT modify the resulting
149 // descriptors.
150 func (b *Bundler) GetStreamDescs() map[string]*logpb.LogStreamDescriptor {
151 b.streamsLock.Lock()
152 defer b.streamsLock.Unlock()
153
154 if len(b.streams) == 0 {
155 return nil
156 }
157
158 streams := make(map[string]*logpb.LogStreamDescriptor, len(b.streams))
159 for k, s := range b.streams {
160 streams[k] = s.streamDesc()
161 }
162 return streams
163 }
164
144 // CloseAndFlush closes the Bundler, alerting it that no more streams will be 165 // CloseAndFlush closes the Bundler, alerting it that no more streams will be
145 // added and that existing data may be aggressively output. 166 // added and that existing data may be aggressively output.
146 // 167 //
147 // CloseAndFlush will block until all buffered data has been consumed. 168 // CloseAndFlush will block until all buffered data has been consumed.
148 func (b *Bundler) CloseAndFlush() { 169 func (b *Bundler) CloseAndFlush() {
149 // Mark that we're flushing. This will cause us to perform more aggressi ve 170 // Mark that we're flushing. This will cause us to perform more aggressi ve
150 // bundling in Next(). 171 // bundling in Next().
151 b.startFlushing() 172 b.startFlushing()
152 <-b.finishedC 173 <-b.finishedC
153 } 174 }
(...skipping 339 matching lines...) Expand 10 before | Expand all | Expand 10 after
493 514
494 func (s *streamState) Push(x interface{}) { 515 func (s *streamState) Push(x interface{}) {
495 s.streams = append(s.streams, x.(bundlerStream)) 516 s.streams = append(s.streams, x.(bundlerStream))
496 } 517 }
497 518
498 func (s *streamState) Pop() interface{} { 519 func (s *streamState) Pop() interface{} {
499 last := s.streams[len(s.streams)-1] 520 last := s.streams[len(s.streams)-1]
500 s.streams = s.streams[:len(s.streams)-1] 521 s.streams = s.streams[:len(s.streams)-1]
501 return last 522 return last
502 } 523 }
OLDNEW
« no previous file with comments | « no previous file | logdog/client/butler/bundler/stream.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698