OLD | NEW |
1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package collector | 5 package collector |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "time" | 9 "time" |
10 | 10 |
11 "github.com/golang/protobuf/proto" | 11 "github.com/golang/protobuf/proto" |
12 "github.com/luci/luci-go/common/clock" | 12 "github.com/luci/luci-go/common/clock" |
13 "github.com/luci/luci-go/common/config" | 13 "github.com/luci/luci-go/common/config" |
14 "github.com/luci/luci-go/common/errors" | 14 "github.com/luci/luci-go/common/errors" |
15 "github.com/luci/luci-go/common/logdog/butlerproto" | 15 "github.com/luci/luci-go/common/logdog/butlerproto" |
16 "github.com/luci/luci-go/common/logdog/types" | 16 "github.com/luci/luci-go/common/logdog/types" |
17 log "github.com/luci/luci-go/common/logging" | 17 log "github.com/luci/luci-go/common/logging" |
18 "github.com/luci/luci-go/common/parallel" | 18 "github.com/luci/luci-go/common/parallel" |
19 "github.com/luci/luci-go/common/proto/logdog/logpb" | 19 "github.com/luci/luci-go/common/proto/logdog/logpb" |
20 "github.com/luci/luci-go/common/tsmon/distribution" | 20 "github.com/luci/luci-go/common/tsmon/distribution" |
21 "github.com/luci/luci-go/common/tsmon/field" | 21 "github.com/luci/luci-go/common/tsmon/field" |
22 "github.com/luci/luci-go/common/tsmon/metric" | 22 "github.com/luci/luci-go/common/tsmon/metric" |
| 23 tsmon_types "github.com/luci/luci-go/common/tsmon/types" |
23 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" | 24 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" |
24 "github.com/luci/luci-go/server/logdog/storage" | 25 "github.com/luci/luci-go/server/logdog/storage" |
25 "golang.org/x/net/context" | 26 "golang.org/x/net/context" |
26 ) | 27 ) |
27 | 28 |
28 const ( | 29 const ( |
29 // DefaultMaxMessageWorkers is the default number of concurrent worker | 30 // DefaultMaxMessageWorkers is the default number of concurrent worker |
30 // goroutones to employ for a single message. | 31 // goroutones to employ for a single message. |
31 DefaultMaxMessageWorkers = 4 | 32 DefaultMaxMessageWorkers = 4 |
32 ) | 33 ) |
33 | 34 |
34 var ( | 35 var ( |
35 // tsBundles tracks the total number of logpb.ButlerLogBundle entries th
at | 36 // tsBundles tracks the total number of logpb.ButlerLogBundle entries th
at |
36 // have been submitted for collection. | 37 // have been submitted for collection. |
37 tsBundles = metric.NewCounter("logdog/collector/bundles", | 38 tsBundles = metric.NewCounter("logdog/collector/bundles", |
38 » » "The number of individual log entry bundles that have been inges
ted.") | 39 » » "The number of individual log entry bundles that have been inges
ted.", |
| 40 » » tsmon_types.MetricMetadata{}) |
39 // tsLogs tracks the number of logpb.LogEntry entries that have been | 41 // tsLogs tracks the number of logpb.LogEntry entries that have been |
40 // written to intermediate storage. | 42 // written to intermediate storage. |
41 tsLogs = metric.NewCounter("logdog/collector/logs", | 43 tsLogs = metric.NewCounter("logdog/collector/logs", |
42 » » "The number of individual log entries that have been ingested.") | 44 » » "The number of individual log entries that have been ingested.", |
| 45 » » tsmon_types.MetricMetadata{}) |
43 | 46 |
44 // tsBundleSize tracks the size, in bytes, of a given log bundle. | 47 // tsBundleSize tracks the size, in bytes, of a given log bundle. |
45 tsBundleSize = metric.NewCumulativeDistribution("logdog/collector/bundle
/size", | 48 tsBundleSize = metric.NewCumulativeDistribution("logdog/collector/bundle
/size", |
46 "The size (in bytes) of the bundle.", | 49 "The size (in bytes) of the bundle.", |
| 50 tsmon_types.MetricMetadata{Units: tsmon_types.Bytes}, |
47 distribution.DefaultBucketer) | 51 distribution.DefaultBucketer) |
48 // tsBundleEntriesPerBundle tracks the number of ButlerLogBundle.Entry e
ntries | 52 // tsBundleEntriesPerBundle tracks the number of ButlerLogBundle.Entry e
ntries |
49 // in each bundle that have been collected. | 53 // in each bundle that have been collected. |
50 tsBundleEntriesPerBundle = metric.NewCumulativeDistribution("logdog/coll
ector/bundle/entries_per_bundle", | 54 tsBundleEntriesPerBundle = metric.NewCumulativeDistribution("logdog/coll
ector/bundle/entries_per_bundle", |
51 "The number of log bundle entries per bundle.", | 55 "The number of log bundle entries per bundle.", |
| 56 tsmon_types.MetricMetadata{}, |
52 distribution.DefaultBucketer) | 57 distribution.DefaultBucketer) |
53 | 58 |
54 // tsBundleEntries tracks the total number of ButlerLogBundle.Entry entr
ies | 59 // tsBundleEntries tracks the total number of ButlerLogBundle.Entry entr
ies |
55 // that have been collected. | 60 // that have been collected. |
56 // | 61 // |
57 // The "stream" field is the type of log stream for each tracked bundle
entry. | 62 // The "stream" field is the type of log stream for each tracked bundle
entry. |
58 tsBundleEntries = metric.NewCounter("logdog/collector/bundle/entries", | 63 tsBundleEntries = metric.NewCounter("logdog/collector/bundle/entries", |
59 "The number of Butler bundle entries pulled.", | 64 "The number of Butler bundle entries pulled.", |
| 65 tsmon_types.MetricMetadata{}, |
60 field.String("stream")) | 66 field.String("stream")) |
61 // tsBundleEntryLogs tracks the number of LogEntry ingested per bundle. | 67 // tsBundleEntryLogs tracks the number of LogEntry ingested per bundle. |
62 // | 68 // |
63 // The "stream" field is the type of log stream. | 69 // The "stream" field is the type of log stream. |
64 tsBundleEntryLogs = metric.NewCumulativeDistribution("logdog/collector/b
undle/entry/logs", | 70 tsBundleEntryLogs = metric.NewCumulativeDistribution("logdog/collector/b
undle/entry/logs", |
65 "The number of log entries per bundle.", | 71 "The number of log entries per bundle.", |
| 72 tsmon_types.MetricMetadata{}, |
66 distribution.DefaultBucketer, | 73 distribution.DefaultBucketer, |
67 field.String("stream")) | 74 field.String("stream")) |
68 tsBundleEntryProcessingTime = metric.NewCumulativeDistribution("logdog/c
ollector/bundle/entry/processing_time_ms", | 75 tsBundleEntryProcessingTime = metric.NewCumulativeDistribution("logdog/c
ollector/bundle/entry/processing_time_ms", |
69 "The amount of time in milliseconds that a bundle entry takes to
process.", | 76 "The amount of time in milliseconds that a bundle entry takes to
process.", |
| 77 tsmon_types.MetricMetadata{Units: tsmon_types.Milliseconds}, |
70 distribution.DefaultBucketer, | 78 distribution.DefaultBucketer, |
71 field.String("stream")) | 79 field.String("stream")) |
72 ) | 80 ) |
73 | 81 |
74 // Collector is a stateful object responsible for ingesting LogDog logs, | 82 // Collector is a stateful object responsible for ingesting LogDog logs, |
75 // registering them with a Coordinator, and stowing them in short-term storage | 83 // registering them with a Coordinator, and stowing them in short-term storage |
76 // for streaming and processing. | 84 // for streaming and processing. |
77 // | 85 // |
78 // A Collector's Close should be called when finished to release any internal | 86 // A Collector's Close should be called when finished to release any internal |
79 // resources. | 87 // resources. |
(...skipping 358 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
438 } | 446 } |
439 }) | 447 }) |
440 } | 448 } |
441 | 449 |
442 func streamType(desc *logpb.LogStreamDescriptor) string { | 450 func streamType(desc *logpb.LogStreamDescriptor) string { |
443 if desc == nil { | 451 if desc == nil { |
444 return "UNKNOWN" | 452 return "UNKNOWN" |
445 } | 453 } |
446 return desc.StreamType.String() | 454 return desc.StreamType.String() |
447 } | 455 } |
OLD | NEW |