| Index: tools/telemetry/telemetry/wpr/archive_info_unittest.py
|
| diff --git a/tools/telemetry/telemetry/wpr/archive_info_unittest.py b/tools/telemetry/telemetry/wpr/archive_info_unittest.py
|
| deleted file mode 100644
|
| index b082aa9ea0365b6f7a6f344c28a37f083bb66bc0..0000000000000000000000000000000000000000
|
| --- a/tools/telemetry/telemetry/wpr/archive_info_unittest.py
|
| +++ /dev/null
|
| @@ -1,224 +0,0 @@
|
| -# Copyright 2012 The Chromium Authors. All rights reserved.
|
| -# Use of this source code is governed by a BSD-style license that can be
|
| -# found in the LICENSE file.
|
| -import json
|
| -import os
|
| -import shutil
|
| -import tempfile
|
| -import unittest
|
| -
|
| -from catapult_base import cloud_storage # pylint: disable=import-error
|
| -
|
| -from telemetry.page import page
|
| -from telemetry.testing import system_stub
|
| -from telemetry.wpr import archive_info
|
| -
|
| -
|
| -class MockPage(page.Page):
|
| - def __init__(self, url, name=None):
|
| - super(MockPage, self).__init__(url, None, name=name)
|
| -
|
| -
|
| -page1 = MockPage('http://www.foo.com/', 'Foo')
|
| -page2 = MockPage('http://www.bar.com/', 'Bar')
|
| -page3 = MockPage('http://www.baz.com/')
|
| -recording1 = 'data_001.wpr'
|
| -recording2 = 'data_002.wpr'
|
| -archive_info_contents = ("""
|
| -{
|
| -"archives": {
|
| - "%s": ["%s", "%s"],
|
| - "%s": ["%s"]
|
| -}
|
| -}
|
| -""" % (recording1, page1.display_name, page2.display_name, recording2,
|
| - page3.display_name))
|
| -
|
| -
|
| -class WprArchiveInfoTest(unittest.TestCase):
|
| - def setUp(self):
|
| - self.tmp_dir = tempfile.mkdtemp()
|
| - # Write the metadata.
|
| - self.story_set_archive_info_file = os.path.join(
|
| - self.tmp_dir, 'info.json')
|
| - with open(self.story_set_archive_info_file, 'w') as f:
|
| - f.write(archive_info_contents)
|
| -
|
| - # Write the existing .wpr files.
|
| - for i in [1, 2]:
|
| - with open(os.path.join(self.tmp_dir, ('data_00%d.wpr' % i)), 'w') as f:
|
| - f.write(archive_info_contents)
|
| -
|
| - # Create the PageSetArchiveInfo object to be tested.
|
| - self.archive_info = archive_info.WprArchiveInfo.FromFile(
|
| - self.story_set_archive_info_file, cloud_storage.PUBLIC_BUCKET)
|
| - # Use cloud_storage system stub.
|
| - self.overrides = system_stub.Override(archive_info, ['cloud_storage'])
|
| -
|
| - def tearDown(self):
|
| - shutil.rmtree(self.tmp_dir)
|
| - self.overrides.Restore()
|
| -
|
| - def assertCorrectHashFile(self, file_path):
|
| - old_ch = cloud_storage.CalculateHash
|
| - cloud_storage.CalculateHash = self.overrides.cloud_storage.CalculateHash
|
| - try:
|
| - self.assertTrue(os.path.exists(file_path + '.sha1'))
|
| - with open(file_path + '.sha1', 'rb') as f:
|
| - self.assertEquals(cloud_storage.CalculateHash(file_path), f.read())
|
| - finally:
|
| - cloud_storage.CalculateHash = old_ch
|
| -
|
| - def testDownloadArchivesIfNeeded(self):
|
| - cloud_storage_stub = self.overrides.cloud_storage
|
| - # Second hash doesn't match, need to fetch it.
|
| - cloud_storage_stub.SetRemotePathsForTesting(
|
| - {cloud_storage.PUBLIC_BUCKET: {recording1: "dummyhash",
|
| - recording2: "dummyhash22"}})
|
| - cloud_storage_stub.SetCalculatedHashesForTesting(
|
| - {os.path.join(self.tmp_dir, recording1): "dummyhash",
|
| - os.path.join(self.tmp_dir, recording2): "dummyhash2",})
|
| - self.archive_info.DownloadArchivesIfNeeded()
|
| - self.assertEquals(len(cloud_storage_stub.downloaded_files), 1)
|
| - self.assertEquals(cloud_storage_stub.downloaded_files[0], recording2)
|
| -
|
| - def testReadingArchiveInfo(self):
|
| - self.assertIsNotNone(self.archive_info.WprFilePathForStory(page1))
|
| - self.assertEquals(recording1, os.path.basename(
|
| - self.archive_info.WprFilePathForStory(page1)))
|
| -
|
| - self.assertIsNotNone(self.archive_info.WprFilePathForStory(page2))
|
| - self.assertEquals(recording1, os.path.basename(
|
| - self.archive_info.WprFilePathForStory(page2)))
|
| -
|
| - self.assertIsNotNone(self.archive_info.WprFilePathForStory(page3))
|
| - self.assertEquals(recording2, os.path.basename(
|
| - self.archive_info.WprFilePathForStory(page3)))
|
| -
|
| - def testArchiveInfoFileGetsUpdated(self):
|
| - """Ensures that the archive info file is updated correctly."""
|
| -
|
| - expected_archive_file_contents = {
|
| - u'description': (u'Describes the Web Page Replay archives for a'
|
| - u' story set. Don\'t edit by hand! Use record_wpr for'
|
| - u' updating.'),
|
| - u'archives': {
|
| - u'data_003.wpr': [u'Bar', u'http://www.baz.com/'],
|
| - u'data_001.wpr': [u'Foo']
|
| - }
|
| - }
|
| -
|
| - new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
|
| - expected_archive_file_path = os.path.join(self.tmp_dir, 'data_003.wpr')
|
| - hash_dictionary = {expected_archive_file_path:'filehash'}
|
| - cloud_storage_stub = self.overrides.cloud_storage
|
| - cloud_storage_stub.SetCalculatedHashesForTesting(hash_dictionary)
|
| - with open(new_temp_recording, 'w') as f:
|
| - f.write('wpr data')
|
| - self.archive_info.AddNewTemporaryRecording(new_temp_recording)
|
| - self.archive_info.AddRecordedStories([page2, page3])
|
| -
|
| - with open(self.story_set_archive_info_file, 'r') as f:
|
| - archive_file_contents = json.load(f)
|
| - self.assertEquals(expected_archive_file_contents, archive_file_contents)
|
| -
|
| - def testModifications(self):
|
| - recording1_path = os.path.join(self.tmp_dir, recording1)
|
| - recording2_path = os.path.join(self.tmp_dir, recording2)
|
| -
|
| - new_recording1 = os.path.join(self.tmp_dir, 'data_003.wpr')
|
| - new_recording2 = os.path.join(self.tmp_dir, 'data_004.wpr')
|
| - hash_dictionary = {new_recording1:'file_hash1',
|
| - new_recording2:'file_hash2'}
|
| - cloud_storage_stub = self.overrides.cloud_storage
|
| - cloud_storage_stub.SetCalculatedHashesForTesting(hash_dictionary)
|
| -
|
| - new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
|
| - with open(new_temp_recording, 'w') as f:
|
| - f.write('wpr data')
|
| -
|
| - self.archive_info.AddNewTemporaryRecording(new_temp_recording)
|
| -
|
| - self.assertEquals(new_temp_recording,
|
| - self.archive_info.WprFilePathForStory(page1))
|
| - self.assertEquals(new_temp_recording,
|
| - self.archive_info.WprFilePathForStory(page2))
|
| - self.assertEquals(new_temp_recording,
|
| - self.archive_info.WprFilePathForStory(page3))
|
| -
|
| - self.archive_info.AddRecordedStories([page2])
|
| -
|
| - self.assertTrue(os.path.exists(new_recording1))
|
| - self.assertFalse(os.path.exists(new_temp_recording))
|
| -
|
| - self.assertTrue(os.path.exists(recording1_path))
|
| - self.assertTrue(os.path.exists(recording2_path))
|
| - self.assertCorrectHashFile(new_recording1)
|
| -
|
| - with open(new_temp_recording, 'w') as f:
|
| - f.write('wpr data')
|
| -
|
| - self.archive_info.AddNewTemporaryRecording(new_temp_recording)
|
| - self.archive_info.AddRecordedStories([page3])
|
| -
|
| - self.assertTrue(os.path.exists(new_recording2))
|
| - self.assertCorrectHashFile(new_recording2)
|
| - self.assertFalse(os.path.exists(new_temp_recording))
|
| -
|
| - self.assertTrue(os.path.exists(recording1_path))
|
| - # recording2 is no longer needed, so it was deleted.
|
| - self.assertFalse(os.path.exists(recording2_path))
|
| -
|
| - def testCreatingNewArchiveInfo(self):
|
| - # Write only the page set without the corresponding metadata file.
|
| - story_set_contents = ("""
|
| - {
|
| - archive_data_file": "new_archive_info.json",
|
| - "pages": [
|
| - {
|
| - "url": "%s",
|
| - }
|
| - ]
|
| - }""" % page1.url)
|
| -
|
| - story_set_file = os.path.join(self.tmp_dir, 'new_story_set.json')
|
| - with open(story_set_file, 'w') as f:
|
| - f.write(story_set_contents)
|
| -
|
| - self.story_set_archive_info_file = os.path.join(self.tmp_dir,
|
| - 'new_archive_info.json')
|
| -
|
| - expected_archive_file_path = os.path.join(self.tmp_dir,
|
| - 'new_archive_info_000.wpr')
|
| - hash_dictionary = {expected_archive_file_path:'filehash'}
|
| - self.overrides.cloud_storage.SetCalculatedHashesForTesting(hash_dictionary)
|
| -
|
| - # Create the WprArchiveInfo object to be tested.
|
| - self.archive_info = archive_info.WprArchiveInfo.FromFile(
|
| - self.story_set_archive_info_file, cloud_storage.PUBLIC_BUCKET)
|
| -
|
| - # Add a recording for all the pages.
|
| - new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
|
| - with open(new_temp_recording, 'w') as f:
|
| - f.write('wpr data')
|
| -
|
| - self.archive_info.AddNewTemporaryRecording(new_temp_recording)
|
| -
|
| - self.assertEquals(new_temp_recording,
|
| - self.archive_info.WprFilePathForStory(page1))
|
| -
|
| - self.archive_info.AddRecordedStories([page1])
|
| -
|
| - # Expected name for the recording (decided by WprArchiveInfo).
|
| - new_recording = os.path.join(self.tmp_dir, 'new_archive_info_000.wpr')
|
| -
|
| - self.assertTrue(os.path.exists(new_recording))
|
| - self.assertFalse(os.path.exists(new_temp_recording))
|
| - self.assertCorrectHashFile(new_recording)
|
| -
|
| - # Check that the archive info was written correctly.
|
| - self.assertTrue(os.path.exists(self.story_set_archive_info_file))
|
| - read_archive_info = archive_info.WprArchiveInfo.FromFile(
|
| - self.story_set_archive_info_file, cloud_storage.PUBLIC_BUCKET)
|
| - self.assertEquals(new_recording,
|
| - read_archive_info.WprFilePathForStory(page1))
|
|
|