| OLD | NEW |
| (Empty) |
| 1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
| 2 # Use of this source code is governed by a BSD-style license that can be | |
| 3 # found in the LICENSE file. | |
| 4 | |
| 5 """Utilities for PyAuto.""" | |
| 6 | |
| 7 import httplib | |
| 8 import logging | |
| 9 import os | |
| 10 import shutil | |
| 11 import socket | |
| 12 import sys | |
| 13 import tempfile | |
| 14 import unittest | |
| 15 import urlparse | |
| 16 import zipfile | |
| 17 | |
| 18 | |
| 19 class ExistingPathReplacer(object): | |
| 20 """Facilitates backing up a given path (file or dir).. | |
| 21 | |
| 22 Often you want to manipulate a directory or file for testing but don't want to | |
| 23 meddle with the existing contents. This class lets you make a backup, and | |
| 24 reinstate the backup when done. A backup is made in an adjacent directory, | |
| 25 so you need to make sure you have write permissions to the parent directory. | |
| 26 | |
| 27 Works seemlessly in cases where the requested path already exists, or not. | |
| 28 | |
| 29 Automatically reinstates the backed up path (if any) when object is deleted. | |
| 30 """ | |
| 31 _path = '' | |
| 32 _backup_dir = None # dir to which existing content is backed up | |
| 33 _backup_basename = '' | |
| 34 | |
| 35 def __init__(self, path, path_type='dir'): | |
| 36 """Initialize the object, making backups if necessary. | |
| 37 | |
| 38 Args: | |
| 39 path: the requested path to file or directory | |
| 40 path_type: path type. Options: 'file', 'dir'. Default: 'dir' | |
| 41 """ | |
| 42 assert path_type in ('file', 'dir'), 'Invalid path_type: %s' % path_type | |
| 43 self._path_type = path_type | |
| 44 self._path = path | |
| 45 if os.path.exists(self._path): | |
| 46 if 'dir' == self._path_type: | |
| 47 assert os.path.isdir(self._path), '%s is not a directory' % self._path | |
| 48 else: | |
| 49 assert os.path.isfile(self._path), '%s is not a file' % self._path | |
| 50 # take a backup | |
| 51 self._backup_basename = os.path.basename(self._path) | |
| 52 self._backup_dir = tempfile.mkdtemp(dir=os.path.dirname(self._path), | |
| 53 prefix='bkp-' + self._backup_basename) | |
| 54 logging.info('Backing up %s in %s' % (self._path, self._backup_dir)) | |
| 55 shutil.move(self._path, | |
| 56 os.path.join(self._backup_dir, self._backup_basename)) | |
| 57 self._CreateRequestedPath() | |
| 58 | |
| 59 def __del__(self): | |
| 60 """Cleanup. Reinstate backup.""" | |
| 61 self._CleanupRequestedPath() | |
| 62 if self._backup_dir: # Reinstate, if backed up. | |
| 63 from_path = os.path.join(self._backup_dir, self._backup_basename) | |
| 64 logging.info('Reinstating backup from %s to %s' % (from_path, self._path)) | |
| 65 shutil.move(from_path, self._path) | |
| 66 self._RemoveBackupDir() | |
| 67 | |
| 68 def _CreateRequestedPath(self): | |
| 69 # Create intermediate dirs if needed. | |
| 70 if not os.path.exists(os.path.dirname(self._path)): | |
| 71 os.makedirs(os.path.dirname(self._path)) | |
| 72 if 'dir' == self._path_type: | |
| 73 os.mkdir(self._path) | |
| 74 else: | |
| 75 open(self._path, 'w').close() | |
| 76 | |
| 77 def _CleanupRequestedPath(self): | |
| 78 if os.path.exists(self._path): | |
| 79 if os.path.isdir(self._path): | |
| 80 shutil.rmtree(self._path, ignore_errors=True) | |
| 81 else: | |
| 82 os.remove(self._path) | |
| 83 | |
| 84 def _RemoveBackupDir(self): | |
| 85 if self._backup_dir and os.path.isdir(self._backup_dir): | |
| 86 shutil.rmtree(self._backup_dir, ignore_errors=True) | |
| 87 | |
| 88 | |
| 89 def RemovePath(path): | |
| 90 """Remove the given path (file or dir).""" | |
| 91 if os.path.isdir(path): | |
| 92 shutil.rmtree(path, ignore_errors=True) | |
| 93 return | |
| 94 try: | |
| 95 os.remove(path) | |
| 96 except OSError: | |
| 97 pass | |
| 98 | |
| 99 | |
| 100 def UnzipFilenameToDir(filename, dir): | |
| 101 """Unzip |filename| to directory |dir|. | |
| 102 | |
| 103 This works with as low as python2.4 (used on win). | |
| 104 """ | |
| 105 zf = zipfile.ZipFile(filename) | |
| 106 pushd = os.getcwd() | |
| 107 if not os.path.isdir(dir): | |
| 108 os.mkdir(dir) | |
| 109 os.chdir(dir) | |
| 110 # Extract files. | |
| 111 for info in zf.infolist(): | |
| 112 name = info.filename | |
| 113 if name.endswith('/'): # dir | |
| 114 if not os.path.isdir(name): | |
| 115 os.makedirs(name) | |
| 116 else: # file | |
| 117 dir = os.path.dirname(name) | |
| 118 if not os.path.isdir(dir): | |
| 119 os.makedirs(dir) | |
| 120 out = open(name, 'wb') | |
| 121 out.write(zf.read(name)) | |
| 122 out.close() | |
| 123 # Set permissions. Permission info in external_attr is shifted 16 bits. | |
| 124 os.chmod(name, info.external_attr >> 16L) | |
| 125 os.chdir(pushd) | |
| 126 | |
| 127 | |
| 128 def GetCurrentPlatform(): | |
| 129 """Get a string representation for the current platform. | |
| 130 | |
| 131 Returns: | |
| 132 'mac', 'win' or 'linux' | |
| 133 """ | |
| 134 if sys.platform == 'darwin': | |
| 135 return 'mac' | |
| 136 if sys.platform == 'win32': | |
| 137 return 'win' | |
| 138 if sys.platform.startswith('linux'): | |
| 139 return 'linux' | |
| 140 raise RuntimeError('Unknown platform') | |
| 141 | |
| 142 | |
| 143 def PrintPerfResult(graph_name, series_name, data_point, units, | |
| 144 show_on_waterfall=False): | |
| 145 """Prints a line to stdout that is specially formatted for the perf bots. | |
| 146 | |
| 147 Args: | |
| 148 graph_name: String name for the graph on which to plot the data. | |
| 149 series_name: String name for the series (line on the graph) associated with | |
| 150 the data. This is also the string displayed on the waterfall | |
| 151 if |show_on_waterfall| is True. | |
| 152 data_point: Numeric data value to plot on the graph for the current build. | |
| 153 This can be a single value or an array of values. If an array, | |
| 154 the graph will plot the average of the values, along with error | |
| 155 bars. | |
| 156 units: The string unit of measurement for the given |data_point|. | |
| 157 show_on_waterfall: Whether or not to display this result directly on the | |
| 158 buildbot waterfall itself (in the buildbot step running | |
| 159 this test on the waterfall page, not the stdio page). | |
| 160 """ | |
| 161 waterfall_indicator = ['', '*'][show_on_waterfall] | |
| 162 print '%sRESULT %s: %s= %s %s' % ( | |
| 163 waterfall_indicator, graph_name, series_name, | |
| 164 str(data_point).replace(' ', ''), units) | |
| 165 sys.stdout.flush() | |
| 166 | |
| 167 | |
| 168 def Shard(ilist, shard_index, num_shards): | |
| 169 """Shard a given list and return the group at index |shard_index|. | |
| 170 | |
| 171 Args: | |
| 172 ilist: input list | |
| 173 shard_index: 0-based sharding index | |
| 174 num_shards: shard count | |
| 175 """ | |
| 176 chunk_size = len(ilist) / num_shards | |
| 177 chunk_start = shard_index * chunk_size | |
| 178 if shard_index == num_shards - 1: # Exhaust the remainder in the last shard. | |
| 179 chunk_end = len(ilist) | |
| 180 else: | |
| 181 chunk_end = chunk_start + chunk_size | |
| 182 return ilist[chunk_start:chunk_end] | |
| 183 | |
| 184 | |
| 185 def WaitForDomElement(pyauto, driver, xpath): | |
| 186 """Wait for the UI element to appear. | |
| 187 | |
| 188 Args: | |
| 189 pyauto: an instance of pyauto.PyUITest. | |
| 190 driver: an instance of chrome driver or a web element. | |
| 191 xpath: the xpath of the element to wait for. | |
| 192 | |
| 193 Returns: | |
| 194 The element if it is found. | |
| 195 NoSuchElementException if it is not found. | |
| 196 """ | |
| 197 pyauto.WaitUntil(lambda: len(driver.find_elements_by_xpath(xpath)) > 0) | |
| 198 return driver.find_element_by_xpath(xpath) | |
| 199 | |
| 200 | |
| 201 def DoesUrlExist(url): | |
| 202 """Determines whether a resource exists at the given URL. | |
| 203 | |
| 204 Args: | |
| 205 url: URL to be verified. | |
| 206 | |
| 207 Returns: | |
| 208 True if url exists, otherwise False. | |
| 209 """ | |
| 210 parsed = urlparse.urlparse(url) | |
| 211 try: | |
| 212 conn = httplib.HTTPConnection(parsed.netloc) | |
| 213 conn.request('HEAD', parsed.path) | |
| 214 response = conn.getresponse() | |
| 215 except (socket.gaierror, socket.error): | |
| 216 return False | |
| 217 finally: | |
| 218 conn.close() | |
| 219 # Follow both permanent (301) and temporary (302) redirects. | |
| 220 if response.status == 302 or response.status == 301: | |
| 221 return DoesUrlExist(response.getheader('location')) | |
| 222 return response.status == 200 | |
| 223 | |
| 224 | |
| 225 class _GTestTextTestResult(unittest._TextTestResult): | |
| 226 """A test result class that can print formatted text results to a stream. | |
| 227 | |
| 228 Results printed in conformance with gtest output format, like: | |
| 229 [ RUN ] autofill.AutofillTest.testAutofillInvalid: "test desc." | |
| 230 [ OK ] autofill.AutofillTest.testAutofillInvalid | |
| 231 [ RUN ] autofill.AutofillTest.testFillProfile: "test desc." | |
| 232 [ OK ] autofill.AutofillTest.testFillProfile | |
| 233 [ RUN ] autofill.AutofillTest.testFillProfileCrazyCharacters: "Test." | |
| 234 [ OK ] autofill.AutofillTest.testFillProfileCrazyCharacters | |
| 235 """ | |
| 236 | |
| 237 def __init__(self, stream, descriptions, verbosity): | |
| 238 unittest._TextTestResult.__init__(self, stream, descriptions, verbosity) | |
| 239 | |
| 240 def _GetTestURI(self, test): | |
| 241 if sys.version_info[:2] <= (2, 4): | |
| 242 return '%s.%s' % (unittest._strclass(test.__class__), | |
| 243 test._TestCase__testMethodName) | |
| 244 return '%s.%s' % (unittest._strclass(test.__class__), test._testMethodName) | |
| 245 | |
| 246 def getDescription(self, test): | |
| 247 return '%s: "%s"' % (self._GetTestURI(test), test.shortDescription()) | |
| 248 | |
| 249 def startTest(self, test): | |
| 250 unittest.TestResult.startTest(self, test) | |
| 251 self.stream.writeln('[ RUN ] %s' % self.getDescription(test)) | |
| 252 | |
| 253 def addSuccess(self, test): | |
| 254 unittest.TestResult.addSuccess(self, test) | |
| 255 self.stream.writeln('[ OK ] %s' % self._GetTestURI(test)) | |
| 256 | |
| 257 def addError(self, test, err): | |
| 258 unittest.TestResult.addError(self, test, err) | |
| 259 self.stream.writeln('[ ERROR ] %s' % self._GetTestURI(test)) | |
| 260 | |
| 261 def addFailure(self, test, err): | |
| 262 unittest.TestResult.addFailure(self, test, err) | |
| 263 self.stream.writeln('[ FAILED ] %s' % self._GetTestURI(test)) | |
| 264 | |
| 265 | |
| 266 class GTestTextTestRunner(unittest.TextTestRunner): | |
| 267 """Test Runner for displaying test results in textual format. | |
| 268 | |
| 269 Results are displayed in conformance with gtest output. | |
| 270 """ | |
| 271 | |
| 272 def __init__(self, verbosity=1): | |
| 273 unittest.TextTestRunner.__init__(self, stream=sys.stderr, | |
| 274 verbosity=verbosity) | |
| 275 | |
| 276 def _makeResult(self): | |
| 277 return _GTestTextTestResult(self.stream, self.descriptions, self.verbosity) | |
| OLD | NEW |