OLD | NEW |
| (Empty) |
1 # Copyright 2015 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 import functools | |
6 import threading | |
7 import time | |
8 import unittest | |
9 | |
10 import mock | |
11 | |
12 from testing_support import auto_stub | |
13 | |
14 from infra_libs.ts_mon.common import errors | |
15 from infra_libs.ts_mon.common import interface | |
16 from infra_libs.ts_mon.common import metrics | |
17 from infra_libs.ts_mon.common import targets | |
18 from infra_libs.ts_mon.common.test import stubs | |
19 | |
20 | |
21 class GlobalsTest(unittest.TestCase): | |
22 | |
23 def setUp(self): | |
24 target = targets.TaskTarget('test_service', 'test_job', | |
25 'test_region', 'test_host') | |
26 self.mock_state = interface.State(target=target) | |
27 self.state_patcher = mock.patch('infra_libs.ts_mon.common.interface.state', | |
28 new=self.mock_state) | |
29 self.state_patcher.start() | |
30 | |
31 def tearDown(self): | |
32 # It's important to call close() before un-setting the mock state object, | |
33 # because any FlushThread started by the test is stored in that mock state | |
34 # and needs to be stopped before running any other tests. | |
35 interface.close() | |
36 self.state_patcher.stop() | |
37 | |
38 def test_flush(self): | |
39 interface.state.global_monitor = stubs.MockMonitor() | |
40 interface.state.target = stubs.MockTarget() | |
41 | |
42 # pylint: disable=unused-argument | |
43 def serialize_to(pb, start_time, fields, value, target): | |
44 pb.data.add().name = 'foo' | |
45 | |
46 fake_metric = mock.create_autospec(metrics.Metric, spec_set=True) | |
47 fake_metric.name = 'fake' | |
48 fake_metric.serialize_to.side_effect = serialize_to | |
49 interface.register(fake_metric) | |
50 interface.state.store.set('fake', (), None, 123) | |
51 | |
52 interface.flush() | |
53 interface.state.global_monitor.send.assert_called_once() | |
54 proto = interface.state.global_monitor.send.call_args[0][0] | |
55 self.assertEqual(1, len(proto.data)) | |
56 self.assertEqual('foo', proto.data[0].name) | |
57 | |
58 def test_flush_disabled(self): | |
59 interface.reset_for_unittest(disable=True) | |
60 interface.state.global_monitor = stubs.MockMonitor() | |
61 interface.state.target = stubs.MockTarget() | |
62 interface.flush() | |
63 self.assertFalse(interface.state.global_monitor.send.called) | |
64 | |
65 def test_flush_raises(self): | |
66 self.assertIsNone(interface.state.global_monitor) | |
67 with self.assertRaises(errors.MonitoringNoConfiguredMonitorError): | |
68 interface.flush() | |
69 | |
70 def test_flush_many(self): | |
71 interface.state.global_monitor = stubs.MockMonitor() | |
72 interface.state.target = stubs.MockTarget() | |
73 | |
74 # pylint: disable=unused-argument | |
75 def serialize_to(pb, start_time, fields, value, target): | |
76 pb.data.add().name = 'foo' | |
77 | |
78 # We can't use the mock's call_args_list here because the same object is | |
79 # reused as the argument to both calls and cleared inbetween. | |
80 data_lengths = [] | |
81 def send(proto): | |
82 data_lengths.append(len(proto.data)) | |
83 interface.state.global_monitor.send.side_effect = send | |
84 | |
85 fake_metric = mock.create_autospec(metrics.Metric, spec_set=True) | |
86 fake_metric.name = 'fake' | |
87 fake_metric.serialize_to.side_effect = serialize_to | |
88 interface.register(fake_metric) | |
89 | |
90 for i in xrange(1001): | |
91 interface.state.store.set('fake', ('field', i), None, 123) | |
92 | |
93 interface.flush() | |
94 self.assertEquals(2, interface.state.global_monitor.send.call_count) | |
95 self.assertEqual(1000, data_lengths[0]) | |
96 self.assertEqual(1, data_lengths[1]) | |
97 | |
98 def test_send_modifies_metric_values(self): | |
99 interface.state.global_monitor = stubs.MockMonitor() | |
100 interface.state.target = stubs.MockTarget() | |
101 | |
102 # pylint: disable=unused-argument | |
103 def serialize_to(pb, start_time, fields, value, target): | |
104 pb.data.add().name = 'foo' | |
105 | |
106 fake_metric = mock.create_autospec(metrics.Metric, spec_set=True) | |
107 fake_metric.name = 'fake' | |
108 fake_metric.serialize_to.side_effect = serialize_to | |
109 interface.register(fake_metric) | |
110 | |
111 # Setting this will modify store._values in the middle of iteration. | |
112 delayed_metric = metrics.CounterMetric('foo') | |
113 def send(proto): | |
114 delayed_metric.increment_by(1) | |
115 interface.state.global_monitor.send.side_effect = send | |
116 | |
117 for i in xrange(1001): | |
118 interface.state.store.set('fake', ('field', i), None, 123) | |
119 | |
120 # Shouldn't raise an exception. | |
121 interface.flush() | |
122 | |
123 def test_register_unregister(self): | |
124 fake_metric = mock.create_autospec(metrics.Metric, spec_set=True) | |
125 self.assertEqual(0, len(interface.state.metrics)) | |
126 interface.register(fake_metric) | |
127 self.assertEqual(1, len(interface.state.metrics)) | |
128 interface.unregister(fake_metric) | |
129 self.assertEqual(0, len(interface.state.metrics)) | |
130 | |
131 def test_identical_register(self): | |
132 fake_metric = mock.Mock(_name='foo') | |
133 interface.register(fake_metric) | |
134 interface.register(fake_metric) | |
135 self.assertEqual(1, len(interface.state.metrics)) | |
136 | |
137 def test_duplicate_register_raises(self): | |
138 fake_metric = mock.Mock() | |
139 fake_metric.name = 'foo' | |
140 phake_metric = mock.Mock() | |
141 phake_metric.name = 'foo' | |
142 interface.register(fake_metric) | |
143 with self.assertRaises(errors.MonitoringDuplicateRegistrationError): | |
144 interface.register(phake_metric) | |
145 self.assertEqual(1, len(interface.state.metrics)) | |
146 | |
147 def test_unregister_missing_raises(self): | |
148 fake_metric = mock.Mock(_name='foo') | |
149 self.assertEqual(0, len(interface.state.metrics)) | |
150 with self.assertRaises(KeyError): | |
151 interface.unregister(fake_metric) | |
152 | |
153 def test_close_stops_flush_thread(self): | |
154 interface.state.flush_thread = interface._FlushThread(10) | |
155 interface.state.flush_thread.start() | |
156 | |
157 self.assertTrue(interface.state.flush_thread.is_alive()) | |
158 interface.close() | |
159 self.assertFalse(interface.state.flush_thread.is_alive()) | |
160 | |
161 def test_reset_for_unittest(self): | |
162 metric = metrics.CounterMetric('foo') | |
163 metric.increment() | |
164 self.assertEquals(1, metric.get()) | |
165 | |
166 interface.reset_for_unittest() | |
167 self.assertIsNone(metric.get()) | |
168 | |
169 | |
170 class FakeThreadingEvent(object): | |
171 """A fake threading.Event that doesn't use the clock for timeouts.""" | |
172 | |
173 def __init__(self): | |
174 # If not None, called inside wait() with the timeout (in seconds) to | |
175 # increment a fake clock. | |
176 self.increment_time_func = None | |
177 | |
178 self._is_set = False # Return value of the next call to wait. | |
179 self._last_wait_timeout = None # timeout argument of the last call to wait. | |
180 | |
181 self._wait_enter_semaphore = threading.Semaphore(0) | |
182 self._wait_exit_semaphore = threading.Semaphore(0) | |
183 | |
184 def timeout_wait(self): | |
185 """Blocks until the next time the code under test calls wait(). | |
186 | |
187 Makes the wait() call return False (indicating a timeout), and this call | |
188 returns the timeout argument given to the wait() method. | |
189 | |
190 Called by the test. | |
191 """ | |
192 | |
193 self._wait_enter_semaphore.release() | |
194 self._wait_exit_semaphore.acquire() | |
195 return self._last_wait_timeout | |
196 | |
197 def set(self, blocking=True): | |
198 """Makes the next wait() call return True. | |
199 | |
200 By default this blocks until the next call to wait(), but you can pass | |
201 blocking=False to just set the flag, wake up any wait() in progress (if any) | |
202 and return immediately. | |
203 """ | |
204 | |
205 self._is_set = True | |
206 self._wait_enter_semaphore.release() | |
207 if blocking: | |
208 self._wait_exit_semaphore.acquire() | |
209 | |
210 def wait(self, timeout): | |
211 """Block until either set() or timeout_wait() is called by the test.""" | |
212 | |
213 self._wait_enter_semaphore.acquire() | |
214 self._last_wait_timeout = timeout | |
215 if self.increment_time_func is not None: # pragma: no cover | |
216 self.increment_time_func(timeout) | |
217 ret = self._is_set | |
218 self._wait_exit_semaphore.release() | |
219 return ret | |
220 | |
221 | |
222 class FlushThreadTest(unittest.TestCase): | |
223 | |
224 def setUp(self): | |
225 mock.patch('infra_libs.ts_mon.common.interface.flush', | |
226 autospec=True).start() | |
227 mock.patch('time.time', autospec=True).start() | |
228 | |
229 self.fake_time = 0 | |
230 time.time.side_effect = lambda: self.fake_time | |
231 | |
232 self.stop_event = FakeThreadingEvent() | |
233 self.stop_event.increment_time_func = self.increment_time | |
234 | |
235 self.t = interface._FlushThread(60, stop_event=self.stop_event) | |
236 | |
237 def increment_time(self, delta): | |
238 self.fake_time += delta | |
239 | |
240 def assertInRange(self, lower, upper, value): | |
241 self.assertGreaterEqual(value, lower) | |
242 self.assertLessEqual(value, upper) | |
243 | |
244 def tearDown(self): | |
245 # Ensure the thread exits. | |
246 self.stop_event.set(blocking=False) | |
247 self.t.join() | |
248 | |
249 mock.patch.stopall() | |
250 | |
251 def test_run_calls_flush(self): | |
252 self.t.start() | |
253 | |
254 self.assertEqual(0, interface.flush.call_count) | |
255 | |
256 # The wait is for the whole interval (with jitter). | |
257 self.assertInRange(30, 60, self.stop_event.timeout_wait()) | |
258 | |
259 # Return from the second wait, which exits the thread. | |
260 self.stop_event.set() | |
261 self.t.join() | |
262 self.assertEqual(2, interface.flush.call_count) | |
263 | |
264 def test_run_catches_exceptions(self): | |
265 interface.flush.side_effect = Exception() | |
266 self.t.start() | |
267 | |
268 self.stop_event.timeout_wait() | |
269 # flush is called now and raises an exception. The exception is caught, so | |
270 # wait is called again. | |
271 | |
272 # Do it again to make sure the exception doesn't terminate the loop. | |
273 self.stop_event.timeout_wait() | |
274 | |
275 # Return from the third wait, which exits the thread. | |
276 self.stop_event.set() | |
277 self.t.join() | |
278 self.assertEqual(3, interface.flush.call_count) | |
279 | |
280 def test_stop_stops(self): | |
281 self.t.start() | |
282 | |
283 self.assertTrue(self.t.is_alive()) | |
284 | |
285 self.t.stop() | |
286 self.assertFalse(self.t.is_alive()) | |
287 self.assertEqual(1, interface.flush.call_count) | |
288 | |
289 def test_sleeps_for_exact_interval(self): | |
290 self.t.start() | |
291 | |
292 # Flush takes 5 seconds. | |
293 interface.flush.side_effect = functools.partial(self.increment_time, 5) | |
294 | |
295 self.assertInRange(30, 60, self.stop_event.timeout_wait()) | |
296 self.assertAlmostEqual(55, self.stop_event.timeout_wait()) | |
297 self.assertAlmostEqual(55, self.stop_event.timeout_wait()) | |
298 | |
299 def test_sleeps_for_minimum_zero_secs(self): | |
300 self.t.start() | |
301 | |
302 # Flush takes 65 seconds. | |
303 interface.flush.side_effect = functools.partial(self.increment_time, 65) | |
304 | |
305 self.assertInRange(30, 60, self.stop_event.timeout_wait()) | |
306 self.assertAlmostEqual(0, self.stop_event.timeout_wait()) | |
307 self.assertAlmostEqual(0, self.stop_event.timeout_wait()) | |
OLD | NEW |