Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(280)

Side by Side Diff: client/internal/logdog/butler/bundler/bundler.go

Issue 1906023002: LogDog: Add project namespace to Butler/Collector. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-archivist
Patch Set: Rebase? Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « client/cmd/logdog_butler/subcommand_run.go ('k') | client/internal/logdog/butler/butler.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package bundler 5 package bundler
6 6
7 import ( 7 import (
8 "container/heap" 8 "container/heap"
9 "fmt" 9 "fmt"
10 "sync" 10 "sync"
11 "time" 11 "time"
12 12
13 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" 13 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto"
14 "github.com/luci/luci-go/common/cancelcond" 14 "github.com/luci/luci-go/common/cancelcond"
15 "github.com/luci/luci-go/common/clock" 15 "github.com/luci/luci-go/common/clock"
16 "github.com/luci/luci-go/common/config"
16 "github.com/luci/luci-go/common/logdog/types" 17 "github.com/luci/luci-go/common/logdog/types"
17 "github.com/luci/luci-go/common/proto/google" 18 "github.com/luci/luci-go/common/proto/google"
18 "github.com/luci/luci-go/common/proto/logdog/logpb" 19 "github.com/luci/luci-go/common/proto/logdog/logpb"
19 "golang.org/x/net/context" 20 "golang.org/x/net/context"
20 ) 21 )
21 22
22 // Config is the Bundler configuration. 23 // Config is the Bundler configuration.
23 type Config struct { 24 type Config struct {
24 // Clock is the clock instance that will be used for Bundler and stream 25 // Clock is the clock instance that will be used for Bundler and stream
25 // timing. 26 // timing.
26 Clock clock.Clock 27 Clock clock.Clock
27 28
28 // Source is the bundle source string to use. This can be empty if there is no 29 // Source is the bundle source string to use. This can be empty if there is no
29 // source information to include. 30 // source information to include.
30 Source string 31 Source string
31 32
33 // Project is the project to use.
34 Project config.ProjectName
35 // Prefix is the common prefix for this set of streams.
36 Prefix types.StreamName
37 // Secret is the prefix secret for this set of streams.
38 Secret []byte
39
32 // MaxBufferedBytes is the maximum number of bytes to buffer in memory p er 40 // MaxBufferedBytes is the maximum number of bytes to buffer in memory p er
33 // stream. 41 // stream.
34 MaxBufferedBytes int64 42 MaxBufferedBytes int64
35 43
36 // MaxBundleSize is the maximum bundle size in bytes that may be generat ed. 44 // MaxBundleSize is the maximum bundle size in bytes that may be generat ed.
37 // 45 //
38 // If this value is zero, no size constraint will be applied to generate d 46 // If this value is zero, no size constraint will be applied to generate d
39 // bundles. 47 // bundles.
40 MaxBundleSize int 48 MaxBundleSize int
41 49
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
90 } 98 }
91 99
92 // Register adds a new stream to the Bundler, returning a reference to the 100 // Register adds a new stream to the Bundler, returning a reference to the
93 // registered stream. 101 // registered stream.
94 func (b *Bundler) Register(p streamproto.Properties) (Stream, error) { 102 func (b *Bundler) Register(p streamproto.Properties) (Stream, error) {
95 // Our Properties must validate. 103 // Our Properties must validate.
96 if err := p.Validate(); err != nil { 104 if err := p.Validate(); err != nil {
97 return nil, err 105 return nil, err
98 } 106 }
99 107
108 // Enforce that the log stream descriptor's Prefix is empty.
109 p.Prefix = ""
110
100 // Construct a parser for this stream. 111 // Construct a parser for this stream.
101 c := streamConfig{ 112 c := streamConfig{
102 name: p.Name, 113 name: p.Name,
103 template: logpb.ButlerLogBundle_Entry{ 114 template: logpb.ButlerLogBundle_Entry{
104 Desc: &p.LogStreamDescriptor, 115 Desc: &p.LogStreamDescriptor,
105 }, 116 },
106 maximumBufferDuration: b.c.MaxBufferDelay, 117 maximumBufferDuration: b.c.MaxBufferDelay,
107 maximumBufferedBytes: b.c.MaxBufferedBytes, 118 maximumBufferedBytes: b.c.MaxBufferedBytes,
108 onAppend: func(appended bool) { 119 onAppend: func(appended bool) {
109 if appended { 120 if appended {
110 b.signalStreamUpdate() 121 b.signalStreamUpdate()
111 } 122 }
112 }, 123 },
113 } 124 }
114 125
115 err := error(nil) 126 err := error(nil)
116 c.parser, err = newParser(&p, &b.prefixCounter) 127 c.parser, err = newParser(&p, &b.prefixCounter)
117 if err != nil { 128 if err != nil {
118 return nil, fmt.Errorf("failed to create stream parser: %s", err ) 129 return nil, fmt.Errorf("failed to create stream parser: %s", err )
119 } 130 }
120 131
121 // Generate a secret for this Stream instance.
122 c.template.Secret, err = types.NewPrefixSecret()
123 if err != nil {
124 return nil, fmt.Errorf("failed to generate stream secret: %s", e rr)
125 }
126
127 b.streamsLock.Lock() 132 b.streamsLock.Lock()
128 defer b.streamsLock.Unlock() 133 defer b.streamsLock.Unlock()
129 134
130 // Ensure that this is not a duplicate stream name. 135 // Ensure that this is not a duplicate stream name.
131 if s := b.streams[p.Name]; s != nil { 136 if s := b.streams[p.Name]; s != nil {
132 return nil, fmt.Errorf("a Stream is already registered for %q", p.Name) 137 return nil, fmt.Errorf("a Stream is already registered for %q", p.Name)
133 } 138 }
134 139
135 // Create a new stream. This will kick off its processing goroutine, whi ch 140 // Create a new stream. This will kick off its processing goroutine, whi ch
136 // will not stop until it is closed. 141 // will not stop until it is closed.
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
183 b.bundleC <- bb.bundle() 188 b.bundleC <- bb.bundle()
184 } 189 }
185 }() 190 }()
186 191
187 for { 192 for {
188 bb = &builder{ 193 bb = &builder{
189 size: b.c.MaxBundleSize, 194 size: b.c.MaxBundleSize,
190 template: logpb.ButlerLogBundle{ 195 template: logpb.ButlerLogBundle{
191 Source: b.c.Source, 196 Source: b.c.Source,
192 Timestamp: google.NewTimestamp(b.getClock().Now( )), 197 Timestamp: google.NewTimestamp(b.getClock().Now( )),
198 Project: string(b.c.Project),
199 Prefix: string(b.c.Prefix),
200 Secret: b.c.Secret,
193 }, 201 },
194 } 202 }
195 var oldestContentTime time.Time 203 var oldestContentTime time.Time
196 204
197 for { 205 for {
198 state := b.getStreamStateLocked() 206 state := b.getStreamStateLocked()
199 207
200 // Attempt to create more bundles. 208 // Attempt to create more bundles.
201 sendNow := b.bundleRoundLocked(bb, state) 209 sendNow := b.bundleRoundLocked(bb, state)
202 210
(...skipping 287 matching lines...) Expand 10 before | Expand all | Expand 10 after
490 498
491 func (s *streamState) Push(x interface{}) { 499 func (s *streamState) Push(x interface{}) {
492 s.streams = append(s.streams, x.(bundlerStream)) 500 s.streams = append(s.streams, x.(bundlerStream))
493 } 501 }
494 502
495 func (s *streamState) Pop() interface{} { 503 func (s *streamState) Pop() interface{} {
496 last := s.streams[len(s.streams)-1] 504 last := s.streams[len(s.streams)-1]
497 s.streams = s.streams[:len(s.streams)-1] 505 s.streams = s.streams[:len(s.streams)-1]
498 return last 506 return last
499 } 507 }
OLDNEW
« no previous file with comments | « client/cmd/logdog_butler/subcommand_run.go ('k') | client/internal/logdog/butler/butler.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698