 Chromium Code Reviews
 Chromium Code Reviews Issue 1722163002:
  Metrics2: Store a registry of Counters  (Closed) 
  Base URL: https://skia.googlesource.com/buildbot@master
    
  
    Issue 1722163002:
  Metrics2: Store a registry of Counters  (Closed) 
  Base URL: https://skia.googlesource.com/buildbot@master| OLD | NEW | 
|---|---|
| 1 package metrics2 | 1 package metrics2 | 
| 2 | 2 | 
| 3 /* | 3 /* | 
| 4 Convenience utilities for working with InfluxDB. | 4 Convenience utilities for working with InfluxDB. | 
| 5 */ | 5 */ | 
| 6 | 6 | 
| 7 import ( | 7 import ( | 
| 8 "fmt" | 8 "fmt" | 
| 9 "os" | 9 "os" | 
| 10 "sync" | 10 "sync" | 
| 11 "time" | 11 "time" | 
| 12 | 12 | 
| 13 "github.com/skia-dev/glog" | 13 "github.com/skia-dev/glog" | 
| 14 "go.skia.org/infra/go/influxdb" | 14 "go.skia.org/infra/go/influxdb" | 
| 15 "go.skia.org/infra/go/util" | 15 "go.skia.org/infra/go/util" | 
| 16 ) | 16 ) | 
| 17 | 17 | 
| 18 const ( | 18 const ( | 
| 19 DEFAULT_REPORT_FREQUENCY = time.Minute | 19 DEFAULT_REPORT_FREQUENCY = time.Minute | 
| 20 PUSH_FREQUENCY = time.Minute | 20 PUSH_FREQUENCY = time.Minute | 
| 21 ) | 21 ) | 
| 22 | 22 | 
| 23 var ( | 23 var ( | 
| 24 DefaultClient *Client = &Client{ | 24 DefaultClient *Client = &Client{ | 
| 25 aggMetrics: map[string]*aggregateMetric{}, | 25 aggMetrics: map[string]*aggregateMetric{}, | 
| 26 counters: map[string]*Counter{}, | |
| 26 metrics: map[string]*rawMetric{}, | 27 metrics: map[string]*rawMetric{}, | 
| 27 } | 28 } | 
| 28 ) | 29 ) | 
| 29 | 30 | 
| 30 // Init() initializes the metrics package. | 31 // Init() initializes the metrics package. | 
| 31 func Init(appName string, influxClient *influxdb.Client) error { | 32 func Init(appName string, influxClient *influxdb.Client) error { | 
| 32 hostName, err := os.Hostname() | 33 hostName, err := os.Hostname() | 
| 33 if err != nil { | 34 if err != nil { | 
| 34 return fmt.Errorf("Unable to retrieve hostname: %s", err) | 35 return fmt.Errorf("Unable to retrieve hostname: %s", err) | 
| 35 } | 36 } | 
| 36 tags := map[string]string{ | 37 tags := map[string]string{ | 
| 37 "app": appName, | 38 "app": appName, | 
| 38 "host": hostName, | 39 "host": hostName, | 
| 39 } | 40 } | 
| 40 c, err := NewClient(influxClient, tags, DEFAULT_REPORT_FREQUENCY) | 41 c, err := NewClient(influxClient, tags, DEFAULT_REPORT_FREQUENCY) | 
| 41 if err != nil { | 42 if err != nil { | 
| 42 return err | 43 return err | 
| 43 } | 44 } | 
| 44 // Some metrics may already be registered with DefaultClient. Copy them | 45 // Some metrics may already be registered with DefaultClient. Copy them | 
| 45 // over. | 46 // over. | 
| 46 c.aggMetrics = DefaultClient.aggMetrics | 47 c.aggMetrics = DefaultClient.aggMetrics | 
| 48 c.counters = DefaultClient.counters | |
| 47 c.metrics = DefaultClient.metrics | 49 c.metrics = DefaultClient.metrics | 
| 48 | 50 | 
| 49 // Set the default client. | 51 // Set the default client. | 
| 50 DefaultClient = c | 52 DefaultClient = c | 
| 51 return nil | 53 return nil | 
| 52 } | 54 } | 
| 53 | 55 | 
| 54 // Client is a struct used for communicating with an InfluxDB instance. | 56 // Client is a struct used for communicating with an InfluxDB instance. | 
| 55 type Client struct { | 57 type Client struct { | 
| 56 » aggMetrics map[string]*aggregateMetric | 58 » aggMetrics map[string]*aggregateMetric | 
| 57 » aggMetricsMtx sync.Mutex | 59 » aggMetricsMtx sync.Mutex | 
| 58 » influxClient *influxdb.Client | 60 | 
| 59 » defaultTags map[string]string | 61 » counters map[string]*Counter | 
| 60 » metrics map[string]*rawMetric | 62 » countersMtx sync.Mutex | 
| 61 » metricsMtx sync.Mutex | 63 | 
| 64 » influxClient *influxdb.Client | |
| 65 » defaultTags map[string]string | |
| 66 | |
| 67 » metrics map[string]*rawMetric | |
| 68 » metricsMtx sync.Mutex | |
| 69 | |
| 62 reportFrequency time.Duration | 70 reportFrequency time.Duration | 
| 63 values *influxdb.BatchPoints | 71 values *influxdb.BatchPoints | 
| 64 valuesMtx sync.Mutex | 72 valuesMtx sync.Mutex | 
| 65 } | 73 } | 
| 66 | 74 | 
| 67 // NewClient returns a Client which uses the given influxdb.Client to push data. | 75 // NewClient returns a Client which uses the given influxdb.Client to push data. | 
| 68 // defaultTags specifies a set of default tag keys and values which are applied | 76 // defaultTags specifies a set of default tag keys and values which are applied | 
| 69 // to all data points. reportFrequency specifies how often metrics should create | 77 // to all data points. reportFrequency specifies how often metrics should create | 
| 70 // data points. | 78 // data points. | 
| 71 func NewClient(influxClient *influxdb.Client, defaultTags map[string]string, rep ortFrequency time.Duration) (*Client, error) { | 79 func NewClient(influxClient *influxdb.Client, defaultTags map[string]string, rep ortFrequency time.Duration) (*Client, error) { | 
| 72 values, err := influxClient.NewBatchPoints() | 80 values, err := influxClient.NewBatchPoints() | 
| 73 if err != nil { | 81 if err != nil { | 
| 74 return nil, err | 82 return nil, err | 
| 75 } | 83 } | 
| 76 c := &Client{ | 84 c := &Client{ | 
| 77 aggMetrics: map[string]*aggregateMetric{}, | 85 aggMetrics: map[string]*aggregateMetric{}, | 
| 78 aggMetricsMtx: sync.Mutex{}, | 86 aggMetricsMtx: sync.Mutex{}, | 
| 87 counters: map[string]*Counter{}, | |
| 88 countersMtx: sync.Mutex{}, | |
| 79 influxClient: influxClient, | 89 influxClient: influxClient, | 
| 80 defaultTags: defaultTags, | 90 defaultTags: defaultTags, | 
| 81 metrics: map[string]*rawMetric{}, | 91 metrics: map[string]*rawMetric{}, | 
| 82 metricsMtx: sync.Mutex{}, | 92 metricsMtx: sync.Mutex{}, | 
| 83 reportFrequency: reportFrequency, | 93 reportFrequency: reportFrequency, | 
| 84 values: values, | 94 values: values, | 
| 85 valuesMtx: sync.Mutex{}, | 95 valuesMtx: sync.Mutex{}, | 
| 86 } | 96 } | 
| 87 go func() { | 97 go func() { | 
| 88 for _ = range time.Tick(PUSH_FREQUENCY) { | 98 for _ = range time.Tick(PUSH_FREQUENCY) { | 
| (...skipping 168 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 257 aggFn: aggFn, | 267 aggFn: aggFn, | 
| 258 measurement: measurement, | 268 measurement: measurement, | 
| 259 mtx: sync.RWMutex{}, | 269 mtx: sync.RWMutex{}, | 
| 260 tags: tags, | 270 tags: tags, | 
| 261 values: []interface{}{}, | 271 values: []interface{}{}, | 
| 262 } | 272 } | 
| 263 c.aggMetrics[key] = m | 273 c.aggMetrics[key] = m | 
| 264 } | 274 } | 
| 265 return m | 275 return m | 
| 266 } | 276 } | 
| 277 | |
| 278 // getCounter creates or retrieves a Counter with the given name and tag set and | |
| 
stephana
2016/02/24 14:48:16
This should be in counter.go file, all the other G
 
borenet
2016/02/24 15:09:35
Done.
 | |
| 279 // returns it. | |
| 280 func (c *Client) GetCounter(name string, tagsList ...map[string]string) *Counter { | |
| 281 c.countersMtx.Lock() | |
| 282 defer c.countersMtx.Unlock() | |
| 283 | |
| 284 // Make a copy of the concatenation of all provided tags. | |
| 285 tags := util.AddParams(map[string]string{}, tagsList...) | |
| 286 tags["name"] = name | |
| 287 md5, err := util.MD5Params(tags) | |
| 288 if err != nil { | |
| 289 glog.Errorf("Failed to encode measurement tags: %s", err) | |
| 290 } | |
| 291 key := fmt.Sprintf("%s_%s", MEASUREMENT_COUNTER, md5) | |
| 292 m, ok := c.counters[key] | |
| 293 if !ok { | |
| 294 m = &Counter{ | |
| 295 m: c.GetInt64Metric(MEASUREMENT_COUNTER, tags), | |
| 296 mtx: sync.Mutex{}, | |
| 
stephana
2016/02/24 14:48:16
nit: Mutex doesn't need to be initialized, it's un
 
borenet
2016/02/24 15:09:35
Done, here and elsewhere.
 | |
| 297 } | |
| 298 c.counters[key] = m | |
| 299 } | |
| 300 return m | |
| 301 | |
| 302 } | |
| OLD | NEW |