Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(346)

Unified Diff: infra_libs/ts_mon/common/interface.py

Issue 2213143002: Add infra_libs as a bootstrap dependency. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Removed the ugly import hack Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: infra_libs/ts_mon/common/interface.py
diff --git a/infra_libs/ts_mon/common/interface.py b/infra_libs/ts_mon/common/interface.py
deleted file mode 100644
index df3f50286c7abadf96eb0f6d77d6b2a94555aea4..0000000000000000000000000000000000000000
--- a/infra_libs/ts_mon/common/interface.py
+++ /dev/null
@@ -1,196 +0,0 @@
-# Copyright 2015 The Chromium Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Classes representing the monitoring interface for tasks or devices.
-
-Usage:
- import argparse
- from infra_libs import ts_mon
-
- p = argparse.ArgumentParser()
- ts_mon.add_argparse_options(p)
- args = p.parse_args() # Must contain info for Monitor (and optionally Target)
- ts_mon.process_argparse_options(args)
-
- # Will use the default Target set up via command line args:
- m = ts_mon.BooleanMetric('/my/metric/name', fields={'foo': 1, 'bar': 'baz'})
- m.set(True)
-
- # Use a custom Target:
- t = ts_mon.TaskTarget('service', 'job', 'region', 'host') # or DeviceTarget
- m2 = ts_mon.GaugeMetric('/my/metric/name2', fields={'asdf': 'qwer'}, target=t)
- m2.set(5)
-
-Library usage:
- from infra_libs.ts_mon import CounterMetric
- # No need to set up Monitor or Target, assume calling code did that.
- c = CounterMetric('/my/counter', fields={'source': 'mylibrary'})
- c.set(0)
- for x in range(100):
- c.increment()
-"""
-
-import datetime
-import logging
-import random
-import threading
-import time
-
-from infra_libs.ts_mon.common import errors
-from infra_libs.ts_mon.common import metric_store
-from infra_libs.ts_mon.protos import metrics_pb2
-
-# The maximum number of MetricsData messages to include in each HTTP request.
-# MetricsCollections larger than this will be split into multiple requests.
-METRICS_DATA_LENGTH_LIMIT = 1000
-
-
-class State(object):
- """Package-level state is stored here so that it is easily accessible.
-
- Configuration is kept in this one object at the global level so that all
- libraries in use by the same tool or service can all take advantage of the
- same configuration.
- """
-
- def __init__(self, store_ctor=None, target=None):
- """Optional arguments are for unit tests."""
- if store_ctor is None: # pragma: no branch
- store_ctor = metric_store.InProcessMetricStore
- # The Monitor object that will be used to send all metrics.
- self.global_monitor = None
- # The Target object that will be paired with all metrics that don't supply
- # their own.
- self.target = target
- # The flush mode being used to control when metrics are pushed.
- self.flush_mode = None
- # A predicate to determine if metrics should be sent.
- self.flush_enabled_fn = lambda: True
- # The background thread that flushes metrics every
- # --ts-mon-flush-interval-secs seconds. May be None if
- # --ts-mon-flush != 'auto' or --ts-mon-flush-interval-secs == 0.
- self.flush_thread = None
- # All metrics created by this application.
- self.metrics = {}
- # The MetricStore object that holds the actual metric values.
- self.store = store_ctor(self)
- # Cached time of the last flush. Useful mostly in AppEngine apps.
- self.last_flushed = datetime.datetime.utcfromtimestamp(0)
- # Metric name prefix
- self.metric_name_prefix = '/chrome/infra/'
-
- def reset_for_unittest(self):
- self.metrics = {}
- self.last_flushed = datetime.datetime.utcfromtimestamp(0)
- self.store.reset_for_unittest()
-
-state = State()
-
-
-def flush():
- """Send all metrics that are registered in the application."""
-
- if not state.flush_enabled_fn():
- logging.debug('ts_mon: sending metrics is disabled.')
- return
-
- if not state.global_monitor or not state.target:
- raise errors.MonitoringNoConfiguredMonitorError(None)
-
- proto = metrics_pb2.MetricsCollection()
-
- for target, metric, start_time, fields_values in state.store.get_all():
- for fields, value in fields_values.iteritems():
- if len(proto.data) >= METRICS_DATA_LENGTH_LIMIT:
- state.global_monitor.send(proto)
- del proto.data[:]
-
- metric.serialize_to(proto, start_time, fields, value, target)
-
- state.global_monitor.send(proto)
- state.last_flushed = datetime.datetime.utcnow()
-
-
-def register(metric):
- """Adds the metric to the list of metrics sent by flush().
-
- This is called automatically by Metric's constructor.
- """
- # If someone is registering the same metric object twice, that's okay, but
- # registering two different metric objects with the same metric name is not.
- for m in state.metrics.values():
- if metric == m:
- state.metrics[metric.name] = metric
- return
- if metric.name in state.metrics:
- raise errors.MonitoringDuplicateRegistrationError(metric.name)
-
- state.metrics[metric.name] = metric
-
-
-def unregister(metric):
- """Removes the metric from the list of metrics sent by flush()."""
- del state.metrics[metric.name]
-
-
-def close():
- """Stops any background threads and waits for them to exit."""
- if state.flush_thread is not None:
- state.flush_thread.stop()
-
-
-def reset_for_unittest(disable=False):
- state.reset_for_unittest()
- if disable:
- state.flush_enabled_fn = lambda: False
-
-
-class _FlushThread(threading.Thread):
- """Background thread that flushes metrics on an interval."""
-
- def __init__(self, interval_secs, stop_event=None):
- super(_FlushThread, self).__init__(name='ts_mon')
-
- if stop_event is None:
- stop_event = threading.Event()
-
- self.daemon = True
- self.interval_secs = interval_secs
- self.stop_event = stop_event
-
- def _flush_and_log_exceptions(self):
- try:
- flush()
- except Exception:
- logging.exception('Automatic monitoring flush failed.')
-
- def run(self):
- # Jitter the first interval so tasks started at the same time (say, by cron)
- # on different machines don't all send metrics simultaneously.
- next_timeout = random.uniform(self.interval_secs / 2.0, self.interval_secs)
-
- while True:
- if self.stop_event.wait(next_timeout):
- self._flush_and_log_exceptions()
- return
-
- # Try to flush every N seconds exactly so rate calculations are more
- # consistent.
- start = time.time()
- self._flush_and_log_exceptions()
- flush_duration = time.time() - start
- next_timeout = self.interval_secs - flush_duration
-
- if next_timeout < 0:
- logging.warning(
- 'Last monitoring flush took %f seconds (longer than '
- '--ts-mon-flush-interval-secs = %f seconds)',
- flush_duration, self.interval_secs)
- next_timeout = 0
-
- def stop(self):
- """Stops the background thread and performs a final flush."""
-
- self.stop_event.set()
- self.join()

Powered by Google App Engine
This is Rietveld 408576698