OLD | NEW |
1 #!/usr/bin/python | 1 #!/usr/bin/python |
2 # | 2 # |
3 # Copyright (c) 2010 The Chromium OS Authors. All rights reserved. | 3 # Copyright (c) 2010 The Chromium OS Authors. All rights reserved. |
4 # Use of this source code is governed by a BSD-style license that can be | 4 # Use of this source code is governed by a BSD-style license that can be |
5 # found in the LICENSE file. | 5 # found in the LICENSE file. |
6 | 6 |
7 import errno | 7 import errno |
8 import os | 8 import os |
9 import shutil | 9 import shutil |
| 10 import signal |
10 import subprocess | 11 import subprocess |
11 import tempfile | 12 import tempfile |
12 import unittest | 13 import unittest |
13 import cros_build_lib | 14 import cros_build_lib |
14 import mox | 15 import mox |
15 | 16 |
16 | 17 |
17 class TestRunCommand(unittest.TestCase): | 18 class TestRunCommand(unittest.TestCase): |
18 | 19 |
19 def setUp(self): | 20 def setUp(self): |
| 21 # Get the original value for SIGINT so our signal() mock can return the |
| 22 # correct thing. |
| 23 self._old_sigint = signal.getsignal(signal.SIGINT) |
| 24 |
20 self.mox = mox.Mox() | 25 self.mox = mox.Mox() |
21 self.mox.StubOutWithMock(subprocess, 'Popen', use_mock_anything=True) | 26 self.mox.StubOutWithMock(subprocess, 'Popen', use_mock_anything=True) |
| 27 self.mox.StubOutWithMock(signal, 'signal') |
22 self.proc_mock = self.mox.CreateMockAnything() | 28 self.proc_mock = self.mox.CreateMockAnything() |
23 self.error = 'test error' | 29 self.error = 'test error' |
24 self.output = 'test output' | 30 self.output = 'test output' |
25 | 31 |
26 def tearDown(self): | 32 def tearDown(self): |
| 33 # Unset anything that we set with mox. |
27 self.mox.UnsetStubs() | 34 self.mox.UnsetStubs() |
28 | 35 |
29 def _AssertCrEqual(self, expected, actual): | 36 def _AssertCrEqual(self, expected, actual): |
30 """Helper method to compare two CommandResult objects. | 37 """Helper method to compare two CommandResult objects. |
31 | 38 |
32 This is needed since assertEqual does not know how to compare two | 39 This is needed since assertEqual does not know how to compare two |
33 CommandResult objects. | 40 CommandResult objects. |
34 | 41 |
35 Args: | 42 Args: |
36 expected: a CommandResult object, expected result. | 43 expected: a CommandResult object, expected result. |
(...skipping 15 matching lines...) Expand all Loading... |
52 rc_kv: key-value pairs passed to RunCommand(). | 59 rc_kv: key-value pairs passed to RunCommand(). |
53 """ | 60 """ |
54 expected_result = cros_build_lib.CommandResult() | 61 expected_result = cros_build_lib.CommandResult() |
55 expected_result.cmd = real_cmd | 62 expected_result.cmd = real_cmd |
56 expected_result.error = self.error | 63 expected_result.error = self.error |
57 expected_result.output = self.output | 64 expected_result.output = self.output |
58 if 'exit_code' in rc_kv: | 65 if 'exit_code' in rc_kv: |
59 expected_result.returncode = self.proc_mock.returncode | 66 expected_result.returncode = self.proc_mock.returncode |
60 | 67 |
61 arg_dict = dict() | 68 arg_dict = dict() |
62 for attr in 'cwd stdin stdout stderr shell'.split(): | 69 for attr in 'cwd env stdin stdout stderr shell'.split(): |
63 if attr in sp_kv: | 70 if attr in sp_kv: |
64 arg_dict[attr] = sp_kv[attr] | 71 arg_dict[attr] = sp_kv[attr] |
65 else: | 72 else: |
66 if attr == 'shell': | 73 if attr == 'shell': |
67 arg_dict[attr] = False | 74 arg_dict[attr] = False |
68 else: | 75 else: |
69 arg_dict[attr] = None | 76 arg_dict[attr] = None |
70 | 77 |
| 78 # If requested, RunCommand will ignore sigints; record that. |
| 79 if rc_kv.get('ignore_sigint'): |
| 80 signal.signal(signal.SIGINT, signal.SIG_IGN).AndReturn(self._old_sigint) |
| 81 |
71 subprocess.Popen(real_cmd, **arg_dict).AndReturn(self.proc_mock) | 82 subprocess.Popen(real_cmd, **arg_dict).AndReturn(self.proc_mock) |
72 self.proc_mock.communicate(None).AndReturn((self.output, self.error)) | 83 self.proc_mock.communicate(None).AndReturn((self.output, self.error)) |
73 | 84 |
| 85 # If it ignored them, RunCommand will restore sigints; record that. |
| 86 if rc_kv.get('ignore_sigint'): |
| 87 signal.signal(signal.SIGINT, self._old_sigint).AndReturn(signal.SIG_IGN) |
| 88 |
74 self.mox.ReplayAll() | 89 self.mox.ReplayAll() |
75 actual_result = cros_build_lib.RunCommand(cmd, **rc_kv) | 90 actual_result = cros_build_lib.RunCommand(cmd, **rc_kv) |
76 self.mox.VerifyAll() | 91 self.mox.VerifyAll() |
77 | 92 |
78 self._AssertCrEqual(expected_result, actual_result) | 93 self._AssertCrEqual(expected_result, actual_result) |
79 | 94 |
80 def testReturnCodeZeroWithArrayCmd(self): | 95 def testReturnCodeZeroWithArrayCmd(self, ignore_sigint=False): |
81 """--enter_chroot=False and --cmd is an array of strings.""" | 96 """--enter_chroot=False and --cmd is an array of strings. |
| 97 |
| 98 Parameterized so this can also be used by some other tests w/ alternate |
| 99 params to RunCommand(). |
| 100 |
| 101 Args: |
| 102 ignore_sigint: If True, we'll tell RunCommand to ignore sigint. |
| 103 """ |
82 self.proc_mock.returncode = 0 | 104 self.proc_mock.returncode = 0 |
83 cmd_list = ['foo', 'bar', 'roger'] | 105 cmd_list = ['foo', 'bar', 'roger'] |
84 self._TestCmd(cmd_list, cmd_list, rc_kv=dict(exit_code=True)) | 106 self._TestCmd(cmd_list, cmd_list, rc_kv=dict(exit_code=True, |
| 107 ignore_sigint=ignore_sigint)) |
| 108 |
| 109 def testSignalRestoreNormalCase(self): |
| 110 """Test RunCommand() properly sets/restores sigint. Normal case.""" |
| 111 self.testReturnCodeZeroWithArrayCmd(ignore_sigint=True) |
| 112 |
85 | 113 |
86 def testReturnCodeZeroWithArrayCmdEnterChroot(self): | 114 def testReturnCodeZeroWithArrayCmdEnterChroot(self): |
87 """--enter_chroot=True and --cmd is an array of strings.""" | 115 """--enter_chroot=True and --cmd is an array of strings.""" |
88 self.proc_mock.returncode = 0 | 116 self.proc_mock.returncode = 0 |
89 cmd_list = ['foo', 'bar', 'roger'] | 117 cmd_list = ['foo', 'bar', 'roger'] |
90 real_cmd = ['./enter_chroot.sh', '--'] + cmd_list | 118 real_cmd = ['./enter_chroot.sh', '--'] + cmd_list |
91 self._TestCmd(cmd_list, real_cmd, rc_kv=dict(enter_chroot=True)) | 119 self._TestCmd(cmd_list, real_cmd, rc_kv=dict(enter_chroot=True)) |
92 | 120 |
93 def testReturnCodeNotZeroErrorOkNotRaisesError(self): | 121 def testReturnCodeNotZeroErrorOkNotRaisesError(self): |
94 """Raise error when proc.communicate() returns non-zero.""" | 122 """Raise error when proc.communicate() returns non-zero.""" |
95 self.proc_mock.returncode = 1 | 123 self.proc_mock.returncode = 1 |
96 cmd = 'test cmd' | 124 cmd = 'test cmd' |
97 self._TestCmd(cmd, cmd, rc_kv=dict(error_ok=True)) | 125 self._TestCmd(cmd, cmd, rc_kv=dict(error_ok=True)) |
98 | 126 |
99 def testSubprocessCommunicateExceptionRaisesError(self): | 127 def testSubprocessCommunicateExceptionRaisesError(self, ignore_sigint=False): |
100 """Verify error raised by communicate() is caught.""" | 128 """Verify error raised by communicate() is caught. |
| 129 |
| 130 Parameterized so this can also be used by some other tests w/ alternate |
| 131 params to RunCommand(). |
| 132 |
| 133 Args: |
| 134 ignore_sigint: If True, we'll tell RunCommand to ignore sigint. |
| 135 """ |
101 cmd = 'test cmd' | 136 cmd = 'test cmd' |
102 subprocess.Popen(cmd, cwd=None, stdin=None, stdout=None, stderr=None, | 137 |
| 138 # If requested, RunCommand will ignore sigints; record that. |
| 139 if ignore_sigint: |
| 140 signal.signal(signal.SIGINT, signal.SIG_IGN).AndReturn(self._old_sigint) |
| 141 |
| 142 subprocess.Popen(cmd, cwd=None, env=None, |
| 143 stdin=None, stdout=None, stderr=None, |
103 shell=False).AndReturn(self.proc_mock) | 144 shell=False).AndReturn(self.proc_mock) |
104 self.proc_mock.communicate(None).AndRaise(ValueError) | 145 self.proc_mock.communicate(None).AndRaise(ValueError) |
105 | 146 |
| 147 # If it ignored them, RunCommand will restore sigints; record that. |
| 148 if ignore_sigint: |
| 149 signal.signal(signal.SIGINT, self._old_sigint).AndReturn(signal.SIG_IGN) |
| 150 |
106 self.mox.ReplayAll() | 151 self.mox.ReplayAll() |
107 self.assertRaises(ValueError, cros_build_lib.RunCommand, cmd) | 152 self.assertRaises(ValueError, cros_build_lib.RunCommand, cmd, |
| 153 ignore_sigint=ignore_sigint) |
108 self.mox.VerifyAll() | 154 self.mox.VerifyAll() |
109 | 155 |
| 156 def testSignalRestoreExceptionCase(self): |
| 157 """Test RunCommand() properly sets/restores sigint. Exception case.""" |
| 158 self.testSubprocessCommunicateExceptionRaisesError(ignore_sigint=True) |
| 159 |
110 def testSubprocessCommunicateExceptionNotRaisesError(self): | 160 def testSubprocessCommunicateExceptionNotRaisesError(self): |
111 """Don't re-raise error from communicate() when --error_ok=True.""" | 161 """Don't re-raise error from communicate() when --error_ok=True.""" |
112 cmd = 'test cmd' | 162 cmd = 'test cmd' |
113 real_cmd = './enter_chroot.sh -- %s' % cmd | 163 real_cmd = './enter_chroot.sh -- %s' % cmd |
114 expected_result = cros_build_lib.CommandResult() | 164 expected_result = cros_build_lib.CommandResult() |
115 expected_result.cmd = real_cmd | 165 expected_result.cmd = real_cmd |
116 | 166 |
117 subprocess.Popen(real_cmd, cwd=None, stdin=None, stdout=None, stderr=None, | 167 subprocess.Popen(real_cmd, cwd=None, env=None, |
| 168 stdin=None, stdout=None, stderr=None, |
118 shell=False).AndReturn(self.proc_mock) | 169 shell=False).AndReturn(self.proc_mock) |
119 self.proc_mock.communicate(None).AndRaise(ValueError) | 170 self.proc_mock.communicate(None).AndRaise(ValueError) |
120 | 171 |
121 self.mox.ReplayAll() | 172 self.mox.ReplayAll() |
122 actual_result = cros_build_lib.RunCommand(cmd, error_ok=True, | 173 actual_result = cros_build_lib.RunCommand(cmd, error_ok=True, |
123 enter_chroot=True) | 174 enter_chroot=True) |
124 self.mox.VerifyAll() | 175 self.mox.VerifyAll() |
125 | 176 |
126 self._AssertCrEqual(expected_result, actual_result) | 177 self._AssertCrEqual(expected_result, actual_result) |
127 | 178 |
| 179 def testEnvWorks(self): |
| 180 """Test RunCommand(..., env=xyz) works.""" |
| 181 # We'll put this bogus environment together, just to make sure |
| 182 # subprocess.Popen gets passed it. |
| 183 env = {'Tom': 'Jerry', 'Itchy': 'Scratchy'} |
| 184 |
| 185 # This is a simple case, copied from testReturnCodeZeroWithArrayCmd() |
| 186 self.proc_mock.returncode = 0 |
| 187 cmd_list = ['foo', 'bar', 'roger'] |
| 188 |
| 189 # Run. We expect the env= to be passed through from sp (subprocess.Popen) |
| 190 # to rc (RunCommand). |
| 191 self._TestCmd(cmd_list, cmd_list, |
| 192 sp_kv=dict(env=env), |
| 193 rc_kv=dict(env=env, exit_code=True)) |
| 194 |
128 | 195 |
129 class TestListFiles(unittest.TestCase): | 196 class TestListFiles(unittest.TestCase): |
130 | 197 |
131 def setUp(self): | 198 def setUp(self): |
132 self.root_dir = tempfile.mkdtemp(prefix='listfiles_unittest') | 199 self.root_dir = tempfile.mkdtemp(prefix='listfiles_unittest') |
133 | 200 |
134 def tearDown(self): | 201 def tearDown(self): |
135 shutil.rmtree(self.root_dir) | 202 shutil.rmtree(self.root_dir) |
136 | 203 |
137 def _CreateNestedDir(self, dir_structure): | 204 def _CreateNestedDir(self, dir_structure): |
(...skipping 93 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
231 | 298 |
232 def testGetChromeosVersionWithEmptyInputReturnsDefault(self): | 299 def testGetChromeosVersionWithEmptyInputReturnsDefault(self): |
233 self._TestChromeosVersion('') | 300 self._TestChromeosVersion('') |
234 | 301 |
235 def testGetChromeosVersionWithNoneInputReturnsDefault(self): | 302 def testGetChromeosVersionWithNoneInputReturnsDefault(self): |
236 self._TestChromeosVersion(None) | 303 self._TestChromeosVersion(None) |
237 | 304 |
238 | 305 |
239 if __name__ == '__main__': | 306 if __name__ == '__main__': |
240 unittest.main() | 307 unittest.main() |
OLD | NEW |