| Index: chrome/test/pyautolib/pyauto_utils.py
|
| ===================================================================
|
| --- chrome/test/pyautolib/pyauto_utils.py (revision 261231)
|
| +++ chrome/test/pyautolib/pyauto_utils.py (working copy)
|
| @@ -1,277 +0,0 @@
|
| -# Copyright (c) 2012 The Chromium Authors. All rights reserved.
|
| -# Use of this source code is governed by a BSD-style license that can be
|
| -# found in the LICENSE file.
|
| -
|
| -"""Utilities for PyAuto."""
|
| -
|
| -import httplib
|
| -import logging
|
| -import os
|
| -import shutil
|
| -import socket
|
| -import sys
|
| -import tempfile
|
| -import unittest
|
| -import urlparse
|
| -import zipfile
|
| -
|
| -
|
| -class ExistingPathReplacer(object):
|
| - """Facilitates backing up a given path (file or dir)..
|
| -
|
| - Often you want to manipulate a directory or file for testing but don't want to
|
| - meddle with the existing contents. This class lets you make a backup, and
|
| - reinstate the backup when done. A backup is made in an adjacent directory,
|
| - so you need to make sure you have write permissions to the parent directory.
|
| -
|
| - Works seemlessly in cases where the requested path already exists, or not.
|
| -
|
| - Automatically reinstates the backed up path (if any) when object is deleted.
|
| - """
|
| - _path = ''
|
| - _backup_dir = None # dir to which existing content is backed up
|
| - _backup_basename = ''
|
| -
|
| - def __init__(self, path, path_type='dir'):
|
| - """Initialize the object, making backups if necessary.
|
| -
|
| - Args:
|
| - path: the requested path to file or directory
|
| - path_type: path type. Options: 'file', 'dir'. Default: 'dir'
|
| - """
|
| - assert path_type in ('file', 'dir'), 'Invalid path_type: %s' % path_type
|
| - self._path_type = path_type
|
| - self._path = path
|
| - if os.path.exists(self._path):
|
| - if 'dir' == self._path_type:
|
| - assert os.path.isdir(self._path), '%s is not a directory' % self._path
|
| - else:
|
| - assert os.path.isfile(self._path), '%s is not a file' % self._path
|
| - # take a backup
|
| - self._backup_basename = os.path.basename(self._path)
|
| - self._backup_dir = tempfile.mkdtemp(dir=os.path.dirname(self._path),
|
| - prefix='bkp-' + self._backup_basename)
|
| - logging.info('Backing up %s in %s' % (self._path, self._backup_dir))
|
| - shutil.move(self._path,
|
| - os.path.join(self._backup_dir, self._backup_basename))
|
| - self._CreateRequestedPath()
|
| -
|
| - def __del__(self):
|
| - """Cleanup. Reinstate backup."""
|
| - self._CleanupRequestedPath()
|
| - if self._backup_dir: # Reinstate, if backed up.
|
| - from_path = os.path.join(self._backup_dir, self._backup_basename)
|
| - logging.info('Reinstating backup from %s to %s' % (from_path, self._path))
|
| - shutil.move(from_path, self._path)
|
| - self._RemoveBackupDir()
|
| -
|
| - def _CreateRequestedPath(self):
|
| - # Create intermediate dirs if needed.
|
| - if not os.path.exists(os.path.dirname(self._path)):
|
| - os.makedirs(os.path.dirname(self._path))
|
| - if 'dir' == self._path_type:
|
| - os.mkdir(self._path)
|
| - else:
|
| - open(self._path, 'w').close()
|
| -
|
| - def _CleanupRequestedPath(self):
|
| - if os.path.exists(self._path):
|
| - if os.path.isdir(self._path):
|
| - shutil.rmtree(self._path, ignore_errors=True)
|
| - else:
|
| - os.remove(self._path)
|
| -
|
| - def _RemoveBackupDir(self):
|
| - if self._backup_dir and os.path.isdir(self._backup_dir):
|
| - shutil.rmtree(self._backup_dir, ignore_errors=True)
|
| -
|
| -
|
| -def RemovePath(path):
|
| - """Remove the given path (file or dir)."""
|
| - if os.path.isdir(path):
|
| - shutil.rmtree(path, ignore_errors=True)
|
| - return
|
| - try:
|
| - os.remove(path)
|
| - except OSError:
|
| - pass
|
| -
|
| -
|
| -def UnzipFilenameToDir(filename, dir):
|
| - """Unzip |filename| to directory |dir|.
|
| -
|
| - This works with as low as python2.4 (used on win).
|
| - """
|
| - zf = zipfile.ZipFile(filename)
|
| - pushd = os.getcwd()
|
| - if not os.path.isdir(dir):
|
| - os.mkdir(dir)
|
| - os.chdir(dir)
|
| - # Extract files.
|
| - for info in zf.infolist():
|
| - name = info.filename
|
| - if name.endswith('/'): # dir
|
| - if not os.path.isdir(name):
|
| - os.makedirs(name)
|
| - else: # file
|
| - dir = os.path.dirname(name)
|
| - if not os.path.isdir(dir):
|
| - os.makedirs(dir)
|
| - out = open(name, 'wb')
|
| - out.write(zf.read(name))
|
| - out.close()
|
| - # Set permissions. Permission info in external_attr is shifted 16 bits.
|
| - os.chmod(name, info.external_attr >> 16L)
|
| - os.chdir(pushd)
|
| -
|
| -
|
| -def GetCurrentPlatform():
|
| - """Get a string representation for the current platform.
|
| -
|
| - Returns:
|
| - 'mac', 'win' or 'linux'
|
| - """
|
| - if sys.platform == 'darwin':
|
| - return 'mac'
|
| - if sys.platform == 'win32':
|
| - return 'win'
|
| - if sys.platform.startswith('linux'):
|
| - return 'linux'
|
| - raise RuntimeError('Unknown platform')
|
| -
|
| -
|
| -def PrintPerfResult(graph_name, series_name, data_point, units,
|
| - show_on_waterfall=False):
|
| - """Prints a line to stdout that is specially formatted for the perf bots.
|
| -
|
| - Args:
|
| - graph_name: String name for the graph on which to plot the data.
|
| - series_name: String name for the series (line on the graph) associated with
|
| - the data. This is also the string displayed on the waterfall
|
| - if |show_on_waterfall| is True.
|
| - data_point: Numeric data value to plot on the graph for the current build.
|
| - This can be a single value or an array of values. If an array,
|
| - the graph will plot the average of the values, along with error
|
| - bars.
|
| - units: The string unit of measurement for the given |data_point|.
|
| - show_on_waterfall: Whether or not to display this result directly on the
|
| - buildbot waterfall itself (in the buildbot step running
|
| - this test on the waterfall page, not the stdio page).
|
| - """
|
| - waterfall_indicator = ['', '*'][show_on_waterfall]
|
| - print '%sRESULT %s: %s= %s %s' % (
|
| - waterfall_indicator, graph_name, series_name,
|
| - str(data_point).replace(' ', ''), units)
|
| - sys.stdout.flush()
|
| -
|
| -
|
| -def Shard(ilist, shard_index, num_shards):
|
| - """Shard a given list and return the group at index |shard_index|.
|
| -
|
| - Args:
|
| - ilist: input list
|
| - shard_index: 0-based sharding index
|
| - num_shards: shard count
|
| - """
|
| - chunk_size = len(ilist) / num_shards
|
| - chunk_start = shard_index * chunk_size
|
| - if shard_index == num_shards - 1: # Exhaust the remainder in the last shard.
|
| - chunk_end = len(ilist)
|
| - else:
|
| - chunk_end = chunk_start + chunk_size
|
| - return ilist[chunk_start:chunk_end]
|
| -
|
| -
|
| -def WaitForDomElement(pyauto, driver, xpath):
|
| - """Wait for the UI element to appear.
|
| -
|
| - Args:
|
| - pyauto: an instance of pyauto.PyUITest.
|
| - driver: an instance of chrome driver or a web element.
|
| - xpath: the xpath of the element to wait for.
|
| -
|
| - Returns:
|
| - The element if it is found.
|
| - NoSuchElementException if it is not found.
|
| - """
|
| - pyauto.WaitUntil(lambda: len(driver.find_elements_by_xpath(xpath)) > 0)
|
| - return driver.find_element_by_xpath(xpath)
|
| -
|
| -
|
| -def DoesUrlExist(url):
|
| - """Determines whether a resource exists at the given URL.
|
| -
|
| - Args:
|
| - url: URL to be verified.
|
| -
|
| - Returns:
|
| - True if url exists, otherwise False.
|
| - """
|
| - parsed = urlparse.urlparse(url)
|
| - try:
|
| - conn = httplib.HTTPConnection(parsed.netloc)
|
| - conn.request('HEAD', parsed.path)
|
| - response = conn.getresponse()
|
| - except (socket.gaierror, socket.error):
|
| - return False
|
| - finally:
|
| - conn.close()
|
| - # Follow both permanent (301) and temporary (302) redirects.
|
| - if response.status == 302 or response.status == 301:
|
| - return DoesUrlExist(response.getheader('location'))
|
| - return response.status == 200
|
| -
|
| -
|
| -class _GTestTextTestResult(unittest._TextTestResult):
|
| - """A test result class that can print formatted text results to a stream.
|
| -
|
| - Results printed in conformance with gtest output format, like:
|
| - [ RUN ] autofill.AutofillTest.testAutofillInvalid: "test desc."
|
| - [ OK ] autofill.AutofillTest.testAutofillInvalid
|
| - [ RUN ] autofill.AutofillTest.testFillProfile: "test desc."
|
| - [ OK ] autofill.AutofillTest.testFillProfile
|
| - [ RUN ] autofill.AutofillTest.testFillProfileCrazyCharacters: "Test."
|
| - [ OK ] autofill.AutofillTest.testFillProfileCrazyCharacters
|
| - """
|
| -
|
| - def __init__(self, stream, descriptions, verbosity):
|
| - unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
|
| -
|
| - def _GetTestURI(self, test):
|
| - if sys.version_info[:2] <= (2, 4):
|
| - return '%s.%s' % (unittest._strclass(test.__class__),
|
| - test._TestCase__testMethodName)
|
| - return '%s.%s' % (unittest._strclass(test.__class__), test._testMethodName)
|
| -
|
| - def getDescription(self, test):
|
| - return '%s: "%s"' % (self._GetTestURI(test), test.shortDescription())
|
| -
|
| - def startTest(self, test):
|
| - unittest.TestResult.startTest(self, test)
|
| - self.stream.writeln('[ RUN ] %s' % self.getDescription(test))
|
| -
|
| - def addSuccess(self, test):
|
| - unittest.TestResult.addSuccess(self, test)
|
| - self.stream.writeln('[ OK ] %s' % self._GetTestURI(test))
|
| -
|
| - def addError(self, test, err):
|
| - unittest.TestResult.addError(self, test, err)
|
| - self.stream.writeln('[ ERROR ] %s' % self._GetTestURI(test))
|
| -
|
| - def addFailure(self, test, err):
|
| - unittest.TestResult.addFailure(self, test, err)
|
| - self.stream.writeln('[ FAILED ] %s' % self._GetTestURI(test))
|
| -
|
| -
|
| -class GTestTextTestRunner(unittest.TextTestRunner):
|
| - """Test Runner for displaying test results in textual format.
|
| -
|
| - Results are displayed in conformance with gtest output.
|
| - """
|
| -
|
| - def __init__(self, verbosity=1):
|
| - unittest.TextTestRunner.__init__(self, stream=sys.stderr,
|
| - verbosity=verbosity)
|
| -
|
| - def _makeResult(self):
|
| - return _GTestTextTestResult(self.stream, self.descriptions, self.verbosity)
|
|
|