OLD | NEW |
1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. | 1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 """Module containing a test suite that is run to test auto updates.""" | 5 """Module containing a test suite that is run to test auto updates.""" |
6 | 6 |
7 import os | 7 import os |
8 import tempfile | 8 import tempfile |
9 import time | 9 import time |
10 import unittest | 10 import unittest |
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
74 | 74 |
75 # Cache away options to instantiate workers later. | 75 # Cache away options to instantiate workers later. |
76 cls.options = options | 76 cls.options = options |
77 | 77 |
78 def AttemptUpdateWithPayloadExpectedFailure(self, payload, expected_msg): | 78 def AttemptUpdateWithPayloadExpectedFailure(self, payload, expected_msg): |
79 """Attempt a payload update, expect it to fail with expected log""" | 79 """Attempt a payload update, expect it to fail with expected log""" |
80 try: | 80 try: |
81 self.worker.UpdateUsingPayload(payload) | 81 self.worker.UpdateUsingPayload(payload) |
82 except UpdateException as err: | 82 except UpdateException as err: |
83 # Will raise ValueError if expected is not found. | 83 # Will raise ValueError if expected is not found. |
84 if re.search(re.escape(expected_msg), err.stdout, re.MULTILINE): | 84 if re.search(re.escape(expected_msg), err.output, re.MULTILINE): |
85 return | 85 return |
86 else: | 86 else: |
87 cros_lib.Warning("Didn't find '%s' in:" % expected_msg) | 87 cros_lib.Warning("Didn't find '%s' in:" % expected_msg) |
88 cros_lib.Warning(err.stdout) | 88 cros_lib.Warning(err.output) |
89 | 89 |
90 self.fail('We managed to update when failure was expected') | 90 self.fail('We managed to update when failure was expected') |
91 | 91 |
92 def AttemptUpdateWithFilter(self, filter, proxy_port=8081): | 92 def AttemptUpdateWithFilter(self, filter, proxy_port=8081): |
93 """Update through a proxy, with a specified filter, and expect success.""" | 93 """Update through a proxy, with a specified filter, and expect success.""" |
94 self.worker.PrepareBase(self.target_image_path) | 94 self.worker.PrepareBase(self.target_image_path) |
95 | 95 |
96 # The devserver runs at port 8080 by default. We assume that here, and | 96 # The devserver runs at port 8080 by default. We assume that here, and |
97 # start our proxy at a different one. We then tell our update tools to | 97 # start our proxy at a different one. We then tell our update tools to |
98 # have the client connect to our proxy_port instead of 8080. | 98 # have the client connect to our proxy_port instead of 8080. |
99 proxy = cros_test_proxy.CrosTestProxy(port_in=proxy_port, | 99 proxy = cros_test_proxy.CrosTestProxy(port_in=proxy_port, |
100 address_out='127.0.0.1', | 100 address_out='127.0.0.1', |
101 port_out=8080, | 101 port_out=8080, |
102 filter=filter) | 102 filter=filter) |
103 proxy.serve_forever_in_thread() | 103 proxy.serve_forever_in_thread() |
104 try: | 104 try: |
105 self.worker.PerformUpdate(self.target_image_path, self.target_image_path, | 105 self.worker.PerformUpdate(self.target_image_path, self.target_image_path, |
106 proxy_port=proxy_port) | 106 proxy_port=proxy_port) |
107 finally: | 107 finally: |
108 proxy.shutdown() | 108 proxy.shutdown() |
109 | 109 |
110 # --- UNITTEST SPECIFIC METHODS --- | 110 # --- UNITTEST SPECIFIC METHODS --- |
111 | 111 |
112 def setUp(self): | 112 def setUp(self): |
113 """Overrides unittest.TestCase.setUp and called before every test. | 113 """Overrides unittest.TestCase.setUp and called before every test. |
114 | 114 |
115 Sets instance specific variables and initializes worker. | 115 Sets instance specific variables and initializes worker. |
116 """ | 116 """ |
117 unittest.TestCase.setUp(self) | 117 super(AUTest, self).setUp() |
118 self.worker = self.worker_class(self.options, AUTest.test_results_root) | 118 self.worker = self.worker_class(self.options, AUTest.test_results_root) |
119 self.download_folder = os.path.join(os.path.realpath(os.path.curdir), | 119 self.download_folder = os.path.join(os.path.realpath(os.path.curdir), |
120 'latest_download') | 120 'latest_download') |
121 if not os.path.exists(self.download_folder): | 121 if not os.path.exists(self.download_folder): |
122 os.makedirs(self.download_folder) | 122 os.makedirs(self.download_folder) |
123 | 123 |
124 def tearDown(self): | 124 def tearDown(self): |
125 """Overrides unittest.TestCase.tearDown and called after every test.""" | 125 """Overrides unittest.TestCase.tearDown and called after every test.""" |
126 self.worker.CleanUp() | 126 self.worker.CleanUp() |
127 | 127 |
(...skipping 147 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
275 url = 'http://gsdview.appspot.com/chromeos-localmirror/' \ | 275 url = 'http://gsdview.appspot.com/chromeos-localmirror/' \ |
276 'autest-images/corrupted_image.gz' | 276 'autest-images/corrupted_image.gz' |
277 payload = os.path.join(self.download_folder, 'corrupted.gz') | 277 payload = os.path.join(self.download_folder, 'corrupted.gz') |
278 | 278 |
279 # Read from the URL and write to the local file | 279 # Read from the URL and write to the local file |
280 urllib.urlretrieve(url, payload) | 280 urllib.urlretrieve(url, payload) |
281 | 281 |
282 # This update is expected to fail... | 282 # This update is expected to fail... |
283 expected_msg = 'zlib inflate() error:-3' | 283 expected_msg = 'zlib inflate() error:-3' |
284 self.AttemptUpdateWithPayloadExpectedFailure(payload, expected_msg) | 284 self.AttemptUpdateWithPayloadExpectedFailure(payload, expected_msg) |
OLD | NEW |