OLD | NEW |
| (Empty) |
1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 """Utilities for PyAuto.""" | |
6 | |
7 import httplib | |
8 import logging | |
9 import os | |
10 import shutil | |
11 import socket | |
12 import sys | |
13 import tempfile | |
14 import unittest | |
15 import urlparse | |
16 import zipfile | |
17 | |
18 | |
19 class ExistingPathReplacer(object): | |
20 """Facilitates backing up a given path (file or dir).. | |
21 | |
22 Often you want to manipulate a directory or file for testing but don't want to | |
23 meddle with the existing contents. This class lets you make a backup, and | |
24 reinstate the backup when done. A backup is made in an adjacent directory, | |
25 so you need to make sure you have write permissions to the parent directory. | |
26 | |
27 Works seemlessly in cases where the requested path already exists, or not. | |
28 | |
29 Automatically reinstates the backed up path (if any) when object is deleted. | |
30 """ | |
31 _path = '' | |
32 _backup_dir = None # dir to which existing content is backed up | |
33 _backup_basename = '' | |
34 | |
35 def __init__(self, path, path_type='dir'): | |
36 """Initialize the object, making backups if necessary. | |
37 | |
38 Args: | |
39 path: the requested path to file or directory | |
40 path_type: path type. Options: 'file', 'dir'. Default: 'dir' | |
41 """ | |
42 assert path_type in ('file', 'dir'), 'Invalid path_type: %s' % path_type | |
43 self._path_type = path_type | |
44 self._path = path | |
45 if os.path.exists(self._path): | |
46 if 'dir' == self._path_type: | |
47 assert os.path.isdir(self._path), '%s is not a directory' % self._path | |
48 else: | |
49 assert os.path.isfile(self._path), '%s is not a file' % self._path | |
50 # take a backup | |
51 self._backup_basename = os.path.basename(self._path) | |
52 self._backup_dir = tempfile.mkdtemp(dir=os.path.dirname(self._path), | |
53 prefix='bkp-' + self._backup_basename) | |
54 logging.info('Backing up %s in %s' % (self._path, self._backup_dir)) | |
55 shutil.move(self._path, | |
56 os.path.join(self._backup_dir, self._backup_basename)) | |
57 self._CreateRequestedPath() | |
58 | |
59 def __del__(self): | |
60 """Cleanup. Reinstate backup.""" | |
61 self._CleanupRequestedPath() | |
62 if self._backup_dir: # Reinstate, if backed up. | |
63 from_path = os.path.join(self._backup_dir, self._backup_basename) | |
64 logging.info('Reinstating backup from %s to %s' % (from_path, self._path)) | |
65 shutil.move(from_path, self._path) | |
66 self._RemoveBackupDir() | |
67 | |
68 def _CreateRequestedPath(self): | |
69 # Create intermediate dirs if needed. | |
70 if not os.path.exists(os.path.dirname(self._path)): | |
71 os.makedirs(os.path.dirname(self._path)) | |
72 if 'dir' == self._path_type: | |
73 os.mkdir(self._path) | |
74 else: | |
75 open(self._path, 'w').close() | |
76 | |
77 def _CleanupRequestedPath(self): | |
78 if os.path.exists(self._path): | |
79 if os.path.isdir(self._path): | |
80 shutil.rmtree(self._path, ignore_errors=True) | |
81 else: | |
82 os.remove(self._path) | |
83 | |
84 def _RemoveBackupDir(self): | |
85 if self._backup_dir and os.path.isdir(self._backup_dir): | |
86 shutil.rmtree(self._backup_dir, ignore_errors=True) | |
87 | |
88 | |
89 def RemovePath(path): | |
90 """Remove the given path (file or dir).""" | |
91 if os.path.isdir(path): | |
92 shutil.rmtree(path, ignore_errors=True) | |
93 return | |
94 try: | |
95 os.remove(path) | |
96 except OSError: | |
97 pass | |
98 | |
99 | |
100 def UnzipFilenameToDir(filename, dir): | |
101 """Unzip |filename| to directory |dir|. | |
102 | |
103 This works with as low as python2.4 (used on win). | |
104 """ | |
105 zf = zipfile.ZipFile(filename) | |
106 pushd = os.getcwd() | |
107 if not os.path.isdir(dir): | |
108 os.mkdir(dir) | |
109 os.chdir(dir) | |
110 # Extract files. | |
111 for info in zf.infolist(): | |
112 name = info.filename | |
113 if name.endswith('/'): # dir | |
114 if not os.path.isdir(name): | |
115 os.makedirs(name) | |
116 else: # file | |
117 dir = os.path.dirname(name) | |
118 if not os.path.isdir(dir): | |
119 os.makedirs(dir) | |
120 out = open(name, 'wb') | |
121 out.write(zf.read(name)) | |
122 out.close() | |
123 # Set permissions. Permission info in external_attr is shifted 16 bits. | |
124 os.chmod(name, info.external_attr >> 16L) | |
125 os.chdir(pushd) | |
126 | |
127 | |
128 def GetCurrentPlatform(): | |
129 """Get a string representation for the current platform. | |
130 | |
131 Returns: | |
132 'mac', 'win' or 'linux' | |
133 """ | |
134 if sys.platform == 'darwin': | |
135 return 'mac' | |
136 if sys.platform == 'win32': | |
137 return 'win' | |
138 if sys.platform.startswith('linux'): | |
139 return 'linux' | |
140 raise RuntimeError('Unknown platform') | |
141 | |
142 | |
143 def PrintPerfResult(graph_name, series_name, data_point, units, | |
144 show_on_waterfall=False): | |
145 """Prints a line to stdout that is specially formatted for the perf bots. | |
146 | |
147 Args: | |
148 graph_name: String name for the graph on which to plot the data. | |
149 series_name: String name for the series (line on the graph) associated with | |
150 the data. This is also the string displayed on the waterfall | |
151 if |show_on_waterfall| is True. | |
152 data_point: Numeric data value to plot on the graph for the current build. | |
153 This can be a single value or an array of values. If an array, | |
154 the graph will plot the average of the values, along with error | |
155 bars. | |
156 units: The string unit of measurement for the given |data_point|. | |
157 show_on_waterfall: Whether or not to display this result directly on the | |
158 buildbot waterfall itself (in the buildbot step running | |
159 this test on the waterfall page, not the stdio page). | |
160 """ | |
161 waterfall_indicator = ['', '*'][show_on_waterfall] | |
162 print '%sRESULT %s: %s= %s %s' % ( | |
163 waterfall_indicator, graph_name, series_name, | |
164 str(data_point).replace(' ', ''), units) | |
165 sys.stdout.flush() | |
166 | |
167 | |
168 def Shard(ilist, shard_index, num_shards): | |
169 """Shard a given list and return the group at index |shard_index|. | |
170 | |
171 Args: | |
172 ilist: input list | |
173 shard_index: 0-based sharding index | |
174 num_shards: shard count | |
175 """ | |
176 chunk_size = len(ilist) / num_shards | |
177 chunk_start = shard_index * chunk_size | |
178 if shard_index == num_shards - 1: # Exhaust the remainder in the last shard. | |
179 chunk_end = len(ilist) | |
180 else: | |
181 chunk_end = chunk_start + chunk_size | |
182 return ilist[chunk_start:chunk_end] | |
183 | |
184 | |
185 def WaitForDomElement(pyauto, driver, xpath): | |
186 """Wait for the UI element to appear. | |
187 | |
188 Args: | |
189 pyauto: an instance of pyauto.PyUITest. | |
190 driver: an instance of chrome driver or a web element. | |
191 xpath: the xpath of the element to wait for. | |
192 | |
193 Returns: | |
194 The element if it is found. | |
195 NoSuchElementException if it is not found. | |
196 """ | |
197 pyauto.WaitUntil(lambda: len(driver.find_elements_by_xpath(xpath)) > 0) | |
198 return driver.find_element_by_xpath(xpath) | |
199 | |
200 | |
201 def DoesUrlExist(url): | |
202 """Determines whether a resource exists at the given URL. | |
203 | |
204 Args: | |
205 url: URL to be verified. | |
206 | |
207 Returns: | |
208 True if url exists, otherwise False. | |
209 """ | |
210 parsed = urlparse.urlparse(url) | |
211 try: | |
212 conn = httplib.HTTPConnection(parsed.netloc) | |
213 conn.request('HEAD', parsed.path) | |
214 response = conn.getresponse() | |
215 except (socket.gaierror, socket.error): | |
216 return False | |
217 finally: | |
218 conn.close() | |
219 # Follow both permanent (301) and temporary (302) redirects. | |
220 if response.status == 302 or response.status == 301: | |
221 return DoesUrlExist(response.getheader('location')) | |
222 return response.status == 200 | |
223 | |
224 | |
225 class _GTestTextTestResult(unittest._TextTestResult): | |
226 """A test result class that can print formatted text results to a stream. | |
227 | |
228 Results printed in conformance with gtest output format, like: | |
229 [ RUN ] autofill.AutofillTest.testAutofillInvalid: "test desc." | |
230 [ OK ] autofill.AutofillTest.testAutofillInvalid | |
231 [ RUN ] autofill.AutofillTest.testFillProfile: "test desc." | |
232 [ OK ] autofill.AutofillTest.testFillProfile | |
233 [ RUN ] autofill.AutofillTest.testFillProfileCrazyCharacters: "Test." | |
234 [ OK ] autofill.AutofillTest.testFillProfileCrazyCharacters | |
235 """ | |
236 | |
237 def __init__(self, stream, descriptions, verbosity): | |
238 unittest._TextTestResult.__init__(self, stream, descriptions, verbosity) | |
239 | |
240 def _GetTestURI(self, test): | |
241 if sys.version_info[:2] <= (2, 4): | |
242 return '%s.%s' % (unittest._strclass(test.__class__), | |
243 test._TestCase__testMethodName) | |
244 return '%s.%s' % (unittest._strclass(test.__class__), test._testMethodName) | |
245 | |
246 def getDescription(self, test): | |
247 return '%s: "%s"' % (self._GetTestURI(test), test.shortDescription()) | |
248 | |
249 def startTest(self, test): | |
250 unittest.TestResult.startTest(self, test) | |
251 self.stream.writeln('[ RUN ] %s' % self.getDescription(test)) | |
252 | |
253 def addSuccess(self, test): | |
254 unittest.TestResult.addSuccess(self, test) | |
255 self.stream.writeln('[ OK ] %s' % self._GetTestURI(test)) | |
256 | |
257 def addError(self, test, err): | |
258 unittest.TestResult.addError(self, test, err) | |
259 self.stream.writeln('[ ERROR ] %s' % self._GetTestURI(test)) | |
260 | |
261 def addFailure(self, test, err): | |
262 unittest.TestResult.addFailure(self, test, err) | |
263 self.stream.writeln('[ FAILED ] %s' % self._GetTestURI(test)) | |
264 | |
265 | |
266 class GTestTextTestRunner(unittest.TextTestRunner): | |
267 """Test Runner for displaying test results in textual format. | |
268 | |
269 Results are displayed in conformance with gtest output. | |
270 """ | |
271 | |
272 def __init__(self, verbosity=1): | |
273 unittest.TextTestRunner.__init__(self, stream=sys.stderr, | |
274 verbosity=verbosity) | |
275 | |
276 def _makeResult(self): | |
277 return _GTestTextTestResult(self.stream, self.descriptions, self.verbosity) | |
OLD | NEW |