OLD | NEW |
---|---|
(Empty) | |
1 #!/usr/bin/python | |
dgarrett
2011/03/03 02:17:21
According to the new guide this should be:
#!/usr
sosa
2011/03/03 03:11:41
This hasn't been agreed upon and differs from goog
| |
2 | |
3 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. | |
4 # Use of this source code is governed by a BSD-style license that can be | |
5 # found in the LICENSE file. | |
6 | |
7 """This module runs a suite of Auto Update tests. | |
8 | |
9 The tests can be run on either a virtual machine or actual device depending | |
10 on parameters given. Specific tests can be run by invoking --test_prefix. | |
11 Verbose is useful for many of the tests if you want to see individual commands | |
12 being run during the update process. | |
13 """ | |
14 | |
15 import optparse | |
16 import os | |
17 import re | |
18 import sys | |
19 import tempfile | |
20 import unittest | |
21 | |
22 sys.path.append(os.path.join(os.path.dirname(__file__), '../lib')) | |
23 import cros_build_lib as cros_lib | |
24 | |
25 import au_test | |
26 import au_worker | |
27 import dummy_au_worker | |
28 import dev_server_wrapper | |
29 import parallel_test_job | |
30 import update_exception | |
31 | |
32 | |
33 def _PrepareTestSuite(options, use_dummy_worker=False): | |
34 """Returns a prepared test suite given by the options and test class.""" | |
35 au_test.AUTest.ProcessOptions(options, use_dummy_worker) | |
36 test_loader = unittest.TestLoader() | |
37 test_loader.testMethodPrefix = options.test_prefix | |
38 return test_loader.loadTestsFromTestCase(au_test.AUTest) | |
39 | |
40 | |
41 def _PregenerateUpdates(options): | |
42 """Determines all deltas that will be generated and generates them. | |
43 | |
44 This method effectively pre-generates the dev server cache for all tests. | |
45 | |
46 Args: | |
47 options: options from parsed parser. | |
48 Returns: | |
49 Dictionary of Update Identifiers->Relative cache locations. | |
50 Raises: | |
51 update_exception.UpdateException if we fail to generate an update. | |
52 """ | |
53 def _GenerateVMUpdate(target, src, private_key_path): | |
54 """Generates an update using the devserver.""" | |
55 command = ['./enter_chroot.sh', | |
56 '--nogit_config', | |
57 '--', | |
58 'sudo', | |
59 'start_devserver', | |
60 '--pregenerate_update', | |
61 '--exit', | |
62 ] | |
63 # Add actual args to command. | |
64 command.append('--image=%s' % cros_lib.ReinterpretPathForChroot(target)) | |
65 if src: command.append('--src_image=%s' % | |
66 cros_lib.ReinterpretPathForChroot(src)) | |
67 if options.type == 'vm': command.append('--for_vm') | |
68 if private_key_path: | |
69 command.append('--private_key=%s' % | |
70 cros_lib.ReinterpretPathForChroot(private_key_path)) | |
71 | |
72 return cros_lib.RunCommandCaptureOutput(command, combine_stdout_stderr=True, | |
73 print_cmd=True) | |
74 | |
75 # Use dummy class to mock out updates that would be run as part of a test. | |
76 test_suite = _PrepareTestSuite(options, use_dummy_worker=True) | |
77 test_result = unittest.TextTestRunner(verbosity=0).run(test_suite) | |
78 if not test_result.wasSuccessful(): | |
79 raise update_exception.UpdateException(1, | |
80 'Error finding updates to generate.') | |
81 | |
82 cros_lib.Info('The following delta updates are required.') | |
83 update_ids = [] | |
84 jobs = [] | |
85 args = [] | |
86 for target, srcs in dummy_au_worker.DummyAUWorker.delta_list.items(): | |
87 for src_key in srcs: | |
88 (src, _ , key) = src_key.partition('+') | |
89 # TODO(sosa): Add private key as part of caching name once devserver can | |
90 # handle it its own cache. | |
91 update_id = dev_server_wrapper.GenerateUpdateId(target, src, key) | |
92 print >> sys.stderr, 'AU: %s' % update_id | |
93 update_ids.append(update_id) | |
94 jobs.append(_GenerateVMUpdate) | |
95 args.append((target, src, key)) | |
96 | |
97 raw_results = parallel_test_job.RunParallelJobs(options.jobs, jobs, args, | |
98 print_status=True) | |
99 results = [] | |
100 | |
101 # Looking for this line in the output. | |
102 key_line_re = re.compile('^PREGENERATED_UPDATE=([\w/.]+)') | |
103 for result in raw_results: | |
104 (return_code, output, _) = result | |
105 if return_code != 0: | |
106 cros_lib.Warning(output) | |
107 raise update_exception.UpdateException(return_code, | |
108 'Failed to generate all updates.') | |
109 else: | |
110 for line in output.splitlines(): | |
111 match = key_line_re.search(line) | |
112 if match: | |
113 # Convert blah/blah/update.gz -> update/blah/blah. | |
114 path_to_update_gz = match.group(1).rstrip() | |
115 (path_to_update_dir, _, _) = path_to_update_gz.rpartition( | |
116 '/update.gz') | |
117 results.append('/'.join(['update', path_to_update_dir])) | |
118 break | |
119 | |
120 # Make sure all generation of updates returned cached locations. | |
121 if len(raw_results) != len(results): | |
122 raise update_exception.UpdateException( | |
123 1, 'Insufficient number cache directories returned.') | |
124 | |
125 # Build the dictionary from our id's and returned cache paths. | |
126 cache_dictionary = {} | |
127 for index, id in enumerate(update_ids): | |
128 cache_dictionary[id] = results[index] | |
129 | |
130 return cache_dictionary | |
131 | |
132 | |
133 def _RunTestsInParallel(options): | |
134 """Runs the tests given by the options in parallel.""" | |
135 threads = [] | |
136 args = [] | |
137 test_suite = _PrepareTestSuite(options) | |
138 for test in test_suite: | |
139 test_name = test.id() | |
140 test_case = unittest.TestLoader().loadTestsFromName(test_name) | |
141 threads.append(unittest.TextTestRunner().run) | |
142 args.append(test_case) | |
143 | |
144 results = parallel_test_job.RunParallelJobs(options.jobs, threads, args, | |
145 print_status=False) | |
146 for test_result in results: | |
147 if not test_result.wasSuccessful(): | |
148 cros_lib.Die('Test harness was not successful') | |
149 | |
150 | |
151 def _InsertPublicKeyIntoImage(image_path, key_path): | |
152 """Inserts public key into image @ static update_engine location.""" | |
153 from_dir = os.path.dirname(image_path) | |
154 image = os.path.basename(image_path) | |
155 crosutils_dir = os.path.abspath(__file__).rsplit('/', 2)[0] | |
156 target_key_path = 'usr/share/update_engine/update-payload-key.pub.pem' | |
157 | |
158 # Temporary directories for this function. | |
159 rootfs_dir = tempfile.mkdtemp(suffix='rootfs', prefix='tmp') | |
160 stateful_dir = tempfile.mkdtemp(suffix='stateful', prefix='tmp') | |
161 | |
162 cros_lib.Info('Copying %s into %s' % (key_path, image_path)) | |
163 try: | |
164 cros_lib.RunCommand(['./mount_gpt_image.sh', | |
165 '--from=%s' % from_dir, | |
166 '--image=%s' % image, | |
167 '--rootfs_mountpt=%s' % rootfs_dir, | |
168 '--stateful_mountpt=%s' % stateful_dir, | |
169 ], print_cmd=False, redirect_stdout=True, | |
170 redirect_stderr=True, cwd=crosutils_dir) | |
171 path = os.path.join(rootfs_dir, target_key_path) | |
172 dir_path = os.path.dirname(path) | |
173 cros_lib.RunCommand(['sudo', 'mkdir', '--parents', dir_path], | |
174 print_cmd=False) | |
175 cros_lib.RunCommand(['sudo', 'cp', '--force', '-p', key_path, path], | |
176 print_cmd=False) | |
177 finally: | |
178 # Unmount best effort regardless. | |
179 cros_lib.RunCommand(['./mount_gpt_image.sh', | |
180 '--unmount', | |
181 '--rootfs_mountpt=%s' % rootfs_dir, | |
182 '--stateful_mountpt=%s' % stateful_dir, | |
183 ], print_cmd=False, redirect_stdout=True, | |
184 redirect_stderr=True, cwd=crosutils_dir) | |
185 # Clean up our directories. | |
186 os.rmdir(rootfs_dir) | |
187 os.rmdir(stateful_dir) | |
188 | |
189 cros_lib.RunCommand(['bin/cros_make_image_bootable', | |
190 cros_lib.ReinterpretPathForChroot(from_dir), | |
191 image], | |
192 print_cmd=False, redirect_stdout=True, | |
193 redirect_stderr=True, enter_chroot=True, | |
194 cwd=crosutils_dir) | |
195 | |
196 | |
197 def _CleanPreviousWork(options): | |
198 """Cleans up previous work from the devserver cache and local image cache.""" | |
199 cros_lib.Info('Cleaning up previous work.') | |
200 # Wipe devserver cache. | |
201 cros_lib.RunCommandCaptureOutput( | |
202 ['sudo', 'start_devserver', '--clear_cache', '--exit', ], | |
203 enter_chroot=True, print_cmd=False, combine_stdout_stderr=True) | |
204 | |
205 # Clean previous vm images if they exist. | |
206 if options.type == 'vm': | |
207 target_vm_image_path = '%s/chromiumos_qemu_image.bin' % os.path.dirname( | |
208 options.target_image) | |
209 base_vm_image_path = '%s/chromiumos_qemu_image.bin' % os.path.dirname( | |
210 options.base_image) | |
211 if os.path.exists(target_vm_image_path): os.remove(target_vm_image_path) | |
212 if os.path.exists(base_vm_image_path): os.remove(base_vm_image_path) | |
213 | |
214 | |
215 def main(): | |
216 parser = optparse.OptionParser() | |
217 parser.add_option('-b', '--base_image', | |
218 help='path to the base image.') | |
219 parser.add_option('-r', '--board', | |
220 help='board for the images.') | |
221 parser.add_option('--clean', default=False, dest='clean', action='store_true', | |
222 help='Clean all previous state') | |
223 parser.add_option('--no_delta', action='store_false', default=True, | |
224 dest='delta', | |
225 help='Disable using delta updates.') | |
226 parser.add_option('--no_graphics', action='store_true', | |
227 help='Disable graphics for the vm test.') | |
228 parser.add_option('-j', '--jobs', default=8, type=int, | |
229 help='Number of simultaneous jobs') | |
230 parser.add_option('--public_key', default=None, | |
231 help='Public key to use on images and updates.') | |
232 parser.add_option('--private_key', default=None, | |
233 help='Private key to use on images and updates.') | |
234 parser.add_option('-q', '--quick_test', default=False, action='store_true', | |
235 help='Use a basic test to verify image.') | |
236 parser.add_option('-m', '--remote', | |
237 help='Remote address for real test.') | |
238 parser.add_option('-t', '--target_image', | |
239 help='path to the target image.') | |
240 parser.add_option('--test_prefix', default='test', | |
241 help='Only runs tests with specific prefix i.e. ' | |
242 'testFullUpdateWipeStateful.') | |
243 parser.add_option('-p', '--type', default='vm', | |
244 help='type of test to run: [vm, real]. Default: vm.') | |
245 parser.add_option('--verbose', default=True, action='store_true', | |
246 help='Print out rather than capture output as much as ' | |
247 'possible.') | |
248 (options, leftover_args) = parser.parse_args() | |
249 | |
250 if leftover_args: parser.error('Found unsupported flags: %s' % leftover_args) | |
251 | |
252 assert options.target_image and os.path.exists(options.target_image), \ | |
253 'Target image path does not exist' | |
254 if not options.base_image: | |
255 cros_lib.Info('Base image not specified. Using target as base image.') | |
256 options.base_image = options.target_image | |
257 | |
258 # Sanity checks on keys and insert them onto the image. The caches must be | |
259 # cleaned so we know that the vm images and payloads match the possibly new | |
260 # key. | |
261 if options.private_key or options.public_key: | |
262 error_msg = ('Could not find %s key. Both private and public keys must be ' | |
263 'specified if either is specified.') | |
264 assert options.private_key and os.path.exists(options.private_key), \ | |
265 error_msg % 'private' | |
266 assert options.public_key and os.path.exists(options.public_key), \ | |
267 error_msg % 'public' | |
268 _InsertPublicKeyIntoImage(options.target_image, options.public_key) | |
269 if options.target_image != options.base_image: | |
270 _InsertPublicKeyIntoImage(options.base_image, options.public_key) | |
271 options.clean = True | |
272 | |
273 # Clean up previous work if requested. | |
274 if options.clean: _CleanPreviousWork(options) | |
275 | |
276 # Generate cache of updates to use during test harness. | |
277 update_cache = _PregenerateUpdates(options) | |
278 au_worker.AUWorker.SetUpdateCache(update_cache) | |
279 | |
280 my_server = dev_server_wrapper.DevServerWrapper() | |
281 my_server.start() | |
282 try: | |
283 if options.type == 'vm': | |
284 _RunTestsInParallel(options) | |
285 else: | |
286 # TODO(sosa) - Take in a machine pool for a real test. | |
287 # Can't run in parallel with only one remote device. | |
288 test_suite = _PrepareTestSuite(options) | |
289 test_result = unittest.TextTestRunner(verbosity=2).run(test_suite) | |
290 if not test_result.wasSuccessful(): cros_lib.Die('Test harness failed.') | |
291 finally: | |
292 my_server.Stop() | |
293 | |
294 | |
295 if __name__ == '__main__': | |
296 main() | |
OLD | NEW |