OLD | NEW |
| (Empty) |
1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 """Module containing a test suite that is run to test auto updates.""" | |
6 | |
7 import os | |
8 import tempfile | |
9 import time | |
10 import unittest | |
11 | |
12 import cros_build_lib as cros_lib | |
13 | |
14 import cros_test_proxy | |
15 import dummy_au_worker | |
16 import real_au_worker | |
17 import vm_au_worker | |
18 | |
19 | |
20 class AUTest(unittest.TestCase): | |
21 """Test harness that uses an au_worker to perform and validate updates. | |
22 | |
23 Defines a test suite that is run using an au_worker. An au_worker can | |
24 be created to perform and validates updates on both virtual and real devices. | |
25 See documentation for au_worker for more information. | |
26 """ | |
27 test_results_root = None | |
28 public_key_managers = [] | |
29 | |
30 @classmethod | |
31 def ProcessOptions(cls, options, use_dummy_worker): | |
32 """Processes options for the test suite and sets up the worker class. | |
33 | |
34 Args: | |
35 options: options class to be parsed from main class. | |
36 use_dummy_worker: If True, use a dummy_worker_class rather than deriving | |
37 one from options.type. | |
38 """ | |
39 cls.base_image_path = options.base_image | |
40 cls.target_image_path = options.target_image | |
41 cls.clean = options.clean | |
42 | |
43 assert options.type in ['real', 'vm'], 'Failed to specify either real|vm.' | |
44 if use_dummy_worker: | |
45 cls.worker_class = dummy_au_worker.DummyAUWorker | |
46 elif options.type == 'vm': | |
47 cls.worker_class = vm_au_worker.VMAUWorker | |
48 else: | |
49 cls.worker_class = real_au_worker.RealAUWorker | |
50 | |
51 # Sanity checks. | |
52 if not cls.base_image_path: | |
53 cros_lib.Die('Need path to base image for vm.') | |
54 elif not os.path.exists(cls.base_image_path): | |
55 cros_lib.Die('%s does not exist' % cls.base_image_path) | |
56 | |
57 if not cls.target_image_path: | |
58 cros_lib.Die('Need path to target image to update with.') | |
59 elif not os.path.exists(cls.target_image_path): | |
60 cros_lib.Die('%s does not exist' % cls.target_image_path) | |
61 | |
62 # Initialize test root. Test root path must be in the chroot. | |
63 if not cls.test_results_root: | |
64 if options.test_results_root: | |
65 assert 'chroot/tmp' in options.test_results_root, \ | |
66 'Must specify a test results root inside tmp in a chroot.' | |
67 cls.test_results_root = options.test_results_root | |
68 else: | |
69 cls.test_results_root = tempfile.mkdtemp( | |
70 prefix='au_test_harness', | |
71 dir=cros_lib.PrependChrootPath('/tmp')) | |
72 | |
73 cros_lib.Info('Using %s as the test results root' % cls.test_results_root) | |
74 | |
75 # Cache away options to instantiate workers later. | |
76 cls.options = options | |
77 | |
78 def AttemptUpdateWithPayloadExpectedFailure(self, payload, expected_msg): | |
79 """Attempt a payload update, expect it to fail with expected log""" | |
80 try: | |
81 self.worker.UpdateUsingPayload(payload) | |
82 except UpdateException as err: | |
83 # Will raise ValueError if expected is not found. | |
84 if re.search(re.escape(expected_msg), err.stdout, re.MULTILINE): | |
85 return | |
86 else: | |
87 cros_lib.Warning("Didn't find '%s' in:" % expected_msg) | |
88 cros_lib.Warning(err.stdout) | |
89 | |
90 self.fail('We managed to update when failure was expected') | |
91 | |
92 def AttemptUpdateWithFilter(self, filter, proxy_port=8081): | |
93 """Update through a proxy, with a specified filter, and expect success.""" | |
94 self.worker.PrepareBase(self.target_image_path) | |
95 | |
96 # The devserver runs at port 8080 by default. We assume that here, and | |
97 # start our proxy at a different one. We then tell our update tools to | |
98 # have the client connect to our proxy_port instead of 8080. | |
99 proxy = cros_test_proxy.CrosTestProxy(port_in=proxy_port, | |
100 address_out='127.0.0.1', | |
101 port_out=8080, | |
102 filter=filter) | |
103 proxy.serve_forever_in_thread() | |
104 try: | |
105 self.worker.PerformUpdate(self.target_image_path, self.target_image_path, | |
106 proxy_port=proxy_port) | |
107 finally: | |
108 proxy.shutdown() | |
109 | |
110 # --- UNITTEST SPECIFIC METHODS --- | |
111 | |
112 def setUp(self): | |
113 """Overrides unittest.TestCase.setUp and called before every test. | |
114 | |
115 Sets instance specific variables and initializes worker. | |
116 """ | |
117 unittest.TestCase.setUp(self) | |
118 self.worker = self.worker_class(self.options, AUTest.test_results_root) | |
119 self.crosutils = os.path.join(os.path.dirname(__file__), '..', '..') | |
120 self.download_folder = os.path.join(self.crosutils, 'latest_download') | |
121 if not os.path.exists(self.download_folder): | |
122 os.makedirs(self.download_folder) | |
123 | |
124 def tearDown(self): | |
125 """Overrides unittest.TestCase.tearDown and called after every test.""" | |
126 self.worker.CleanUp() | |
127 | |
128 def testUpdateKeepStateful(self): | |
129 """Tests if we can update normally. | |
130 | |
131 This test checks that we can update by updating the stateful partition | |
132 rather than wiping it. | |
133 """ | |
134 self.worker.InitializeResultsDirectory() | |
135 # Just make sure some tests pass on original image. Some old images | |
136 # don't pass many tests. | |
137 self.worker.PrepareBase(self.base_image_path) | |
138 # TODO(sosa): move to 100% once we start testing using the autotest paired | |
139 # with the dev channel. | |
140 percent_passed = self.worker.VerifyImage(self, 10) | |
141 | |
142 # Update to - all tests should pass on new image. | |
143 self.worker.PerformUpdate(self.target_image_path, self.base_image_path) | |
144 percent_passed = self.worker.VerifyImage(self) | |
145 | |
146 # Update from - same percentage should pass that originally passed. | |
147 self.worker.PerformUpdate(self.base_image_path, self.target_image_path) | |
148 self.worker.VerifyImage(self, percent_passed) | |
149 | |
150 def testUpdateWipeStateful(self): | |
151 """Tests if we can update after cleaning the stateful partition. | |
152 | |
153 This test checks that we can update successfully after wiping the | |
154 stateful partition. | |
155 """ | |
156 self.worker.InitializeResultsDirectory() | |
157 # Just make sure some tests pass on original image. Some old images | |
158 # don't pass many tests. | |
159 self.worker.PrepareBase(self.base_image_path) | |
160 percent_passed = self.worker.VerifyImage(self, 10) | |
161 | |
162 # Update to - all tests should pass on new image. | |
163 self.worker.PerformUpdate(self.target_image_path, self.base_image_path, | |
164 'clean') | |
165 self.worker.VerifyImage(self) | |
166 | |
167 # Update from - same percentage should pass that originally passed. | |
168 self.worker.PerformUpdate(self.base_image_path, self.target_image_path, | |
169 'clean') | |
170 self.worker.VerifyImage(self, percent_passed) | |
171 | |
172 def testInterruptedUpdate(self): | |
173 """Tests what happens if we interrupt payload delivery 3 times.""" | |
174 | |
175 class InterruptionFilter(cros_test_proxy.Filter): | |
176 """This filter causes the proxy to interrupt the download 3 times | |
177 | |
178 It does this by closing the first three connections to transfer | |
179 2M total in the outbound connection after they transfer the | |
180 2M. | |
181 """ | |
182 def __init__(self): | |
183 """Defines variable shared across all connections""" | |
184 self.close_count = 0 | |
185 | |
186 def setup(self): | |
187 """Called once at the start of each connection.""" | |
188 self.data_size = 0 | |
189 | |
190 def OutBound(self, data): | |
191 """Called once per packet for outgoing data. | |
192 | |
193 The first three connections transferring more than 2M | |
194 outbound will be closed. | |
195 """ | |
196 if self.close_count < 3: | |
197 if self.data_size > (2 * 1024 * 1024): | |
198 self.close_count += 1 | |
199 return None | |
200 | |
201 self.data_size += len(data) | |
202 return data | |
203 | |
204 self.worker.InitializeResultsDirectory() | |
205 self.AttemptUpdateWithFilter(InterruptionFilter(), proxy_port=8082) | |
206 | |
207 def testDelayedUpdate(self): | |
208 """Tests what happens if some data is delayed during update delivery""" | |
209 | |
210 class DelayedFilter(cros_test_proxy.Filter): | |
211 """Causes intermittent delays in data transmission. | |
212 | |
213 It does this by inserting 3 20 second delays when transmitting | |
214 data after 2M has been sent. | |
215 """ | |
216 def setup(self): | |
217 """Called once at the start of each connection.""" | |
218 self.data_size = 0 | |
219 self.delay_count = 0 | |
220 | |
221 def OutBound(self, data): | |
222 """Called once per packet for outgoing data. | |
223 | |
224 The first three packets after we reach 2M transferred | |
225 are delayed by 20 seconds. | |
226 """ | |
227 if self.delay_count < 3: | |
228 if self.data_size > (2 * 1024 * 1024): | |
229 self.delay_count += 1 | |
230 time.sleep(20) | |
231 | |
232 self.data_size += len(data) | |
233 return data | |
234 | |
235 self.worker.InitializeResultsDirectory() | |
236 self.AttemptUpdateWithFilter(DelayedFilter(), proxy_port=8083) | |
237 | |
238 def SimpleTest(self): | |
239 """A simple update that updates once from a base image to a target. | |
240 | |
241 We explicitly don't use test prefix so that isn't run by default. Can be | |
242 run using test_prefix option. | |
243 """ | |
244 self.worker.InitializeResultsDirectory() | |
245 self.worker.PrepareBase(self.base_image_path) | |
246 self.worker.PerformUpdate(self.target_image_path, self.base_image_path) | |
247 self.worker.VerifyImage(self) | |
248 | |
249 # --- DISABLED TESTS --- | |
250 | |
251 # TODO(sosa): Get test to work with verbose. | |
252 def NotestPartialUpdate(self): | |
253 """Tests what happens if we attempt to update with a truncated payload.""" | |
254 self.worker.InitializeResultsDirectory() | |
255 # Preload with the version we are trying to test. | |
256 self.worker.PrepareBase(self.target_image_path) | |
257 | |
258 # Image can be updated at: | |
259 # ~chrome-eng/chromeos/localmirror/autest-images | |
260 url = 'http://gsdview.appspot.com/chromeos-localmirror/' \ | |
261 'autest-images/truncated_image.gz' | |
262 payload = os.path.join(self.download_folder, 'truncated_image.gz') | |
263 | |
264 # Read from the URL and write to the local file | |
265 urllib.urlretrieve(url, payload) | |
266 | |
267 expected_msg = 'download_hash_data == update_check_response_hash failed' | |
268 self.AttemptUpdateWithPayloadExpectedFailure(payload, expected_msg) | |
269 | |
270 # TODO(sosa): Get test to work with verbose. | |
271 def NotestCorruptedUpdate(self): | |
272 """Tests what happens if we attempt to update with a corrupted payload.""" | |
273 self.worker.InitializeResultsDirectory() | |
274 # Preload with the version we are trying to test. | |
275 self.worker.PrepareBase(self.target_image_path) | |
276 | |
277 # Image can be updated at: | |
278 # ~chrome-eng/chromeos/localmirror/autest-images | |
279 url = 'http://gsdview.appspot.com/chromeos-localmirror/' \ | |
280 'autest-images/corrupted_image.gz' | |
281 payload = os.path.join(self.download_folder, 'corrupted.gz') | |
282 | |
283 # Read from the URL and write to the local file | |
284 urllib.urlretrieve(url, payload) | |
285 | |
286 # This update is expected to fail... | |
287 expected_msg = 'zlib inflate() error:-3' | |
288 self.AttemptUpdateWithPayloadExpectedFailure(payload, expected_msg) | |
OLD | NEW |