OLD | NEW |
---|---|
1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import Queue | 5 import Queue |
6 import logging | 6 import logging |
7 import requests | |
7 import threading | 8 import threading |
8 import time | 9 import time |
9 | 10 |
10 from infra.libs.event_mon.log_request_lite_pb2 import LogRequestLite | 11 from infra.libs.event_mon.log_request_lite_pb2 import LogRequestLite |
11 | 12 |
12 | 13 |
13 def time_ms(): | 14 def time_ms(): |
14 """Return current timestamp in milliseconds.""" | 15 """Return current timestamp in milliseconds.""" |
15 return int(1000 * time.time()) | 16 return int(1000 * time.time()) |
16 | 17 |
17 | 18 |
18 class _Router(object): | 19 class _Router(object): |
19 """Route events to the right destination. | 20 """Route events to the right destination. |
20 | 21 |
21 This object is meant to be a singleton, and is not part of the API. | 22 This object is meant to be a singleton, and is not part of the API. |
22 | 23 |
23 Usage: | 24 Usage: |
24 router = _Router() | 25 router = _Router() |
25 event = ChromeInfraEvent.LogEventLite(...) | 26 event = ChromeInfraEvent.LogEventLite(...) |
26 ... fill in event ... | 27 ... fill in event ... |
27 router.push_event(event) | 28 router.push_event(event) |
28 """ | 29 """ |
29 def __init__(self, dry_run=True): | 30 def __init__(self, endpoint=None): |
30 # dry_run is meant for local testing and unit testing. When True, this | 31 # endpoint == None means 'dry run'. No data is sent. |
31 # object should have no side effect. | 32 self.endpoint = endpoint |
32 self.dry_run = dry_run | |
33 | 33 |
34 self.event_queue = Queue.Queue() | 34 self.event_queue = Queue.Queue() |
35 self._thread = threading.Thread(target=self._router) | 35 self._thread = threading.Thread(target=self._router) |
36 self._thread.daemon = True | 36 self._thread.daemon = True |
37 self._thread.start() | 37 self._thread.start() |
38 | 38 |
39 def _router(self): | 39 def _router(self): |
40 while(True): # pragma: no branch | 40 while(True): # pragma: no branch |
41 events = self.event_queue.get() | 41 events = self.event_queue.get() |
42 if events is None: | 42 if events is None: |
43 break | 43 break |
44 | 44 |
45 # Set this time at the very last moment | 45 # Set this time at the very last moment |
46 events.request_time_ms = time_ms() | 46 events.request_time_ms = time_ms() |
47 | 47 if self.endpoint: # pragma: no cover |
48 if not self.dry_run: # pragma: no cover | 48 # TODO(pgervais): log when something fails |
Sergey Berezin
2015/02/07 03:16:47
nit: dot at the end of the sentence.
pgervais
2015/02/10 10:36:37
Done.
| |
49 # TODO(pgervais): Actually do something | 49 requests.post(self.endpoint, data=events.SerializeToString()) |
50 pass | 50 else: |
51 print('fake post request') | |
51 | 52 |
52 def close(self, timeout=None): | 53 def close(self, timeout=None): |
53 """ | 54 """ |
54 Returns: | 55 Returns: |
55 success (bool): True if everything went well. Otherwise, there is no | 56 success (bool): True if everything went well. Otherwise, there is no |
56 guarantee that all events have been properly sent to the remote. | 57 guarantee that all events have been properly sent to the remote. |
57 """ | 58 """ |
58 timeout = timeout or 5 | 59 timeout = timeout or 5 |
59 self.event_queue.put(None) | 60 self.event_queue.put(None) |
60 self._thread.join(timeout) | 61 self._thread.join(timeout) |
(...skipping 12 matching lines...) Expand all Loading... | |
73 event (LogRequestLite.LogEventLite): one single event. | 74 event (LogRequestLite.LogEventLite): one single event. |
74 Returns: | 75 Returns: |
75 success (bool): False if an error happened. True means 'event accepted', | 76 success (bool): False if an error happened. True means 'event accepted', |
76 but NOT 'event successfully pushed to the remote'. | 77 but NOT 'event successfully pushed to the remote'. |
77 """ | 78 """ |
78 if not isinstance(event, LogRequestLite.LogEventLite): | 79 if not isinstance(event, LogRequestLite.LogEventLite): |
79 logging.error('Invalid type for "event": %s (should be LogEventLite)' | 80 logging.error('Invalid type for "event": %s (should be LogEventLite)' |
80 % str(type(event))) | 81 % str(type(event))) |
81 return False | 82 return False |
82 | 83 |
83 # Dumb implementation, can be made more sophisticated (batching) | 84 # Dumb implementation, can be made more sophisticated (batching) |
Sergey Berezin
2015/02/07 03:16:47
nit: this sounds like a TODO. And is missing a dot
pgervais
2015/02/10 10:36:37
Done.
| |
84 request = LogRequestLite() | 85 request_p = LogRequestLite() |
85 request.log_event.extend((event,)) # copies the protobuf | 86 request_p.log_source_name = 'CHROME_INFRA' |
86 self.event_queue.put(request) | 87 request_p.log_event.extend((event,)) # copies the protobuf |
88 self.event_queue.put(request_p) | |
87 return True | 89 return True |
OLD | NEW |