Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1655)

Unified Diff: bin/cros_au_test_harness.py

Issue 6597122: Refactor au_test_harness into modules and refactor to use worker design. (Closed) Base URL: http://git.chromium.org/git/crosutils.git@master
Patch Set: 80 char Created 9 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« bin/au_test_harness/vm_au_worker.py ('K') | « bin/cros_au_test_harness ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: bin/cros_au_test_harness.py
diff --git a/bin/cros_au_test_harness.py b/bin/cros_au_test_harness.py
deleted file mode 100755
index 5822a772989eadfffaa6102f97da37d7bd7eecb8..0000000000000000000000000000000000000000
--- a/bin/cros_au_test_harness.py
+++ /dev/null
@@ -1,1079 +0,0 @@
-#!/usr/bin/python
-
-# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""This module runs a suite of Auto Update tests.
-
- The tests can be run on either a virtual machine or actual device depending
- on parameters given. Specific tests can be run by invoking --test_prefix.
- Verbose is useful for many of the tests if you want to see individual commands
- being run during the update process.
-"""
-
-import optparse
-import os
-import re
-import shutil
-import subprocess
-import sys
-import tempfile
-import threading
-import time
-import unittest
-import urllib
-
-sys.path.append(os.path.join(os.path.dirname(__file__), '../lib'))
-from cros_build_lib import Die
-from cros_build_lib import GetIPAddress
-from cros_build_lib import Info
-from cros_build_lib import ReinterpretPathForChroot
-from cros_build_lib import RunCommand
-from cros_build_lib import RunCommandCaptureOutput
-from cros_build_lib import Warning
-
-import cros_test_proxy
-
-global dev_server_cache
-
-
-class UpdateException(Exception):
- """Exception thrown when _UpdateImage or _UpdateUsingPayload fail"""
- def __init__(self, code, stdout):
- self.code = code
- self.stdout = stdout
-
-
-class AUTest(object):
- """Abstract interface that defines an Auto Update test."""
- verbose = False
-
- def setUp(self):
- unittest.TestCase.setUp(self)
- # Set these up as they are used often.
- self.crosutils = os.path.join(os.path.dirname(__file__), '..')
- self.crosutilsbin = os.path.join(os.path.dirname(__file__))
- self.download_folder = os.path.join(self.crosutils, 'latest_download')
- self.vm_image_path = None
- if not os.path.exists(self.download_folder):
- os.makedirs(self.download_folder)
-
- # -------- Helper functions ---------
-
- def _PrepareRealBase(self, image_path):
- self.PerformUpdate(image_path)
-
- def _PrepareVMBase(self, image_path):
- # VM Constants.
- FULL_VDISK_SIZE = 6072
- FULL_STATEFULFS_SIZE = 3074
- # Needed for VM delta updates. We need to use the qemu image rather
- # than the base image on a first update. By tracking the first_update
- # we can set src_image to the qemu form of the base image when
- # performing generating the delta payload.
- self._first_update = True
- self.vm_image_path = '%s/chromiumos_qemu_image.bin' % os.path.dirname(
- image_path)
- if not os.path.exists(self.vm_image_path):
- Info('Creating %s' % self.vm_image_path)
- RunCommand(['%s/image_to_vm.sh' % self.crosutils,
- '--full',
- '--from=%s' % ReinterpretPathForChroot(
- os.path.dirname(image_path)),
- '--vdisk_size=%s' % FULL_VDISK_SIZE,
- '--statefulfs_size=%s' % FULL_STATEFULFS_SIZE,
- '--board=%s' % self.board,
- '--test_image'], enter_chroot=True)
-
- Info('Using %s as base' % self.vm_image_path)
- self.assertTrue(os.path.exists(self.vm_image_path))
-
- def AppendUpdateFlags(self, cmd, image_path, src_image_path, proxy_port,
- private_key_path):
- """Appends common args to an update cmd defined by an array.
-
- Modifies cmd in places by appending appropriate items given args.
- """
- if proxy_port: cmd.append('--proxy_port=%s' % proxy_port)
-
- # Get pregenerated update if we have one.
- update_id = _GenerateUpdateId(target=image_path, src=src_image_path,
- key=private_key_path)
- cache_path = dev_server_cache[update_id]
- if cache_path:
- update_url = DevServerWrapper.GetDevServerURL(proxy_port, cache_path)
- cmd.append('--update_url=%s' % update_url)
- else:
- cmd.append('--image=%s' % image_path)
- if src_image_path: cmd.append('--src_image=%s' % src_image_path)
-
- def RunUpdateCmd(self, cmd):
- """Runs the given update cmd given verbose options.
-
- Raises an UpdateException if the update fails.
- """
- if self.verbose:
- try:
- RunCommand(cmd)
- except Exception, e:
- raise UpdateException(1, e.message)
- else:
- (code, stdout, stderr) = RunCommandCaptureOutput(cmd)
- if code != 0:
- raise UpdateException(code, stdout)
-
- def GetStatefulChangeFlag(self, stateful_change):
- """Returns the flag to pass to image_to_vm for the stateful change."""
- stateful_change_flag = ''
- if stateful_change:
- stateful_change_flag = '--stateful_update_flag=%s' % stateful_change
-
- return stateful_change_flag
-
- def _ParseGenerateTestReportOutput(self, output):
- """Returns the percentage of tests that passed based on output."""
- percent_passed = 0
- lines = output.split('\n')
-
- for line in lines:
- if line.startswith("Total PASS:"):
- # FORMAT: ^TOTAL PASS: num_passed/num_total (percent%)$
- percent_passed = line.split()[3].strip('()%')
- Info('Percent of tests passed %s' % percent_passed)
- break
-
- return int(percent_passed)
-
- def AssertEnoughTestsPassed(self, unittest, output, percent_required_to_pass):
- """Helper function that asserts a sufficient number of tests passed.
-
- Args:
- unittest: Handle to the unittest.
- output: stdout from a test run.
- percent_required_to_pass: percentage required to pass. This should be
- fall between 0-100.
- Returns:
- percent that passed.
- """
- Info('Output from VerifyImage():')
- print >> sys.stderr, output
- sys.stderr.flush()
- percent_passed = self._ParseGenerateTestReportOutput(output)
- Info('Percent passed: %d vs. Percent required: %d' % (
- percent_passed, percent_required_to_pass))
- unittest.assertTrue(percent_passed >= percent_required_to_pass)
- return percent_passed
-
- def PerformUpdate(self, image_path, src_image_path='', stateful_change='old',
- proxy_port=None, private_key_path=None):
- """Performs an update using _UpdateImage and reports any error.
-
- Subclasses should not override this method but override _UpdateImage
- instead.
-
- Args:
- image_path: Path to the image to update with. This image must be a test
- image.
- src_image_path: Optional. If set, perform a delta update using the
- image specified by the path as the source image.
- stateful_change: How to modify the stateful partition. Values are:
- 'old': Don't modify stateful partition. Just update normally.
- 'clean': Uses clobber-state to wipe the stateful partition with the
- exception of code needed for ssh.
- proxy_port: Port to have the client connect to. For use with
- CrosTestProxy.
- Raises an UpdateException if _UpdateImage returns an error.
- """
- try:
- if not self.use_delta_updates: src_image_path = ''
- if private_key_path:
- key_to_use = private_key_path
- else:
- key_to_use = self.private_key
-
- self._UpdateImage(image_path, src_image_path, stateful_change, proxy_port,
- key_to_use)
- except UpdateException as err:
- # If the update fails, print it out
- Warning(err.stdout)
- raise
-
- def AttemptUpdateWithPayloadExpectedFailure(self, payload, expected_msg):
- """Attempt a payload update, expect it to fail with expected log"""
- try:
- self._UpdateUsingPayload(payload)
- except UpdateException as err:
- # Will raise ValueError if expected is not found.
- if re.search(re.escape(expected_msg), err.stdout, re.MULTILINE):
- return
- else:
- Warning("Didn't find '%s' in:" % expected_msg)
- Warning(err.stdout)
-
- self.fail('We managed to update when failure was expected')
-
- def AttemptUpdateWithFilter(self, filter, proxy_port=8081):
- """Update through a proxy, with a specified filter, and expect success."""
-
- self.PrepareBase(self.target_image_path)
-
- # The devserver runs at port 8080 by default. We assume that here, and
- # start our proxy at a different. We then tell our update tools to
- # have the client connect to our proxy_port instead of 8080.
- proxy = cros_test_proxy.CrosTestProxy(port_in=proxy_port,
- address_out='127.0.0.1',
- port_out=8080,
- filter=filter)
- proxy.serve_forever_in_thread()
-
- # This update is expected to fail...
- try:
- self.PerformUpdate(self.target_image_path, self.target_image_path,
- proxy_port=proxy_port)
- finally:
- proxy.shutdown()
-
- # -------- Functions that subclasses should override ---------
-
- @classmethod
- def ProcessOptions(cls, parser, options):
- """Processes options.
-
- Static method that should be called from main. Subclasses should also
- call their parent method if they override it.
- """
- cls.verbose = options.verbose
- cls.base_image_path = options.base_image
- cls.target_image_path = options.target_image
- cls.use_delta_updates = options.delta
- cls.board = options.board
- cls.private_key = options.private_key
- cls.clean = options.clean
- if options.quick_test:
- cls.verify_suite = 'build_RootFilesystemSize'
- else:
- cls.verify_suite = 'suite_Smoke'
-
- # Sanity checks.
- if not cls.base_image_path:
- parser.error('Need path to base image for vm.')
- elif not os.path.exists(cls.base_image_path):
- Die('%s does not exist' % cls.base_image_path)
-
- if not cls.target_image_path:
- parser.error('Need path to target image to update with.')
- elif not os.path.exists(cls.target_image_path):
- Die('%s does not exist' % cls.target_image_path)
-
- def PrepareBase(self, image_path):
- """Prepares target with base_image_path."""
- pass
-
- def _UpdateImage(self, image_path, src_image_path='', stateful_change='old',
- proxy_port=None, private_key_path=None):
- """Implementation of an actual update.
-
- See PerformUpdate for description of args. Subclasses must override this
- method with the correct update procedure for the class.
- """
- pass
-
- def _UpdateUsingPayload(self, update_path, stateful_change='old',
- proxy_port=None):
- """Updates target with the pre-generated update stored in update_path.
-
- Subclasses must override this method with the correct update procedure for
- the class.
-
- Args:
- update_path: Path to the image to update with. This directory should
- contain both update.gz, and stateful.image.gz
- proxy_port: Port to have the client connect to. For use with
- CrosTestProxy.
- """
- pass
-
- def VerifyImage(self, percent_required_to_pass):
- """Verifies the image with tests.
-
- Verifies that the test images passes the percent required. Subclasses must
- override this method with the correct update procedure for the class.
-
- Args:
- percent_required_to_pass: percentage required to pass. This should be
- fall between 0-100.
-
- Returns:
- Returns the percent that passed.
- """
- pass
-
- # -------- Tests ---------
-
- def testUpdateKeepStateful(self):
- """Tests if we can update normally.
-
- This test checks that we can update by updating the stateful partition
- rather than wiping it.
- """
- # Just make sure some tests pass on original image. Some old images
- # don't pass many tests.
- self.PrepareBase(self.base_image_path)
- # TODO(sosa): move to 100% once we start testing using the autotest paired
- # with the dev channel.
- percent_passed = self.VerifyImage(10)
-
- # Update to - all tests should pass on new image.
- self.PerformUpdate(self.target_image_path, self.base_image_path)
- self.VerifyImage(100)
-
- # Update from - same percentage should pass that originally passed.
- self.PerformUpdate(self.base_image_path, self.target_image_path)
- self.VerifyImage(percent_passed)
-
- def testUpdateWipeStateful(self):
- """Tests if we can update after cleaning the stateful partition.
-
- This test checks that we can update successfully after wiping the
- stateful partition.
- """
- # Just make sure some tests pass on original image. Some old images
- # don't pass many tests.
- self.PrepareBase(self.base_image_path)
- # TODO(sosa): move to 100% once we start testing using the autotest paired
- # with the dev channel.
- percent_passed = self.VerifyImage(10)
-
- # Update to - all tests should pass on new image.
- self.PerformUpdate(self.target_image_path, self.base_image_path, 'clean')
- self.VerifyImage(100)
-
- # Update from - same percentage should pass that originally passed.
- self.PerformUpdate(self.base_image_path, self.target_image_path, 'clean')
- self.VerifyImage(percent_passed)
-
- # TODO(sosa): Get test to work with verbose.
- def NotestPartialUpdate(self):
- """Tests what happens if we attempt to update with a truncated payload."""
- # Preload with the version we are trying to test.
- self.PrepareBase(self.target_image_path)
-
- # Image can be updated at:
- # ~chrome-eng/chromeos/localmirror/autest-images
- url = 'http://gsdview.appspot.com/chromeos-localmirror/' \
- 'autest-images/truncated_image.gz'
- payload = os.path.join(self.download_folder, 'truncated_image.gz')
-
- # Read from the URL and write to the local file
- urllib.urlretrieve(url, payload)
-
- expected_msg = 'download_hash_data == update_check_response_hash failed'
- self.AttemptUpdateWithPayloadExpectedFailure(payload, expected_msg)
-
- # TODO(sosa): Get test to work with verbose.
- def NotestCorruptedUpdate(self):
- """Tests what happens if we attempt to update with a corrupted payload."""
- # Preload with the version we are trying to test.
- self.PrepareBase(self.target_image_path)
-
- # Image can be updated at:
- # ~chrome-eng/chromeos/localmirror/autest-images
- url = 'http://gsdview.appspot.com/chromeos-localmirror/' \
- 'autest-images/corrupted_image.gz'
- payload = os.path.join(self.download_folder, 'corrupted.gz')
-
- # Read from the URL and write to the local file
- urllib.urlretrieve(url, payload)
-
- # This update is expected to fail...
- expected_msg = 'zlib inflate() error:-3'
- self.AttemptUpdateWithPayloadExpectedFailure(payload, expected_msg)
-
- def testInterruptedUpdate(self):
- """Tests what happens if we interrupt payload delivery 3 times."""
-
- class InterruptionFilter(cros_test_proxy.Filter):
- """This filter causes the proxy to interrupt the download 3 times
-
- It does this by closing the first three connections to transfer
- 2M total in the outbound connection after they transfer the
- 2M.
- """
- def __init__(self):
- """Defines variable shared across all connections"""
- self.close_count = 0
-
- def setup(self):
- """Called once at the start of each connection."""
- self.data_size = 0
-
- def OutBound(self, data):
- """Called once per packet for outgoing data.
-
- The first three connections transferring more than 2M
- outbound will be closed.
- """
- if self.close_count < 3:
- if self.data_size > (2 * 1024 * 1024):
- self.close_count += 1
- return None
-
- self.data_size += len(data)
- return data
-
- self.AttemptUpdateWithFilter(InterruptionFilter(), proxy_port=8082)
-
- def testDelayedUpdate(self):
- """Tests what happens if some data is delayed during update delivery"""
-
- class DelayedFilter(cros_test_proxy.Filter):
- """Causes intermittent delays in data transmission.
-
- It does this by inserting 3 20 second delays when transmitting
- data after 2M has been sent.
- """
- def setup(self):
- """Called once at the start of each connection."""
- self.data_size = 0
- self.delay_count = 0
-
- def OutBound(self, data):
- """Called once per packet for outgoing data.
-
- The first three packets after we reach 2M transferred
- are delayed by 20 seconds.
- """
- if self.delay_count < 3:
- if self.data_size > (2 * 1024 * 1024):
- self.delay_count += 1
- time.sleep(20)
-
- self.data_size += len(data)
- return data
-
- self.AttemptUpdateWithFilter(DelayedFilter(), proxy_port=8083)
-
- def SimpleTest(self):
- """A simple update that updates once from a base image to a target.
-
- We explicitly don't use test prefix so that isn't run by default. Can be
- run using test_prefix option.
- """
- self.PrepareBase(self.base_image_path)
- self.PerformUpdate(self.target_image_path, self.base_image_path)
- self.VerifyImage(100)
-
-
-class RealAUTest(unittest.TestCase, AUTest):
- """Test harness for updating real images."""
-
- def setUp(self):
- AUTest.setUp(self)
-
- @classmethod
- def ProcessOptions(cls, parser, options):
- """Processes non-vm-specific options."""
- AUTest.ProcessOptions(parser, options)
- cls.remote = options.remote
-
- if not cls.remote:
- parser.error('We require a remote address for real tests.')
-
- def PrepareBase(self, image_path):
- """Auto-update to base image to prepare for test."""
- self._PrepareRealBase(image_path)
-
- def _UpdateImage(self, image_path, src_image_path='', stateful_change='old',
- proxy_port=None, private_key_path=None):
- """Updates a remote image using image_to_live.sh."""
- stateful_change_flag = self.GetStatefulChangeFlag(stateful_change)
- cmd = ['%s/image_to_live.sh' % self.crosutils,
- '--remote=%s' % self.remote,
- stateful_change_flag,
- '--verify',
- ]
- self.AppendUpdateFlags(cmd, image_path, src_image_path, proxy_port,
- private_key_path)
- self.RunUpdateCmd(cmd)
-
- def _UpdateUsingPayload(self, update_path, stateful_change='old',
- proxy_port=None):
- """Updates a remote image using image_to_live.sh."""
- stateful_change_flag = self.GetStatefulChangeFlag(stateful_change)
- cmd = ['%s/image_to_live.sh' % self.crosutils,
- '--payload=%s' % update_path,
- '--remote=%s' % self.remote,
- stateful_change_flag,
- '--verify',
- ]
- if proxy_port: cmd.append('--proxy_port=%s' % proxy_port)
- self.RunUpdateCmd(cmd)
-
- def VerifyImage(self, percent_required_to_pass):
- """Verifies an image using run_remote_tests.sh with verification suite."""
- output = RunCommand([
- '%s/run_remote_tests.sh' % self.crosutils,
- '--remote=%s' % self.remote,
- self.verify_suite,
- ], error_ok=True, enter_chroot=False, redirect_stdout=True)
- return self.AssertEnoughTestsPassed(self, output, percent_required_to_pass)
-
-
-class VirtualAUTest(unittest.TestCase, AUTest):
- """Test harness for updating virtual machines."""
-
- # Class variables used to acquire individual VM variables per test.
- _vm_lock = threading.Lock()
- _next_port = 9222
-
- def _KillExistingVM(self, pid_file):
- if os.path.exists(pid_file):
- Warning('Existing %s found. Deleting and killing process' %
- pid_file)
- RunCommand(['./cros_stop_vm', '--kvm_pid=%s' % pid_file],
- cwd=self.crosutilsbin)
-
- assert not os.path.exists(pid_file)
-
- def _AcquireUniquePortAndPidFile(self):
- """Acquires unique ssh port and pid file for VM."""
- with VirtualAUTest._vm_lock:
- self._ssh_port = VirtualAUTest._next_port
- self._kvm_pid_file = '/tmp/kvm.%d' % self._ssh_port
- VirtualAUTest._next_port += 1
-
- def setUp(self):
- """Unit test overriden method. Is called before every test."""
- AUTest.setUp(self)
- self._AcquireUniquePortAndPidFile()
- self._KillExistingVM(self._kvm_pid_file)
-
- def tearDown(self):
- self._KillExistingVM(self._kvm_pid_file)
-
- @classmethod
- def ProcessOptions(cls, parser, options):
- """Processes vm-specific options."""
- AUTest.ProcessOptions(parser, options)
-
- # Communicate flags to tests.
- cls.graphics_flag = ''
- if options.no_graphics: cls.graphics_flag = '--no_graphics'
- if not cls.board: parser.error('Need board to convert base image to vm.')
-
- def PrepareBase(self, image_path):
- """Creates an update-able VM based on base image."""
- self._PrepareVMBase(image_path)
-
- def _UpdateImage(self, image_path, src_image_path='', stateful_change='old',
- proxy_port='', private_key_path=None):
- """Updates VM image with image_path."""
- stateful_change_flag = self.GetStatefulChangeFlag(stateful_change)
- if src_image_path and self._first_update:
- src_image_path = self.vm_image_path
- self._first_update = False
-
- cmd = ['%s/cros_run_vm_update' % self.crosutilsbin,
- '--vm_image_path=%s' % self.vm_image_path,
- '--snapshot',
- self.graphics_flag,
- '--persist',
- '--kvm_pid=%s' % self._kvm_pid_file,
- '--ssh_port=%s' % self._ssh_port,
- stateful_change_flag,
- ]
-
- self.AppendUpdateFlags(cmd, image_path, src_image_path, proxy_port,
- private_key_path)
- self.RunUpdateCmd(cmd)
-
- def _UpdateUsingPayload(self, update_path, stateful_change='old',
- proxy_port=None):
- """Updates a vm image using cros_run_vm_update."""
- stateful_change_flag = self.GetStatefulChangeFlag(stateful_change)
- cmd = ['%s/cros_run_vm_update' % self.crosutilsbin,
- '--payload=%s' % update_path,
- '--vm_image_path=%s' % self.vm_image_path,
- '--snapshot',
- self.graphics_flag,
- '--persist',
- '--kvm_pid=%s' % self._kvm_pid_file,
- '--ssh_port=%s' % self._ssh_port,
- stateful_change_flag,
- ]
- if proxy_port: cmd.append('--proxy_port=%s' % proxy_port)
- self.RunUpdateCmd(cmd)
-
- def VerifyImage(self, percent_required_to_pass):
- """Runs vm smoke suite to verify image."""
- # image_to_live already verifies lsb-release matching. This is just
- # for additional steps.
-
- commandWithArgs = ['%s/cros_run_vm_test' % self.crosutilsbin,
- '--image_path=%s' % self.vm_image_path,
- '--snapshot',
- '--persist',
- '--kvm_pid=%s' % self._kvm_pid_file,
- '--ssh_port=%s' % self._ssh_port,
- self.verify_suite,
- ]
-
- if self.graphics_flag:
- commandWithArgs.append(self.graphics_flag)
-
- output = RunCommand(commandWithArgs, error_ok=True, enter_chroot=False,
- redirect_stdout=True)
- return self.AssertEnoughTestsPassed(self, output, percent_required_to_pass)
-
-
-class PregenerateAUDeltas(unittest.TestCase, AUTest):
- """Magical class that emulates an AUTest to store deltas we will generate.
-
- This class emulates an AUTest such that when it runs as a TestCase it runs
- through the exact up
- """
- delta_list = {}
-
- def setUp(self):
- AUTest.setUp(self)
-
- def tearDown(self):
- pass
-
- @classmethod
- def ProcessOptions(cls, parser, options):
- AUTest.ProcessOptions(parser, options)
- cls.au_type = options.type
-
- def PrepareBase(self, image_path):
- if self.au_type == 'vm':
- self._PrepareVMBase(image_path)
- else:
- self._PrepareRealBase(image_path)
-
- def _UpdateImage(self, image_path, src_image_path='', stateful_change='old',
- proxy_port=None, private_key_path=None):
- if self.au_type == 'vm' and src_image_path and self._first_update:
- src_image_path = self.vm_image_path
- self._first_update = False
-
- # Generate a value that combines delta with private key path.
- val = src_image_path
- if private_key_path: val = '%s+%s' % (val, private_key_path)
- if not self.delta_list.has_key(image_path):
- self.delta_list[image_path] = set([val])
- else:
- self.delta_list[image_path].add(val)
-
- def AttemptUpdateWithPayloadExpectedFailure(self, payload, expected_msg):
- pass
-
- def VerifyImage(self, percent_required_to_pass):
- pass
-
-
-class ParallelJob(threading.Thread):
- """Small wrapper for threading. Thread that releases a semaphores on exit."""
-
- def __init__(self, starting_semaphore, ending_semaphore, target, args):
- """Initializes an instance of a job.
-
- Args:
- starting_semaphore: Semaphore used by caller to wait on such that
- there isn't more than a certain number of threads running. Should
- be initialized to a value for the number of threads wanting to be run
- at a time.
- ending_semaphore: Semaphore is released every time a job ends. Should be
- initialized to 0 before starting first job. Should be acquired once for
- each job. Threading.Thread.join() has a bug where if the run function
- terminates too quickly join() will hang forever.
- target: The func to run.
- args: Args to pass to the fun.
- """
- threading.Thread.__init__(self, target=target, args=args)
- self._target = target
- self._args = args
- self._starting_semaphore = starting_semaphore
- self._ending_semaphore = ending_semaphore
- self._output = None
- self._completed = False
-
- def run(self):
- """Thread override. Runs the method specified and sets output."""
- try:
- self._output = self._target(*self._args)
- finally:
- # Our own clean up.
- self._Cleanup()
- self._completed = True
- # From threading.py to avoid a refcycle.
- del self._target, self._args
-
- def GetOutput(self):
- """Returns the output of the method run."""
- assert self._completed, 'GetOutput called before thread was run.'
- return self._output
-
- def _Cleanup(self):
- """Releases semaphores for a waiting caller."""
- Info('Completed job %s' % self)
- self._starting_semaphore.release()
- self._ending_semaphore.release()
-
- def __str__(self):
- return '%s(%s)' % (self._target, self._args)
-
-
-class DevServerWrapper(threading.Thread):
- """A Simple wrapper around a devserver instance."""
-
- def __init__(self):
- self.proc = None
- threading.Thread.__init__(self)
-
- def run(self):
- # Kill previous running instance of devserver if it exists.
- RunCommand(['sudo', 'pkill', '-f', 'devserver.py'], error_ok=True,
- print_cmd=False)
- RunCommand(['sudo',
- 'start_devserver',
- '--archive_dir=./static',
- '--client_prefix=ChromeOSUpdateEngine',
- '--production',
- ], enter_chroot=True, print_cmd=False)
-
- def Stop(self):
- """Kills the devserver instance."""
- RunCommand(['sudo', 'pkill', '-f', 'devserver.py'], error_ok=True,
- print_cmd=False)
-
- @classmethod
- def GetDevServerURL(cls, port, sub_dir):
- """Returns the dev server url for a given port and sub directory."""
- ip_addr = GetIPAddress()
- if not port: port = 8080
- url = 'http://%(ip)s:%(port)s/%(dir)s' % {'ip': ip_addr,
- 'port': str(port),
- 'dir': sub_dir}
- return url
-
-
-def _GenerateUpdateId(target, src, key):
- """Returns a simple representation id of target and src paths."""
- update_id = target
- if src: update_id = '->'.join([update_id, src])
- if key: update_id = '+'.join([update_id, key])
- return update_id
-
-
-def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args,
- print_status):
-
- """Runs set number of specified jobs in parallel.
-
- Args:
- number_of_simultaneous_jobs: Max number of threads to be run in parallel.
- jobs: Array of methods to run.
- jobs_args: Array of args associated with method calls.
- print_status: True if you'd like this to print out .'s as it runs jobs.
- Returns:
- Returns an array of results corresponding to each thread.
- """
- def _TwoTupleize(x, y):
- return (x, y)
-
- threads = []
- job_start_semaphore = threading.Semaphore(number_of_sumultaneous_jobs)
- join_semaphore = threading.Semaphore(0)
- assert len(jobs) == len(jobs_args), 'Length of args array is wrong.'
-
- # Create the parallel jobs.
- for job, args in map(_TwoTupleize, jobs, jobs_args):
- thread = ParallelJob(job_start_semaphore, join_semaphore, target=job,
- args=args)
- threads.append(thread)
-
- # Cache sudo access.
- RunCommand(['sudo', 'echo', 'Starting test harness'],
- print_cmd=False, redirect_stdout=True, redirect_stderr=True)
-
- # We use a semaphore to ensure we don't run more jobs that required.
- # After each thread finishes, it releases (increments semaphore).
- # Acquire blocks of num jobs reached and continues when a thread finishes.
- for next_thread in threads:
- job_start_semaphore.acquire(blocking=True)
- Info('Starting job %s' % next_thread)
- next_thread.start()
-
- # Wait on the rest of the threads to finish.
- Info('Waiting for threads to complete.')
- for thread in threads:
- while not join_semaphore.acquire(blocking=False):
- time.sleep(5)
- if print_status:
- print >> sys.stderr, '.',
-
- return [thread.GetOutput() for thread in threads]
-
-
-def _PrepareTestSuite(parser, options, test_class):
- """Returns a prepared test suite given by the options and test class."""
- test_class.ProcessOptions(parser, options)
- test_loader = unittest.TestLoader()
- test_loader.testMethodPrefix = options.test_prefix
- return test_loader.loadTestsFromTestCase(test_class)
-
-
-def _PregenerateUpdates(parser, options):
- """Determines all deltas that will be generated and generates them.
-
- This method effectively pre-generates the dev server cache for all tests.
-
- Args:
- parser: parser from main.
- options: options from parsed parser.
- Returns:
- Dictionary of Update Identifiers->Relative cache locations.
- Raises:
- UpdateException if we fail to generate an update.
- """
- def _GenerateVMUpdate(target, src, private_key_path):
- """Generates an update using the devserver."""
- command = ['./enter_chroot.sh',
- '--nogit_config',
- '--',
- 'sudo',
- 'start_devserver',
- '--pregenerate_update',
- '--exit',
- ]
- # Add actual args to command.
- command.append('--image=%s' % ReinterpretPathForChroot(target))
- if src: command.append('--src_image=%s' % ReinterpretPathForChroot(src))
- if options.type == 'vm': command.append('--for_vm')
- if private_key_path:
- command.append('--private_key=%s' %
- ReinterpretPathForChroot(private_key_path))
-
- return RunCommandCaptureOutput(command, combine_stdout_stderr=True,
- print_cmd=True)
-
- # Get the list of deltas by mocking out update method in test class.
- test_suite = _PrepareTestSuite(parser, options, PregenerateAUDeltas)
- test_result = unittest.TextTestRunner(verbosity=0).run(test_suite)
- if not test_result.wasSuccessful():
- raise UpdateException(1, 'Error finding updates to generate.')
-
- Info('The following delta updates are required.')
- update_ids = []
- jobs = []
- args = []
- for target, srcs in PregenerateAUDeltas.delta_list.items():
- for src_key in srcs:
- (src, _ , key) = src_key.partition('+')
- # TODO(sosa): Add private key as part of caching name once devserver can
- # handle it its own cache.
- update_id = _GenerateUpdateId(target=target, src=src, key=key)
- print >> sys.stderr, 'AU: %s' % update_id
- update_ids.append(update_id)
- jobs.append(_GenerateVMUpdate)
- args.append((target, src, key))
-
- raw_results = _RunParallelJobs(options.jobs, jobs, args, print_status=True)
- results = []
-
- # Looking for this line in the output.
- key_line_re = re.compile('^PREGENERATED_UPDATE=([\w/.]+)')
- for result in raw_results:
- (return_code, output, _) = result
- if return_code != 0:
- Warning(output)
- raise UpdateException(return_code, 'Failed to generate all updates.')
- else:
- for line in output.splitlines():
- match = key_line_re.search(line)
- if match:
- # Convert blah/blah/update.gz -> update/blah/blah.
- path_to_update_gz = match.group(1).rstrip()
- (path_to_update_dir, _, _) = path_to_update_gz.rpartition(
- '/update.gz')
- results.append('/'.join(['update', path_to_update_dir]))
- break
-
- # Make sure all generation of updates returned cached locations.
- if len(raw_results) != len(results):
- raise UpdateException(1, 'Insufficient number cache directories returned.')
-
- # Build the dictionary from our id's and returned cache paths.
- cache_dictionary = {}
- for index, id in enumerate(update_ids):
- cache_dictionary[id] = results[index]
-
- return cache_dictionary
-
-
-def _RunTestsInParallel(parser, options, test_class):
- """Runs the tests given by the options and test_class in parallel."""
- threads = []
- args = []
- test_suite = _PrepareTestSuite(parser, options, test_class)
- for test in test_suite:
- test_name = test.id()
- test_case = unittest.TestLoader().loadTestsFromName(test_name)
- threads.append(unittest.TextTestRunner().run)
- args.append(test_case)
-
- results = _RunParallelJobs(options.jobs, threads, args, print_status=False)
- for test_result in results:
- if not test_result.wasSuccessful():
- Die('Test harness was not successful')
-
-
-def InsertPublicKeyIntoImage(image_path, key_path):
- """Inserts public key into image @ static update_engine location."""
- from_dir = os.path.dirname(image_path)
- image = os.path.basename(image_path)
- crosutils_dir = os.path.abspath(__file__).rsplit('/', 2)[0]
- target_key_path = 'usr/share/update_engine/update-payload-key.pub.pem'
-
- # Temporary directories for this function.
- rootfs_dir = tempfile.mkdtemp(suffix='rootfs', prefix='tmp')
- stateful_dir = tempfile.mkdtemp(suffix='stateful', prefix='tmp')
-
- Info('Copying %s into %s' % (key_path, image_path))
- try:
- RunCommand(['./mount_gpt_image.sh',
- '--from=%s' % from_dir,
- '--image=%s' % image,
- '--rootfs_mountpt=%s' % rootfs_dir,
- '--stateful_mountpt=%s' % stateful_dir,
- ], print_cmd=False, redirect_stdout=True,
- redirect_stderr=True, cwd=crosutils_dir)
- path = os.path.join(rootfs_dir, target_key_path)
- dir_path = os.path.dirname(path)
- RunCommand(['sudo', 'mkdir', '--parents', dir_path], print_cmd=False)
- RunCommand(['sudo', 'cp', '--force', '-p', key_path, path],
- print_cmd=False)
- finally:
- # Unmount best effort regardless.
- RunCommand(['./mount_gpt_image.sh',
- '--unmount',
- '--rootfs_mountpt=%s' % rootfs_dir,
- '--stateful_mountpt=%s' % stateful_dir,
- ], print_cmd=False, redirect_stdout=True, redirect_stderr=True,
- cwd=crosutils_dir)
- # Clean up our directories.
- os.rmdir(rootfs_dir)
- os.rmdir(stateful_dir)
-
- RunCommand(['bin/cros_make_image_bootable',
- ReinterpretPathForChroot(from_dir),
- image],
- print_cmd=False, redirect_stdout=True, redirect_stderr=True,
- enter_chroot=True, cwd=crosutils_dir)
-
-
-def CleanPreviousWork(options):
- """Cleans up previous work from the devserver cache and local image cache."""
- Info('Cleaning up previous work.')
- # Wipe devserver cache.
- RunCommandCaptureOutput(
- ['sudo', 'start_devserver', '--clear_cache', '--exit', ],
- enter_chroot=True, print_cmd=False, combine_stdout_stderr=True)
-
- # Clean previous vm images if they exist.
- if options.type == 'vm':
- target_vm_image_path = '%s/chromiumos_qemu_image.bin' % os.path.dirname(
- options.target_image)
- base_vm_image_path = '%s/chromiumos_qemu_image.bin' % os.path.dirname(
- options.base_image)
- if os.path.exists(target_vm_image_path): os.remove(target_vm_image_path)
- if os.path.exists(base_vm_image_path): os.remove(base_vm_image_path)
-
-
-def main():
- parser = optparse.OptionParser()
- parser.add_option('-b', '--base_image',
- help='path to the base image.')
- parser.add_option('-r', '--board',
- help='board for the images.')
- parser.add_option('--clean', default=False, dest='clean', action='store_true',
- help='Clean all previous state')
- parser.add_option('--no_delta', action='store_false', default=True,
- dest='delta',
- help='Disable using delta updates.')
- parser.add_option('--no_graphics', action='store_true',
- help='Disable graphics for the vm test.')
- parser.add_option('-j', '--jobs', default=8, type=int,
- help='Number of simultaneous jobs')
- parser.add_option('--public_key', default=None,
- help='Public key to use on images and updates.')
- parser.add_option('--private_key', default=None,
- help='Private key to use on images and updates.')
- parser.add_option('-q', '--quick_test', default=False, action='store_true',
- help='Use a basic test to verify image.')
- parser.add_option('-m', '--remote',
- help='Remote address for real test.')
- parser.add_option('-t', '--target_image',
- help='path to the target image.')
- parser.add_option('--test_prefix', default='test',
- help='Only runs tests with specific prefix i.e. '
- 'testFullUpdateWipeStateful.')
- parser.add_option('-p', '--type', default='vm',
- help='type of test to run: [vm, real]. Default: vm.')
- parser.add_option('--verbose', default=True, action='store_true',
- help='Print out rather than capture output as much as '
- 'possible.')
- (options, leftover_args) = parser.parse_args()
-
- if leftover_args: parser.error('Found unsupported flags: %s' % leftover_args)
-
- assert options.target_image and os.path.exists(options.target_image), \
- 'Target image path does not exist'
- if not options.base_image:
- Info('Base image not specified. Using target image as base image.')
- options.base_image = options.target_image
-
- # Sanity checks on keys and insert them onto the image. The caches must be
- # cleaned so we know that the vm images and payloads match the possibly new
- # key.
- if options.private_key or options.public_key:
- error_msg = ('Could not find %s key. Both private and public keys must be '
- 'specified if either is specified.')
- assert options.private_key and os.path.exists(options.private_key), \
- error_msg % 'private'
- assert options.public_key and os.path.exists(options.public_key), \
- error_msg % 'public'
- InsertPublicKeyIntoImage(options.target_image, options.public_key)
- InsertPublicKeyIntoImage(options.base_image, options.public_key)
- options.clean = True
-
- # Clean up previous work if requested.
- if options.clean: CleanPreviousWork(options)
-
- # Figure out the test_class.
- if options.type == 'vm': test_class = VirtualAUTest
- elif options.type == 'real': test_class = RealAUTest
- else: parser.error('Could not parse harness type %s.' % options.type)
-
- # Generate cache of updates to use during test harness.
- global dev_server_cache
- dev_server_cache = _PregenerateUpdates(parser, options)
- my_server = DevServerWrapper()
- my_server.start()
- try:
- if options.type == 'vm':
- _RunTestsInParallel(parser, options, test_class)
- else:
- # TODO(sosa) - Take in a machine pool for a real test.
- # Can't run in parallel with only one remote device.
- test_suite = _PrepareTestSuite(parser, options, test_class)
- test_result = unittest.TextTestRunner(verbosity=2).run(test_suite)
- if not test_result.wasSuccessful(): Die('Test harness failed.')
- finally:
- my_server.Stop()
-
-
-if __name__ == '__main__':
- main()
« bin/au_test_harness/vm_au_worker.py ('K') | « bin/cros_au_test_harness ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698