Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(148)

Side by Side Diff: infra_libs/ts_mon/common/test/interface_test.py

Issue 2213143002: Add infra_libs as a bootstrap dependency. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Removed the ugly import hack Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 import functools
6 import threading
7 import time
8 import unittest
9
10 import mock
11
12 from testing_support import auto_stub
13
14 from infra_libs.ts_mon.common import errors
15 from infra_libs.ts_mon.common import interface
16 from infra_libs.ts_mon.common import metrics
17 from infra_libs.ts_mon.common import targets
18 from infra_libs.ts_mon.common.test import stubs
19
20
21 class GlobalsTest(unittest.TestCase):
22
23 def setUp(self):
24 target = targets.TaskTarget('test_service', 'test_job',
25 'test_region', 'test_host')
26 self.mock_state = interface.State(target=target)
27 self.state_patcher = mock.patch('infra_libs.ts_mon.common.interface.state',
28 new=self.mock_state)
29 self.state_patcher.start()
30
31 def tearDown(self):
32 # It's important to call close() before un-setting the mock state object,
33 # because any FlushThread started by the test is stored in that mock state
34 # and needs to be stopped before running any other tests.
35 interface.close()
36 self.state_patcher.stop()
37
38 def test_flush(self):
39 interface.state.global_monitor = stubs.MockMonitor()
40 interface.state.target = stubs.MockTarget()
41
42 # pylint: disable=unused-argument
43 def serialize_to(pb, start_time, fields, value, target):
44 pb.data.add().name = 'foo'
45
46 fake_metric = mock.create_autospec(metrics.Metric, spec_set=True)
47 fake_metric.name = 'fake'
48 fake_metric.serialize_to.side_effect = serialize_to
49 interface.register(fake_metric)
50 interface.state.store.set('fake', (), None, 123)
51
52 interface.flush()
53 interface.state.global_monitor.send.assert_called_once()
54 proto = interface.state.global_monitor.send.call_args[0][0]
55 self.assertEqual(1, len(proto.data))
56 self.assertEqual('foo', proto.data[0].name)
57
58 def test_flush_disabled(self):
59 interface.reset_for_unittest(disable=True)
60 interface.state.global_monitor = stubs.MockMonitor()
61 interface.state.target = stubs.MockTarget()
62 interface.flush()
63 self.assertFalse(interface.state.global_monitor.send.called)
64
65 def test_flush_raises(self):
66 self.assertIsNone(interface.state.global_monitor)
67 with self.assertRaises(errors.MonitoringNoConfiguredMonitorError):
68 interface.flush()
69
70 def test_flush_many(self):
71 interface.state.global_monitor = stubs.MockMonitor()
72 interface.state.target = stubs.MockTarget()
73
74 # pylint: disable=unused-argument
75 def serialize_to(pb, start_time, fields, value, target):
76 pb.data.add().name = 'foo'
77
78 # We can't use the mock's call_args_list here because the same object is
79 # reused as the argument to both calls and cleared inbetween.
80 data_lengths = []
81 def send(proto):
82 data_lengths.append(len(proto.data))
83 interface.state.global_monitor.send.side_effect = send
84
85 fake_metric = mock.create_autospec(metrics.Metric, spec_set=True)
86 fake_metric.name = 'fake'
87 fake_metric.serialize_to.side_effect = serialize_to
88 interface.register(fake_metric)
89
90 for i in xrange(1001):
91 interface.state.store.set('fake', ('field', i), None, 123)
92
93 interface.flush()
94 self.assertEquals(2, interface.state.global_monitor.send.call_count)
95 self.assertEqual(1000, data_lengths[0])
96 self.assertEqual(1, data_lengths[1])
97
98 def test_send_modifies_metric_values(self):
99 interface.state.global_monitor = stubs.MockMonitor()
100 interface.state.target = stubs.MockTarget()
101
102 # pylint: disable=unused-argument
103 def serialize_to(pb, start_time, fields, value, target):
104 pb.data.add().name = 'foo'
105
106 fake_metric = mock.create_autospec(metrics.Metric, spec_set=True)
107 fake_metric.name = 'fake'
108 fake_metric.serialize_to.side_effect = serialize_to
109 interface.register(fake_metric)
110
111 # Setting this will modify store._values in the middle of iteration.
112 delayed_metric = metrics.CounterMetric('foo')
113 def send(proto):
114 delayed_metric.increment_by(1)
115 interface.state.global_monitor.send.side_effect = send
116
117 for i in xrange(1001):
118 interface.state.store.set('fake', ('field', i), None, 123)
119
120 # Shouldn't raise an exception.
121 interface.flush()
122
123 def test_register_unregister(self):
124 fake_metric = mock.create_autospec(metrics.Metric, spec_set=True)
125 self.assertEqual(0, len(interface.state.metrics))
126 interface.register(fake_metric)
127 self.assertEqual(1, len(interface.state.metrics))
128 interface.unregister(fake_metric)
129 self.assertEqual(0, len(interface.state.metrics))
130
131 def test_identical_register(self):
132 fake_metric = mock.Mock(_name='foo')
133 interface.register(fake_metric)
134 interface.register(fake_metric)
135 self.assertEqual(1, len(interface.state.metrics))
136
137 def test_duplicate_register_raises(self):
138 fake_metric = mock.Mock()
139 fake_metric.name = 'foo'
140 phake_metric = mock.Mock()
141 phake_metric.name = 'foo'
142 interface.register(fake_metric)
143 with self.assertRaises(errors.MonitoringDuplicateRegistrationError):
144 interface.register(phake_metric)
145 self.assertEqual(1, len(interface.state.metrics))
146
147 def test_unregister_missing_raises(self):
148 fake_metric = mock.Mock(_name='foo')
149 self.assertEqual(0, len(interface.state.metrics))
150 with self.assertRaises(KeyError):
151 interface.unregister(fake_metric)
152
153 def test_close_stops_flush_thread(self):
154 interface.state.flush_thread = interface._FlushThread(10)
155 interface.state.flush_thread.start()
156
157 self.assertTrue(interface.state.flush_thread.is_alive())
158 interface.close()
159 self.assertFalse(interface.state.flush_thread.is_alive())
160
161 def test_reset_for_unittest(self):
162 metric = metrics.CounterMetric('foo')
163 metric.increment()
164 self.assertEquals(1, metric.get())
165
166 interface.reset_for_unittest()
167 self.assertIsNone(metric.get())
168
169
170 class FakeThreadingEvent(object):
171 """A fake threading.Event that doesn't use the clock for timeouts."""
172
173 def __init__(self):
174 # If not None, called inside wait() with the timeout (in seconds) to
175 # increment a fake clock.
176 self.increment_time_func = None
177
178 self._is_set = False # Return value of the next call to wait.
179 self._last_wait_timeout = None # timeout argument of the last call to wait.
180
181 self._wait_enter_semaphore = threading.Semaphore(0)
182 self._wait_exit_semaphore = threading.Semaphore(0)
183
184 def timeout_wait(self):
185 """Blocks until the next time the code under test calls wait().
186
187 Makes the wait() call return False (indicating a timeout), and this call
188 returns the timeout argument given to the wait() method.
189
190 Called by the test.
191 """
192
193 self._wait_enter_semaphore.release()
194 self._wait_exit_semaphore.acquire()
195 return self._last_wait_timeout
196
197 def set(self, blocking=True):
198 """Makes the next wait() call return True.
199
200 By default this blocks until the next call to wait(), but you can pass
201 blocking=False to just set the flag, wake up any wait() in progress (if any)
202 and return immediately.
203 """
204
205 self._is_set = True
206 self._wait_enter_semaphore.release()
207 if blocking:
208 self._wait_exit_semaphore.acquire()
209
210 def wait(self, timeout):
211 """Block until either set() or timeout_wait() is called by the test."""
212
213 self._wait_enter_semaphore.acquire()
214 self._last_wait_timeout = timeout
215 if self.increment_time_func is not None: # pragma: no cover
216 self.increment_time_func(timeout)
217 ret = self._is_set
218 self._wait_exit_semaphore.release()
219 return ret
220
221
222 class FlushThreadTest(unittest.TestCase):
223
224 def setUp(self):
225 mock.patch('infra_libs.ts_mon.common.interface.flush',
226 autospec=True).start()
227 mock.patch('time.time', autospec=True).start()
228
229 self.fake_time = 0
230 time.time.side_effect = lambda: self.fake_time
231
232 self.stop_event = FakeThreadingEvent()
233 self.stop_event.increment_time_func = self.increment_time
234
235 self.t = interface._FlushThread(60, stop_event=self.stop_event)
236
237 def increment_time(self, delta):
238 self.fake_time += delta
239
240 def assertInRange(self, lower, upper, value):
241 self.assertGreaterEqual(value, lower)
242 self.assertLessEqual(value, upper)
243
244 def tearDown(self):
245 # Ensure the thread exits.
246 self.stop_event.set(blocking=False)
247 self.t.join()
248
249 mock.patch.stopall()
250
251 def test_run_calls_flush(self):
252 self.t.start()
253
254 self.assertEqual(0, interface.flush.call_count)
255
256 # The wait is for the whole interval (with jitter).
257 self.assertInRange(30, 60, self.stop_event.timeout_wait())
258
259 # Return from the second wait, which exits the thread.
260 self.stop_event.set()
261 self.t.join()
262 self.assertEqual(2, interface.flush.call_count)
263
264 def test_run_catches_exceptions(self):
265 interface.flush.side_effect = Exception()
266 self.t.start()
267
268 self.stop_event.timeout_wait()
269 # flush is called now and raises an exception. The exception is caught, so
270 # wait is called again.
271
272 # Do it again to make sure the exception doesn't terminate the loop.
273 self.stop_event.timeout_wait()
274
275 # Return from the third wait, which exits the thread.
276 self.stop_event.set()
277 self.t.join()
278 self.assertEqual(3, interface.flush.call_count)
279
280 def test_stop_stops(self):
281 self.t.start()
282
283 self.assertTrue(self.t.is_alive())
284
285 self.t.stop()
286 self.assertFalse(self.t.is_alive())
287 self.assertEqual(1, interface.flush.call_count)
288
289 def test_sleeps_for_exact_interval(self):
290 self.t.start()
291
292 # Flush takes 5 seconds.
293 interface.flush.side_effect = functools.partial(self.increment_time, 5)
294
295 self.assertInRange(30, 60, self.stop_event.timeout_wait())
296 self.assertAlmostEqual(55, self.stop_event.timeout_wait())
297 self.assertAlmostEqual(55, self.stop_event.timeout_wait())
298
299 def test_sleeps_for_minimum_zero_secs(self):
300 self.t.start()
301
302 # Flush takes 65 seconds.
303 interface.flush.side_effect = functools.partial(self.increment_time, 65)
304
305 self.assertInRange(30, 60, self.stop_event.timeout_wait())
306 self.assertAlmostEqual(0, self.stop_event.timeout_wait())
307 self.assertAlmostEqual(0, self.stop_event.timeout_wait())
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698