| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package collector | 5 package collector |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "time" | 9 "time" |
| 10 | 10 |
| 11 "github.com/golang/protobuf/proto" | 11 "github.com/golang/protobuf/proto" |
| 12 "github.com/luci/luci-go/common/clock" | 12 "github.com/luci/luci-go/common/clock" |
| 13 "github.com/luci/luci-go/common/config" | 13 "github.com/luci/luci-go/common/config" |
| 14 "github.com/luci/luci-go/common/errors" | 14 "github.com/luci/luci-go/common/errors" |
| 15 "github.com/luci/luci-go/common/logdog/butlerproto" | 15 "github.com/luci/luci-go/common/logdog/butlerproto" |
| 16 "github.com/luci/luci-go/common/logdog/types" | 16 "github.com/luci/luci-go/common/logdog/types" |
| 17 log "github.com/luci/luci-go/common/logging" | 17 log "github.com/luci/luci-go/common/logging" |
| 18 "github.com/luci/luci-go/common/parallel" | 18 "github.com/luci/luci-go/common/parallel" |
| 19 "github.com/luci/luci-go/common/proto/logdog/logpb" | 19 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 20 "github.com/luci/luci-go/common/tsmon/distribution" | 20 "github.com/luci/luci-go/common/tsmon/distribution" |
| 21 "github.com/luci/luci-go/common/tsmon/field" | 21 "github.com/luci/luci-go/common/tsmon/field" |
| 22 "github.com/luci/luci-go/common/tsmon/metric" | 22 "github.com/luci/luci-go/common/tsmon/metric" |
| 23 tsmon_types "github.com/luci/luci-go/common/tsmon/types" |
| 23 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" | 24 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" |
| 24 "github.com/luci/luci-go/server/logdog/storage" | 25 "github.com/luci/luci-go/server/logdog/storage" |
| 25 "golang.org/x/net/context" | 26 "golang.org/x/net/context" |
| 26 ) | 27 ) |
| 27 | 28 |
| 28 const ( | 29 const ( |
| 29 // DefaultMaxMessageWorkers is the default number of concurrent worker | 30 // DefaultMaxMessageWorkers is the default number of concurrent worker |
| 30 // goroutones to employ for a single message. | 31 // goroutones to employ for a single message. |
| 31 DefaultMaxMessageWorkers = 4 | 32 DefaultMaxMessageWorkers = 4 |
| 32 ) | 33 ) |
| 33 | 34 |
| 34 var ( | 35 var ( |
| 35 // tsBundles tracks the total number of logpb.ButlerLogBundle entries th
at | 36 // tsBundles tracks the total number of logpb.ButlerLogBundle entries th
at |
| 36 // have been submitted for collection. | 37 // have been submitted for collection. |
| 37 tsBundles = metric.NewCounter("logdog/collector/bundles", | 38 tsBundles = metric.NewCounter("logdog/collector/bundles", |
| 38 » » "The number of individual log entry bundles that have been inges
ted.") | 39 » » "The number of individual log entry bundles that have been inges
ted.", |
| 40 » » tsmon_types.MetricMetadata{}) |
| 39 // tsLogs tracks the number of logpb.LogEntry entries that have been | 41 // tsLogs tracks the number of logpb.LogEntry entries that have been |
| 40 // written to intermediate storage. | 42 // written to intermediate storage. |
| 41 tsLogs = metric.NewCounter("logdog/collector/logs", | 43 tsLogs = metric.NewCounter("logdog/collector/logs", |
| 42 » » "The number of individual log entries that have been ingested.") | 44 » » "The number of individual log entries that have been ingested.", |
| 45 » » tsmon_types.MetricMetadata{}) |
| 43 | 46 |
| 44 // tsBundleSize tracks the size, in bytes, of a given log bundle. | 47 // tsBundleSize tracks the size, in bytes, of a given log bundle. |
| 45 tsBundleSize = metric.NewCumulativeDistribution("logdog/collector/bundle
/size", | 48 tsBundleSize = metric.NewCumulativeDistribution("logdog/collector/bundle
/size", |
| 46 "The size (in bytes) of the bundle.", | 49 "The size (in bytes) of the bundle.", |
| 50 tsmon_types.MetricMetadata{Units: tsmon_types.Bytes}, |
| 47 distribution.DefaultBucketer) | 51 distribution.DefaultBucketer) |
| 48 // tsBundleEntriesPerBundle tracks the number of ButlerLogBundle.Entry e
ntries | 52 // tsBundleEntriesPerBundle tracks the number of ButlerLogBundle.Entry e
ntries |
| 49 // in each bundle that have been collected. | 53 // in each bundle that have been collected. |
| 50 tsBundleEntriesPerBundle = metric.NewCumulativeDistribution("logdog/coll
ector/bundle/entries_per_bundle", | 54 tsBundleEntriesPerBundle = metric.NewCumulativeDistribution("logdog/coll
ector/bundle/entries_per_bundle", |
| 51 "The number of log bundle entries per bundle.", | 55 "The number of log bundle entries per bundle.", |
| 56 tsmon_types.MetricMetadata{}, |
| 52 distribution.DefaultBucketer) | 57 distribution.DefaultBucketer) |
| 53 | 58 |
| 54 // tsBundleEntries tracks the total number of ButlerLogBundle.Entry entr
ies | 59 // tsBundleEntries tracks the total number of ButlerLogBundle.Entry entr
ies |
| 55 // that have been collected. | 60 // that have been collected. |
| 56 // | 61 // |
| 57 // The "stream" field is the type of log stream for each tracked bundle
entry. | 62 // The "stream" field is the type of log stream for each tracked bundle
entry. |
| 58 tsBundleEntries = metric.NewCounter("logdog/collector/bundle/entries", | 63 tsBundleEntries = metric.NewCounter("logdog/collector/bundle/entries", |
| 59 "The number of Butler bundle entries pulled.", | 64 "The number of Butler bundle entries pulled.", |
| 65 tsmon_types.MetricMetadata{}, |
| 60 field.String("stream")) | 66 field.String("stream")) |
| 61 // tsBundleEntryLogs tracks the number of LogEntry ingested per bundle. | 67 // tsBundleEntryLogs tracks the number of LogEntry ingested per bundle. |
| 62 // | 68 // |
| 63 // The "stream" field is the type of log stream. | 69 // The "stream" field is the type of log stream. |
| 64 tsBundleEntryLogs = metric.NewCumulativeDistribution("logdog/collector/b
undle/entry/logs", | 70 tsBundleEntryLogs = metric.NewCumulativeDistribution("logdog/collector/b
undle/entry/logs", |
| 65 "The number of log entries per bundle.", | 71 "The number of log entries per bundle.", |
| 72 tsmon_types.MetricMetadata{}, |
| 66 distribution.DefaultBucketer, | 73 distribution.DefaultBucketer, |
| 67 field.String("stream")) | 74 field.String("stream")) |
| 68 tsBundleEntryProcessingTime = metric.NewCumulativeDistribution("logdog/c
ollector/bundle/entry/processing_time_ms", | 75 tsBundleEntryProcessingTime = metric.NewCumulativeDistribution("logdog/c
ollector/bundle/entry/processing_time_ms", |
| 69 "The amount of time in milliseconds that a bundle entry takes to
process.", | 76 "The amount of time in milliseconds that a bundle entry takes to
process.", |
| 77 tsmon_types.MetricMetadata{Units: tsmon_types.Milliseconds}, |
| 70 distribution.DefaultBucketer, | 78 distribution.DefaultBucketer, |
| 71 field.String("stream")) | 79 field.String("stream")) |
| 72 ) | 80 ) |
| 73 | 81 |
| 74 // Collector is a stateful object responsible for ingesting LogDog logs, | 82 // Collector is a stateful object responsible for ingesting LogDog logs, |
| 75 // registering them with a Coordinator, and stowing them in short-term storage | 83 // registering them with a Coordinator, and stowing them in short-term storage |
| 76 // for streaming and processing. | 84 // for streaming and processing. |
| 77 // | 85 // |
| 78 // A Collector's Close should be called when finished to release any internal | 86 // A Collector's Close should be called when finished to release any internal |
| 79 // resources. | 87 // resources. |
| (...skipping 358 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 438 } | 446 } |
| 439 }) | 447 }) |
| 440 } | 448 } |
| 441 | 449 |
| 442 func streamType(desc *logpb.LogStreamDescriptor) string { | 450 func streamType(desc *logpb.LogStreamDescriptor) string { |
| 443 if desc == nil { | 451 if desc == nil { |
| 444 return "UNKNOWN" | 452 return "UNKNOWN" |
| 445 } | 453 } |
| 446 return desc.StreamType.String() | 454 return desc.StreamType.String() |
| 447 } | 455 } |
| OLD | NEW |