| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package butler | 5 package butler |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | 8 "errors" |
| 9 "fmt" | 9 "fmt" |
| 10 "io" | 10 "io" |
| 11 "sort" | 11 "sort" |
| 12 "sync" | 12 "sync" |
| 13 "time" | 13 "time" |
| 14 | 14 |
| 15 "github.com/luci/luci-go/common/clock" | 15 "github.com/luci/luci-go/common/clock" |
| 16 "github.com/luci/luci-go/common/config" | 16 "github.com/luci/luci-go/common/config" |
| 17 "github.com/luci/luci-go/common/data/stringset" | 17 "github.com/luci/luci-go/common/data/stringset" |
| 18 "github.com/luci/luci-go/common/iotools" | 18 "github.com/luci/luci-go/common/iotools" |
| 19 log "github.com/luci/luci-go/common/logging" | 19 log "github.com/luci/luci-go/common/logging" |
| 20 "github.com/luci/luci-go/common/proto/google" | 20 "github.com/luci/luci-go/common/proto/google" |
| 21 "github.com/luci/luci-go/common/runtime/paniccatcher" | 21 "github.com/luci/luci-go/common/runtime/paniccatcher" |
| 22 "github.com/luci/luci-go/common/sync/parallel" | 22 "github.com/luci/luci-go/common/sync/parallel" |
| 23 "github.com/luci/luci-go/logdog/client/butler/bundler" | 23 "github.com/luci/luci-go/logdog/client/butler/bundler" |
| 24 "github.com/luci/luci-go/logdog/client/butler/output" | 24 "github.com/luci/luci-go/logdog/client/butler/output" |
| 25 "github.com/luci/luci-go/logdog/client/butler/streamserver" | 25 "github.com/luci/luci-go/logdog/client/butler/streamserver" |
| 26 "github.com/luci/luci-go/logdog/client/butlerlib/streamproto" | 26 "github.com/luci/luci-go/logdog/client/butlerlib/streamproto" |
| 27 "github.com/luci/luci-go/logdog/common/types" | 27 "github.com/luci/luci-go/logdog/common/types" |
| 28 |
| 28 "golang.org/x/net/context" | 29 "golang.org/x/net/context" |
| 29 ) | 30 ) |
| 30 | 31 |
| 31 const ( | 32 const ( |
| 32 // DefaultMaxBufferAge is the default amount of time that a log entry ma
y | 33 // DefaultMaxBufferAge is the default amount of time that a log entry ma
y |
| 33 // be buffered before being dispatched. | 34 // be buffered before being dispatched. |
| 34 DefaultMaxBufferAge = time.Duration(5 * time.Second) | 35 DefaultMaxBufferAge = time.Duration(5 * time.Second) |
| 35 | 36 |
| 36 // DefaultOutputWorkers is the default number of output workers to use. | 37 // DefaultOutputWorkers is the default number of output workers to use. |
| 37 DefaultOutputWorkers = 16 | 38 DefaultOutputWorkers = 16 |
| (...skipping 11 matching lines...) Expand all Loading... |
| 49 Output output.Output | 50 Output output.Output |
| 50 // OutputWorkers is the number of simultaneous goroutines that will be u
sed | 51 // OutputWorkers is the number of simultaneous goroutines that will be u
sed |
| 51 // to output Butler log data. If zero, DefaultOutputWorkers will be used
. | 52 // to output Butler log data. If zero, DefaultOutputWorkers will be used
. |
| 52 OutputWorkers int | 53 OutputWorkers int |
| 53 | 54 |
| 54 // Project is the project that the log stream will be bound to. | 55 // Project is the project that the log stream will be bound to. |
| 55 Project config.ProjectName | 56 Project config.ProjectName |
| 56 // Prefix is the log stream common prefix value. | 57 // Prefix is the log stream common prefix value. |
| 57 Prefix types.StreamName | 58 Prefix types.StreamName |
| 58 | 59 |
| 60 // GlobalTags are a set of global log stream tags to apply to individual |
| 61 // streams on registration. Individual stream tags will override tags wi
th |
| 62 // the same key. |
| 63 GlobalTags streamproto.TagMap |
| 64 |
| 59 // BufferLogs, if true, instructs the butler to buffer collected log dat
a | 65 // BufferLogs, if true, instructs the butler to buffer collected log dat
a |
| 60 // before sending it to Output. | 66 // before sending it to Output. |
| 61 BufferLogs bool | 67 BufferLogs bool |
| 62 // If buffering logs, this is the maximum amount of time that a log will | 68 // If buffering logs, this is the maximum amount of time that a log will |
| 63 // be buffered before being marked for dispatch. If this is zero, | 69 // be buffered before being marked for dispatch. If this is zero, |
| 64 // DefaultMaxBufferAge will be used. | 70 // DefaultMaxBufferAge will be used. |
| 65 MaxBufferAge time.Duration | 71 MaxBufferAge time.Duration |
| 66 | 72 |
| 67 // TeeStdout, if not nil, is the Writer that will be used for streams | 73 // TeeStdout, if not nil, is the Writer that will be used for streams |
| 68 // requesting STDOUT tee. | 74 // requesting STDOUT tee. |
| (...skipping 322 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 391 // is responsible for closing it. | 397 // is responsible for closing it. |
| 392 func (b *Butler) AddStream(rc io.ReadCloser, p *streamproto.Properties) error { | 398 func (b *Butler) AddStream(rc io.ReadCloser, p *streamproto.Properties) error { |
| 393 p = p.Clone() | 399 p = p.Clone() |
| 394 if p.Timestamp == nil || p.Timestamp.Time().IsZero() { | 400 if p.Timestamp == nil || p.Timestamp.Time().IsZero() { |
| 395 p.Timestamp = google.NewTimestamp(clock.Now(b.ctx)) | 401 p.Timestamp = google.NewTimestamp(clock.Now(b.ctx)) |
| 396 } | 402 } |
| 397 if err := p.Validate(); err != nil { | 403 if err := p.Validate(); err != nil { |
| 398 return err | 404 return err |
| 399 } | 405 } |
| 400 | 406 |
| 407 // Build per-stream tag map. |
| 408 if l := len(b.c.GlobalTags); l > 0 { |
| 409 for k, v := range b.c.GlobalTags { |
| 410 // Add only global flags that aren't already present (ov
erridden) in |
| 411 // stream tags. |
| 412 if _, ok := p.Tags[k]; !ok { |
| 413 p.Tags[k] = v |
| 414 } |
| 415 } |
| 416 } |
| 417 |
| 401 if p.Timeout > 0 { | 418 if p.Timeout > 0 { |
| 402 if rts, ok := rc.(iotools.ReadTimeoutSetter); ok { | 419 if rts, ok := rc.(iotools.ReadTimeoutSetter); ok { |
| 403 if err := rts.SetReadTimeout(p.Timeout); err != nil { | 420 if err := rts.SetReadTimeout(p.Timeout); err != nil { |
| 404 log.Fields{ | 421 log.Fields{ |
| 405 log.ErrorKey: err, | 422 log.ErrorKey: err, |
| 406 "timeout": p.Timeout, | 423 "timeout": p.Timeout, |
| 407 }.Warningf(b.ctx, "Failed to set stream timeout.
") | 424 }.Warningf(b.ctx, "Failed to set stream timeout.
") |
| 408 } | 425 } |
| 409 } else { | 426 } else { |
| 410 log.Fields{ | 427 log.Fields{ |
| (...skipping 230 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 641 // shutdown prematurely, so this should be reasonably quick. | 658 // shutdown prematurely, so this should be reasonably quick. |
| 642 b.Activate() | 659 b.Activate() |
| 643 } | 660 } |
| 644 | 661 |
| 645 // Returns the configured Butler error. | 662 // Returns the configured Butler error. |
| 646 func (b *Butler) getRunErr() error { | 663 func (b *Butler) getRunErr() error { |
| 647 b.shutdownMu.Lock() | 664 b.shutdownMu.Lock() |
| 648 defer b.shutdownMu.Unlock() | 665 defer b.shutdownMu.Unlock() |
| 649 return b.runErr | 666 return b.runErr |
| 650 } | 667 } |
| OLD | NEW |