Index: infra_libs/ts_mon/common/test/interface_test.py |
diff --git a/infra_libs/ts_mon/common/test/interface_test.py b/infra_libs/ts_mon/common/test/interface_test.py |
deleted file mode 100644 |
index 81875c35d9e846eb7cfc883b61911184d995f240..0000000000000000000000000000000000000000 |
--- a/infra_libs/ts_mon/common/test/interface_test.py |
+++ /dev/null |
@@ -1,307 +0,0 @@ |
-# Copyright 2015 The Chromium Authors. All rights reserved. |
-# Use of this source code is governed by a BSD-style license that can be |
-# found in the LICENSE file. |
- |
-import functools |
-import threading |
-import time |
-import unittest |
- |
-import mock |
- |
-from testing_support import auto_stub |
- |
-from infra_libs.ts_mon.common import errors |
-from infra_libs.ts_mon.common import interface |
-from infra_libs.ts_mon.common import metrics |
-from infra_libs.ts_mon.common import targets |
-from infra_libs.ts_mon.common.test import stubs |
- |
- |
-class GlobalsTest(unittest.TestCase): |
- |
- def setUp(self): |
- target = targets.TaskTarget('test_service', 'test_job', |
- 'test_region', 'test_host') |
- self.mock_state = interface.State(target=target) |
- self.state_patcher = mock.patch('infra_libs.ts_mon.common.interface.state', |
- new=self.mock_state) |
- self.state_patcher.start() |
- |
- def tearDown(self): |
- # It's important to call close() before un-setting the mock state object, |
- # because any FlushThread started by the test is stored in that mock state |
- # and needs to be stopped before running any other tests. |
- interface.close() |
- self.state_patcher.stop() |
- |
- def test_flush(self): |
- interface.state.global_monitor = stubs.MockMonitor() |
- interface.state.target = stubs.MockTarget() |
- |
- # pylint: disable=unused-argument |
- def serialize_to(pb, start_time, fields, value, target): |
- pb.data.add().name = 'foo' |
- |
- fake_metric = mock.create_autospec(metrics.Metric, spec_set=True) |
- fake_metric.name = 'fake' |
- fake_metric.serialize_to.side_effect = serialize_to |
- interface.register(fake_metric) |
- interface.state.store.set('fake', (), None, 123) |
- |
- interface.flush() |
- interface.state.global_monitor.send.assert_called_once() |
- proto = interface.state.global_monitor.send.call_args[0][0] |
- self.assertEqual(1, len(proto.data)) |
- self.assertEqual('foo', proto.data[0].name) |
- |
- def test_flush_disabled(self): |
- interface.reset_for_unittest(disable=True) |
- interface.state.global_monitor = stubs.MockMonitor() |
- interface.state.target = stubs.MockTarget() |
- interface.flush() |
- self.assertFalse(interface.state.global_monitor.send.called) |
- |
- def test_flush_raises(self): |
- self.assertIsNone(interface.state.global_monitor) |
- with self.assertRaises(errors.MonitoringNoConfiguredMonitorError): |
- interface.flush() |
- |
- def test_flush_many(self): |
- interface.state.global_monitor = stubs.MockMonitor() |
- interface.state.target = stubs.MockTarget() |
- |
- # pylint: disable=unused-argument |
- def serialize_to(pb, start_time, fields, value, target): |
- pb.data.add().name = 'foo' |
- |
- # We can't use the mock's call_args_list here because the same object is |
- # reused as the argument to both calls and cleared inbetween. |
- data_lengths = [] |
- def send(proto): |
- data_lengths.append(len(proto.data)) |
- interface.state.global_monitor.send.side_effect = send |
- |
- fake_metric = mock.create_autospec(metrics.Metric, spec_set=True) |
- fake_metric.name = 'fake' |
- fake_metric.serialize_to.side_effect = serialize_to |
- interface.register(fake_metric) |
- |
- for i in xrange(1001): |
- interface.state.store.set('fake', ('field', i), None, 123) |
- |
- interface.flush() |
- self.assertEquals(2, interface.state.global_monitor.send.call_count) |
- self.assertEqual(1000, data_lengths[0]) |
- self.assertEqual(1, data_lengths[1]) |
- |
- def test_send_modifies_metric_values(self): |
- interface.state.global_monitor = stubs.MockMonitor() |
- interface.state.target = stubs.MockTarget() |
- |
- # pylint: disable=unused-argument |
- def serialize_to(pb, start_time, fields, value, target): |
- pb.data.add().name = 'foo' |
- |
- fake_metric = mock.create_autospec(metrics.Metric, spec_set=True) |
- fake_metric.name = 'fake' |
- fake_metric.serialize_to.side_effect = serialize_to |
- interface.register(fake_metric) |
- |
- # Setting this will modify store._values in the middle of iteration. |
- delayed_metric = metrics.CounterMetric('foo') |
- def send(proto): |
- delayed_metric.increment_by(1) |
- interface.state.global_monitor.send.side_effect = send |
- |
- for i in xrange(1001): |
- interface.state.store.set('fake', ('field', i), None, 123) |
- |
- # Shouldn't raise an exception. |
- interface.flush() |
- |
- def test_register_unregister(self): |
- fake_metric = mock.create_autospec(metrics.Metric, spec_set=True) |
- self.assertEqual(0, len(interface.state.metrics)) |
- interface.register(fake_metric) |
- self.assertEqual(1, len(interface.state.metrics)) |
- interface.unregister(fake_metric) |
- self.assertEqual(0, len(interface.state.metrics)) |
- |
- def test_identical_register(self): |
- fake_metric = mock.Mock(_name='foo') |
- interface.register(fake_metric) |
- interface.register(fake_metric) |
- self.assertEqual(1, len(interface.state.metrics)) |
- |
- def test_duplicate_register_raises(self): |
- fake_metric = mock.Mock() |
- fake_metric.name = 'foo' |
- phake_metric = mock.Mock() |
- phake_metric.name = 'foo' |
- interface.register(fake_metric) |
- with self.assertRaises(errors.MonitoringDuplicateRegistrationError): |
- interface.register(phake_metric) |
- self.assertEqual(1, len(interface.state.metrics)) |
- |
- def test_unregister_missing_raises(self): |
- fake_metric = mock.Mock(_name='foo') |
- self.assertEqual(0, len(interface.state.metrics)) |
- with self.assertRaises(KeyError): |
- interface.unregister(fake_metric) |
- |
- def test_close_stops_flush_thread(self): |
- interface.state.flush_thread = interface._FlushThread(10) |
- interface.state.flush_thread.start() |
- |
- self.assertTrue(interface.state.flush_thread.is_alive()) |
- interface.close() |
- self.assertFalse(interface.state.flush_thread.is_alive()) |
- |
- def test_reset_for_unittest(self): |
- metric = metrics.CounterMetric('foo') |
- metric.increment() |
- self.assertEquals(1, metric.get()) |
- |
- interface.reset_for_unittest() |
- self.assertIsNone(metric.get()) |
- |
- |
-class FakeThreadingEvent(object): |
- """A fake threading.Event that doesn't use the clock for timeouts.""" |
- |
- def __init__(self): |
- # If not None, called inside wait() with the timeout (in seconds) to |
- # increment a fake clock. |
- self.increment_time_func = None |
- |
- self._is_set = False # Return value of the next call to wait. |
- self._last_wait_timeout = None # timeout argument of the last call to wait. |
- |
- self._wait_enter_semaphore = threading.Semaphore(0) |
- self._wait_exit_semaphore = threading.Semaphore(0) |
- |
- def timeout_wait(self): |
- """Blocks until the next time the code under test calls wait(). |
- |
- Makes the wait() call return False (indicating a timeout), and this call |
- returns the timeout argument given to the wait() method. |
- |
- Called by the test. |
- """ |
- |
- self._wait_enter_semaphore.release() |
- self._wait_exit_semaphore.acquire() |
- return self._last_wait_timeout |
- |
- def set(self, blocking=True): |
- """Makes the next wait() call return True. |
- |
- By default this blocks until the next call to wait(), but you can pass |
- blocking=False to just set the flag, wake up any wait() in progress (if any) |
- and return immediately. |
- """ |
- |
- self._is_set = True |
- self._wait_enter_semaphore.release() |
- if blocking: |
- self._wait_exit_semaphore.acquire() |
- |
- def wait(self, timeout): |
- """Block until either set() or timeout_wait() is called by the test.""" |
- |
- self._wait_enter_semaphore.acquire() |
- self._last_wait_timeout = timeout |
- if self.increment_time_func is not None: # pragma: no cover |
- self.increment_time_func(timeout) |
- ret = self._is_set |
- self._wait_exit_semaphore.release() |
- return ret |
- |
- |
-class FlushThreadTest(unittest.TestCase): |
- |
- def setUp(self): |
- mock.patch('infra_libs.ts_mon.common.interface.flush', |
- autospec=True).start() |
- mock.patch('time.time', autospec=True).start() |
- |
- self.fake_time = 0 |
- time.time.side_effect = lambda: self.fake_time |
- |
- self.stop_event = FakeThreadingEvent() |
- self.stop_event.increment_time_func = self.increment_time |
- |
- self.t = interface._FlushThread(60, stop_event=self.stop_event) |
- |
- def increment_time(self, delta): |
- self.fake_time += delta |
- |
- def assertInRange(self, lower, upper, value): |
- self.assertGreaterEqual(value, lower) |
- self.assertLessEqual(value, upper) |
- |
- def tearDown(self): |
- # Ensure the thread exits. |
- self.stop_event.set(blocking=False) |
- self.t.join() |
- |
- mock.patch.stopall() |
- |
- def test_run_calls_flush(self): |
- self.t.start() |
- |
- self.assertEqual(0, interface.flush.call_count) |
- |
- # The wait is for the whole interval (with jitter). |
- self.assertInRange(30, 60, self.stop_event.timeout_wait()) |
- |
- # Return from the second wait, which exits the thread. |
- self.stop_event.set() |
- self.t.join() |
- self.assertEqual(2, interface.flush.call_count) |
- |
- def test_run_catches_exceptions(self): |
- interface.flush.side_effect = Exception() |
- self.t.start() |
- |
- self.stop_event.timeout_wait() |
- # flush is called now and raises an exception. The exception is caught, so |
- # wait is called again. |
- |
- # Do it again to make sure the exception doesn't terminate the loop. |
- self.stop_event.timeout_wait() |
- |
- # Return from the third wait, which exits the thread. |
- self.stop_event.set() |
- self.t.join() |
- self.assertEqual(3, interface.flush.call_count) |
- |
- def test_stop_stops(self): |
- self.t.start() |
- |
- self.assertTrue(self.t.is_alive()) |
- |
- self.t.stop() |
- self.assertFalse(self.t.is_alive()) |
- self.assertEqual(1, interface.flush.call_count) |
- |
- def test_sleeps_for_exact_interval(self): |
- self.t.start() |
- |
- # Flush takes 5 seconds. |
- interface.flush.side_effect = functools.partial(self.increment_time, 5) |
- |
- self.assertInRange(30, 60, self.stop_event.timeout_wait()) |
- self.assertAlmostEqual(55, self.stop_event.timeout_wait()) |
- self.assertAlmostEqual(55, self.stop_event.timeout_wait()) |
- |
- def test_sleeps_for_minimum_zero_secs(self): |
- self.t.start() |
- |
- # Flush takes 65 seconds. |
- interface.flush.side_effect = functools.partial(self.increment_time, 65) |
- |
- self.assertInRange(30, 60, self.stop_event.timeout_wait()) |
- self.assertAlmostEqual(0, self.stop_event.timeout_wait()) |
- self.assertAlmostEqual(0, self.stop_event.timeout_wait()) |