| OLD | NEW |
| (Empty) | |
| 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. |
| 4 |
| 5 import contextlib |
| 6 import collections |
| 7 import fcntl |
| 8 import os |
| 9 import sys |
| 10 import unittest |
| 11 |
| 12 from testing_support import auto_stub |
| 13 from infra.libs.service_utils import daemon |
| 14 |
| 15 |
| 16 Stat = collections.namedtuple('Stat', ['st_ino']) |
| 17 |
| 18 |
| 19 class TestFlock(auto_stub.TestCase): |
| 20 def setUp(self): |
| 21 super(TestFlock, self).setUp() |
| 22 |
| 23 # daemon.flock() only works on linux/osx, so set 'linux' here if we're |
| 24 # testing in windows. The OS calls are mocked so it will still work. If |
| 25 # windows support is added, remove this mock entirely. |
| 26 self.mock(sys, 'platform', 'linux2') |
| 27 |
| 28 @contextlib.contextmanager |
| 29 def _assert_reached(self): |
| 30 reached = {'yup': False} |
| 31 yield reached |
| 32 self.assertTrue(reached['yup']) |
| 33 |
| 34 def _mock_basic_fs_calls(self): |
| 35 """Mocks os.open, os.close as well as os.fstat.""" |
| 36 def _noop_handler(*_args, **_kwargs): |
| 37 return 1 |
| 38 |
| 39 def _noop_os_close(*_args, **_kwargs): |
| 40 pass |
| 41 |
| 42 def _noop_fstat(*_args, **_kwargs): |
| 43 return Stat(st_ino=45678) |
| 44 |
| 45 self.mock(os, 'open', _noop_handler) |
| 46 self.mock(os, 'close', _noop_os_close) |
| 47 self.mock(os, 'fstat', _noop_fstat) |
| 48 |
| 49 def _set_lock_status(self, success=True): |
| 50 """Mocks os.fcntl and whether the mock succeeds or not.""" |
| 51 def _lock_status(_fd, flags, **_kwargs): |
| 52 if flags != fcntl.LOCK_UN: # We don't care if unlock fails. |
| 53 if not success: |
| 54 raise IOError('Couldn\'t get lock.') |
| 55 |
| 56 self.mock(fcntl, 'lockf', _lock_status) |
| 57 |
| 58 def _set_stat_status(self, success=True, matching=True): |
| 59 """Mocks os.stat, sets its success and if st_ino matches os.fstat mock.""" |
| 60 def _stat_handler(*_args, **_kwargs): |
| 61 if not success: |
| 62 raise OSError('Not found.') |
| 63 if matching: |
| 64 return Stat(st_ino=45678) |
| 65 return Stat(st_ino=67890) |
| 66 |
| 67 self.mock(os, 'stat', _stat_handler) |
| 68 |
| 69 def _set_unlink_status(self, success=True): |
| 70 """Mocks os.unlink and sets whether it succeeds or not.""" |
| 71 def _unlink_handler(*_args, **_kwargs): |
| 72 if not success: |
| 73 raise OSError('Not found.') |
| 74 |
| 75 self.mock(os, 'unlink', _unlink_handler) |
| 76 |
| 77 #### Tests. |
| 78 |
| 79 def testGetLock(self): |
| 80 self._mock_basic_fs_calls() |
| 81 self._set_lock_status() |
| 82 self._set_stat_status() |
| 83 self._set_unlink_status() |
| 84 with self._assert_reached() as reached: |
| 85 with daemon.flock('bogus'): |
| 86 reached['yup'] = True |
| 87 |
| 88 def testDontGetLock(self): |
| 89 self._mock_basic_fs_calls() |
| 90 self._set_lock_status(success=False) |
| 91 self._set_stat_status() |
| 92 self._set_unlink_status() |
| 93 with self.assertRaises(daemon.LockAlreadyLocked): |
| 94 with daemon.flock('bogus'): |
| 95 # Should never reach this. |
| 96 # pylint: disable=redundant-unittest-assert |
| 97 self.assertTrue(False) # pragma: no cover |
| 98 |
| 99 def testFileDeletedAfterLockAcquired(self): |
| 100 """Test that we abort if we acquire a lock but the file has been deleted.""" |
| 101 self._mock_basic_fs_calls() |
| 102 self._set_lock_status() |
| 103 self._set_stat_status(success=False) |
| 104 self._set_unlink_status() |
| 105 with self.assertRaises(daemon.LockAlreadyLocked): |
| 106 with daemon.flock('bogus'): |
| 107 # Should never reach this. |
| 108 # pylint: disable=redundant-unittest-assert |
| 109 self.assertTrue(False) # pragma: no cover |
| 110 |
| 111 def testLockfileRecreated(self): |
| 112 """Test that we abort if a new lockfile is created under us.""" |
| 113 self._mock_basic_fs_calls() |
| 114 self._set_lock_status() |
| 115 self._set_stat_status(matching=False) |
| 116 self._set_unlink_status() |
| 117 with self.assertRaises(daemon.LockAlreadyLocked): |
| 118 with daemon.flock('bogus'): |
| 119 # Should never reach this. |
| 120 # pylint: disable=redundant-unittest-assert |
| 121 self.assertTrue(False) # pragma: no cover |
| 122 |
| 123 def testDeleteWhenDone(self): |
| 124 """Test that we delete the lockfile when we're done.""" |
| 125 data = {'count': 0} |
| 126 def _mock_unlink(*_args, **_kwargs): |
| 127 data['count'] += 1 |
| 128 self.mock(os, 'unlink', _mock_unlink) |
| 129 self._mock_basic_fs_calls() |
| 130 self._set_lock_status() |
| 131 self._set_stat_status() |
| 132 with self._assert_reached() as reached: |
| 133 with daemon.flock('bogus'): |
| 134 reached['yup'] = True |
| 135 self.assertEqual(data['count'], 1) |
| 136 |
| 137 |
| 138 def testUnlinkFailureDoesntBreak(self): |
| 139 """Test that a failing unlink doesn't break us.""" |
| 140 self._mock_basic_fs_calls() |
| 141 self._set_lock_status() |
| 142 self._set_stat_status() |
| 143 self._set_unlink_status(success=False) |
| 144 with self._assert_reached() as reached: |
| 145 with daemon.flock('bogus'): |
| 146 reached['yup'] = True |
| 147 |
| 148 |
| 149 class TestTimeout(auto_stub.TestCase): |
| 150 def setUp(self): |
| 151 super(TestTimeout, self).setUp() |
| 152 |
| 153 # daemon.add_timeout() only works on linux, so set 'linux' here if we're |
| 154 # testing in windows/osx. If windows or osx support is added, change |
| 155 # accordingly. |
| 156 self.mock(sys, 'platform', 'linux2') |
| 157 |
| 158 def testAddTimeout(self): |
| 159 self.assertEqual( |
| 160 ['timeout', '600', 'echo', 'hey'], |
| 161 daemon.add_timeout(['echo', 'hey'], 600)) |
| OLD | NEW |