Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package butler | 5 package butler |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | 8 "errors" |
| 9 "fmt" | 9 "fmt" |
| 10 "io" | 10 "io" |
| 11 "sort" | 11 "sort" |
| 12 "sync" | 12 "sync" |
| 13 "time" | 13 "time" |
| 14 | 14 |
| 15 "github.com/luci/luci-go/common/clock" | 15 "github.com/luci/luci-go/common/clock" |
| 16 "github.com/luci/luci-go/common/config" | 16 "github.com/luci/luci-go/common/config" |
| 17 "github.com/luci/luci-go/common/data/stringset" | 17 "github.com/luci/luci-go/common/data/stringset" |
| 18 "github.com/luci/luci-go/common/iotools" | 18 "github.com/luci/luci-go/common/iotools" |
| 19 log "github.com/luci/luci-go/common/logging" | 19 log "github.com/luci/luci-go/common/logging" |
| 20 "github.com/luci/luci-go/common/proto/google" | 20 "github.com/luci/luci-go/common/proto/google" |
| 21 "github.com/luci/luci-go/common/runtime/paniccatcher" | 21 "github.com/luci/luci-go/common/runtime/paniccatcher" |
| 22 "github.com/luci/luci-go/common/sync/parallel" | 22 "github.com/luci/luci-go/common/sync/parallel" |
| 23 "github.com/luci/luci-go/logdog/client/butler/bundler" | 23 "github.com/luci/luci-go/logdog/client/butler/bundler" |
| 24 "github.com/luci/luci-go/logdog/client/butler/output" | 24 "github.com/luci/luci-go/logdog/client/butler/output" |
| 25 "github.com/luci/luci-go/logdog/client/butler/streamserver" | 25 "github.com/luci/luci-go/logdog/client/butler/streamserver" |
| 26 "github.com/luci/luci-go/logdog/client/butlerlib/streamproto" | 26 "github.com/luci/luci-go/logdog/client/butlerlib/streamproto" |
| 27 "github.com/luci/luci-go/logdog/common/types" | 27 "github.com/luci/luci-go/logdog/common/types" |
| 28 | |
| 28 "golang.org/x/net/context" | 29 "golang.org/x/net/context" |
| 29 ) | 30 ) |
| 30 | 31 |
| 31 const ( | 32 const ( |
| 32 // DefaultMaxBufferAge is the default amount of time that a log entry ma y | 33 // DefaultMaxBufferAge is the default amount of time that a log entry ma y |
| 33 // be buffered before being dispatched. | 34 // be buffered before being dispatched. |
| 34 DefaultMaxBufferAge = time.Duration(5 * time.Second) | 35 DefaultMaxBufferAge = time.Duration(5 * time.Second) |
| 35 | 36 |
| 36 // DefaultOutputWorkers is the default number of output workers to use. | 37 // DefaultOutputWorkers is the default number of output workers to use. |
| 37 DefaultOutputWorkers = 16 | 38 DefaultOutputWorkers = 16 |
| (...skipping 11 matching lines...) Expand all Loading... | |
| 49 Output output.Output | 50 Output output.Output |
| 50 // OutputWorkers is the number of simultaneous goroutines that will be u sed | 51 // OutputWorkers is the number of simultaneous goroutines that will be u sed |
| 51 // to output Butler log data. If zero, DefaultOutputWorkers will be used . | 52 // to output Butler log data. If zero, DefaultOutputWorkers will be used . |
| 52 OutputWorkers int | 53 OutputWorkers int |
| 53 | 54 |
| 54 // Project is the project that the log stream will be bound to. | 55 // Project is the project that the log stream will be bound to. |
| 55 Project config.ProjectName | 56 Project config.ProjectName |
| 56 // Prefix is the log stream common prefix value. | 57 // Prefix is the log stream common prefix value. |
| 57 Prefix types.StreamName | 58 Prefix types.StreamName |
| 58 | 59 |
| 60 // GlobalTags are a set of global log stream tags to apply to individual | |
| 61 // streams on registration. Individual stream tags will override tags wi th | |
| 62 // the same key. | |
| 63 GlobalTags streamproto.TagMap | |
| 64 | |
| 59 // BufferLogs, if true, instructs the butler to buffer collected log dat a | 65 // BufferLogs, if true, instructs the butler to buffer collected log dat a |
| 60 // before sending it to Output. | 66 // before sending it to Output. |
| 61 BufferLogs bool | 67 BufferLogs bool |
| 62 // If buffering logs, this is the maximum amount of time that a log will | 68 // If buffering logs, this is the maximum amount of time that a log will |
| 63 // be buffered before being marked for dispatch. If this is zero, | 69 // be buffered before being marked for dispatch. If this is zero, |
| 64 // DefaultMaxBufferAge will be used. | 70 // DefaultMaxBufferAge will be used. |
| 65 MaxBufferAge time.Duration | 71 MaxBufferAge time.Duration |
| 66 | 72 |
| 67 // TeeStdout, if not nil, is the Writer that will be used for streams | 73 // TeeStdout, if not nil, is the Writer that will be used for streams |
| 68 // requesting STDOUT tee. | 74 // requesting STDOUT tee. |
| (...skipping 322 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 391 // is responsible for closing it. | 397 // is responsible for closing it. |
| 392 func (b *Butler) AddStream(rc io.ReadCloser, p *streamproto.Properties) error { | 398 func (b *Butler) AddStream(rc io.ReadCloser, p *streamproto.Properties) error { |
| 393 p = p.Clone() | 399 p = p.Clone() |
| 394 if p.Timestamp == nil || p.Timestamp.Time().IsZero() { | 400 if p.Timestamp == nil || p.Timestamp.Time().IsZero() { |
| 395 p.Timestamp = google.NewTimestamp(clock.Now(b.ctx)) | 401 p.Timestamp = google.NewTimestamp(clock.Now(b.ctx)) |
| 396 } | 402 } |
| 397 if err := p.Validate(); err != nil { | 403 if err := p.Validate(); err != nil { |
| 398 return err | 404 return err |
| 399 } | 405 } |
| 400 | 406 |
| 407 // Build per-stream tag map. | |
| 408 if l := len(b.c.GlobalTags); l > 0 { | |
| 409 streamTags := make(map[string]string, l+len(p.Tags)) | |
|
seanmccullough1
2016/12/22 01:46:10
why not just assign to/mutate p.Tags?
dnj
2016/12/22 01:49:38
Hmm good point, will do that.
| |
| 410 for k, v := range b.c.GlobalTags { | |
| 411 streamTags[k] = v | |
| 412 } | |
| 413 for k, v := range p.Tags { | |
| 414 streamTags[k] = v | |
| 415 } | |
| 416 p.Tags = streamTags | |
| 417 } | |
| 418 | |
| 401 if p.Timeout > 0 { | 419 if p.Timeout > 0 { |
| 402 if rts, ok := rc.(iotools.ReadTimeoutSetter); ok { | 420 if rts, ok := rc.(iotools.ReadTimeoutSetter); ok { |
| 403 if err := rts.SetReadTimeout(p.Timeout); err != nil { | 421 if err := rts.SetReadTimeout(p.Timeout); err != nil { |
| 404 log.Fields{ | 422 log.Fields{ |
| 405 log.ErrorKey: err, | 423 log.ErrorKey: err, |
| 406 "timeout": p.Timeout, | 424 "timeout": p.Timeout, |
| 407 }.Warningf(b.ctx, "Failed to set stream timeout. ") | 425 }.Warningf(b.ctx, "Failed to set stream timeout. ") |
| 408 } | 426 } |
| 409 } else { | 427 } else { |
| 410 log.Fields{ | 428 log.Fields{ |
| (...skipping 230 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 641 // shutdown prematurely, so this should be reasonably quick. | 659 // shutdown prematurely, so this should be reasonably quick. |
| 642 b.Activate() | 660 b.Activate() |
| 643 } | 661 } |
| 644 | 662 |
| 645 // Returns the configured Butler error. | 663 // Returns the configured Butler error. |
| 646 func (b *Butler) getRunErr() error { | 664 func (b *Butler) getRunErr() error { |
| 647 b.shutdownMu.Lock() | 665 b.shutdownMu.Lock() |
| 648 defer b.shutdownMu.Unlock() | 666 defer b.shutdownMu.Unlock() |
| 649 return b.runErr | 667 return b.runErr |
| 650 } | 668 } |
| OLD | NEW |