| OLD | NEW |
| (Empty) |
| 1 # Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 # Use of this source code is governed by a BSD-style license that can be | |
| 3 # found in the LICENSE file. | |
| 4 | |
| 5 import functools | |
| 6 import threading | |
| 7 import time | |
| 8 import unittest | |
| 9 | |
| 10 import mock | |
| 11 | |
| 12 from testing_support import auto_stub | |
| 13 | |
| 14 from infra_libs.ts_mon.common import errors | |
| 15 from infra_libs.ts_mon.common import interface | |
| 16 from infra_libs.ts_mon.common import metrics | |
| 17 from infra_libs.ts_mon.common import targets | |
| 18 from infra_libs.ts_mon.common.test import stubs | |
| 19 | |
| 20 | |
| 21 class GlobalsTest(unittest.TestCase): | |
| 22 | |
| 23 def setUp(self): | |
| 24 target = targets.TaskTarget('test_service', 'test_job', | |
| 25 'test_region', 'test_host') | |
| 26 self.mock_state = interface.State(target=target) | |
| 27 self.state_patcher = mock.patch('infra_libs.ts_mon.common.interface.state', | |
| 28 new=self.mock_state) | |
| 29 self.state_patcher.start() | |
| 30 | |
| 31 def tearDown(self): | |
| 32 # It's important to call close() before un-setting the mock state object, | |
| 33 # because any FlushThread started by the test is stored in that mock state | |
| 34 # and needs to be stopped before running any other tests. | |
| 35 interface.close() | |
| 36 self.state_patcher.stop() | |
| 37 | |
| 38 def test_flush(self): | |
| 39 interface.state.global_monitor = stubs.MockMonitor() | |
| 40 interface.state.target = stubs.MockTarget() | |
| 41 | |
| 42 # pylint: disable=unused-argument | |
| 43 def serialize_to(pb, start_time, fields, value, target): | |
| 44 pb.data.add().name = 'foo' | |
| 45 | |
| 46 fake_metric = mock.create_autospec(metrics.Metric, spec_set=True) | |
| 47 fake_metric.name = 'fake' | |
| 48 fake_metric.serialize_to.side_effect = serialize_to | |
| 49 interface.register(fake_metric) | |
| 50 interface.state.store.set('fake', (), None, 123) | |
| 51 | |
| 52 interface.flush() | |
| 53 interface.state.global_monitor.send.assert_called_once() | |
| 54 proto = interface.state.global_monitor.send.call_args[0][0] | |
| 55 self.assertEqual(1, len(proto.data)) | |
| 56 self.assertEqual('foo', proto.data[0].name) | |
| 57 | |
| 58 def test_flush_disabled(self): | |
| 59 interface.reset_for_unittest(disable=True) | |
| 60 interface.state.global_monitor = stubs.MockMonitor() | |
| 61 interface.state.target = stubs.MockTarget() | |
| 62 interface.flush() | |
| 63 self.assertFalse(interface.state.global_monitor.send.called) | |
| 64 | |
| 65 def test_flush_raises(self): | |
| 66 self.assertIsNone(interface.state.global_monitor) | |
| 67 with self.assertRaises(errors.MonitoringNoConfiguredMonitorError): | |
| 68 interface.flush() | |
| 69 | |
| 70 def test_flush_many(self): | |
| 71 interface.state.global_monitor = stubs.MockMonitor() | |
| 72 interface.state.target = stubs.MockTarget() | |
| 73 | |
| 74 # pylint: disable=unused-argument | |
| 75 def serialize_to(pb, start_time, fields, value, target): | |
| 76 pb.data.add().name = 'foo' | |
| 77 | |
| 78 # We can't use the mock's call_args_list here because the same object is | |
| 79 # reused as the argument to both calls and cleared inbetween. | |
| 80 data_lengths = [] | |
| 81 def send(proto): | |
| 82 data_lengths.append(len(proto.data)) | |
| 83 interface.state.global_monitor.send.side_effect = send | |
| 84 | |
| 85 fake_metric = mock.create_autospec(metrics.Metric, spec_set=True) | |
| 86 fake_metric.name = 'fake' | |
| 87 fake_metric.serialize_to.side_effect = serialize_to | |
| 88 interface.register(fake_metric) | |
| 89 | |
| 90 for i in xrange(1001): | |
| 91 interface.state.store.set('fake', ('field', i), None, 123) | |
| 92 | |
| 93 interface.flush() | |
| 94 self.assertEquals(2, interface.state.global_monitor.send.call_count) | |
| 95 self.assertEqual(1000, data_lengths[0]) | |
| 96 self.assertEqual(1, data_lengths[1]) | |
| 97 | |
| 98 def test_send_modifies_metric_values(self): | |
| 99 interface.state.global_monitor = stubs.MockMonitor() | |
| 100 interface.state.target = stubs.MockTarget() | |
| 101 | |
| 102 # pylint: disable=unused-argument | |
| 103 def serialize_to(pb, start_time, fields, value, target): | |
| 104 pb.data.add().name = 'foo' | |
| 105 | |
| 106 fake_metric = mock.create_autospec(metrics.Metric, spec_set=True) | |
| 107 fake_metric.name = 'fake' | |
| 108 fake_metric.serialize_to.side_effect = serialize_to | |
| 109 interface.register(fake_metric) | |
| 110 | |
| 111 # Setting this will modify store._values in the middle of iteration. | |
| 112 delayed_metric = metrics.CounterMetric('foo') | |
| 113 def send(proto): | |
| 114 delayed_metric.increment_by(1) | |
| 115 interface.state.global_monitor.send.side_effect = send | |
| 116 | |
| 117 for i in xrange(1001): | |
| 118 interface.state.store.set('fake', ('field', i), None, 123) | |
| 119 | |
| 120 # Shouldn't raise an exception. | |
| 121 interface.flush() | |
| 122 | |
| 123 def test_register_unregister(self): | |
| 124 fake_metric = mock.create_autospec(metrics.Metric, spec_set=True) | |
| 125 self.assertEqual(0, len(interface.state.metrics)) | |
| 126 interface.register(fake_metric) | |
| 127 self.assertEqual(1, len(interface.state.metrics)) | |
| 128 interface.unregister(fake_metric) | |
| 129 self.assertEqual(0, len(interface.state.metrics)) | |
| 130 | |
| 131 def test_identical_register(self): | |
| 132 fake_metric = mock.Mock(_name='foo') | |
| 133 interface.register(fake_metric) | |
| 134 interface.register(fake_metric) | |
| 135 self.assertEqual(1, len(interface.state.metrics)) | |
| 136 | |
| 137 def test_duplicate_register_raises(self): | |
| 138 fake_metric = mock.Mock() | |
| 139 fake_metric.name = 'foo' | |
| 140 phake_metric = mock.Mock() | |
| 141 phake_metric.name = 'foo' | |
| 142 interface.register(fake_metric) | |
| 143 with self.assertRaises(errors.MonitoringDuplicateRegistrationError): | |
| 144 interface.register(phake_metric) | |
| 145 self.assertEqual(1, len(interface.state.metrics)) | |
| 146 | |
| 147 def test_unregister_missing_raises(self): | |
| 148 fake_metric = mock.Mock(_name='foo') | |
| 149 self.assertEqual(0, len(interface.state.metrics)) | |
| 150 with self.assertRaises(KeyError): | |
| 151 interface.unregister(fake_metric) | |
| 152 | |
| 153 def test_close_stops_flush_thread(self): | |
| 154 interface.state.flush_thread = interface._FlushThread(10) | |
| 155 interface.state.flush_thread.start() | |
| 156 | |
| 157 self.assertTrue(interface.state.flush_thread.is_alive()) | |
| 158 interface.close() | |
| 159 self.assertFalse(interface.state.flush_thread.is_alive()) | |
| 160 | |
| 161 def test_reset_for_unittest(self): | |
| 162 metric = metrics.CounterMetric('foo') | |
| 163 metric.increment() | |
| 164 self.assertEquals(1, metric.get()) | |
| 165 | |
| 166 interface.reset_for_unittest() | |
| 167 self.assertIsNone(metric.get()) | |
| 168 | |
| 169 | |
| 170 class FakeThreadingEvent(object): | |
| 171 """A fake threading.Event that doesn't use the clock for timeouts.""" | |
| 172 | |
| 173 def __init__(self): | |
| 174 # If not None, called inside wait() with the timeout (in seconds) to | |
| 175 # increment a fake clock. | |
| 176 self.increment_time_func = None | |
| 177 | |
| 178 self._is_set = False # Return value of the next call to wait. | |
| 179 self._last_wait_timeout = None # timeout argument of the last call to wait. | |
| 180 | |
| 181 self._wait_enter_semaphore = threading.Semaphore(0) | |
| 182 self._wait_exit_semaphore = threading.Semaphore(0) | |
| 183 | |
| 184 def timeout_wait(self): | |
| 185 """Blocks until the next time the code under test calls wait(). | |
| 186 | |
| 187 Makes the wait() call return False (indicating a timeout), and this call | |
| 188 returns the timeout argument given to the wait() method. | |
| 189 | |
| 190 Called by the test. | |
| 191 """ | |
| 192 | |
| 193 self._wait_enter_semaphore.release() | |
| 194 self._wait_exit_semaphore.acquire() | |
| 195 return self._last_wait_timeout | |
| 196 | |
| 197 def set(self, blocking=True): | |
| 198 """Makes the next wait() call return True. | |
| 199 | |
| 200 By default this blocks until the next call to wait(), but you can pass | |
| 201 blocking=False to just set the flag, wake up any wait() in progress (if any) | |
| 202 and return immediately. | |
| 203 """ | |
| 204 | |
| 205 self._is_set = True | |
| 206 self._wait_enter_semaphore.release() | |
| 207 if blocking: | |
| 208 self._wait_exit_semaphore.acquire() | |
| 209 | |
| 210 def wait(self, timeout): | |
| 211 """Block until either set() or timeout_wait() is called by the test.""" | |
| 212 | |
| 213 self._wait_enter_semaphore.acquire() | |
| 214 self._last_wait_timeout = timeout | |
| 215 if self.increment_time_func is not None: # pragma: no cover | |
| 216 self.increment_time_func(timeout) | |
| 217 ret = self._is_set | |
| 218 self._wait_exit_semaphore.release() | |
| 219 return ret | |
| 220 | |
| 221 | |
| 222 class FlushThreadTest(unittest.TestCase): | |
| 223 | |
| 224 def setUp(self): | |
| 225 mock.patch('infra_libs.ts_mon.common.interface.flush', | |
| 226 autospec=True).start() | |
| 227 mock.patch('time.time', autospec=True).start() | |
| 228 | |
| 229 self.fake_time = 0 | |
| 230 time.time.side_effect = lambda: self.fake_time | |
| 231 | |
| 232 self.stop_event = FakeThreadingEvent() | |
| 233 self.stop_event.increment_time_func = self.increment_time | |
| 234 | |
| 235 self.t = interface._FlushThread(60, stop_event=self.stop_event) | |
| 236 | |
| 237 def increment_time(self, delta): | |
| 238 self.fake_time += delta | |
| 239 | |
| 240 def assertInRange(self, lower, upper, value): | |
| 241 self.assertGreaterEqual(value, lower) | |
| 242 self.assertLessEqual(value, upper) | |
| 243 | |
| 244 def tearDown(self): | |
| 245 # Ensure the thread exits. | |
| 246 self.stop_event.set(blocking=False) | |
| 247 self.t.join() | |
| 248 | |
| 249 mock.patch.stopall() | |
| 250 | |
| 251 def test_run_calls_flush(self): | |
| 252 self.t.start() | |
| 253 | |
| 254 self.assertEqual(0, interface.flush.call_count) | |
| 255 | |
| 256 # The wait is for the whole interval (with jitter). | |
| 257 self.assertInRange(30, 60, self.stop_event.timeout_wait()) | |
| 258 | |
| 259 # Return from the second wait, which exits the thread. | |
| 260 self.stop_event.set() | |
| 261 self.t.join() | |
| 262 self.assertEqual(2, interface.flush.call_count) | |
| 263 | |
| 264 def test_run_catches_exceptions(self): | |
| 265 interface.flush.side_effect = Exception() | |
| 266 self.t.start() | |
| 267 | |
| 268 self.stop_event.timeout_wait() | |
| 269 # flush is called now and raises an exception. The exception is caught, so | |
| 270 # wait is called again. | |
| 271 | |
| 272 # Do it again to make sure the exception doesn't terminate the loop. | |
| 273 self.stop_event.timeout_wait() | |
| 274 | |
| 275 # Return from the third wait, which exits the thread. | |
| 276 self.stop_event.set() | |
| 277 self.t.join() | |
| 278 self.assertEqual(3, interface.flush.call_count) | |
| 279 | |
| 280 def test_stop_stops(self): | |
| 281 self.t.start() | |
| 282 | |
| 283 self.assertTrue(self.t.is_alive()) | |
| 284 | |
| 285 self.t.stop() | |
| 286 self.assertFalse(self.t.is_alive()) | |
| 287 self.assertEqual(1, interface.flush.call_count) | |
| 288 | |
| 289 def test_sleeps_for_exact_interval(self): | |
| 290 self.t.start() | |
| 291 | |
| 292 # Flush takes 5 seconds. | |
| 293 interface.flush.side_effect = functools.partial(self.increment_time, 5) | |
| 294 | |
| 295 self.assertInRange(30, 60, self.stop_event.timeout_wait()) | |
| 296 self.assertAlmostEqual(55, self.stop_event.timeout_wait()) | |
| 297 self.assertAlmostEqual(55, self.stop_event.timeout_wait()) | |
| 298 | |
| 299 def test_sleeps_for_minimum_zero_secs(self): | |
| 300 self.t.start() | |
| 301 | |
| 302 # Flush takes 65 seconds. | |
| 303 interface.flush.side_effect = functools.partial(self.increment_time, 65) | |
| 304 | |
| 305 self.assertInRange(30, 60, self.stop_event.timeout_wait()) | |
| 306 self.assertAlmostEqual(0, self.stop_event.timeout_wait()) | |
| 307 self.assertAlmostEqual(0, self.stop_event.timeout_wait()) | |
| OLD | NEW |