Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(107)

Side by Side Diff: client/third_party/infra_libs/ts_mon/common/monitors.py

Issue 2991803002: Update infra_libs to 1.1.15 / 0b44aba87c1c6538439df6d24a409870810747ab (Closed)
Patch Set: fix Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2015 The Chromium Authors. All rights reserved. 1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """Classes representing the monitoring interface for tasks or devices.""" 5 """Classes representing the monitoring interface for tasks or devices."""
6 6
7 7
8 import base64 8 import base64
9 import httplib2 9 import httplib2
10 import json 10 import json
11 import logging 11 import logging
12 import socket 12 import socket
13 import traceback 13 import traceback
14 14
15 from googleapiclient import discovery 15 from googleapiclient import discovery
16 from googleapiclient import errors 16 from googleapiclient import errors
17 from infra_libs import httplib2_utils 17 from infra_libs import httplib2_utils
18 from infra_libs.ts_mon.common import interface 18 from infra_libs.ts_mon.common import interface
19 from infra_libs.ts_mon.common import http_metrics 19 from infra_libs.ts_mon.common import http_metrics
20 from infra_libs.ts_mon.common import pb_to_popo 20 from infra_libs.ts_mon.common import pb_to_popo
21 from infra_libs.ts_mon.protos.current import metrics_pb2 21 from infra_libs.ts_mon.protos import metrics_pb2
22 try: # pragma: no cover 22 try: # pragma: no cover
23 from oauth2client import gce 23 from oauth2client import gce
24 except ImportError: # pragma: no cover 24 except ImportError: # pragma: no cover
25 from oauth2client.contrib import gce 25 from oauth2client.contrib import gce
26 from oauth2client.client import GoogleCredentials 26 from oauth2client.client import GoogleCredentials
27 from oauth2client.file import Storage 27 from oauth2client.file import Storage
28 28
29 # Special string that can be passed through as the credentials path to use the 29 # Special string that can be passed through as the credentials path to use the
30 # default Appengine or GCE service account. 30 # default Appengine or GCE service account.
31 APPENGINE_CREDENTIALS = ':appengine' 31 APPENGINE_CREDENTIALS = ':appengine'
(...skipping 19 matching lines...) Expand all
51 51
52 class GCECredentials(CredentialFactory): 52 class GCECredentials(CredentialFactory):
53 def create(self, scopes): 53 def create(self, scopes):
54 return gce.AppAssertionCredentials(scopes) 54 return gce.AppAssertionCredentials(scopes)
55 55
56 56
57 class AppengineCredentials(CredentialFactory): 57 class AppengineCredentials(CredentialFactory):
58 def create(self, scopes): # pragma: no cover 58 def create(self, scopes): # pragma: no cover
59 # This import doesn't work outside appengine, so delay it until it's used. 59 # This import doesn't work outside appengine, so delay it until it's used.
60 from oauth2client import appengine 60 from oauth2client import appengine
61 from google.appengine.api import app_identity
62 logging.info('Initializing with service account %s',
63 app_identity.get_service_account_name())
64 return appengine.AppAssertionCredentials(scopes) 61 return appengine.AppAssertionCredentials(scopes)
65 62
66 63
67 class FileCredentials(CredentialFactory): 64 class FileCredentials(CredentialFactory):
68 def __init__(self, path): 65 def __init__(self, path):
69 self.path = path 66 self.path = path
70 67
71 def create(self, scopes): 68 def create(self, scopes):
72 with open(self.path, 'r') as fh: 69 with open(self.path, 'r') as fh:
73 data = json.load(fh) 70 data = json.load(fh)
(...skipping 20 matching lines...) Expand all
94 91
95 92
96 class Monitor(object): 93 class Monitor(object):
97 """Abstract base class encapsulating the ability to collect and send metrics. 94 """Abstract base class encapsulating the ability to collect and send metrics.
98 95
99 This is a singleton class. There should only be one instance of a Monitor at 96 This is a singleton class. There should only be one instance of a Monitor at
100 a time. It will be created and initialized by process_argparse_options. It 97 a time. It will be created and initialized by process_argparse_options. It
101 must exist in order for any metrics to be sent, although both Targets and 98 must exist in order for any metrics to be sent, although both Targets and
102 Metrics may be initialized before the underlying Monitor. If it does not exist 99 Metrics may be initialized before the underlying Monitor. If it does not exist
103 at the time that a Metric is sent, an exception will be raised. 100 at the time that a Metric is sent, an exception will be raised.
101
102 send() can be either synchronous or asynchronous. If synchronous, it needs to
103 make the HTTP request, wait for a response and return None.
104 If asynchronous, send() should start the request and immediately return some
105 object which is later passed to wait() once all requests have been started.
104 """ 106 """
105 107
106 _SCOPES = [] 108 _SCOPES = []
107 109
108 @staticmethod
109 def _wrap_proto(data):
110 """Normalize MetricsData, list(MetricsData), and MetricsCollection.
111
112 Args:
113 input: A MetricsData, list of MetricsData, or a MetricsCollection.
114
115 Returns:
116 A MetricsCollection with the appropriate data attribute set.
117 """
118 if isinstance(data, metrics_pb2.MetricsCollection):
119 ret = data
120 elif isinstance(data, list):
121 ret = metrics_pb2.MetricsCollection(data=data)
122 else:
123 ret = metrics_pb2.MetricsCollection(data=[data])
124 return ret
125
126 def send(self, metric_pb): 110 def send(self, metric_pb):
127 raise NotImplementedError() 111 raise NotImplementedError()
128 112
113 def wait(self, state): # pragma: no cover
114 pass
115
129 116
130 class HttpsMonitor(Monitor): 117 class HttpsMonitor(Monitor):
131 118
132 _SCOPES = ['https://www.googleapis.com/auth/prodxmon'] 119 _SCOPES = ['https://www.googleapis.com/auth/prodxmon']
133 120
134 def __init__(self, endpoint, credential_factory, http=None, ca_certs=None): 121 def __init__(self, endpoint, credential_factory, http=None, ca_certs=None):
135 self._endpoint = endpoint 122 self._endpoint = endpoint
136 credentials = credential_factory.create(self._SCOPES) 123 credentials = credential_factory.create(self._SCOPES)
137 if http is None: 124 if http is None:
138 http = httplib2_utils.RetriableHttp( 125 http = httplib2_utils.RetriableHttp(
139 httplib2_utils.InstrumentedHttp('acq-mon-api', ca_certs=ca_certs)) 126 httplib2_utils.InstrumentedHttp('acq-mon-api', ca_certs=ca_certs))
140 self._http = credentials.authorize(http) 127 self._http = credentials.authorize(http)
141 128
142 def encode_to_json(self, metric_pb): 129 def encode_to_json(self, metric_pb):
143 if interface.state.use_new_proto: 130 return json.dumps({'payload': pb_to_popo.convert(metric_pb)})
144 return json.dumps({'payload': pb_to_popo.convert(metric_pb)})
145 else:
146 return json.dumps({'resource': pb_to_popo.convert(metric_pb)})
147 131
148 def send(self, metric_pb): 132 def send(self, metric_pb):
149 logging.info('ts_mon: serializing metrics') 133 body = self.encode_to_json(metric_pb)
150 if interface.state.use_new_proto:
151 body = self.encode_to_json(metric_pb)
152 else:
153 body = self.encode_to_json(self._wrap_proto(metric_pb))
154 134
155 try: 135 try:
156 logging.info('ts_mon: sending %d bytes', len(body))
157 resp, content = self._http.request(self._endpoint, 136 resp, content = self._http.request(self._endpoint,
158 method='POST', 137 method='POST',
159 body=body, 138 body=body,
160 headers={'Content-Type': 'application/json'}) 139 headers={'Content-Type': 'application/json'})
161 logging.info('ts_mon: request finished')
162 if resp.status != 200: 140 if resp.status != 200:
163 logging.warning('HttpsMonitor.send received status %d: %s', resp.status, 141 logging.warning('HttpsMonitor.send received status %d: %s', resp.status,
164 content) 142 content)
165 except (ValueError, errors.Error, 143 except (ValueError, errors.Error,
166 socket.timeout, socket.error, socket.herror, socket.gaierror, 144 socket.timeout, socket.error, socket.herror, socket.gaierror,
167 httplib2.HttpLib2Error): 145 httplib2.HttpLib2Error):
168 logging.warning('HttpsMonitor.send failed: %s\n', 146 logging.exception('HttpsMonitor.send failed')
169 traceback.format_exc())
170
171
172 class PubSubMonitor(Monitor):
173 """Class which publishes metrics to a Cloud Pub/Sub topic."""
174
175 _SCOPES = ['https://www.googleapis.com/auth/pubsub']
176
177 TIMEOUT = 10 # seconds
178
179 def _initialize(self):
180 creds = self._credential_factory.create(self._SCOPES)
181 creds.authorize(self._http)
182 self._api = discovery.build('pubsub', 'v1', http=self._http)
183
184 def _update_init_metrics(self, status):
185 if not self._use_instrumented_http:
186 return
187 fields = {'name': 'acq-mon-api-pubsub',
188 'client': 'discovery',
189 'status': status}
190 http_metrics.response_status.increment(fields=fields)
191
192 def _check_initialize(self):
193 if self._api:
194 return True
195 try:
196 self._initialize()
197 except (ValueError, errors.Error,
198 socket.timeout, socket.error, socket.herror, socket.gaierror,
199 httplib2.HttpLib2Error, EnvironmentError):
200 # Log a warning, not error, to avoid false alarms in AppEngine apps.
201 logging.warning('PubSubMonitor._initialize failed:\n%s',
202 traceback.format_exc())
203 self._api = None
204 self._update_init_metrics(http_metrics.STATUS_ERROR)
205 return False
206
207 self._update_init_metrics(http_metrics.STATUS_OK)
208 return True
209
210 def __init__(self, credential_factory, project, topic,
211 use_instrumented_http=True, ca_certs=None):
212 """Process monitoring related command line flags and initialize api.
213
214 Args:
215 credential_factory (CredentialFactory instance): factory that creates
216 oauth2 credentials.
217 project (str): the name of the Pub/Sub project to publish to.
218 topic (str): the name of the Pub/Sub topic to publish to.
219 use_instrumented_http (bool): whether to record monitoring metrics for
220 HTTP requests made to the pubsub API.
221 ca_certs (str): path to file containing root CA certificates for SSL
222 server certificate validation. If not set, a CA cert
223 file bundled with httplib2 is used.
224 """
225 # Do not call self._check_initialize() in the constructor. This
226 # class is constructed during app initialization on AppEngine, and
227 # network calls are especially flaky during that time.
228 self._api = None
229 self._use_instrumented_http = use_instrumented_http
230 if use_instrumented_http:
231 self._http = httplib2_utils.InstrumentedHttp(
232 'acq-mon-api-pubsub', timeout=self.TIMEOUT, ca_certs=ca_certs)
233 else:
234 self._http = httplib2.Http(timeout=self.TIMEOUT, ca_certs=ca_certs)
235 self._credential_factory = credential_factory
236 self._topic = 'projects/%s/topics/%s' % (project, topic)
237
238 def send(self, metric_pb):
239 """Send a metric proto to the monitoring api.
240
241 Args:
242 metric_pb (MetricsData or MetricsCollection): the metric protobuf to send
243 """
244 if not self._check_initialize():
245 return
246 proto = self._wrap_proto(metric_pb)
247 logging.debug('ts_mon: sending %d metrics to PubSub', len(proto.data))
248 body = {
249 'messages': [
250 {'data': base64.b64encode(proto.SerializeToString())},
251 ],
252 }
253 # Occasionally, client fails to receive a proper internal JSON
254 # from the server and raises ValueError trying to parse it. Other
255 # times we may fail with a network error. This is not fatal, we'll
256 # resend metrics next time.
257 try:
258 self._api.projects().topics().publish(
259 topic=self._topic,
260 body=body).execute(num_retries=5)
261 except (ValueError, errors.Error,
262 socket.timeout, socket.error, socket.herror, socket.gaierror,
263 httplib2.HttpLib2Error):
264 # Log a warning, not error, to avoid false alarms in AppEngine apps.
265 logging.warning('PubSubMonitor.send failed:\n%s',
266 traceback.format_exc())
267 147
268 148
269 class DebugMonitor(Monitor): 149 class DebugMonitor(Monitor):
270 """Class which writes metrics to logs or a local file for debugging.""" 150 """Class which writes metrics to logs or a local file for debugging."""
271 def __init__(self, filepath=None): 151 def __init__(self, filepath=None):
272 if filepath is None: 152 if filepath is None:
273 self._fh = None 153 self._fh = None
274 else: 154 else:
275 self._fh = open(filepath, 'a') 155 self._fh = open(filepath, 'a')
276 156
277 def send(self, metric_pb): 157 def send(self, metric_pb):
278 text = str(self._wrap_proto(metric_pb)) 158 text = str(metric_pb)
279 logging.info('Flushing monitoring metrics:\n%s', text) 159 logging.info('Flushing monitoring metrics:\n%s', text)
280 if self._fh is not None: 160 if self._fh is not None:
281 self._fh.write(text + '\n\n') 161 self._fh.write(text + '\n\n')
282 self._fh.flush() 162 self._fh.flush()
283 163
284 164
285 class NullMonitor(Monitor): 165 class NullMonitor(Monitor):
286 """Class that doesn't send metrics anywhere.""" 166 """Class that doesn't send metrics anywhere."""
287 def send(self, metric_pb): 167 def send(self, metric_pb):
288 pass 168 pass
OLDNEW
« no previous file with comments | « client/third_party/infra_libs/ts_mon/common/metrics.py ('k') | client/third_party/infra_libs/ts_mon/common/targets.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698