| OLD | NEW |
| 1 package metrics2 | 1 package metrics2 |
| 2 | 2 |
| 3 /* | 3 /* |
| 4 Convenience utilities for working with InfluxDB. | 4 Convenience utilities for working with InfluxDB. |
| 5 */ | 5 */ |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "os" | 9 "os" |
| 10 "sync" | 10 "sync" |
| 11 "time" | 11 "time" |
| 12 | 12 |
| 13 "github.com/skia-dev/glog" | 13 "github.com/skia-dev/glog" |
| 14 "go.skia.org/infra/go/influxdb" | 14 "go.skia.org/infra/go/influxdb" |
| 15 "go.skia.org/infra/go/util" | 15 "go.skia.org/infra/go/util" |
| 16 ) | 16 ) |
| 17 | 17 |
| 18 const ( | 18 const ( |
| 19 DEFAULT_REPORT_FREQUENCY = time.Minute | 19 DEFAULT_REPORT_FREQUENCY = time.Minute |
| 20 PUSH_FREQUENCY = time.Minute | 20 PUSH_FREQUENCY = time.Minute |
| 21 ) | 21 ) |
| 22 | 22 |
| 23 var ( | 23 var ( |
| 24 DefaultClient *Client = &Client{ | 24 DefaultClient *Client = &Client{ |
| 25 aggMetrics: map[string]*aggregateMetric{}, | 25 aggMetrics: map[string]*aggregateMetric{}, |
| 26 counters: map[string]*Counter{}, |
| 26 metrics: map[string]*rawMetric{}, | 27 metrics: map[string]*rawMetric{}, |
| 27 } | 28 } |
| 28 ) | 29 ) |
| 29 | 30 |
| 30 // Init() initializes the metrics package. | 31 // Init() initializes the metrics package. |
| 31 func Init(appName string, influxClient *influxdb.Client) error { | 32 func Init(appName string, influxClient *influxdb.Client) error { |
| 32 hostName, err := os.Hostname() | 33 hostName, err := os.Hostname() |
| 33 if err != nil { | 34 if err != nil { |
| 34 return fmt.Errorf("Unable to retrieve hostname: %s", err) | 35 return fmt.Errorf("Unable to retrieve hostname: %s", err) |
| 35 } | 36 } |
| 36 tags := map[string]string{ | 37 tags := map[string]string{ |
| 37 "app": appName, | 38 "app": appName, |
| 38 "host": hostName, | 39 "host": hostName, |
| 39 } | 40 } |
| 40 c, err := NewClient(influxClient, tags, DEFAULT_REPORT_FREQUENCY) | 41 c, err := NewClient(influxClient, tags, DEFAULT_REPORT_FREQUENCY) |
| 41 if err != nil { | 42 if err != nil { |
| 42 return err | 43 return err |
| 43 } | 44 } |
| 44 // Some metrics may already be registered with DefaultClient. Copy them | 45 // Some metrics may already be registered with DefaultClient. Copy them |
| 45 // over. | 46 // over. |
| 46 c.aggMetrics = DefaultClient.aggMetrics | 47 c.aggMetrics = DefaultClient.aggMetrics |
| 48 c.counters = DefaultClient.counters |
| 47 c.metrics = DefaultClient.metrics | 49 c.metrics = DefaultClient.metrics |
| 48 | 50 |
| 49 // Set the default client. | 51 // Set the default client. |
| 50 DefaultClient = c | 52 DefaultClient = c |
| 51 return nil | 53 return nil |
| 52 } | 54 } |
| 53 | 55 |
| 54 // Client is a struct used for communicating with an InfluxDB instance. | 56 // Client is a struct used for communicating with an InfluxDB instance. |
| 55 type Client struct { | 57 type Client struct { |
| 56 » aggMetrics map[string]*aggregateMetric | 58 » aggMetrics map[string]*aggregateMetric |
| 57 » aggMetricsMtx sync.Mutex | 59 » aggMetricsMtx sync.Mutex |
| 58 » influxClient *influxdb.Client | 60 |
| 59 » defaultTags map[string]string | 61 » counters map[string]*Counter |
| 60 » metrics map[string]*rawMetric | 62 » countersMtx sync.Mutex |
| 61 » metricsMtx sync.Mutex | 63 |
| 64 » influxClient *influxdb.Client |
| 65 » defaultTags map[string]string |
| 66 |
| 67 » metrics map[string]*rawMetric |
| 68 » metricsMtx sync.Mutex |
| 69 |
| 62 reportFrequency time.Duration | 70 reportFrequency time.Duration |
| 63 values *influxdb.BatchPoints | 71 values *influxdb.BatchPoints |
| 64 valuesMtx sync.Mutex | 72 valuesMtx sync.Mutex |
| 65 } | 73 } |
| 66 | 74 |
| 67 // NewClient returns a Client which uses the given influxdb.Client to push data. | 75 // NewClient returns a Client which uses the given influxdb.Client to push data. |
| 68 // defaultTags specifies a set of default tag keys and values which are applied | 76 // defaultTags specifies a set of default tag keys and values which are applied |
| 69 // to all data points. reportFrequency specifies how often metrics should create | 77 // to all data points. reportFrequency specifies how often metrics should create |
| 70 // data points. | 78 // data points. |
| 71 func NewClient(influxClient *influxdb.Client, defaultTags map[string]string, rep
ortFrequency time.Duration) (*Client, error) { | 79 func NewClient(influxClient *influxdb.Client, defaultTags map[string]string, rep
ortFrequency time.Duration) (*Client, error) { |
| 72 values, err := influxClient.NewBatchPoints() | 80 values, err := influxClient.NewBatchPoints() |
| 73 if err != nil { | 81 if err != nil { |
| 74 return nil, err | 82 return nil, err |
| 75 } | 83 } |
| 76 c := &Client{ | 84 c := &Client{ |
| 77 aggMetrics: map[string]*aggregateMetric{}, | 85 aggMetrics: map[string]*aggregateMetric{}, |
| 78 » » aggMetricsMtx: sync.Mutex{}, | 86 » » counters: map[string]*Counter{}, |
| 79 influxClient: influxClient, | 87 influxClient: influxClient, |
| 80 defaultTags: defaultTags, | 88 defaultTags: defaultTags, |
| 81 metrics: map[string]*rawMetric{}, | 89 metrics: map[string]*rawMetric{}, |
| 82 metricsMtx: sync.Mutex{}, | |
| 83 reportFrequency: reportFrequency, | 90 reportFrequency: reportFrequency, |
| 84 values: values, | 91 values: values, |
| 85 valuesMtx: sync.Mutex{}, | |
| 86 } | 92 } |
| 87 go func() { | 93 go func() { |
| 88 for _ = range time.Tick(PUSH_FREQUENCY) { | 94 for _ = range time.Tick(PUSH_FREQUENCY) { |
| 89 byMeasurement, err := c.pushData() | 95 byMeasurement, err := c.pushData() |
| 90 if err != nil { | 96 if err != nil { |
| 91 glog.Errorf("Failed to push data into InfluxDB:
%s", err) | 97 glog.Errorf("Failed to push data into InfluxDB:
%s", err) |
| 92 } else { | 98 } else { |
| 93 total := int64(0) | 99 total := int64(0) |
| 94 for k, v := range byMeasurement { | 100 for k, v := range byMeasurement { |
| 95 c.GetInt64Metric("metrics.points-pushed.
by-measurement", map[string]string{"measurement": k}).Update(v) | 101 c.GetInt64Metric("metrics.points-pushed.
by-measurement", map[string]string{"measurement": k}).Update(v) |
| (...skipping 126 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 222 tags := util.AddParams(map[string]string{}, tagsList...) | 228 tags := util.AddParams(map[string]string{}, tagsList...) |
| 223 md5, err := util.MD5Params(tags) | 229 md5, err := util.MD5Params(tags) |
| 224 if err != nil { | 230 if err != nil { |
| 225 glog.Errorf("Failed to encode measurement tags: %s", err) | 231 glog.Errorf("Failed to encode measurement tags: %s", err) |
| 226 } | 232 } |
| 227 key := fmt.Sprintf("%s_%s", measurement, md5) | 233 key := fmt.Sprintf("%s_%s", measurement, md5) |
| 228 m, ok := c.metrics[key] | 234 m, ok := c.metrics[key] |
| 229 if !ok { | 235 if !ok { |
| 230 m = &rawMetric{ | 236 m = &rawMetric{ |
| 231 measurement: measurement, | 237 measurement: measurement, |
| 232 mtx: sync.RWMutex{}, | |
| 233 tags: tags, | 238 tags: tags, |
| 234 value: initial, | 239 value: initial, |
| 235 } | 240 } |
| 236 c.metrics[key] = m | 241 c.metrics[key] = m |
| 237 } | 242 } |
| 238 return m | 243 return m |
| 239 } | 244 } |
| 240 | 245 |
| 241 // getAggregateMetric creates or retrieves an aggregateMetric with the given | 246 // getAggregateMetric creates or retrieves an aggregateMetric with the given |
| 242 // measurement name and tag set and returns it. | 247 // measurement name and tag set and returns it. |
| 243 func (c *Client) getAggregateMetric(measurement string, tagsList []map[string]st
ring, aggFn func([]interface{}) interface{}) *aggregateMetric { | 248 func (c *Client) getAggregateMetric(measurement string, tagsList []map[string]st
ring, aggFn func([]interface{}) interface{}) *aggregateMetric { |
| 244 c.aggMetricsMtx.Lock() | 249 c.aggMetricsMtx.Lock() |
| 245 defer c.aggMetricsMtx.Unlock() | 250 defer c.aggMetricsMtx.Unlock() |
| 246 | 251 |
| 247 // Make a copy of the concatenation of all provided tags. | 252 // Make a copy of the concatenation of all provided tags. |
| 248 tags := util.AddParams(map[string]string{}, tagsList...) | 253 tags := util.AddParams(map[string]string{}, tagsList...) |
| 249 md5, err := util.MD5Params(tags) | 254 md5, err := util.MD5Params(tags) |
| 250 if err != nil { | 255 if err != nil { |
| 251 glog.Errorf("Failed to encode measurement tags: %s", err) | 256 glog.Errorf("Failed to encode measurement tags: %s", err) |
| 252 } | 257 } |
| 253 key := fmt.Sprintf("%s_%s", measurement, md5) | 258 key := fmt.Sprintf("%s_%s", measurement, md5) |
| 254 m, ok := c.aggMetrics[key] | 259 m, ok := c.aggMetrics[key] |
| 255 if !ok { | 260 if !ok { |
| 256 m = &aggregateMetric{ | 261 m = &aggregateMetric{ |
| 257 aggFn: aggFn, | 262 aggFn: aggFn, |
| 258 measurement: measurement, | 263 measurement: measurement, |
| 259 mtx: sync.RWMutex{}, | |
| 260 tags: tags, | 264 tags: tags, |
| 261 values: []interface{}{}, | 265 values: []interface{}{}, |
| 262 } | 266 } |
| 263 c.aggMetrics[key] = m | 267 c.aggMetrics[key] = m |
| 264 } | 268 } |
| 265 return m | 269 return m |
| 266 } | 270 } |
| OLD | NEW |