Index: infra_libs/ts_mon/common/monitors.py |
diff --git a/infra_libs/ts_mon/common/monitors.py b/infra_libs/ts_mon/common/monitors.py |
index 110508a6edf83340b69b66fedfb53af31792590c..d782ab0f02db4caad6cd78a9d2cfd78488382c9b 100644 |
--- a/infra_libs/ts_mon/common/monitors.py |
+++ b/infra_libs/ts_mon/common/monitors.py |
@@ -6,6 +6,7 @@ |
import base64 |
+import httplib2 |
import json |
import logging |
import socket |
@@ -13,15 +14,13 @@ import traceback |
from googleapiclient import discovery |
from googleapiclient import errors |
-from oauth2client import gce |
-from oauth2client.client import GoogleCredentials |
-from oauth2client.file import Storage |
-import httplib2 |
- |
from infra_libs import httplib2_utils |
from infra_libs.ts_mon.common import http_metrics |
+from infra_libs.ts_mon.common import pb_to_popo |
from infra_libs.ts_mon.protos import metrics_pb2 |
- |
+from oauth2client import gce |
+from oauth2client.client import GoogleCredentials |
+from oauth2client.file import Storage |
# Special string that can be passed through as the credentials path to use the |
# default Appengine or GCE service account. |
@@ -38,6 +37,9 @@ class Monitor(object): |
Metrics may be initialized before the underlying Monitor. If it does not exist |
at the time that a Metric is sent, an exception will be raised. |
""" |
+ |
+ _SCOPES = [] |
+ |
@staticmethod |
def _wrap_proto(data): |
"""Normalize MetricsData, list(MetricsData), and MetricsCollection. |
@@ -56,19 +58,6 @@ class Monitor(object): |
ret = metrics_pb2.MetricsCollection(data=[data]) |
return ret |
- def send(self, metric_pb): |
- raise NotImplementedError() |
- |
- |
-class PubSubMonitor(Monitor): |
- """Class which publishes metrics to a Cloud Pub/Sub topic.""" |
- |
- _SCOPES = [ |
- 'https://www.googleapis.com/auth/pubsub', |
- ] |
- |
- TIMEOUT = 10 # seconds |
- |
def _load_credentials(self, credentials_file_path): |
if credentials_file_path == GCE_CREDENTIALS: |
return gce.AppAssertionCredentials(self._SCOPES) |
@@ -88,6 +77,49 @@ class PubSubMonitor(Monitor): |
return credentials |
return Storage(credentials_file_path).get() |
+ def send(self, metric_pb): |
+ raise NotImplementedError() |
+ |
+class HttpsMonitor(Monitor): |
+ |
+ _SCOPES = [ |
+ 'https://www.googleapis.com/auth/prodxmon' |
+ ] |
+ |
+ def __init__(self, endpoint, credentials_file_path, |
+ http=httplib2_utils.InstrumentedHttp('acq-mon-api')): |
+ self._endpoint = endpoint |
+ credentials = self._load_credentials(credentials_file_path) |
+ self._http = credentials.authorize(http) |
+ |
+ def encodeToJson(self, metric_pb): |
+ return json.dumps({ 'resource': pb_to_popo.convert(metric_pb) }) |
+ |
+ def send(self, metric_pb): |
+ body = self.encodeToJson(self._wrap_proto(metric_pb)) |
+ |
+ try: |
+ resp, content = self._http.request(self._endpoint, method='POST', |
Sergey Berezin
2016/07/01 22:05:21
You also want to retry this automatically a few ti
tnn
2016/07/01 22:31:39
For transient network failures, it retries 2 times
Sergey Berezin
2016/07/06 17:18:39
For a shared implementation that would indeed be g
tnn
2016/07/06 19:28:06
Done.
|
+ body=body) |
+ if resp.status != 200: |
+ logging.warning("HttpsMonitor.send doesn't get the status 200: %s %s\n", |
Sergey Berezin
2016/07/01 22:05:21
nit: I'd rephrase this as "HttpsMonitor.send recei
tnn
2016/07/06 19:28:06
Done.
|
+ resp, content) |
+ except (ValueError, errors.Error, |
+ socket.timeout, socket.error, socket.herror, socket.gaierror, |
+ httplib2.HttpLib2Error): |
+ logging.warning('HttpsMonitor.send failed: %s\n', |
+ traceback.format_exc()) |
+ |
+ |
+class PubSubMonitor(Monitor): |
+ """Class which publishes metrics to a Cloud Pub/Sub topic.""" |
+ |
+ _SCOPES = [ |
+ 'https://www.googleapis.com/auth/pubsub', |
+ ] |
+ |
+ TIMEOUT = 10 # seconds |
+ |
def _initialize(self): |
creds = self._load_credentials(self._credsfile) |
creds.authorize(self._http) |