OLD | NEW |
---|---|
1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 """Classes representing the monitoring interface for tasks or devices. | 5 """Classes representing the monitoring interface for tasks or devices. |
6 | 6 |
7 Usage: | 7 Usage: |
8 import argparse | 8 import argparse |
9 from infra_libs import ts_mon | 9 from infra_libs import ts_mon |
10 | 10 |
(...skipping 16 matching lines...) Expand all Loading... | |
27 # No need to set up Monitor or Target, assume calling code did that. | 27 # No need to set up Monitor or Target, assume calling code did that. |
28 c = CounterMetric('/my/counter', fields={'source': 'mylibrary'}) | 28 c = CounterMetric('/my/counter', fields={'source': 'mylibrary'}) |
29 c.set(0) | 29 c.set(0) |
30 for x in range(100): | 30 for x in range(100): |
31 c.increment() | 31 c.increment() |
32 """ | 32 """ |
33 | 33 |
34 | 34 |
35 import base64 | 35 import base64 |
36 import json | 36 import json |
37 import logging | |
38 import os | 37 import os |
39 | 38 |
40 from monacq import acquisition_api | 39 from proto import metrics_pb2 |
41 from monacq.proto import metrics_pb2 | |
42 | |
43 from infra_libs import logs | |
44 import infra_libs | |
45 | 40 |
46 import httplib2 | 41 import httplib2 |
47 from apiclient import discovery | 42 from apiclient import discovery |
48 from oauth2client.client import GoogleCredentials | 43 from oauth2client.client import GoogleCredentials |
49 from oauth2client.file import Storage | |
50 | |
51 | |
52 def _logging_callback(resp, content): # pragma: no cover | |
53 logging.debug(repr(resp)) | |
54 logging.debug(content) | |
55 | 44 |
56 | 45 |
57 class Monitor(object): | 46 class Monitor(object): |
58 """Abstract base class encapsulating the ability to collect and send metrics. | 47 """Abstract base class encapsulating the ability to collect and send metrics. |
59 | 48 |
60 This is a singleton class. There should only be one instance of a Monitor at | 49 This is a singleton class. There should only be one instance of a Monitor at |
61 a time. It will be created and initialized by process_argparse_options. It | 50 a time. It will be created and initialized by process_argparse_options. It |
62 must exist in order for any metrics to be sent, although both Targets and | 51 must exist in order for any metrics to be sent, although both Targets and |
63 Metrics may be initialized before the underlying Monitor. If it does not exist | 52 Metrics may be initialized before the underlying Monitor. If it does not exist |
64 at the time that a Metric is sent, an exception will be raised. | 53 at the time that a Metric is sent, an exception will be raised. |
(...skipping 13 matching lines...) Expand all Loading... | |
78 elif isinstance(data, list): | 67 elif isinstance(data, list): |
79 ret = metrics_pb2.MetricsCollection(data=data) | 68 ret = metrics_pb2.MetricsCollection(data=data) |
80 else: | 69 else: |
81 ret = metrics_pb2.MetricsCollection(data=[data]) | 70 ret = metrics_pb2.MetricsCollection(data=[data]) |
82 return ret | 71 return ret |
83 | 72 |
84 def send(self, metric_pb): | 73 def send(self, metric_pb): |
85 raise NotImplementedError() | 74 raise NotImplementedError() |
86 | 75 |
87 | 76 |
88 class ApiMonitor(Monitor): | |
89 """Class which sends metrics to the monitoring api, the default behavior.""" | |
90 def __init__(self, credsfile, endpoint, use_instrumented_http=True): | |
91 """Process monitoring related command line flags and initialize api. | |
92 | |
93 Args: | |
94 credsfile (str): path to the credentials json file | |
95 endpoint (str): url of the monitoring endpoint to hit | |
96 """ | |
97 | |
98 creds = acquisition_api.AcquisitionCredential.Load( | |
99 os.path.abspath(credsfile)) | |
100 api = acquisition_api.AcquisitionApi(creds, endpoint) | |
101 api.SetResponseCallback(_logging_callback) | |
102 | |
103 if use_instrumented_http: | |
104 api.SetHttp(infra_libs.InstrumentedHttp('acq-mon-api')) | |
105 | |
106 self._api = api | |
107 | |
108 def send(self, metric_pb): | |
109 """Send a metric proto to the monitoring api. | |
110 | |
111 Args: | |
112 metric_pb (MetricsData or MetricsCollection): the metric protobuf to send | |
113 """ | |
114 try: | |
115 self._api.Send(self._wrap_proto(metric_pb)) | |
116 except acquisition_api.AcquisitionApiRequestException as e: | |
117 logging.error('Failed to send the metrics: %s', e) | |
118 | |
119 | |
120 class PubSubMonitor(Monitor): | 77 class PubSubMonitor(Monitor): |
121 """Class which publishes metrics to a Cloud Pub/Sub topic.""" | 78 """Class which publishes metrics to a Cloud Pub/Sub topic.""" |
122 | 79 |
123 _SCOPES = [ | 80 _SCOPES = [ |
124 'https://www.googleapis.com/auth/pubsub', | 81 'https://www.googleapis.com/auth/pubsub', |
125 ] | 82 ] |
126 | 83 |
127 @classmethod | 84 def _initialize(self, project, topic): |
128 def _load_credentials(cls, credentials_file_path): | |
129 with open(credentials_file_path, 'r') as credentials_file: | |
130 credentials_json = json.load(credentials_file) | |
131 if credentials_json.get('type', None): | |
132 credentials = GoogleCredentials.from_stream(credentials_file_path) | |
133 credentials = credentials.create_scoped(cls._SCOPES) | |
134 return credentials | |
135 return Storage(credentials_file_path).get() | |
136 | |
137 def _initialize(self, credsfile, project, topic): | |
138 # Copied from acquisition_api.AcquisitionCredential.Load. | 85 # Copied from acquisition_api.AcquisitionCredential.Load. |
139 creds = self._load_credentials(credsfile) | 86 creds = GoogleCredentials.get_application_default() |
agable
2015/08/10 23:04:38
And the pubsub pipeline is accepting these credent
jshu
2015/08/11 21:57:18
Sergey had to add it to the list of whitelisted ap
agable
2015/08/12 22:18:08
Whitelisting is fine for now.
| |
87 creds = creds.create_scoped(self._SCOPES) | |
140 self._http = httplib2.Http() | 88 self._http = httplib2.Http() |
141 creds.authorize(self._http) | 89 creds.authorize(self._http) |
142 self._api = discovery.build('pubsub', 'v1', http=self._http) | 90 self._api = discovery.build('pubsub', 'v1', http=self._http) |
143 self._topic = 'projects/%s/topics/%s' % (project, topic) | 91 self._topic = 'projects/%s/topics/%s' % (project, topic) |
144 | 92 |
145 def __init__(self, credsfile, project, topic): | 93 def __init__(self, project, topic): |
146 """Process monitoring related command line flags and initialize api. | 94 """Process monitoring related command line flags and initialize api. |
147 | 95 |
148 Args: | 96 Args: |
149 credsfile (str): path to the credentials json file | |
150 project (str): the name of the Pub/Sub project to publish to. | 97 project (str): the name of the Pub/Sub project to publish to. |
151 topic (str): the name of the Pub/Sub topic to publish to. | 98 topic (str): the name of the Pub/Sub topic to publish to. |
152 """ | 99 """ |
153 self._initialize(credsfile, project, topic) | 100 self._initialize(project, topic) |
154 | 101 |
155 def send(self, metric_pb): | 102 def send(self, metric_pb): |
156 """Send a metric proto to the monitoring api. | 103 """Send a metric proto to the monitoring api. |
157 | 104 |
158 Args: | 105 Args: |
159 metric_pb (MetricsData or MetricsCollection): the metric protobuf to send | 106 metric_pb (MetricsData or MetricsCollection): the metric protobuf to send |
160 """ | 107 """ |
161 proto = self._wrap_proto(metric_pb) | 108 proto = self._wrap_proto(metric_pb) |
162 body = { | 109 body = { |
163 'messages': [ | 110 'messages': [ |
164 {'data': base64.b64encode(proto.SerializeToString())}, | 111 {'data': base64.b64encode(proto.SerializeToString())}, |
165 ], | 112 ], |
166 } | 113 } |
167 self._api.projects().topics().publish( | 114 self._api.projects().topics().publish( |
168 topic=self._topic, | 115 topic=self._topic, |
169 body=body).execute(num_retries=5) | 116 body=body).execute(num_retries=5) |
170 | 117 |
171 | 118 |
172 class DiskMonitor(Monitor): | |
173 """Class which writes metrics to a local file for debugging.""" | |
174 def __init__(self, filepath): | |
175 self._logger = logging.getLogger('__name__') | |
176 filehandler = logging.FileHandler(filepath, 'a') | |
177 logs.add_handler(self._logger, handler=filehandler, level=logging.INFO) | |
178 | |
179 def send(self, metric_pb): | |
180 self._logger.info('\n' + str(self._wrap_proto(metric_pb))) | |
181 | |
182 | |
183 class NullMonitor(Monitor): | 119 class NullMonitor(Monitor): |
184 """Class that doesn't send metrics anywhere.""" | 120 """Class that doesn't send metrics anywhere.""" |
185 def send(self, metric_pb): | 121 def send(self, metric_pb): |
186 pass | 122 pass |
OLD | NEW |