Index: infra_libs/ts_mon/common/monitors.py |
diff --git a/infra_libs/ts_mon/common/monitors.py b/infra_libs/ts_mon/common/monitors.py |
deleted file mode 100644 |
index e39cdfe63503584c8622f1c96bc837f7d77fdfc3..0000000000000000000000000000000000000000 |
--- a/infra_libs/ts_mon/common/monitors.py |
+++ /dev/null |
@@ -1,229 +0,0 @@ |
-# Copyright 2015 The Chromium Authors. All rights reserved. |
-# Use of this source code is governed by a BSD-style license that can be |
-# found in the LICENSE file. |
- |
-"""Classes representing the monitoring interface for tasks or devices.""" |
- |
- |
-import base64 |
-import httplib2 |
-import json |
-import logging |
-import socket |
-import traceback |
- |
-from googleapiclient import discovery |
-from googleapiclient import errors |
-from infra_libs import httplib2_utils |
-from infra_libs.ts_mon.common import http_metrics |
-from infra_libs.ts_mon.common import pb_to_popo |
-from infra_libs.ts_mon.protos import metrics_pb2 |
-from oauth2client import gce |
-from oauth2client.client import GoogleCredentials |
-from oauth2client.file import Storage |
- |
-# Special string that can be passed through as the credentials path to use the |
-# default Appengine or GCE service account. |
-APPENGINE_CREDENTIALS = ':appengine' |
-GCE_CREDENTIALS = ':gce' |
- |
- |
-class Monitor(object): |
- """Abstract base class encapsulating the ability to collect and send metrics. |
- |
- This is a singleton class. There should only be one instance of a Monitor at |
- a time. It will be created and initialized by process_argparse_options. It |
- must exist in order for any metrics to be sent, although both Targets and |
- Metrics may be initialized before the underlying Monitor. If it does not exist |
- at the time that a Metric is sent, an exception will be raised. |
- """ |
- |
- _SCOPES = [] |
- |
- @staticmethod |
- def _wrap_proto(data): |
- """Normalize MetricsData, list(MetricsData), and MetricsCollection. |
- |
- Args: |
- input: A MetricsData, list of MetricsData, or a MetricsCollection. |
- |
- Returns: |
- A MetricsCollection with the appropriate data attribute set. |
- """ |
- if isinstance(data, metrics_pb2.MetricsCollection): |
- ret = data |
- elif isinstance(data, list): |
- ret = metrics_pb2.MetricsCollection(data=data) |
- else: |
- ret = metrics_pb2.MetricsCollection(data=[data]) |
- return ret |
- |
- def _load_credentials(self, credentials_file_path): |
- if credentials_file_path == GCE_CREDENTIALS: |
- return gce.AppAssertionCredentials(self._SCOPES) |
- if credentials_file_path == APPENGINE_CREDENTIALS: # pragma: no cover |
- # This import doesn't work outside appengine, so delay it until it's used. |
- from oauth2client import appengine |
- from google.appengine.api import app_identity |
- logging.info('Initializing with service account %s', |
- app_identity.get_service_account_name()) |
- return appengine.AppAssertionCredentials(self._SCOPES) |
- |
- with open(credentials_file_path, 'r') as credentials_file: |
- credentials_json = json.load(credentials_file) |
- if credentials_json.get('type', None): |
- credentials = GoogleCredentials.from_stream(credentials_file_path) |
- credentials = credentials.create_scoped(self._SCOPES) |
- return credentials |
- return Storage(credentials_file_path).get() |
- |
- def send(self, metric_pb): |
- raise NotImplementedError() |
- |
-class HttpsMonitor(Monitor): |
- |
- _SCOPES = [ |
- 'https://www.googleapis.com/auth/prodxmon' |
- ] |
- |
- def __init__(self, endpoint, credentials_file_path, http=None): |
- self._endpoint = endpoint |
- credentials = self._load_credentials(credentials_file_path) |
- if http is None: |
- http = httplib2_utils.RetriableHttp( |
- httplib2_utils.InstrumentedHttp('acq-mon-api')) |
- self._http = credentials.authorize(http) |
- |
- def encodeToJson(self, metric_pb): |
- return json.dumps({ 'resource': pb_to_popo.convert(metric_pb) }) |
- |
- def send(self, metric_pb): |
- body = self.encodeToJson(self._wrap_proto(metric_pb)) |
- |
- try: |
- resp, content = self._http.request(self._endpoint, method='POST', |
- body=body) |
- if resp.status != 200: |
- logging.warning('HttpsMonitor.send received status %d: %s', resp.status, |
- content) |
- except (ValueError, errors.Error, |
- socket.timeout, socket.error, socket.herror, socket.gaierror, |
- httplib2.HttpLib2Error): |
- logging.warning('HttpsMonitor.send failed: %s\n', |
- traceback.format_exc()) |
- |
- |
-class PubSubMonitor(Monitor): |
- """Class which publishes metrics to a Cloud Pub/Sub topic.""" |
- |
- _SCOPES = [ |
- 'https://www.googleapis.com/auth/pubsub', |
- ] |
- |
- TIMEOUT = 10 # seconds |
- |
- def _initialize(self): |
- creds = self._load_credentials(self._credsfile) |
- creds.authorize(self._http) |
- self._api = discovery.build('pubsub', 'v1', http=self._http) |
- |
- def _update_init_metrics(self, status): |
- if not self._use_instrumented_http: |
- return |
- fields = {'name': 'acq-mon-api-pubsub', |
- 'client': 'discovery', |
- 'status': status} |
- http_metrics.response_status.increment(fields=fields) |
- |
- def _check_initialize(self): |
- if self._api: |
- return True |
- try: |
- self._initialize() |
- except (ValueError, errors.Error, |
- socket.timeout, socket.error, socket.herror, socket.gaierror, |
- httplib2.HttpLib2Error, EnvironmentError): |
- # Log a warning, not error, to avoid false alarms in AppEngine apps. |
- logging.warning('PubSubMonitor._initialize failed:\n%s', |
- traceback.format_exc()) |
- self._api = None |
- self._update_init_metrics(http_metrics.STATUS_ERROR) |
- return False |
- |
- self._update_init_metrics(http_metrics.STATUS_OK) |
- return True |
- |
- def __init__(self, credsfile, project, topic, use_instrumented_http=True): |
- """Process monitoring related command line flags and initialize api. |
- |
- Args: |
- credsfile (str): path to the credentials json file |
- project (str): the name of the Pub/Sub project to publish to. |
- topic (str): the name of the Pub/Sub topic to publish to. |
- use_instrumented_http (bool): whether to record monitoring metrics for |
- HTTP requests made to the pubsub API. |
- """ |
- # Do not call self._check_initialize() in the constructor. This |
- # class is constructed during app initialization on AppEngine, and |
- # network calls are especially flaky during that time. |
- self._api = None |
- self._use_instrumented_http = use_instrumented_http |
- if use_instrumented_http: |
- self._http = httplib2_utils.InstrumentedHttp( |
- 'acq-mon-api-pubsub', timeout=self.TIMEOUT) |
- else: |
- self._http = httplib2.Http(timeout=self.TIMEOUT) |
- self._credsfile = credsfile |
- self._topic = 'projects/%s/topics/%s' % (project, topic) |
- |
- def send(self, metric_pb): |
- """Send a metric proto to the monitoring api. |
- |
- Args: |
- metric_pb (MetricsData or MetricsCollection): the metric protobuf to send |
- """ |
- if not self._check_initialize(): |
- return |
- proto = self._wrap_proto(metric_pb) |
- logging.debug('ts_mon: sending %d metrics to PubSub', len(proto.data)) |
- body = { |
- 'messages': [ |
- {'data': base64.b64encode(proto.SerializeToString())}, |
- ], |
- } |
- # Occasionally, client fails to receive a proper internal JSON |
- # from the server and raises ValueError trying to parse it. Other |
- # times we may fail with a network error. This is not fatal, we'll |
- # resend metrics next time. |
- try: |
- self._api.projects().topics().publish( |
- topic=self._topic, |
- body=body).execute(num_retries=5) |
- except (ValueError, errors.Error, |
- socket.timeout, socket.error, socket.herror, socket.gaierror, |
- httplib2.HttpLib2Error): |
- # Log a warning, not error, to avoid false alarms in AppEngine apps. |
- logging.warning('PubSubMonitor.send failed:\n%s', |
- traceback.format_exc()) |
- |
- |
-class DebugMonitor(Monitor): |
- """Class which writes metrics to logs or a local file for debugging.""" |
- def __init__(self, filepath=None): |
- if filepath is None: |
- self._fh = None |
- else: |
- self._fh = open(filepath, 'a') |
- |
- def send(self, metric_pb): |
- text = str(self._wrap_proto(metric_pb)) |
- logging.info('Flushing monitoring metrics:\n%s', text) |
- if self._fh is not None: |
- self._fh.write(text + '\n\n') |
- self._fh.flush() |
- |
- |
-class NullMonitor(Monitor): |
- """Class that doesn't send metrics anywhere.""" |
- def send(self, metric_pb): |
- pass |