Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(52)

Side by Side Diff: infra_libs/ts_mon/common/monitors.py

Issue 2213143002: Add infra_libs as a bootstrap dependency. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Removed the ugly import hack Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 """Classes representing the monitoring interface for tasks or devices."""
6
7
8 import base64
9 import httplib2
10 import json
11 import logging
12 import socket
13 import traceback
14
15 from googleapiclient import discovery
16 from googleapiclient import errors
17 from infra_libs import httplib2_utils
18 from infra_libs.ts_mon.common import http_metrics
19 from infra_libs.ts_mon.common import pb_to_popo
20 from infra_libs.ts_mon.protos import metrics_pb2
21 from oauth2client import gce
22 from oauth2client.client import GoogleCredentials
23 from oauth2client.file import Storage
24
25 # Special string that can be passed through as the credentials path to use the
26 # default Appengine or GCE service account.
27 APPENGINE_CREDENTIALS = ':appengine'
28 GCE_CREDENTIALS = ':gce'
29
30
31 class Monitor(object):
32 """Abstract base class encapsulating the ability to collect and send metrics.
33
34 This is a singleton class. There should only be one instance of a Monitor at
35 a time. It will be created and initialized by process_argparse_options. It
36 must exist in order for any metrics to be sent, although both Targets and
37 Metrics may be initialized before the underlying Monitor. If it does not exist
38 at the time that a Metric is sent, an exception will be raised.
39 """
40
41 _SCOPES = []
42
43 @staticmethod
44 def _wrap_proto(data):
45 """Normalize MetricsData, list(MetricsData), and MetricsCollection.
46
47 Args:
48 input: A MetricsData, list of MetricsData, or a MetricsCollection.
49
50 Returns:
51 A MetricsCollection with the appropriate data attribute set.
52 """
53 if isinstance(data, metrics_pb2.MetricsCollection):
54 ret = data
55 elif isinstance(data, list):
56 ret = metrics_pb2.MetricsCollection(data=data)
57 else:
58 ret = metrics_pb2.MetricsCollection(data=[data])
59 return ret
60
61 def _load_credentials(self, credentials_file_path):
62 if credentials_file_path == GCE_CREDENTIALS:
63 return gce.AppAssertionCredentials(self._SCOPES)
64 if credentials_file_path == APPENGINE_CREDENTIALS: # pragma: no cover
65 # This import doesn't work outside appengine, so delay it until it's used.
66 from oauth2client import appengine
67 from google.appengine.api import app_identity
68 logging.info('Initializing with service account %s',
69 app_identity.get_service_account_name())
70 return appengine.AppAssertionCredentials(self._SCOPES)
71
72 with open(credentials_file_path, 'r') as credentials_file:
73 credentials_json = json.load(credentials_file)
74 if credentials_json.get('type', None):
75 credentials = GoogleCredentials.from_stream(credentials_file_path)
76 credentials = credentials.create_scoped(self._SCOPES)
77 return credentials
78 return Storage(credentials_file_path).get()
79
80 def send(self, metric_pb):
81 raise NotImplementedError()
82
83 class HttpsMonitor(Monitor):
84
85 _SCOPES = [
86 'https://www.googleapis.com/auth/prodxmon'
87 ]
88
89 def __init__(self, endpoint, credentials_file_path, http=None):
90 self._endpoint = endpoint
91 credentials = self._load_credentials(credentials_file_path)
92 if http is None:
93 http = httplib2_utils.RetriableHttp(
94 httplib2_utils.InstrumentedHttp('acq-mon-api'))
95 self._http = credentials.authorize(http)
96
97 def encodeToJson(self, metric_pb):
98 return json.dumps({ 'resource': pb_to_popo.convert(metric_pb) })
99
100 def send(self, metric_pb):
101 body = self.encodeToJson(self._wrap_proto(metric_pb))
102
103 try:
104 resp, content = self._http.request(self._endpoint, method='POST',
105 body=body)
106 if resp.status != 200:
107 logging.warning('HttpsMonitor.send received status %d: %s', resp.status,
108 content)
109 except (ValueError, errors.Error,
110 socket.timeout, socket.error, socket.herror, socket.gaierror,
111 httplib2.HttpLib2Error):
112 logging.warning('HttpsMonitor.send failed: %s\n',
113 traceback.format_exc())
114
115
116 class PubSubMonitor(Monitor):
117 """Class which publishes metrics to a Cloud Pub/Sub topic."""
118
119 _SCOPES = [
120 'https://www.googleapis.com/auth/pubsub',
121 ]
122
123 TIMEOUT = 10 # seconds
124
125 def _initialize(self):
126 creds = self._load_credentials(self._credsfile)
127 creds.authorize(self._http)
128 self._api = discovery.build('pubsub', 'v1', http=self._http)
129
130 def _update_init_metrics(self, status):
131 if not self._use_instrumented_http:
132 return
133 fields = {'name': 'acq-mon-api-pubsub',
134 'client': 'discovery',
135 'status': status}
136 http_metrics.response_status.increment(fields=fields)
137
138 def _check_initialize(self):
139 if self._api:
140 return True
141 try:
142 self._initialize()
143 except (ValueError, errors.Error,
144 socket.timeout, socket.error, socket.herror, socket.gaierror,
145 httplib2.HttpLib2Error, EnvironmentError):
146 # Log a warning, not error, to avoid false alarms in AppEngine apps.
147 logging.warning('PubSubMonitor._initialize failed:\n%s',
148 traceback.format_exc())
149 self._api = None
150 self._update_init_metrics(http_metrics.STATUS_ERROR)
151 return False
152
153 self._update_init_metrics(http_metrics.STATUS_OK)
154 return True
155
156 def __init__(self, credsfile, project, topic, use_instrumented_http=True):
157 """Process monitoring related command line flags and initialize api.
158
159 Args:
160 credsfile (str): path to the credentials json file
161 project (str): the name of the Pub/Sub project to publish to.
162 topic (str): the name of the Pub/Sub topic to publish to.
163 use_instrumented_http (bool): whether to record monitoring metrics for
164 HTTP requests made to the pubsub API.
165 """
166 # Do not call self._check_initialize() in the constructor. This
167 # class is constructed during app initialization on AppEngine, and
168 # network calls are especially flaky during that time.
169 self._api = None
170 self._use_instrumented_http = use_instrumented_http
171 if use_instrumented_http:
172 self._http = httplib2_utils.InstrumentedHttp(
173 'acq-mon-api-pubsub', timeout=self.TIMEOUT)
174 else:
175 self._http = httplib2.Http(timeout=self.TIMEOUT)
176 self._credsfile = credsfile
177 self._topic = 'projects/%s/topics/%s' % (project, topic)
178
179 def send(self, metric_pb):
180 """Send a metric proto to the monitoring api.
181
182 Args:
183 metric_pb (MetricsData or MetricsCollection): the metric protobuf to send
184 """
185 if not self._check_initialize():
186 return
187 proto = self._wrap_proto(metric_pb)
188 logging.debug('ts_mon: sending %d metrics to PubSub', len(proto.data))
189 body = {
190 'messages': [
191 {'data': base64.b64encode(proto.SerializeToString())},
192 ],
193 }
194 # Occasionally, client fails to receive a proper internal JSON
195 # from the server and raises ValueError trying to parse it. Other
196 # times we may fail with a network error. This is not fatal, we'll
197 # resend metrics next time.
198 try:
199 self._api.projects().topics().publish(
200 topic=self._topic,
201 body=body).execute(num_retries=5)
202 except (ValueError, errors.Error,
203 socket.timeout, socket.error, socket.herror, socket.gaierror,
204 httplib2.HttpLib2Error):
205 # Log a warning, not error, to avoid false alarms in AppEngine apps.
206 logging.warning('PubSubMonitor.send failed:\n%s',
207 traceback.format_exc())
208
209
210 class DebugMonitor(Monitor):
211 """Class which writes metrics to logs or a local file for debugging."""
212 def __init__(self, filepath=None):
213 if filepath is None:
214 self._fh = None
215 else:
216 self._fh = open(filepath, 'a')
217
218 def send(self, metric_pb):
219 text = str(self._wrap_proto(metric_pb))
220 logging.info('Flushing monitoring metrics:\n%s', text)
221 if self._fh is not None:
222 self._fh.write(text + '\n\n')
223 self._fh.flush()
224
225
226 class NullMonitor(Monitor):
227 """Class that doesn't send metrics anywhere."""
228 def send(self, metric_pb):
229 pass
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698