| Index: infra_libs/ts_mon/common/monitors.py
|
| diff --git a/infra_libs/ts_mon/common/monitors.py b/infra_libs/ts_mon/common/monitors.py
|
| deleted file mode 100644
|
| index e39cdfe63503584c8622f1c96bc837f7d77fdfc3..0000000000000000000000000000000000000000
|
| --- a/infra_libs/ts_mon/common/monitors.py
|
| +++ /dev/null
|
| @@ -1,229 +0,0 @@
|
| -# Copyright 2015 The Chromium Authors. All rights reserved.
|
| -# Use of this source code is governed by a BSD-style license that can be
|
| -# found in the LICENSE file.
|
| -
|
| -"""Classes representing the monitoring interface for tasks or devices."""
|
| -
|
| -
|
| -import base64
|
| -import httplib2
|
| -import json
|
| -import logging
|
| -import socket
|
| -import traceback
|
| -
|
| -from googleapiclient import discovery
|
| -from googleapiclient import errors
|
| -from infra_libs import httplib2_utils
|
| -from infra_libs.ts_mon.common import http_metrics
|
| -from infra_libs.ts_mon.common import pb_to_popo
|
| -from infra_libs.ts_mon.protos import metrics_pb2
|
| -from oauth2client import gce
|
| -from oauth2client.client import GoogleCredentials
|
| -from oauth2client.file import Storage
|
| -
|
| -# Special string that can be passed through as the credentials path to use the
|
| -# default Appengine or GCE service account.
|
| -APPENGINE_CREDENTIALS = ':appengine'
|
| -GCE_CREDENTIALS = ':gce'
|
| -
|
| -
|
| -class Monitor(object):
|
| - """Abstract base class encapsulating the ability to collect and send metrics.
|
| -
|
| - This is a singleton class. There should only be one instance of a Monitor at
|
| - a time. It will be created and initialized by process_argparse_options. It
|
| - must exist in order for any metrics to be sent, although both Targets and
|
| - Metrics may be initialized before the underlying Monitor. If it does not exist
|
| - at the time that a Metric is sent, an exception will be raised.
|
| - """
|
| -
|
| - _SCOPES = []
|
| -
|
| - @staticmethod
|
| - def _wrap_proto(data):
|
| - """Normalize MetricsData, list(MetricsData), and MetricsCollection.
|
| -
|
| - Args:
|
| - input: A MetricsData, list of MetricsData, or a MetricsCollection.
|
| -
|
| - Returns:
|
| - A MetricsCollection with the appropriate data attribute set.
|
| - """
|
| - if isinstance(data, metrics_pb2.MetricsCollection):
|
| - ret = data
|
| - elif isinstance(data, list):
|
| - ret = metrics_pb2.MetricsCollection(data=data)
|
| - else:
|
| - ret = metrics_pb2.MetricsCollection(data=[data])
|
| - return ret
|
| -
|
| - def _load_credentials(self, credentials_file_path):
|
| - if credentials_file_path == GCE_CREDENTIALS:
|
| - return gce.AppAssertionCredentials(self._SCOPES)
|
| - if credentials_file_path == APPENGINE_CREDENTIALS: # pragma: no cover
|
| - # This import doesn't work outside appengine, so delay it until it's used.
|
| - from oauth2client import appengine
|
| - from google.appengine.api import app_identity
|
| - logging.info('Initializing with service account %s',
|
| - app_identity.get_service_account_name())
|
| - return appengine.AppAssertionCredentials(self._SCOPES)
|
| -
|
| - with open(credentials_file_path, 'r') as credentials_file:
|
| - credentials_json = json.load(credentials_file)
|
| - if credentials_json.get('type', None):
|
| - credentials = GoogleCredentials.from_stream(credentials_file_path)
|
| - credentials = credentials.create_scoped(self._SCOPES)
|
| - return credentials
|
| - return Storage(credentials_file_path).get()
|
| -
|
| - def send(self, metric_pb):
|
| - raise NotImplementedError()
|
| -
|
| -class HttpsMonitor(Monitor):
|
| -
|
| - _SCOPES = [
|
| - 'https://www.googleapis.com/auth/prodxmon'
|
| - ]
|
| -
|
| - def __init__(self, endpoint, credentials_file_path, http=None):
|
| - self._endpoint = endpoint
|
| - credentials = self._load_credentials(credentials_file_path)
|
| - if http is None:
|
| - http = httplib2_utils.RetriableHttp(
|
| - httplib2_utils.InstrumentedHttp('acq-mon-api'))
|
| - self._http = credentials.authorize(http)
|
| -
|
| - def encodeToJson(self, metric_pb):
|
| - return json.dumps({ 'resource': pb_to_popo.convert(metric_pb) })
|
| -
|
| - def send(self, metric_pb):
|
| - body = self.encodeToJson(self._wrap_proto(metric_pb))
|
| -
|
| - try:
|
| - resp, content = self._http.request(self._endpoint, method='POST',
|
| - body=body)
|
| - if resp.status != 200:
|
| - logging.warning('HttpsMonitor.send received status %d: %s', resp.status,
|
| - content)
|
| - except (ValueError, errors.Error,
|
| - socket.timeout, socket.error, socket.herror, socket.gaierror,
|
| - httplib2.HttpLib2Error):
|
| - logging.warning('HttpsMonitor.send failed: %s\n',
|
| - traceback.format_exc())
|
| -
|
| -
|
| -class PubSubMonitor(Monitor):
|
| - """Class which publishes metrics to a Cloud Pub/Sub topic."""
|
| -
|
| - _SCOPES = [
|
| - 'https://www.googleapis.com/auth/pubsub',
|
| - ]
|
| -
|
| - TIMEOUT = 10 # seconds
|
| -
|
| - def _initialize(self):
|
| - creds = self._load_credentials(self._credsfile)
|
| - creds.authorize(self._http)
|
| - self._api = discovery.build('pubsub', 'v1', http=self._http)
|
| -
|
| - def _update_init_metrics(self, status):
|
| - if not self._use_instrumented_http:
|
| - return
|
| - fields = {'name': 'acq-mon-api-pubsub',
|
| - 'client': 'discovery',
|
| - 'status': status}
|
| - http_metrics.response_status.increment(fields=fields)
|
| -
|
| - def _check_initialize(self):
|
| - if self._api:
|
| - return True
|
| - try:
|
| - self._initialize()
|
| - except (ValueError, errors.Error,
|
| - socket.timeout, socket.error, socket.herror, socket.gaierror,
|
| - httplib2.HttpLib2Error, EnvironmentError):
|
| - # Log a warning, not error, to avoid false alarms in AppEngine apps.
|
| - logging.warning('PubSubMonitor._initialize failed:\n%s',
|
| - traceback.format_exc())
|
| - self._api = None
|
| - self._update_init_metrics(http_metrics.STATUS_ERROR)
|
| - return False
|
| -
|
| - self._update_init_metrics(http_metrics.STATUS_OK)
|
| - return True
|
| -
|
| - def __init__(self, credsfile, project, topic, use_instrumented_http=True):
|
| - """Process monitoring related command line flags and initialize api.
|
| -
|
| - Args:
|
| - credsfile (str): path to the credentials json file
|
| - project (str): the name of the Pub/Sub project to publish to.
|
| - topic (str): the name of the Pub/Sub topic to publish to.
|
| - use_instrumented_http (bool): whether to record monitoring metrics for
|
| - HTTP requests made to the pubsub API.
|
| - """
|
| - # Do not call self._check_initialize() in the constructor. This
|
| - # class is constructed during app initialization on AppEngine, and
|
| - # network calls are especially flaky during that time.
|
| - self._api = None
|
| - self._use_instrumented_http = use_instrumented_http
|
| - if use_instrumented_http:
|
| - self._http = httplib2_utils.InstrumentedHttp(
|
| - 'acq-mon-api-pubsub', timeout=self.TIMEOUT)
|
| - else:
|
| - self._http = httplib2.Http(timeout=self.TIMEOUT)
|
| - self._credsfile = credsfile
|
| - self._topic = 'projects/%s/topics/%s' % (project, topic)
|
| -
|
| - def send(self, metric_pb):
|
| - """Send a metric proto to the monitoring api.
|
| -
|
| - Args:
|
| - metric_pb (MetricsData or MetricsCollection): the metric protobuf to send
|
| - """
|
| - if not self._check_initialize():
|
| - return
|
| - proto = self._wrap_proto(metric_pb)
|
| - logging.debug('ts_mon: sending %d metrics to PubSub', len(proto.data))
|
| - body = {
|
| - 'messages': [
|
| - {'data': base64.b64encode(proto.SerializeToString())},
|
| - ],
|
| - }
|
| - # Occasionally, client fails to receive a proper internal JSON
|
| - # from the server and raises ValueError trying to parse it. Other
|
| - # times we may fail with a network error. This is not fatal, we'll
|
| - # resend metrics next time.
|
| - try:
|
| - self._api.projects().topics().publish(
|
| - topic=self._topic,
|
| - body=body).execute(num_retries=5)
|
| - except (ValueError, errors.Error,
|
| - socket.timeout, socket.error, socket.herror, socket.gaierror,
|
| - httplib2.HttpLib2Error):
|
| - # Log a warning, not error, to avoid false alarms in AppEngine apps.
|
| - logging.warning('PubSubMonitor.send failed:\n%s',
|
| - traceback.format_exc())
|
| -
|
| -
|
| -class DebugMonitor(Monitor):
|
| - """Class which writes metrics to logs or a local file for debugging."""
|
| - def __init__(self, filepath=None):
|
| - if filepath is None:
|
| - self._fh = None
|
| - else:
|
| - self._fh = open(filepath, 'a')
|
| -
|
| - def send(self, metric_pb):
|
| - text = str(self._wrap_proto(metric_pb))
|
| - logging.info('Flushing monitoring metrics:\n%s', text)
|
| - if self._fh is not None:
|
| - self._fh.write(text + '\n\n')
|
| - self._fh.flush()
|
| -
|
| -
|
| -class NullMonitor(Monitor):
|
| - """Class that doesn't send metrics anywhere."""
|
| - def send(self, metric_pb):
|
| - pass
|
|
|