| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package bundler | 5 package bundler |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "container/heap" | 8 "container/heap" |
| 9 "fmt" | 9 "fmt" |
| 10 "sync" | 10 "sync" |
| (...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 86 streams: map[string]bundlerStream{}, | 86 streams: map[string]bundlerStream{}, |
| 87 } | 87 } |
| 88 b.streamsCond = cancelcond.New(&b.streamsLock) | 88 b.streamsCond = cancelcond.New(&b.streamsLock) |
| 89 | 89 |
| 90 go b.makeBundles() | 90 go b.makeBundles() |
| 91 return &b | 91 return &b |
| 92 } | 92 } |
| 93 | 93 |
| 94 // Register adds a new stream to the Bundler, returning a reference to the | 94 // Register adds a new stream to the Bundler, returning a reference to the |
| 95 // registered stream. | 95 // registered stream. |
| 96 func (b *Bundler) Register(p streamproto.Properties) (Stream, error) { | 96 // |
| 97 // The Bundler takes ownership of the supplied Properties, and may modify them |
| 98 // as needed. |
| 99 func (b *Bundler) Register(p *streamproto.Properties) (Stream, error) { |
| 97 // Our Properties must validate. | 100 // Our Properties must validate. |
| 98 if err := p.Validate(); err != nil { | 101 if err := p.Validate(); err != nil { |
| 99 return nil, err | 102 return nil, err |
| 100 } | 103 } |
| 101 | 104 |
| 102 // Enforce that the log stream descriptor's Prefix is empty. | 105 // Enforce that the log stream descriptor's Prefix is empty. |
| 103 p.Prefix = "" | 106 p.Prefix = "" |
| 104 | 107 |
| 105 // Construct a parser for this stream. | 108 // Construct a parser for this stream. |
| 106 c := streamConfig{ | 109 c := streamConfig{ |
| 107 name: p.Name, | 110 name: p.Name, |
| 108 template: logpb.ButlerLogBundle_Entry{ | 111 template: logpb.ButlerLogBundle_Entry{ |
| 109 » » » Desc: &p.LogStreamDescriptor, | 112 » » » Desc: p.LogStreamDescriptor, |
| 110 }, | 113 }, |
| 111 maximumBufferDuration: b.c.MaxBufferDelay, | 114 maximumBufferDuration: b.c.MaxBufferDelay, |
| 112 maximumBufferedBytes: b.c.MaxBufferedBytes, | 115 maximumBufferedBytes: b.c.MaxBufferedBytes, |
| 113 onAppend: func(appended bool) { | 116 onAppend: func(appended bool) { |
| 114 if appended { | 117 if appended { |
| 115 b.signalStreamUpdate() | 118 b.signalStreamUpdate() |
| 116 } | 119 } |
| 117 }, | 120 }, |
| 118 } | 121 } |
| 119 | 122 |
| 120 err := error(nil) | 123 err := error(nil) |
| 121 » c.parser, err = newParser(&p, &b.prefixCounter) | 124 » c.parser, err = newParser(p, &b.prefixCounter) |
| 122 if err != nil { | 125 if err != nil { |
| 123 return nil, fmt.Errorf("failed to create stream parser: %s", err
) | 126 return nil, fmt.Errorf("failed to create stream parser: %s", err
) |
| 124 } | 127 } |
| 125 | 128 |
| 126 b.streamsLock.Lock() | 129 b.streamsLock.Lock() |
| 127 defer b.streamsLock.Unlock() | 130 defer b.streamsLock.Unlock() |
| 128 | 131 |
| 129 // Ensure that this is not a duplicate stream name. | 132 // Ensure that this is not a duplicate stream name. |
| 130 if s := b.streams[p.Name]; s != nil { | 133 if s := b.streams[p.Name]; s != nil { |
| 131 return nil, fmt.Errorf("a Stream is already registered for %q",
p.Name) | 134 return nil, fmt.Errorf("a Stream is already registered for %q",
p.Name) |
| (...skipping 358 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 490 | 493 |
| 491 func (s *streamState) Push(x interface{}) { | 494 func (s *streamState) Push(x interface{}) { |
| 492 s.streams = append(s.streams, x.(bundlerStream)) | 495 s.streams = append(s.streams, x.(bundlerStream)) |
| 493 } | 496 } |
| 494 | 497 |
| 495 func (s *streamState) Pop() interface{} { | 498 func (s *streamState) Pop() interface{} { |
| 496 last := s.streams[len(s.streams)-1] | 499 last := s.streams[len(s.streams)-1] |
| 497 s.streams = s.streams[:len(s.streams)-1] | 500 s.streams = s.streams[:len(s.streams)-1] |
| 498 return last | 501 return last |
| 499 } | 502 } |
| OLD | NEW |