Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 package metrics2 | 1 package metrics2 |
| 2 | 2 |
| 3 /* | 3 /* |
| 4 Convenience utilities for working with InfluxDB. | 4 Convenience utilities for working with InfluxDB. |
| 5 */ | 5 */ |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "os" | 9 "os" |
| 10 "sync" | 10 "sync" |
| 11 "time" | 11 "time" |
| 12 | 12 |
| 13 "github.com/skia-dev/glog" | 13 "github.com/skia-dev/glog" |
| 14 "go.skia.org/infra/go/influxdb" | 14 "go.skia.org/infra/go/influxdb" |
| 15 "go.skia.org/infra/go/util" | 15 "go.skia.org/infra/go/util" |
| 16 ) | 16 ) |
| 17 | 17 |
| 18 const ( | 18 const ( |
| 19 DEFAULT_REPORT_FREQUENCY = time.Minute | 19 DEFAULT_REPORT_FREQUENCY = time.Minute |
| 20 PUSH_FREQUENCY = time.Minute | 20 PUSH_FREQUENCY = time.Minute |
| 21 ) | 21 ) |
| 22 | 22 |
| 23 var ( | 23 var ( |
| 24 DefaultClient *Client = &Client{ | 24 DefaultClient *Client = &Client{ |
| 25 aggMetrics: map[string]*aggregateMetric{}, | 25 aggMetrics: map[string]*aggregateMetric{}, |
| 26 counters: map[string]*Counter{}, | |
| 26 metrics: map[string]*rawMetric{}, | 27 metrics: map[string]*rawMetric{}, |
| 27 } | 28 } |
| 28 ) | 29 ) |
| 29 | 30 |
| 30 // Init() initializes the metrics package. | 31 // Init() initializes the metrics package. |
| 31 func Init(appName string, influxClient *influxdb.Client) error { | 32 func Init(appName string, influxClient *influxdb.Client) error { |
| 32 hostName, err := os.Hostname() | 33 hostName, err := os.Hostname() |
| 33 if err != nil { | 34 if err != nil { |
| 34 return fmt.Errorf("Unable to retrieve hostname: %s", err) | 35 return fmt.Errorf("Unable to retrieve hostname: %s", err) |
| 35 } | 36 } |
| 36 tags := map[string]string{ | 37 tags := map[string]string{ |
| 37 "app": appName, | 38 "app": appName, |
| 38 "host": hostName, | 39 "host": hostName, |
| 39 } | 40 } |
| 40 c, err := NewClient(influxClient, tags, DEFAULT_REPORT_FREQUENCY) | 41 c, err := NewClient(influxClient, tags, DEFAULT_REPORT_FREQUENCY) |
| 41 if err != nil { | 42 if err != nil { |
| 42 return err | 43 return err |
| 43 } | 44 } |
| 44 // Some metrics may already be registered with DefaultClient. Copy them | 45 // Some metrics may already be registered with DefaultClient. Copy them |
| 45 // over. | 46 // over. |
| 46 c.aggMetrics = DefaultClient.aggMetrics | 47 c.aggMetrics = DefaultClient.aggMetrics |
| 48 c.counters = DefaultClient.counters | |
| 47 c.metrics = DefaultClient.metrics | 49 c.metrics = DefaultClient.metrics |
| 48 | 50 |
| 49 // Set the default client. | 51 // Set the default client. |
| 50 DefaultClient = c | 52 DefaultClient = c |
| 51 return nil | 53 return nil |
| 52 } | 54 } |
| 53 | 55 |
| 54 // Client is a struct used for communicating with an InfluxDB instance. | 56 // Client is a struct used for communicating with an InfluxDB instance. |
| 55 type Client struct { | 57 type Client struct { |
| 56 » aggMetrics map[string]*aggregateMetric | 58 » aggMetrics map[string]*aggregateMetric |
| 57 » aggMetricsMtx sync.Mutex | 59 » aggMetricsMtx sync.Mutex |
| 58 » influxClient *influxdb.Client | 60 |
| 59 » defaultTags map[string]string | 61 » counters map[string]*Counter |
| 60 » metrics map[string]*rawMetric | 62 » countersMtx sync.Mutex |
| 61 » metricsMtx sync.Mutex | 63 |
| 64 » influxClient *influxdb.Client | |
| 65 » defaultTags map[string]string | |
| 66 | |
| 67 » metrics map[string]*rawMetric | |
| 68 » metricsMtx sync.Mutex | |
| 69 | |
| 62 reportFrequency time.Duration | 70 reportFrequency time.Duration |
| 63 values *influxdb.BatchPoints | 71 values *influxdb.BatchPoints |
| 64 valuesMtx sync.Mutex | 72 valuesMtx sync.Mutex |
| 65 } | 73 } |
| 66 | 74 |
| 67 // NewClient returns a Client which uses the given influxdb.Client to push data. | 75 // NewClient returns a Client which uses the given influxdb.Client to push data. |
| 68 // defaultTags specifies a set of default tag keys and values which are applied | 76 // defaultTags specifies a set of default tag keys and values which are applied |
| 69 // to all data points. reportFrequency specifies how often metrics should create | 77 // to all data points. reportFrequency specifies how often metrics should create |
| 70 // data points. | 78 // data points. |
| 71 func NewClient(influxClient *influxdb.Client, defaultTags map[string]string, rep ortFrequency time.Duration) (*Client, error) { | 79 func NewClient(influxClient *influxdb.Client, defaultTags map[string]string, rep ortFrequency time.Duration) (*Client, error) { |
| 72 values, err := influxClient.NewBatchPoints() | 80 values, err := influxClient.NewBatchPoints() |
| 73 if err != nil { | 81 if err != nil { |
| 74 return nil, err | 82 return nil, err |
| 75 } | 83 } |
| 76 c := &Client{ | 84 c := &Client{ |
| 77 aggMetrics: map[string]*aggregateMetric{}, | 85 aggMetrics: map[string]*aggregateMetric{}, |
| 78 aggMetricsMtx: sync.Mutex{}, | 86 aggMetricsMtx: sync.Mutex{}, |
| 87 counters: map[string]*Counter{}, | |
| 88 countersMtx: sync.Mutex{}, | |
| 79 influxClient: influxClient, | 89 influxClient: influxClient, |
| 80 defaultTags: defaultTags, | 90 defaultTags: defaultTags, |
| 81 metrics: map[string]*rawMetric{}, | 91 metrics: map[string]*rawMetric{}, |
| 82 metricsMtx: sync.Mutex{}, | 92 metricsMtx: sync.Mutex{}, |
| 83 reportFrequency: reportFrequency, | 93 reportFrequency: reportFrequency, |
| 84 values: values, | 94 values: values, |
| 85 valuesMtx: sync.Mutex{}, | 95 valuesMtx: sync.Mutex{}, |
| 86 } | 96 } |
| 87 go func() { | 97 go func() { |
| 88 for _ = range time.Tick(PUSH_FREQUENCY) { | 98 for _ = range time.Tick(PUSH_FREQUENCY) { |
| (...skipping 168 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 257 aggFn: aggFn, | 267 aggFn: aggFn, |
| 258 measurement: measurement, | 268 measurement: measurement, |
| 259 mtx: sync.RWMutex{}, | 269 mtx: sync.RWMutex{}, |
| 260 tags: tags, | 270 tags: tags, |
| 261 values: []interface{}{}, | 271 values: []interface{}{}, |
| 262 } | 272 } |
| 263 c.aggMetrics[key] = m | 273 c.aggMetrics[key] = m |
| 264 } | 274 } |
| 265 return m | 275 return m |
| 266 } | 276 } |
| 277 | |
| 278 // getCounter creates or retrieves a Counter with the given name and tag set and | |
|
stephana
2016/02/24 14:48:16
This should be in counter.go file, all the other G
borenet
2016/02/24 15:09:35
Done.
| |
| 279 // returns it. | |
| 280 func (c *Client) GetCounter(name string, tagsList ...map[string]string) *Counter { | |
| 281 c.countersMtx.Lock() | |
| 282 defer c.countersMtx.Unlock() | |
| 283 | |
| 284 // Make a copy of the concatenation of all provided tags. | |
| 285 tags := util.AddParams(map[string]string{}, tagsList...) | |
| 286 tags["name"] = name | |
| 287 md5, err := util.MD5Params(tags) | |
| 288 if err != nil { | |
| 289 glog.Errorf("Failed to encode measurement tags: %s", err) | |
| 290 } | |
| 291 key := fmt.Sprintf("%s_%s", MEASUREMENT_COUNTER, md5) | |
| 292 m, ok := c.counters[key] | |
| 293 if !ok { | |
| 294 m = &Counter{ | |
| 295 m: c.GetInt64Metric(MEASUREMENT_COUNTER, tags), | |
| 296 mtx: sync.Mutex{}, | |
|
stephana
2016/02/24 14:48:16
nit: Mutex doesn't need to be initialized, it's un
borenet
2016/02/24 15:09:35
Done, here and elsewhere.
| |
| 297 } | |
| 298 c.counters[key] = m | |
| 299 } | |
| 300 return m | |
| 301 | |
| 302 } | |
| OLD | NEW |