Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(94)

Side by Side Diff: infra_libs/event_mon/router.py

Issue 2213143002: Add infra_libs as a bootstrap dependency. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Removed the ugly import hack Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 import logging
6 import os
7 import random
8 import sys
9 import time
10
11 import httplib2
12
13 import infra_libs
14 from infra_libs.event_mon.protos.log_request_lite_pb2 import LogRequestLite
15 from infra_libs.event_mon.protos.chrome_infra_log_pb2 import ChromeInfraEvent
16
17
18 def time_ms():
19 """Return current timestamp in milliseconds."""
20 return int(1000 * time.time())
21
22
23 def backoff_time(attempt, retry_backoff=2., max_delay=30.):
24 """Compute randomized exponential backoff time.
25
26 Args:
27 attempt (int): attempt number, starting at zero.
28
29 Keyword Args:
30 retry_backoff(float): backoff time on the first attempt.
31 max_delay(float): maximum returned value.
32 """
33 delay = retry_backoff * (2 ** attempt)
34 # Add +-25% of variation.
35 delay += delay * ((random.random() - 0.5) / 2.)
36 return min(delay, max_delay)
37
38
39 class _Router(object):
40 """Route events to the right destination. Base class.
41
42 This object is meant to be a singleton, and is not part of the API.
43 Subclasses must implement _send_to_endpoint().
44
45 Usage:
46 router = _Router()
47 event = ChromeInfraEvent.LogEventLite(...)
48 ... fill in event ...
49 router.push_event(event)
50 """
51 def push_event(self, log_events):
52 """Enqueue event to push to the collection service.
53
54 This method offers no guarantee on return that the event have been pushed
55 externally, as some buffering can take place.
56
57 Args:
58 log_events (LogRequestLite.LogEventLite or list/tuple of): events.
59
60 Returns:
61 success (bool): False if an error happened. True means 'event accepted',
62 but NOT 'event successfully pushed to the remote'.
63 """
64 if isinstance(log_events, LogRequestLite.LogEventLite):
65 log_events = (log_events,)
66
67 if not isinstance(log_events, (list, tuple)):
68 logging.error('Invalid type for "event", should be LogEventLite or '
69 'list of. Got %s' % str(type(log_events)))
70 return False
71
72 request_p = LogRequestLite()
73 request_p.log_source_name = 'CHROME_INFRA'
74 request_p.log_event.extend(log_events) # copies the protobuf
75 # Sets the sending time here for safety, _send_to_endpoint should change it
76 # if needed.
77 request_p.request_time_ms = time_ms()
78 return self._send_to_endpoint(request_p)
79
80 def _send_to_endpoint(self, events):
81 """Send a protobuf to wherever it should be sent.
82
83 This method is called by push_event.
84 If some computation is require, make sure to update events.request_time_ms
85 right before sending.
86
87 Args:
88 events(LogRequestLite): protobuf to send.
89
90 Returns:
91 success(bool): whether POSTing/writing succeeded or not.
92 """
93 raise NotImplementedError('Please implement _send_to_endpoint().')
94
95
96 class _LocalFileRouter(_Router):
97 def __init__(self, output_file, dry_run=False):
98 """Initialize the router.
99
100 Writes a serialized LogRequestLite protobuf in a local file. File is
101 created/truncated before writing (no append).
102
103 Args:
104 output_file(str): path to file where to write the protobuf.
105
106 Keyword Args:
107 dry_run(bool): if True, the file is not written.
108 """
109 _Router.__init__(self)
110 self.output_file = output_file
111 self._dry_run = dry_run
112
113 def _send_to_endpoint(self, events):
114 try:
115 if not os.path.isdir(os.path.dirname(self.output_file)):
116 logging.error('File cannot be written, parent directory does '
117 'not exist: %s' % os.path.dirname(self.output_file))
118 if self._dry_run:
119 logging.info('Would have written in %s', self.output_file)
120 else:
121 with open(self.output_file, 'wb') as f:
122 f.write(events.SerializeToString()) # pragma: no branch
123 except Exception:
124 logging.exception('Failed to write in file: %s', self.output_file)
125 return False
126
127 return True
128
129
130 class _TextStreamRouter(_Router):
131 def __init__(self, stream=sys.stdout):
132 """Initialize the router.
133
134 Args:
135 stream(File): where to write the output.
136 """
137 _Router.__init__(self)
138 self.stream = stream
139
140 def _send_to_endpoint(self, events):
141 # Prints individual events because it's what we're usually interested in
142 # in that case.
143 infra_events = [str(ChromeInfraEvent.FromString(
144 ev.source_extension)) for ev in events.log_event]
145 try:
146 self.stream.write('%s\n' % '\n'.join(infra_events))
147 except Exception:
148 logging.exception('Unable to write to provided stream')
149 return False
150 return True
151
152
153 class _LoggingStreamRouter(_Router):
154 def __init__(self, severity=logging.INFO):
155 """Initialize the router.
156
157 Args:
158 severity: severity of the messages for reporting events
159 """
160 _Router.__init__(self)
161 self.severity = severity
162
163 def _send_to_endpoint(self, events):
164 try:
165 for ev in events.log_event:
166 ev_str = str(ChromeInfraEvent.FromString(ev.source_extension))
167 logging.log(self.severity, 'Sending event_mon event:\n%s' % ev_str)
168 except Exception:
169 logging.exception('Unable to log the events')
170 return False
171 return True
172
173
174 class _HttpRouter(_Router):
175 def __init__(self, cache, endpoint, timeout=10, try_num=3, retry_backoff=2.,
176 dry_run=False, _sleep_fn=time.sleep):
177 """Initialize the router.
178
179 Args:
180 cache(dict): This must be config._cache. Passed as a parameter to
181 avoid a circular import.
182 endpoint(str or None): None means 'dry run'. What would be sent is printed
183 on stdout. If endpoint starts with 'https://' data is POSTed there.
184 Otherwise it is interpreted as a file where to write serialized
185 LogEventLite protos.
186 try_num(int): max number of http requests send to the endpoint.
187 retry_backoff(float): time in seconds before retrying posting to the
188 endpoint. Randomized exponential backoff is applied on subsequent
189 retries.
190 dry_run(boolean): if True, no http request is sent. Instead a message is
191 printed.
192 _sleep_fn (function): function to wait specified number of seconds. This
193 argument is provided for testing purposes.
194 """
195 HTTP_IDENTIFIER = 'event_mon'
196 _Router.__init__(self)
197 self.endpoint = endpoint
198 self.try_num = try_num
199 self.retry_backoff = retry_backoff
200 self._cache = cache
201 self._http = infra_libs.InstrumentedHttp(HTTP_IDENTIFIER, timeout=timeout)
202 self._dry_run = dry_run
203 self._sleep_fn = _sleep_fn
204
205 # TODO(pgervais) pass this as parameters instead.
206 if self._cache.get('service_account_creds'):
207 try:
208 logging.debug('Activating OAuth2 authentication.')
209 self._http = infra_libs.get_authenticated_http(
210 self._cache['service_account_creds'],
211 service_accounts_creds_root=
212 self._cache['service_accounts_creds_root'],
213 scope='https://www.googleapis.com/auth/cclog',
214 http_identifier=HTTP_IDENTIFIER,
215 timeout=timeout
216 )
217 except IOError:
218 logging.error('Unable to read credentials, requests will be '
219 'unauthenticated. File: %s',
220 self._cache.get('service_account_creds'))
221
222 def _send_to_endpoint(self, events):
223 """Send protobuf to endpoint
224
225 Args:
226 events(LogRequestLite): the protobuf to send.
227
228 Returns:
229 success(bool): whether POSTing/writing succeeded or not.
230 """
231 if not self.endpoint.startswith('https://'):
232 logging.error("Received invalid https endpoint: %s", self.endpoint)
233 return False
234
235 logging.debug('event_mon: POSTing events to %s', self.endpoint)
236
237 attempt = 0 # silencing pylint
238 for attempt in xrange(self.try_num): # pragma: no branch
239 # (re)set this time at the very last moment
240 events.request_time_ms = time_ms()
241 response = None
242 try:
243 if self._dry_run:
244 logging.info('Http requests disabled. Not sending anything')
245 else: # pragma: no cover
246 response, _ = self._http.request(
247 uri=self.endpoint,
248 method='POST',
249 headers={'Content-Type': 'application/octet-stream'},
250 body=events.SerializeToString()
251 )
252
253 if self._dry_run or response.status == 200:
254 logging.debug('Succeeded POSTing data after %d attempts', attempt + 1)
255 return True
256
257 except Exception:
258 logging.exception('exception when POSTing data')
259
260 if response:
261 logging.error('failed to POST data to %s Status: %d (attempt %d)',
262 self.endpoint, response.status, attempt)
263
264 if attempt == 0:
265 logging.error('data: %s', str(events)[:2000])
266
267 self._sleep_fn(backoff_time(attempt, retry_backoff=self.retry_backoff))
268
269 logging.error('failed to POST data after %d attempts, giving up.',
270 attempt+1)
271 return False
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698