OLD | NEW |
---|---|
1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 """Classes representing the monitoring interface for tasks or devices.""" | 5 """Classes representing the monitoring interface for tasks or devices.""" |
6 | 6 |
7 | 7 |
8 import base64 | 8 import base64 |
9 import httplib2 | |
9 import json | 10 import json |
10 import logging | 11 import logging |
11 import socket | 12 import socket |
12 import traceback | 13 import traceback |
13 | 14 |
14 from googleapiclient import discovery | 15 from googleapiclient import discovery |
15 from googleapiclient import errors | 16 from googleapiclient import errors |
17 from infra_libs import httplib2_utils | |
18 from infra_libs.ts_mon.common import http_metrics | |
19 from infra_libs.ts_mon.common import pb_to_popo | |
20 from infra_libs.ts_mon.protos import metrics_pb2 | |
16 from oauth2client import gce | 21 from oauth2client import gce |
17 from oauth2client.client import GoogleCredentials | 22 from oauth2client.client import GoogleCredentials |
18 from oauth2client.file import Storage | 23 from oauth2client.file import Storage |
19 import httplib2 | |
20 | |
21 from infra_libs import httplib2_utils | |
22 from infra_libs.ts_mon.common import http_metrics | |
23 from infra_libs.ts_mon.protos import metrics_pb2 | |
24 | |
25 | 24 |
26 # Special string that can be passed through as the credentials path to use the | 25 # Special string that can be passed through as the credentials path to use the |
27 # default Appengine or GCE service account. | 26 # default Appengine or GCE service account. |
28 APPENGINE_CREDENTIALS = ':appengine' | 27 APPENGINE_CREDENTIALS = ':appengine' |
29 GCE_CREDENTIALS = ':gce' | 28 GCE_CREDENTIALS = ':gce' |
30 | 29 |
31 | 30 |
32 class Monitor(object): | 31 class Monitor(object): |
33 """Abstract base class encapsulating the ability to collect and send metrics. | 32 """Abstract base class encapsulating the ability to collect and send metrics. |
34 | 33 |
35 This is a singleton class. There should only be one instance of a Monitor at | 34 This is a singleton class. There should only be one instance of a Monitor at |
36 a time. It will be created and initialized by process_argparse_options. It | 35 a time. It will be created and initialized by process_argparse_options. It |
37 must exist in order for any metrics to be sent, although both Targets and | 36 must exist in order for any metrics to be sent, although both Targets and |
38 Metrics may be initialized before the underlying Monitor. If it does not exist | 37 Metrics may be initialized before the underlying Monitor. If it does not exist |
39 at the time that a Metric is sent, an exception will be raised. | 38 at the time that a Metric is sent, an exception will be raised. |
40 """ | 39 """ |
40 | |
41 _SCOPES = [] | |
42 | |
41 @staticmethod | 43 @staticmethod |
42 def _wrap_proto(data): | 44 def _wrap_proto(data): |
43 """Normalize MetricsData, list(MetricsData), and MetricsCollection. | 45 """Normalize MetricsData, list(MetricsData), and MetricsCollection. |
44 | 46 |
45 Args: | 47 Args: |
46 input: A MetricsData, list of MetricsData, or a MetricsCollection. | 48 input: A MetricsData, list of MetricsData, or a MetricsCollection. |
47 | 49 |
48 Returns: | 50 Returns: |
49 A MetricsCollection with the appropriate data attribute set. | 51 A MetricsCollection with the appropriate data attribute set. |
50 """ | 52 """ |
51 if isinstance(data, metrics_pb2.MetricsCollection): | 53 if isinstance(data, metrics_pb2.MetricsCollection): |
52 ret = data | 54 ret = data |
53 elif isinstance(data, list): | 55 elif isinstance(data, list): |
54 ret = metrics_pb2.MetricsCollection(data=data) | 56 ret = metrics_pb2.MetricsCollection(data=data) |
55 else: | 57 else: |
56 ret = metrics_pb2.MetricsCollection(data=[data]) | 58 ret = metrics_pb2.MetricsCollection(data=[data]) |
57 return ret | 59 return ret |
58 | 60 |
59 def send(self, metric_pb): | |
60 raise NotImplementedError() | |
61 | |
62 | |
63 class PubSubMonitor(Monitor): | |
64 """Class which publishes metrics to a Cloud Pub/Sub topic.""" | |
65 | |
66 _SCOPES = [ | |
67 'https://www.googleapis.com/auth/pubsub', | |
68 ] | |
69 | |
70 TIMEOUT = 10 # seconds | |
71 | |
72 def _load_credentials(self, credentials_file_path): | 61 def _load_credentials(self, credentials_file_path): |
73 if credentials_file_path == GCE_CREDENTIALS: | 62 if credentials_file_path == GCE_CREDENTIALS: |
74 return gce.AppAssertionCredentials(self._SCOPES) | 63 return gce.AppAssertionCredentials(self._SCOPES) |
75 if credentials_file_path == APPENGINE_CREDENTIALS: # pragma: no cover | 64 if credentials_file_path == APPENGINE_CREDENTIALS: # pragma: no cover |
76 # This import doesn't work outside appengine, so delay it until it's used. | 65 # This import doesn't work outside appengine, so delay it until it's used. |
77 from oauth2client import appengine | 66 from oauth2client import appengine |
78 from google.appengine.api import app_identity | 67 from google.appengine.api import app_identity |
79 logging.info('Initializing with service account %s', | 68 logging.info('Initializing with service account %s', |
80 app_identity.get_service_account_name()) | 69 app_identity.get_service_account_name()) |
81 return appengine.AppAssertionCredentials(self._SCOPES) | 70 return appengine.AppAssertionCredentials(self._SCOPES) |
82 | 71 |
83 with open(credentials_file_path, 'r') as credentials_file: | 72 with open(credentials_file_path, 'r') as credentials_file: |
84 credentials_json = json.load(credentials_file) | 73 credentials_json = json.load(credentials_file) |
85 if credentials_json.get('type', None): | 74 if credentials_json.get('type', None): |
86 credentials = GoogleCredentials.from_stream(credentials_file_path) | 75 credentials = GoogleCredentials.from_stream(credentials_file_path) |
87 credentials = credentials.create_scoped(self._SCOPES) | 76 credentials = credentials.create_scoped(self._SCOPES) |
88 return credentials | 77 return credentials |
89 return Storage(credentials_file_path).get() | 78 return Storage(credentials_file_path).get() |
90 | 79 |
80 def send(self, metric_pb): | |
81 raise NotImplementedError() | |
82 | |
83 class HttpsMonitor(Monitor): | |
84 | |
85 _SCOPES = [ | |
86 'https://www.googleapis.com/auth/prodxmon' | |
87 ] | |
88 | |
89 def __init__(self, endpoint, credentials_file_path, http=None): | |
90 self._endpoint = endpoint | |
91 credentials = self._load_credentials(credentials_file_path) | |
92 if http is None: | |
93 http = httplib2_utils.RetriableHttp( | |
94 httplib2_utils.InstrumentedHttp('acq-mon-api')) | |
Sergey Berezin
2016/07/06 22:35:49
nit: 4 spaces from the start of the previous line.
tnn
2016/07/06 22:44:36
Done.
| |
95 self._http = credentials.authorize(http) | |
96 | |
97 def encodeToJson(self, metric_pb): | |
98 return json.dumps({ 'resource': pb_to_popo.convert(metric_pb) }) | |
99 | |
100 def send(self, metric_pb): | |
101 body = self.encodeToJson(self._wrap_proto(metric_pb)) | |
102 | |
103 try: | |
104 resp, content = self._http.request(self._endpoint, method='POST', | |
105 body=body) | |
106 if resp.status != 200: | |
107 logging.warning('HttpsMonitor.send received status %d: %s', resp, | |
108 content) | |
109 except (ValueError, errors.Error, | |
110 socket.timeout, socket.error, socket.herror, socket.gaierror, | |
111 httplib2.HttpLib2Error): | |
112 logging.warning('HttpsMonitor.send failed: %s\n', | |
113 traceback.format_exc()) | |
114 | |
115 | |
116 class PubSubMonitor(Monitor): | |
117 """Class which publishes metrics to a Cloud Pub/Sub topic.""" | |
118 | |
119 _SCOPES = [ | |
120 'https://www.googleapis.com/auth/pubsub', | |
121 ] | |
122 | |
123 TIMEOUT = 10 # seconds | |
124 | |
91 def _initialize(self): | 125 def _initialize(self): |
92 creds = self._load_credentials(self._credsfile) | 126 creds = self._load_credentials(self._credsfile) |
93 creds.authorize(self._http) | 127 creds.authorize(self._http) |
94 self._api = discovery.build('pubsub', 'v1', http=self._http) | 128 self._api = discovery.build('pubsub', 'v1', http=self._http) |
95 | 129 |
96 def _update_init_metrics(self, status): | 130 def _update_init_metrics(self, status): |
97 if not self._use_instrumented_http: | 131 if not self._use_instrumented_http: |
98 return | 132 return |
99 fields = {'name': 'acq-mon-api-pubsub', | 133 fields = {'name': 'acq-mon-api-pubsub', |
100 'client': 'discovery', | 134 'client': 'discovery', |
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
186 logging.info('Flushing monitoring metrics:\n%s', text) | 220 logging.info('Flushing monitoring metrics:\n%s', text) |
187 if self._fh is not None: | 221 if self._fh is not None: |
188 self._fh.write(text + '\n\n') | 222 self._fh.write(text + '\n\n') |
189 self._fh.flush() | 223 self._fh.flush() |
190 | 224 |
191 | 225 |
192 class NullMonitor(Monitor): | 226 class NullMonitor(Monitor): |
193 """Class that doesn't send metrics anywhere.""" | 227 """Class that doesn't send metrics anywhere.""" |
194 def send(self, metric_pb): | 228 def send(self, metric_pb): |
195 pass | 229 pass |
OLD | NEW |