Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(168)

Unified Diff: infra_libs/ts_mon/common/test/interface_test.py

Issue 2213143002: Add infra_libs as a bootstrap dependency. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Removed the ugly import hack Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: infra_libs/ts_mon/common/test/interface_test.py
diff --git a/infra_libs/ts_mon/common/test/interface_test.py b/infra_libs/ts_mon/common/test/interface_test.py
deleted file mode 100644
index 81875c35d9e846eb7cfc883b61911184d995f240..0000000000000000000000000000000000000000
--- a/infra_libs/ts_mon/common/test/interface_test.py
+++ /dev/null
@@ -1,307 +0,0 @@
-# Copyright 2015 The Chromium Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-import functools
-import threading
-import time
-import unittest
-
-import mock
-
-from testing_support import auto_stub
-
-from infra_libs.ts_mon.common import errors
-from infra_libs.ts_mon.common import interface
-from infra_libs.ts_mon.common import metrics
-from infra_libs.ts_mon.common import targets
-from infra_libs.ts_mon.common.test import stubs
-
-
-class GlobalsTest(unittest.TestCase):
-
- def setUp(self):
- target = targets.TaskTarget('test_service', 'test_job',
- 'test_region', 'test_host')
- self.mock_state = interface.State(target=target)
- self.state_patcher = mock.patch('infra_libs.ts_mon.common.interface.state',
- new=self.mock_state)
- self.state_patcher.start()
-
- def tearDown(self):
- # It's important to call close() before un-setting the mock state object,
- # because any FlushThread started by the test is stored in that mock state
- # and needs to be stopped before running any other tests.
- interface.close()
- self.state_patcher.stop()
-
- def test_flush(self):
- interface.state.global_monitor = stubs.MockMonitor()
- interface.state.target = stubs.MockTarget()
-
- # pylint: disable=unused-argument
- def serialize_to(pb, start_time, fields, value, target):
- pb.data.add().name = 'foo'
-
- fake_metric = mock.create_autospec(metrics.Metric, spec_set=True)
- fake_metric.name = 'fake'
- fake_metric.serialize_to.side_effect = serialize_to
- interface.register(fake_metric)
- interface.state.store.set('fake', (), None, 123)
-
- interface.flush()
- interface.state.global_monitor.send.assert_called_once()
- proto = interface.state.global_monitor.send.call_args[0][0]
- self.assertEqual(1, len(proto.data))
- self.assertEqual('foo', proto.data[0].name)
-
- def test_flush_disabled(self):
- interface.reset_for_unittest(disable=True)
- interface.state.global_monitor = stubs.MockMonitor()
- interface.state.target = stubs.MockTarget()
- interface.flush()
- self.assertFalse(interface.state.global_monitor.send.called)
-
- def test_flush_raises(self):
- self.assertIsNone(interface.state.global_monitor)
- with self.assertRaises(errors.MonitoringNoConfiguredMonitorError):
- interface.flush()
-
- def test_flush_many(self):
- interface.state.global_monitor = stubs.MockMonitor()
- interface.state.target = stubs.MockTarget()
-
- # pylint: disable=unused-argument
- def serialize_to(pb, start_time, fields, value, target):
- pb.data.add().name = 'foo'
-
- # We can't use the mock's call_args_list here because the same object is
- # reused as the argument to both calls and cleared inbetween.
- data_lengths = []
- def send(proto):
- data_lengths.append(len(proto.data))
- interface.state.global_monitor.send.side_effect = send
-
- fake_metric = mock.create_autospec(metrics.Metric, spec_set=True)
- fake_metric.name = 'fake'
- fake_metric.serialize_to.side_effect = serialize_to
- interface.register(fake_metric)
-
- for i in xrange(1001):
- interface.state.store.set('fake', ('field', i), None, 123)
-
- interface.flush()
- self.assertEquals(2, interface.state.global_monitor.send.call_count)
- self.assertEqual(1000, data_lengths[0])
- self.assertEqual(1, data_lengths[1])
-
- def test_send_modifies_metric_values(self):
- interface.state.global_monitor = stubs.MockMonitor()
- interface.state.target = stubs.MockTarget()
-
- # pylint: disable=unused-argument
- def serialize_to(pb, start_time, fields, value, target):
- pb.data.add().name = 'foo'
-
- fake_metric = mock.create_autospec(metrics.Metric, spec_set=True)
- fake_metric.name = 'fake'
- fake_metric.serialize_to.side_effect = serialize_to
- interface.register(fake_metric)
-
- # Setting this will modify store._values in the middle of iteration.
- delayed_metric = metrics.CounterMetric('foo')
- def send(proto):
- delayed_metric.increment_by(1)
- interface.state.global_monitor.send.side_effect = send
-
- for i in xrange(1001):
- interface.state.store.set('fake', ('field', i), None, 123)
-
- # Shouldn't raise an exception.
- interface.flush()
-
- def test_register_unregister(self):
- fake_metric = mock.create_autospec(metrics.Metric, spec_set=True)
- self.assertEqual(0, len(interface.state.metrics))
- interface.register(fake_metric)
- self.assertEqual(1, len(interface.state.metrics))
- interface.unregister(fake_metric)
- self.assertEqual(0, len(interface.state.metrics))
-
- def test_identical_register(self):
- fake_metric = mock.Mock(_name='foo')
- interface.register(fake_metric)
- interface.register(fake_metric)
- self.assertEqual(1, len(interface.state.metrics))
-
- def test_duplicate_register_raises(self):
- fake_metric = mock.Mock()
- fake_metric.name = 'foo'
- phake_metric = mock.Mock()
- phake_metric.name = 'foo'
- interface.register(fake_metric)
- with self.assertRaises(errors.MonitoringDuplicateRegistrationError):
- interface.register(phake_metric)
- self.assertEqual(1, len(interface.state.metrics))
-
- def test_unregister_missing_raises(self):
- fake_metric = mock.Mock(_name='foo')
- self.assertEqual(0, len(interface.state.metrics))
- with self.assertRaises(KeyError):
- interface.unregister(fake_metric)
-
- def test_close_stops_flush_thread(self):
- interface.state.flush_thread = interface._FlushThread(10)
- interface.state.flush_thread.start()
-
- self.assertTrue(interface.state.flush_thread.is_alive())
- interface.close()
- self.assertFalse(interface.state.flush_thread.is_alive())
-
- def test_reset_for_unittest(self):
- metric = metrics.CounterMetric('foo')
- metric.increment()
- self.assertEquals(1, metric.get())
-
- interface.reset_for_unittest()
- self.assertIsNone(metric.get())
-
-
-class FakeThreadingEvent(object):
- """A fake threading.Event that doesn't use the clock for timeouts."""
-
- def __init__(self):
- # If not None, called inside wait() with the timeout (in seconds) to
- # increment a fake clock.
- self.increment_time_func = None
-
- self._is_set = False # Return value of the next call to wait.
- self._last_wait_timeout = None # timeout argument of the last call to wait.
-
- self._wait_enter_semaphore = threading.Semaphore(0)
- self._wait_exit_semaphore = threading.Semaphore(0)
-
- def timeout_wait(self):
- """Blocks until the next time the code under test calls wait().
-
- Makes the wait() call return False (indicating a timeout), and this call
- returns the timeout argument given to the wait() method.
-
- Called by the test.
- """
-
- self._wait_enter_semaphore.release()
- self._wait_exit_semaphore.acquire()
- return self._last_wait_timeout
-
- def set(self, blocking=True):
- """Makes the next wait() call return True.
-
- By default this blocks until the next call to wait(), but you can pass
- blocking=False to just set the flag, wake up any wait() in progress (if any)
- and return immediately.
- """
-
- self._is_set = True
- self._wait_enter_semaphore.release()
- if blocking:
- self._wait_exit_semaphore.acquire()
-
- def wait(self, timeout):
- """Block until either set() or timeout_wait() is called by the test."""
-
- self._wait_enter_semaphore.acquire()
- self._last_wait_timeout = timeout
- if self.increment_time_func is not None: # pragma: no cover
- self.increment_time_func(timeout)
- ret = self._is_set
- self._wait_exit_semaphore.release()
- return ret
-
-
-class FlushThreadTest(unittest.TestCase):
-
- def setUp(self):
- mock.patch('infra_libs.ts_mon.common.interface.flush',
- autospec=True).start()
- mock.patch('time.time', autospec=True).start()
-
- self.fake_time = 0
- time.time.side_effect = lambda: self.fake_time
-
- self.stop_event = FakeThreadingEvent()
- self.stop_event.increment_time_func = self.increment_time
-
- self.t = interface._FlushThread(60, stop_event=self.stop_event)
-
- def increment_time(self, delta):
- self.fake_time += delta
-
- def assertInRange(self, lower, upper, value):
- self.assertGreaterEqual(value, lower)
- self.assertLessEqual(value, upper)
-
- def tearDown(self):
- # Ensure the thread exits.
- self.stop_event.set(blocking=False)
- self.t.join()
-
- mock.patch.stopall()
-
- def test_run_calls_flush(self):
- self.t.start()
-
- self.assertEqual(0, interface.flush.call_count)
-
- # The wait is for the whole interval (with jitter).
- self.assertInRange(30, 60, self.stop_event.timeout_wait())
-
- # Return from the second wait, which exits the thread.
- self.stop_event.set()
- self.t.join()
- self.assertEqual(2, interface.flush.call_count)
-
- def test_run_catches_exceptions(self):
- interface.flush.side_effect = Exception()
- self.t.start()
-
- self.stop_event.timeout_wait()
- # flush is called now and raises an exception. The exception is caught, so
- # wait is called again.
-
- # Do it again to make sure the exception doesn't terminate the loop.
- self.stop_event.timeout_wait()
-
- # Return from the third wait, which exits the thread.
- self.stop_event.set()
- self.t.join()
- self.assertEqual(3, interface.flush.call_count)
-
- def test_stop_stops(self):
- self.t.start()
-
- self.assertTrue(self.t.is_alive())
-
- self.t.stop()
- self.assertFalse(self.t.is_alive())
- self.assertEqual(1, interface.flush.call_count)
-
- def test_sleeps_for_exact_interval(self):
- self.t.start()
-
- # Flush takes 5 seconds.
- interface.flush.side_effect = functools.partial(self.increment_time, 5)
-
- self.assertInRange(30, 60, self.stop_event.timeout_wait())
- self.assertAlmostEqual(55, self.stop_event.timeout_wait())
- self.assertAlmostEqual(55, self.stop_event.timeout_wait())
-
- def test_sleeps_for_minimum_zero_secs(self):
- self.t.start()
-
- # Flush takes 65 seconds.
- interface.flush.side_effect = functools.partial(self.increment_time, 65)
-
- self.assertInRange(30, 60, self.stop_event.timeout_wait())
- self.assertAlmostEqual(0, self.stop_event.timeout_wait())
- self.assertAlmostEqual(0, self.stop_event.timeout_wait())

Powered by Google App Engine
This is Rietveld 408576698