| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package bundler | 5 package bundler |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "container/heap" | 8 "container/heap" |
| 9 "fmt" | 9 "fmt" |
| 10 "sync" | 10 "sync" |
| 11 "time" | 11 "time" |
| 12 | 12 |
| 13 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" | 13 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" |
| 14 "github.com/luci/luci-go/common/cancelcond" | 14 "github.com/luci/luci-go/common/cancelcond" |
| 15 "github.com/luci/luci-go/common/clock" | 15 "github.com/luci/luci-go/common/clock" |
| 16 "github.com/luci/luci-go/common/config" |
| 16 "github.com/luci/luci-go/common/logdog/types" | 17 "github.com/luci/luci-go/common/logdog/types" |
| 17 "github.com/luci/luci-go/common/proto/google" | 18 "github.com/luci/luci-go/common/proto/google" |
| 18 "github.com/luci/luci-go/common/proto/logdog/logpb" | 19 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 19 "golang.org/x/net/context" | 20 "golang.org/x/net/context" |
| 20 ) | 21 ) |
| 21 | 22 |
| 22 // Config is the Bundler configuration. | 23 // Config is the Bundler configuration. |
| 23 type Config struct { | 24 type Config struct { |
| 24 // Clock is the clock instance that will be used for Bundler and stream | 25 // Clock is the clock instance that will be used for Bundler and stream |
| 25 // timing. | 26 // timing. |
| 26 Clock clock.Clock | 27 Clock clock.Clock |
| 27 | 28 |
| 28 // Source is the bundle source string to use. This can be empty if there
is no | 29 // Source is the bundle source string to use. This can be empty if there
is no |
| 29 // source information to include. | 30 // source information to include. |
| 30 Source string | 31 Source string |
| 31 | 32 |
| 33 // Project is the project to use. |
| 34 Project config.ProjectName |
| 35 // Prefix is the common prefix for this set of streams. |
| 36 Prefix types.StreamName |
| 37 // Secret is the prefix secret for this set of streams. |
| 38 Secret []byte |
| 39 |
| 32 // MaxBufferedBytes is the maximum number of bytes to buffer in memory p
er | 40 // MaxBufferedBytes is the maximum number of bytes to buffer in memory p
er |
| 33 // stream. | 41 // stream. |
| 34 MaxBufferedBytes int64 | 42 MaxBufferedBytes int64 |
| 35 | 43 |
| 36 // MaxBundleSize is the maximum bundle size in bytes that may be generat
ed. | 44 // MaxBundleSize is the maximum bundle size in bytes that may be generat
ed. |
| 37 // | 45 // |
| 38 // If this value is zero, no size constraint will be applied to generate
d | 46 // If this value is zero, no size constraint will be applied to generate
d |
| 39 // bundles. | 47 // bundles. |
| 40 MaxBundleSize int | 48 MaxBundleSize int |
| 41 | 49 |
| (...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 90 } | 98 } |
| 91 | 99 |
| 92 // Register adds a new stream to the Bundler, returning a reference to the | 100 // Register adds a new stream to the Bundler, returning a reference to the |
| 93 // registered stream. | 101 // registered stream. |
| 94 func (b *Bundler) Register(p streamproto.Properties) (Stream, error) { | 102 func (b *Bundler) Register(p streamproto.Properties) (Stream, error) { |
| 95 // Our Properties must validate. | 103 // Our Properties must validate. |
| 96 if err := p.Validate(); err != nil { | 104 if err := p.Validate(); err != nil { |
| 97 return nil, err | 105 return nil, err |
| 98 } | 106 } |
| 99 | 107 |
| 108 // Enforce that the log stream descriptor's Prefix is empty. |
| 109 p.Prefix = "" |
| 110 |
| 100 // Construct a parser for this stream. | 111 // Construct a parser for this stream. |
| 101 c := streamConfig{ | 112 c := streamConfig{ |
| 102 name: p.Name, | 113 name: p.Name, |
| 103 template: logpb.ButlerLogBundle_Entry{ | 114 template: logpb.ButlerLogBundle_Entry{ |
| 104 Desc: &p.LogStreamDescriptor, | 115 Desc: &p.LogStreamDescriptor, |
| 105 }, | 116 }, |
| 106 maximumBufferDuration: b.c.MaxBufferDelay, | 117 maximumBufferDuration: b.c.MaxBufferDelay, |
| 107 maximumBufferedBytes: b.c.MaxBufferedBytes, | 118 maximumBufferedBytes: b.c.MaxBufferedBytes, |
| 108 onAppend: func(appended bool) { | 119 onAppend: func(appended bool) { |
| 109 if appended { | 120 if appended { |
| 110 b.signalStreamUpdate() | 121 b.signalStreamUpdate() |
| 111 } | 122 } |
| 112 }, | 123 }, |
| 113 } | 124 } |
| 114 | 125 |
| 115 err := error(nil) | 126 err := error(nil) |
| 116 c.parser, err = newParser(&p, &b.prefixCounter) | 127 c.parser, err = newParser(&p, &b.prefixCounter) |
| 117 if err != nil { | 128 if err != nil { |
| 118 return nil, fmt.Errorf("failed to create stream parser: %s", err
) | 129 return nil, fmt.Errorf("failed to create stream parser: %s", err
) |
| 119 } | 130 } |
| 120 | 131 |
| 121 // Generate a secret for this Stream instance. | |
| 122 c.template.Secret, err = types.NewPrefixSecret() | |
| 123 if err != nil { | |
| 124 return nil, fmt.Errorf("failed to generate stream secret: %s", e
rr) | |
| 125 } | |
| 126 | |
| 127 b.streamsLock.Lock() | 132 b.streamsLock.Lock() |
| 128 defer b.streamsLock.Unlock() | 133 defer b.streamsLock.Unlock() |
| 129 | 134 |
| 130 // Ensure that this is not a duplicate stream name. | 135 // Ensure that this is not a duplicate stream name. |
| 131 if s := b.streams[p.Name]; s != nil { | 136 if s := b.streams[p.Name]; s != nil { |
| 132 return nil, fmt.Errorf("a Stream is already registered for %q",
p.Name) | 137 return nil, fmt.Errorf("a Stream is already registered for %q",
p.Name) |
| 133 } | 138 } |
| 134 | 139 |
| 135 // Create a new stream. This will kick off its processing goroutine, whi
ch | 140 // Create a new stream. This will kick off its processing goroutine, whi
ch |
| 136 // will not stop until it is closed. | 141 // will not stop until it is closed. |
| (...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 183 b.bundleC <- bb.bundle() | 188 b.bundleC <- bb.bundle() |
| 184 } | 189 } |
| 185 }() | 190 }() |
| 186 | 191 |
| 187 for { | 192 for { |
| 188 bb = &builder{ | 193 bb = &builder{ |
| 189 size: b.c.MaxBundleSize, | 194 size: b.c.MaxBundleSize, |
| 190 template: logpb.ButlerLogBundle{ | 195 template: logpb.ButlerLogBundle{ |
| 191 Source: b.c.Source, | 196 Source: b.c.Source, |
| 192 Timestamp: google.NewTimestamp(b.getClock().Now(
)), | 197 Timestamp: google.NewTimestamp(b.getClock().Now(
)), |
| 198 Project: string(b.c.Project), |
| 199 Prefix: string(b.c.Prefix), |
| 200 Secret: b.c.Secret, |
| 193 }, | 201 }, |
| 194 } | 202 } |
| 195 var oldestContentTime time.Time | 203 var oldestContentTime time.Time |
| 196 | 204 |
| 197 for { | 205 for { |
| 198 state := b.getStreamStateLocked() | 206 state := b.getStreamStateLocked() |
| 199 | 207 |
| 200 // Attempt to create more bundles. | 208 // Attempt to create more bundles. |
| 201 sendNow := b.bundleRoundLocked(bb, state) | 209 sendNow := b.bundleRoundLocked(bb, state) |
| 202 | 210 |
| (...skipping 287 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 490 | 498 |
| 491 func (s *streamState) Push(x interface{}) { | 499 func (s *streamState) Push(x interface{}) { |
| 492 s.streams = append(s.streams, x.(bundlerStream)) | 500 s.streams = append(s.streams, x.(bundlerStream)) |
| 493 } | 501 } |
| 494 | 502 |
| 495 func (s *streamState) Pop() interface{} { | 503 func (s *streamState) Pop() interface{} { |
| 496 last := s.streams[len(s.streams)-1] | 504 last := s.streams[len(s.streams)-1] |
| 497 s.streams = s.streams[:len(s.streams)-1] | 505 s.streams = s.streams[:len(s.streams)-1] |
| 498 return last | 506 return last |
| 499 } | 507 } |
| OLD | NEW |