Index: saft_utility.py |
diff --git a/saft_utility.py b/saft_utility.py |
index ccdd9c297077d1959121f4b3ffcf1f2d63585271..88e01ecf8f67d42848d99e7f0581c460f23063e0 100755 |
--- a/saft_utility.py |
+++ b/saft_utility.py |
@@ -20,6 +20,7 @@ import chromeos_interface |
import flashrom_handler |
import kernel_handler |
import saft_flashrom_util |
+import tpm_handler |
# |
# We need to know the names of two files: |
@@ -73,6 +74,7 @@ cd "${mount_point}%s" |
X_PORT=1 |
X :"${X_PORT}" -config saft.xorg.conf vt02 > /dev/null 2>&1 & |
xpid="$!" |
+initctl stop update-engine # Prevent messing with cgpt attributes |
export DISPLAY=":${X_PORT}.0" |
./%s --next_step |
kill "${xpid}" |
@@ -161,6 +163,7 @@ class FirmwareTest(object): |
self.kern_handler = None |
self.step_failed = False |
self.window = None |
+ self.tpm_handler = None |
def _verify_fw_id(self, compare_to_file): |
'''Verify if the current firmware ID matches the contents a file. |
@@ -273,7 +276,8 @@ class FirmwareTest(object): |
self.chros_if.init(os.path.join(state_root, STATE_SUBDIR), LOG_FILE) |
- def init(self, progname, chros_if, kern_handler=None, state_sequence=None): |
+ def init(self, progname, chros_if, |
+ kern_handler=None, state_sequence=None, tpm_handler=None): |
'''Initialize the Firmware self test instance. |
progname - a string, name of this program as it was invoked. |
@@ -297,12 +301,30 @@ class FirmwareTest(object): |
if kern_handler: |
self.kern_handler = kern_handler |
self.kern_handler.init(self.chros_if) |
+ if tpm_handler: |
+ self.tpm_handler = tpm_handler |
+ self.tpm_handler.init(self.chros_if) |
+ |
+ if self.kern_handler and self.tpm_handler: |
+ if not self.chros_if.is_removable_device(self.base_partition): |
+ # On each non USB flash rooted start confirm that kernel |
+ # versions and TPM setting match. |
+ vers_a = self.kern_handler.get_version('a') |
+ vers_b = self.kern_handler.get_version('b') |
+ if not self.tpm_handler.kernel_version_good(vers_a, vers_b): |
+ raise FwError('TPM kernel version mismatch (%d %d %s)' % ( |
+ vers_a, vers_b, self.base_partition)) |
def set_try_fw_b(self): |
'''Request running firmware B on the next restart.''' |
self.chros_if.log('Requesting restart with FW B') |
self.chros_if.run_shell_command('reboot_mode --try_firmware_b=1') |
+ def request_recovery_boot(self): |
+ '''Request running in recovery mode on the restart.''' |
+ self.chros_if.log('Requesting restart in recovery mode') |
+ self.chros_if.run_shell_command('reboot_mode --recovery=1') |
+ |
@allow_multiple_section_input |
def restore_firmware(self, section): |
'''Restore the requested firmware section (previously corrupted).''' |
@@ -363,12 +385,71 @@ class FirmwareTest(object): |
self.step_failed = True |
break |
+ def move_kernel_backward(self, section): |
+ '''Decrement kernel version for the requested section.''' |
+ new_version = self.kern_handler.get_version(section) - 1 |
+ self.chros_if.log( |
+ 'setting section %s version to %d' % (section, new_version)) |
+ self.kern_handler.set_version(section, new_version) |
+ |
+ def jump_kernels_forward(self): |
+ '''Add two to both kernels' versions. |
+ |
+ This compensates for the previous decrement and increases the version |
+ number by one as compared to the original state (at the start of the |
+ test). |
+ ''' |
+ for section in ('a', 'b'): |
+ new_version = self.kern_handler.get_version(section) + 2 |
+ self.chros_if.log( |
+ 'setting section %s version to %d' % (section, new_version)) |
+ self.kern_handler.set_version(section, new_version) |
+ |
+ def terminate_tpm_tests(self): |
+ '''Restore TPM and kernel states. |
+ |
+ Move kernel versions to where they were before SAFT started, set the |
+ TPM kernel version to the biggest of the two kernels and prevent |
+ future TPM write accesses by restoring the upstart file. |
+ ''' |
+ new_tpm_version = 0 |
+ for section in ('a', 'b'): |
+ new_version = self.kern_handler.get_version(section) - 1 |
+ self.chros_if.log( |
+ 'setting section %s version to %d' % (section, new_version)) |
+ self.kern_handler.set_version(section, new_version) |
+ new_tpm_version = max(new_version, new_tpm_version) |
+ self.tpm_handler.set_kernel_version(new_tpm_version) |
+ self.tpm_handler.disable_write_access() |
+ |
def revert_firmware(self): |
'''Restore firmware to the image backed up when SAFT started.''' |
self.chros_if.log('restoring original firmware image') |
self.chros_if.run_shell_command( |
'flashrom -w %s' % self.chros_if.state_dir_file(FW_BACKUP_FILE)) |
+ def new_fw_image(self, image_file = None): |
+ FLASHROM_HANDLER.new_image(image_file) |
+ FLASHROM_HANDLER.verify_image() |
+ if self.tpm_handler: |
+ vers_a = FLASHROM_HANDLER.get_section_version('a') |
+ vers_b = FLASHROM_HANDLER.get_section_version('b') |
+ if not self.tpm_handler.fw_version_good(vers_a, vers_b): |
+ raise FwError('TPM firmware version mismatch') |
+ |
+ def prepare_tpm_tests(self): |
+ '''Prepare TPM for testing. |
+ |
+ Enable write access on the next reboot, and roll back the kernel we |
+ are running now (the other kernel is expected to be used on the next |
+ restart). |
+ ''' |
+ mount_point = self.chros_if.run_shell_command_get_output( |
+ 'df %s' % self.mydir)[-1].split()[-1] |
+ cfg_file = os.path.join(mount_point, 'etc/init/tcsd.conf') |
+ self.tpm_handler.enable_write_access(cfg_file) |
+ self.move_kernel_backward('a') |
+ |
def init_fw_test(self, opt_dictionary, chros_if): |
'''Prepare firmware test context. |
@@ -381,21 +462,20 @@ class FirmwareTest(object): |
''' |
chros_if.init_environment() |
chros_if.log('Automated firmware test log generated on %s' % ( |
- datetime.datetime.strftime(datetime.datetime.now(), |
- '%b %d %Y'))) |
+ datetime.datetime.strftime( |
+ datetime.datetime.now(), '%b %d %Y'))) |
chros_if.log('Original boot state %s' % chros_if.boot_state_vector()) |
self.chros_if = chros_if |
fw_image = opt_dictionary['image_file'] |
- FLASHROM_HANDLER.new_image() |
- FLASHROM_HANDLER.verify_image() |
+ self.new_fw_image() |
FLASHROM_HANDLER.dump_whole( |
self.chros_if.state_dir_file(FW_BACKUP_FILE)) |
- FLASHROM_HANDLER.new_image(fw_image) |
- FLASHROM_HANDLER.verify_image() |
+ self.new_fw_image(fw_image) |
self._handle_saft_script(True) |
shutil.copyfile(self.chros_if.acpi_file('FWID'), |
self.chros_if.state_dir_file(FWID_BACKUP_FILE)) |
shutil.copyfile(fw_image, self.chros_if.state_dir_file(FW_COPY_FILE)) |
+ |
self._set_step(0) |
def next_step(self): |
@@ -427,7 +507,7 @@ class FirmwareTest(object): |
expected_vector = test_state_tuple[0] |
action = test_state_tuple[1] |
boot_vector = self.chros_if.boot_state_vector() |
- self.chros_if.log('Rebooted into state %s on step %d' % ( |
+ self.chros_if.log('\nRebooted into state %s on step %d' % ( |
boot_vector, this_step)) |
conf_log_file = RetriveSaftConfDefinion(CONF_LOG_VAR_NAME) |
@@ -509,16 +589,22 @@ FST = FirmwareTest() |
# function. |
TEST_STATE_SEQUENCE = ( |
- ('1:1:1:0:3', FST.set_try_fw_b), |
+ ('1:1:1:0:3', FST.set_try_fw_b), # Step 0 |
('1:2:1:0:3', None), |
('1:1:1:0:3', FST.corrupt_firmware, 'a'), |
('1:2:1:0:3', FST.restore_firmware, 'a'), |
('1:1:1:0:3', FST.corrupt_firmware, ('a', 'b')), |
- ('5:0:1:1:3', FST.restore_firmware, ('a', 'b')), |
+ ('5:0:1:1:3', FST.restore_firmware, ('a', 'b')), # Step 5 |
('1:1:1:0:3', FST.corrupt_kernel, 'a'), |
('1:1:1:0:5', FST.corrupt_kernel, 'b'), |
('6:0:1:1:3', FST.restore_kernel, ('a', 'b')), |
- ('1:1:1:0:3', FST.cgpt_test_start), |
+ ('1:1:1:0:3', FST.request_recovery_boot), |
+ ('8:0:1:1:3', FST.prepare_tpm_tests), # Step 10 |
+ ('1:1:1:0:5', FST.move_kernel_backward, 'b'), |
+ ('6:0:1:1:3', FST.jump_kernels_forward), |
+ ('1:1:1:0:3', FST.request_recovery_boot), |
+ ('8:0:1:1:3', FST.terminate_tpm_tests), |
+ ('1:1:1:0:3', FST.cgpt_test_start), # Step 15 |
('1:1:1:0:3', FST.cgpt_test_1), |
('1:1:1:0:3', FST.revert_firmware), |
('1:1:1:0:3', None), |
@@ -602,7 +688,8 @@ def main(argv): |
opt_dictionary[name.lstrip('-')] = value |
FST.init(argv[0], CHROS_IF, |
- kernel_handler.KernelHandler(), TEST_STATE_SEQUENCE) |
+ kernel_handler.KernelHandler(), TEST_STATE_SEQUENCE, |
+ tpm_handler.TpmHandler()) |
FLASHROM_HANDLER.init(saft_flashrom_util, CHROS_IF, |
opt_dictionary.get('pub_key')) |