Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(611)

Side by Side Diff: go/metrics2/metrics.go

Issue 1722163002: Metrics2: Store a registry of Counters (Closed) Base URL: https://skia.googlesource.com/buildbot@master
Patch Set: Address comments Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « go/metrics2/docs.go ('k') | go/trace/service/impl.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 package metrics2 1 package metrics2
2 2
3 /* 3 /*
4 Convenience utilities for working with InfluxDB. 4 Convenience utilities for working with InfluxDB.
5 */ 5 */
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "os" 9 "os"
10 "sync" 10 "sync"
11 "time" 11 "time"
12 12
13 "github.com/skia-dev/glog" 13 "github.com/skia-dev/glog"
14 "go.skia.org/infra/go/influxdb" 14 "go.skia.org/infra/go/influxdb"
15 "go.skia.org/infra/go/util" 15 "go.skia.org/infra/go/util"
16 ) 16 )
17 17
18 const ( 18 const (
19 DEFAULT_REPORT_FREQUENCY = time.Minute 19 DEFAULT_REPORT_FREQUENCY = time.Minute
20 PUSH_FREQUENCY = time.Minute 20 PUSH_FREQUENCY = time.Minute
21 ) 21 )
22 22
23 var ( 23 var (
24 DefaultClient *Client = &Client{ 24 DefaultClient *Client = &Client{
25 aggMetrics: map[string]*aggregateMetric{}, 25 aggMetrics: map[string]*aggregateMetric{},
26 counters: map[string]*Counter{},
26 metrics: map[string]*rawMetric{}, 27 metrics: map[string]*rawMetric{},
27 } 28 }
28 ) 29 )
29 30
30 // Init() initializes the metrics package. 31 // Init() initializes the metrics package.
31 func Init(appName string, influxClient *influxdb.Client) error { 32 func Init(appName string, influxClient *influxdb.Client) error {
32 hostName, err := os.Hostname() 33 hostName, err := os.Hostname()
33 if err != nil { 34 if err != nil {
34 return fmt.Errorf("Unable to retrieve hostname: %s", err) 35 return fmt.Errorf("Unable to retrieve hostname: %s", err)
35 } 36 }
36 tags := map[string]string{ 37 tags := map[string]string{
37 "app": appName, 38 "app": appName,
38 "host": hostName, 39 "host": hostName,
39 } 40 }
40 c, err := NewClient(influxClient, tags, DEFAULT_REPORT_FREQUENCY) 41 c, err := NewClient(influxClient, tags, DEFAULT_REPORT_FREQUENCY)
41 if err != nil { 42 if err != nil {
42 return err 43 return err
43 } 44 }
44 // Some metrics may already be registered with DefaultClient. Copy them 45 // Some metrics may already be registered with DefaultClient. Copy them
45 // over. 46 // over.
46 c.aggMetrics = DefaultClient.aggMetrics 47 c.aggMetrics = DefaultClient.aggMetrics
48 c.counters = DefaultClient.counters
47 c.metrics = DefaultClient.metrics 49 c.metrics = DefaultClient.metrics
48 50
49 // Set the default client. 51 // Set the default client.
50 DefaultClient = c 52 DefaultClient = c
51 return nil 53 return nil
52 } 54 }
53 55
54 // Client is a struct used for communicating with an InfluxDB instance. 56 // Client is a struct used for communicating with an InfluxDB instance.
55 type Client struct { 57 type Client struct {
56 » aggMetrics map[string]*aggregateMetric 58 » aggMetrics map[string]*aggregateMetric
57 » aggMetricsMtx sync.Mutex 59 » aggMetricsMtx sync.Mutex
58 » influxClient *influxdb.Client 60
59 » defaultTags map[string]string 61 » counters map[string]*Counter
60 » metrics map[string]*rawMetric 62 » countersMtx sync.Mutex
61 » metricsMtx sync.Mutex 63
64 » influxClient *influxdb.Client
65 » defaultTags map[string]string
66
67 » metrics map[string]*rawMetric
68 » metricsMtx sync.Mutex
69
62 reportFrequency time.Duration 70 reportFrequency time.Duration
63 values *influxdb.BatchPoints 71 values *influxdb.BatchPoints
64 valuesMtx sync.Mutex 72 valuesMtx sync.Mutex
65 } 73 }
66 74
67 // NewClient returns a Client which uses the given influxdb.Client to push data. 75 // NewClient returns a Client which uses the given influxdb.Client to push data.
68 // defaultTags specifies a set of default tag keys and values which are applied 76 // defaultTags specifies a set of default tag keys and values which are applied
69 // to all data points. reportFrequency specifies how often metrics should create 77 // to all data points. reportFrequency specifies how often metrics should create
70 // data points. 78 // data points.
71 func NewClient(influxClient *influxdb.Client, defaultTags map[string]string, rep ortFrequency time.Duration) (*Client, error) { 79 func NewClient(influxClient *influxdb.Client, defaultTags map[string]string, rep ortFrequency time.Duration) (*Client, error) {
72 values, err := influxClient.NewBatchPoints() 80 values, err := influxClient.NewBatchPoints()
73 if err != nil { 81 if err != nil {
74 return nil, err 82 return nil, err
75 } 83 }
76 c := &Client{ 84 c := &Client{
77 aggMetrics: map[string]*aggregateMetric{}, 85 aggMetrics: map[string]*aggregateMetric{},
78 » » aggMetricsMtx: sync.Mutex{}, 86 » » counters: map[string]*Counter{},
79 influxClient: influxClient, 87 influxClient: influxClient,
80 defaultTags: defaultTags, 88 defaultTags: defaultTags,
81 metrics: map[string]*rawMetric{}, 89 metrics: map[string]*rawMetric{},
82 metricsMtx: sync.Mutex{},
83 reportFrequency: reportFrequency, 90 reportFrequency: reportFrequency,
84 values: values, 91 values: values,
85 valuesMtx: sync.Mutex{},
86 } 92 }
87 go func() { 93 go func() {
88 for _ = range time.Tick(PUSH_FREQUENCY) { 94 for _ = range time.Tick(PUSH_FREQUENCY) {
89 byMeasurement, err := c.pushData() 95 byMeasurement, err := c.pushData()
90 if err != nil { 96 if err != nil {
91 glog.Errorf("Failed to push data into InfluxDB: %s", err) 97 glog.Errorf("Failed to push data into InfluxDB: %s", err)
92 } else { 98 } else {
93 total := int64(0) 99 total := int64(0)
94 for k, v := range byMeasurement { 100 for k, v := range byMeasurement {
95 c.GetInt64Metric("metrics.points-pushed. by-measurement", map[string]string{"measurement": k}).Update(v) 101 c.GetInt64Metric("metrics.points-pushed. by-measurement", map[string]string{"measurement": k}).Update(v)
(...skipping 126 matching lines...) Expand 10 before | Expand all | Expand 10 after
222 tags := util.AddParams(map[string]string{}, tagsList...) 228 tags := util.AddParams(map[string]string{}, tagsList...)
223 md5, err := util.MD5Params(tags) 229 md5, err := util.MD5Params(tags)
224 if err != nil { 230 if err != nil {
225 glog.Errorf("Failed to encode measurement tags: %s", err) 231 glog.Errorf("Failed to encode measurement tags: %s", err)
226 } 232 }
227 key := fmt.Sprintf("%s_%s", measurement, md5) 233 key := fmt.Sprintf("%s_%s", measurement, md5)
228 m, ok := c.metrics[key] 234 m, ok := c.metrics[key]
229 if !ok { 235 if !ok {
230 m = &rawMetric{ 236 m = &rawMetric{
231 measurement: measurement, 237 measurement: measurement,
232 mtx: sync.RWMutex{},
233 tags: tags, 238 tags: tags,
234 value: initial, 239 value: initial,
235 } 240 }
236 c.metrics[key] = m 241 c.metrics[key] = m
237 } 242 }
238 return m 243 return m
239 } 244 }
240 245
241 // getAggregateMetric creates or retrieves an aggregateMetric with the given 246 // getAggregateMetric creates or retrieves an aggregateMetric with the given
242 // measurement name and tag set and returns it. 247 // measurement name and tag set and returns it.
243 func (c *Client) getAggregateMetric(measurement string, tagsList []map[string]st ring, aggFn func([]interface{}) interface{}) *aggregateMetric { 248 func (c *Client) getAggregateMetric(measurement string, tagsList []map[string]st ring, aggFn func([]interface{}) interface{}) *aggregateMetric {
244 c.aggMetricsMtx.Lock() 249 c.aggMetricsMtx.Lock()
245 defer c.aggMetricsMtx.Unlock() 250 defer c.aggMetricsMtx.Unlock()
246 251
247 // Make a copy of the concatenation of all provided tags. 252 // Make a copy of the concatenation of all provided tags.
248 tags := util.AddParams(map[string]string{}, tagsList...) 253 tags := util.AddParams(map[string]string{}, tagsList...)
249 md5, err := util.MD5Params(tags) 254 md5, err := util.MD5Params(tags)
250 if err != nil { 255 if err != nil {
251 glog.Errorf("Failed to encode measurement tags: %s", err) 256 glog.Errorf("Failed to encode measurement tags: %s", err)
252 } 257 }
253 key := fmt.Sprintf("%s_%s", measurement, md5) 258 key := fmt.Sprintf("%s_%s", measurement, md5)
254 m, ok := c.aggMetrics[key] 259 m, ok := c.aggMetrics[key]
255 if !ok { 260 if !ok {
256 m = &aggregateMetric{ 261 m = &aggregateMetric{
257 aggFn: aggFn, 262 aggFn: aggFn,
258 measurement: measurement, 263 measurement: measurement,
259 mtx: sync.RWMutex{},
260 tags: tags, 264 tags: tags,
261 values: []interface{}{}, 265 values: []interface{}{},
262 } 266 }
263 c.aggMetrics[key] = m 267 c.aggMetrics[key] = m
264 } 268 }
265 return m 269 return m
266 } 270 }
OLDNEW
« no previous file with comments | « go/metrics2/docs.go ('k') | go/trace/service/impl.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698