| Index: appengine_module/gae_ts_mon/monitors.py
|
| diff --git a/appengine_module/gae_ts_mon/monitors.py b/appengine_module/gae_ts_mon/monitors.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..8bf998ad733ead5d8d6ddfca43a79ce3dcec5f31
|
| --- /dev/null
|
| +++ b/appengine_module/gae_ts_mon/monitors.py
|
| @@ -0,0 +1,99 @@
|
| +# Copyright 2015 The Chromium Authors. All rights reserved.
|
| +# Use of this source code is governed by a BSD-style license that can be
|
| +# found in the LICENSE file.
|
| +
|
| +"""Classes representing the monitoring interface for tasks or devices.
|
| +
|
| +In appengine, a PubSubMonitor will be automatically initialized when initialize()
|
| +is called, and there is no need to initialize it directly from this class.
|
| +"""
|
| +
|
| +
|
| +import base64
|
| +import json
|
| +import os
|
| +
|
| +from proto import metrics_pb2
|
| +
|
| +import httplib2
|
| +from apiclient import discovery
|
| +from oauth2client.client import GoogleCredentials
|
| +
|
| +
|
| +class Monitor(object):
|
| + """Abstract base class encapsulating the ability to collect and send metrics.
|
| +
|
| + This is a singleton class. There should only be one instance of a Monitor at
|
| + a time. It will be created and initialized by process_argparse_options. It
|
| + must exist in order for any metrics to be sent, although both Targets and
|
| + Metrics may be initialized before the underlying Monitor. If it does not exist
|
| + at the time that a Metric is sent, an exception will be raised.
|
| + """
|
| + @staticmethod
|
| + def _wrap_proto(data):
|
| + """Normalize MetricsData, list(MetricsData), and MetricsCollection.
|
| +
|
| + Args:
|
| + input: A MetricsData, list of MetricsData, or a MetricsCollection.
|
| +
|
| + Returns:
|
| + A MetricsCollection with the appropriate data attribute set.
|
| + """
|
| + if isinstance(data, metrics_pb2.MetricsCollection):
|
| + ret = data
|
| + elif isinstance(data, list):
|
| + ret = metrics_pb2.MetricsCollection(data=data)
|
| + else:
|
| + ret = metrics_pb2.MetricsCollection(data=[data])
|
| + return ret
|
| +
|
| + def send(self, metric_pb):
|
| + raise NotImplementedError()
|
| +
|
| +
|
| +class PubSubMonitor(Monitor):
|
| + """Class which publishes metrics to a Cloud Pub/Sub topic."""
|
| +
|
| + _SCOPES = [
|
| + 'https://www.googleapis.com/auth/pubsub',
|
| + ]
|
| +
|
| + def _initialize(self, project, topic):
|
| + # Copied from acquisition_api.AcquisitionCredential.Load.
|
| + creds = GoogleCredentials.get_application_default()
|
| + creds = creds.create_scoped(self._SCOPES)
|
| + self._http = httplib2.Http()
|
| + creds.authorize(self._http)
|
| + self._api = discovery.build('pubsub', 'v1', http=self._http)
|
| + self._topic = 'projects/%s/topics/%s' % (project, topic)
|
| +
|
| + def __init__(self, project, topic):
|
| + """Process monitoring related command line flags and initialize api.
|
| +
|
| + Args:
|
| + project (str): the name of the Pub/Sub project to publish to.
|
| + topic (str): the name of the Pub/Sub topic to publish to.
|
| + """
|
| + self._initialize(project, topic)
|
| +
|
| + def send(self, metric_pb):
|
| + """Send a metric proto to the monitoring api.
|
| +
|
| + Args:
|
| + metric_pb (MetricsData or MetricsCollection): the metric protobuf to send
|
| + """
|
| + proto = self._wrap_proto(metric_pb)
|
| + body = {
|
| + 'messages': [
|
| + {'data': base64.b64encode(proto.SerializeToString())},
|
| + ],
|
| + }
|
| + self._api.projects().topics().publish(
|
| + topic=self._topic,
|
| + body=body).execute(num_retries=5)
|
| +
|
| +
|
| +class NullMonitor(Monitor):
|
| + """Class that doesn't send metrics anywhere."""
|
| + def send(self, metric_pb):
|
| + pass
|
|
|