Index: infra_libs/ts_mon/common/interface.py |
diff --git a/infra_libs/ts_mon/common/interface.py b/infra_libs/ts_mon/common/interface.py |
deleted file mode 100644 |
index df3f50286c7abadf96eb0f6d77d6b2a94555aea4..0000000000000000000000000000000000000000 |
--- a/infra_libs/ts_mon/common/interface.py |
+++ /dev/null |
@@ -1,196 +0,0 @@ |
-# Copyright 2015 The Chromium Authors. All rights reserved. |
-# Use of this source code is governed by a BSD-style license that can be |
-# found in the LICENSE file. |
- |
-"""Classes representing the monitoring interface for tasks or devices. |
- |
-Usage: |
- import argparse |
- from infra_libs import ts_mon |
- |
- p = argparse.ArgumentParser() |
- ts_mon.add_argparse_options(p) |
- args = p.parse_args() # Must contain info for Monitor (and optionally Target) |
- ts_mon.process_argparse_options(args) |
- |
- # Will use the default Target set up via command line args: |
- m = ts_mon.BooleanMetric('/my/metric/name', fields={'foo': 1, 'bar': 'baz'}) |
- m.set(True) |
- |
- # Use a custom Target: |
- t = ts_mon.TaskTarget('service', 'job', 'region', 'host') # or DeviceTarget |
- m2 = ts_mon.GaugeMetric('/my/metric/name2', fields={'asdf': 'qwer'}, target=t) |
- m2.set(5) |
- |
-Library usage: |
- from infra_libs.ts_mon import CounterMetric |
- # No need to set up Monitor or Target, assume calling code did that. |
- c = CounterMetric('/my/counter', fields={'source': 'mylibrary'}) |
- c.set(0) |
- for x in range(100): |
- c.increment() |
-""" |
- |
-import datetime |
-import logging |
-import random |
-import threading |
-import time |
- |
-from infra_libs.ts_mon.common import errors |
-from infra_libs.ts_mon.common import metric_store |
-from infra_libs.ts_mon.protos import metrics_pb2 |
- |
-# The maximum number of MetricsData messages to include in each HTTP request. |
-# MetricsCollections larger than this will be split into multiple requests. |
-METRICS_DATA_LENGTH_LIMIT = 1000 |
- |
- |
-class State(object): |
- """Package-level state is stored here so that it is easily accessible. |
- |
- Configuration is kept in this one object at the global level so that all |
- libraries in use by the same tool or service can all take advantage of the |
- same configuration. |
- """ |
- |
- def __init__(self, store_ctor=None, target=None): |
- """Optional arguments are for unit tests.""" |
- if store_ctor is None: # pragma: no branch |
- store_ctor = metric_store.InProcessMetricStore |
- # The Monitor object that will be used to send all metrics. |
- self.global_monitor = None |
- # The Target object that will be paired with all metrics that don't supply |
- # their own. |
- self.target = target |
- # The flush mode being used to control when metrics are pushed. |
- self.flush_mode = None |
- # A predicate to determine if metrics should be sent. |
- self.flush_enabled_fn = lambda: True |
- # The background thread that flushes metrics every |
- # --ts-mon-flush-interval-secs seconds. May be None if |
- # --ts-mon-flush != 'auto' or --ts-mon-flush-interval-secs == 0. |
- self.flush_thread = None |
- # All metrics created by this application. |
- self.metrics = {} |
- # The MetricStore object that holds the actual metric values. |
- self.store = store_ctor(self) |
- # Cached time of the last flush. Useful mostly in AppEngine apps. |
- self.last_flushed = datetime.datetime.utcfromtimestamp(0) |
- # Metric name prefix |
- self.metric_name_prefix = '/chrome/infra/' |
- |
- def reset_for_unittest(self): |
- self.metrics = {} |
- self.last_flushed = datetime.datetime.utcfromtimestamp(0) |
- self.store.reset_for_unittest() |
- |
-state = State() |
- |
- |
-def flush(): |
- """Send all metrics that are registered in the application.""" |
- |
- if not state.flush_enabled_fn(): |
- logging.debug('ts_mon: sending metrics is disabled.') |
- return |
- |
- if not state.global_monitor or not state.target: |
- raise errors.MonitoringNoConfiguredMonitorError(None) |
- |
- proto = metrics_pb2.MetricsCollection() |
- |
- for target, metric, start_time, fields_values in state.store.get_all(): |
- for fields, value in fields_values.iteritems(): |
- if len(proto.data) >= METRICS_DATA_LENGTH_LIMIT: |
- state.global_monitor.send(proto) |
- del proto.data[:] |
- |
- metric.serialize_to(proto, start_time, fields, value, target) |
- |
- state.global_monitor.send(proto) |
- state.last_flushed = datetime.datetime.utcnow() |
- |
- |
-def register(metric): |
- """Adds the metric to the list of metrics sent by flush(). |
- |
- This is called automatically by Metric's constructor. |
- """ |
- # If someone is registering the same metric object twice, that's okay, but |
- # registering two different metric objects with the same metric name is not. |
- for m in state.metrics.values(): |
- if metric == m: |
- state.metrics[metric.name] = metric |
- return |
- if metric.name in state.metrics: |
- raise errors.MonitoringDuplicateRegistrationError(metric.name) |
- |
- state.metrics[metric.name] = metric |
- |
- |
-def unregister(metric): |
- """Removes the metric from the list of metrics sent by flush().""" |
- del state.metrics[metric.name] |
- |
- |
-def close(): |
- """Stops any background threads and waits for them to exit.""" |
- if state.flush_thread is not None: |
- state.flush_thread.stop() |
- |
- |
-def reset_for_unittest(disable=False): |
- state.reset_for_unittest() |
- if disable: |
- state.flush_enabled_fn = lambda: False |
- |
- |
-class _FlushThread(threading.Thread): |
- """Background thread that flushes metrics on an interval.""" |
- |
- def __init__(self, interval_secs, stop_event=None): |
- super(_FlushThread, self).__init__(name='ts_mon') |
- |
- if stop_event is None: |
- stop_event = threading.Event() |
- |
- self.daemon = True |
- self.interval_secs = interval_secs |
- self.stop_event = stop_event |
- |
- def _flush_and_log_exceptions(self): |
- try: |
- flush() |
- except Exception: |
- logging.exception('Automatic monitoring flush failed.') |
- |
- def run(self): |
- # Jitter the first interval so tasks started at the same time (say, by cron) |
- # on different machines don't all send metrics simultaneously. |
- next_timeout = random.uniform(self.interval_secs / 2.0, self.interval_secs) |
- |
- while True: |
- if self.stop_event.wait(next_timeout): |
- self._flush_and_log_exceptions() |
- return |
- |
- # Try to flush every N seconds exactly so rate calculations are more |
- # consistent. |
- start = time.time() |
- self._flush_and_log_exceptions() |
- flush_duration = time.time() - start |
- next_timeout = self.interval_secs - flush_duration |
- |
- if next_timeout < 0: |
- logging.warning( |
- 'Last monitoring flush took %f seconds (longer than ' |
- '--ts-mon-flush-interval-secs = %f seconds)', |
- flush_duration, self.interval_secs) |
- next_timeout = 0 |
- |
- def stop(self): |
- """Stops the background thread and performs a final flush.""" |
- |
- self.stop_event.set() |
- self.join() |