| Index: telemetry/telemetry/wpr/archive_info_unittest.py
|
| diff --git a/telemetry/telemetry/wpr/archive_info_unittest.py b/telemetry/telemetry/wpr/archive_info_unittest.py
|
| index 1711eda92c7bf434877f90988d9f9c0b710b79af..9ac7cf0532cf7b8ff081281442698111d3bd1890 100644
|
| --- a/telemetry/telemetry/wpr/archive_info_unittest.py
|
| +++ b/telemetry/telemetry/wpr/archive_info_unittest.py
|
| @@ -1,4 +1,4 @@
|
| -# Copyright 2017 The Chromium Authors. All rights reserved.
|
| +# Copyright 2012 The Chromium Authors. All rights reserved.
|
| # Use of this source code is governed by a BSD-style license that can be
|
| # found in the LICENSE file.
|
| import json
|
| @@ -12,396 +12,240 @@
|
| from telemetry.page import page
|
| from telemetry.testing import system_stub
|
| from telemetry.wpr import archive_info
|
| +from telemetry.wpr import archive_info2
|
|
|
|
|
| class MockPage(page.Page):
|
| - def __init__(self, url, name=None, platform_specific=False):
|
| + def __init__(self, url, name=None):
|
| super(MockPage, self).__init__(url, None, name=name)
|
| - self._platform_specific = platform_specific
|
| +
|
|
|
| page1 = MockPage('http://www.foo.com/', 'Foo')
|
| -page2 = MockPage('http://www.bar.com/', 'Bar', True)
|
| -page3 = MockPage('http://www.baz.com/', platform_specific=True)
|
| -pageNew1 = MockPage('http://www.new.com/', 'New')
|
| -pageNew2 = MockPage('http://www.newer.com/', 'Newer', True)
|
| +page2 = MockPage('http://www.bar.com/', 'Bar')
|
| +page3 = MockPage('http://www.baz.com/')
|
| recording1 = 'data_001.wpr'
|
| recording2 = 'data_002.wpr'
|
| -recording3 = 'data_003.wpr'
|
| -recording4 = 'data_004.wpr'
|
| -recording5 = 'data_005.wpr'
|
| -_DEFAULT_PLATFORM = archive_info._DEFAULT_PLATFORM
|
| -
|
| -default_archives_info_contents_dict = {
|
| - "platform_specific": True,
|
| - "archives": {
|
| - "Foo": {
|
| - _DEFAULT_PLATFORM: recording1
|
| - },
|
| - "Bar": {
|
| - _DEFAULT_PLATFORM: recording2
|
| - },
|
| - "http://www.baz.com/": {
|
| - _DEFAULT_PLATFORM: recording1,
|
| - "win": recording2,
|
| - "mac": recording3,
|
| - "linux": recording4,
|
| - "android": recording5
|
| - }
|
| - }
|
| +archive_info_contents = ("""
|
| +{
|
| +"archives": {
|
| + "%s": ["%s", "%s"],
|
| + "%s": ["%s"]
|
| }
|
| -
|
| -default_archive_info_contents = json.dumps(default_archives_info_contents_dict)
|
| -default_wpr_files = [
|
| - 'data_001.wpr', 'data_002.wpr', 'data_003.wpr', 'data_004.wpr',
|
| - 'data_005.wpr']
|
| -_BASE_ARCHIVE = {
|
| - u'platform_specific': True,
|
| +}
|
| +""" % (recording1, page1.display_name, page2.display_name, recording2,
|
| + page3.display_name))
|
| +
|
| +
|
| +class WprArchiveInfoTest(unittest.TestCase):
|
| + def setUp(self):
|
| + self.tmp_dir = tempfile.mkdtemp()
|
| + # Write the metadata.
|
| + self.story_set_archive_info_file = os.path.join(
|
| + self.tmp_dir, 'info.json')
|
| + with open(self.story_set_archive_info_file, 'w') as f:
|
| + f.write(archive_info_contents)
|
| +
|
| + # Write the existing .wpr files.
|
| + for i in [1, 2]:
|
| + with open(os.path.join(self.tmp_dir, ('data_00%d.wpr' % i)), 'w') as f:
|
| + f.write(archive_info_contents)
|
| +
|
| + # Create the PageSetArchiveInfo object to be tested.
|
| + self.archive_info = archive_info.WprArchiveInfo.FromFile(
|
| + self.story_set_archive_info_file, cloud_storage.PUBLIC_BUCKET)
|
| + # Use cloud_storage system stub.
|
| + self.overrides = system_stub.Override(archive_info, ['cloud_storage'])
|
| +
|
| + def tearDown(self):
|
| + shutil.rmtree(self.tmp_dir)
|
| + self.overrides.Restore()
|
| +
|
| + def assertCorrectHashFile(self, file_path):
|
| + old_ch = cloud_storage.CalculateHash
|
| + cloud_storage.CalculateHash = self.overrides.cloud_storage.CalculateHash
|
| + try:
|
| + self.assertTrue(os.path.exists(file_path + '.sha1'))
|
| + with open(file_path + '.sha1', 'rb') as f:
|
| + self.assertEquals(cloud_storage.CalculateHash(file_path), f.read())
|
| + finally:
|
| + cloud_storage.CalculateHash = old_ch
|
| +
|
| + def testDownloadArchivesIfNeeded(self):
|
| + cloud_storage_stub = self.overrides.cloud_storage
|
| + # Second hash doesn't match, need to fetch it.
|
| + cloud_storage_stub.SetRemotePathsForTesting(
|
| + {cloud_storage.PUBLIC_BUCKET: {recording1: "dummyhash",
|
| + recording2: "dummyhash22"}})
|
| + cloud_storage_stub.SetCalculatedHashesForTesting(
|
| + {os.path.join(self.tmp_dir, recording1): "dummyhash",
|
| + os.path.join(self.tmp_dir, recording2): "dummyhash2",})
|
| + self.archive_info.DownloadArchivesIfNeeded()
|
| + self.assertEquals(len(cloud_storage_stub.downloaded_files), 1)
|
| + self.assertEquals(cloud_storage_stub.downloaded_files[0], recording2)
|
| +
|
| + def testReadingArchiveInfo(self):
|
| + self.assertIsNotNone(self.archive_info.WprFilePathForStory(page1))
|
| + self.assertEquals(recording1, os.path.basename(
|
| + self.archive_info.WprFilePathForStory(page1)))
|
| +
|
| + self.assertIsNotNone(self.archive_info.WprFilePathForStory(page2))
|
| + self.assertEquals(recording1, os.path.basename(
|
| + self.archive_info.WprFilePathForStory(page2)))
|
| +
|
| + self.assertIsNotNone(self.archive_info.WprFilePathForStory(page3))
|
| + self.assertEquals(recording2, os.path.basename(
|
| + self.archive_info.WprFilePathForStory(page3)))
|
| +
|
| + def testArchiveInfoFileGetsUpdated(self):
|
| + """Ensures that the archive info file is updated correctly."""
|
| +
|
| + expected_archive_file_contents = {
|
| u'description': (u'Describes the Web Page Replay archives for a'
|
| u' story set. Don\'t edit by hand! Use record_wpr for'
|
| u' updating.'),
|
| - u'archives': {},
|
| -}
|
| -
|
| -
|
| -class WprArchiveInfoTest(unittest.TestCase):
|
| - def setUp(self):
|
| - self.tmp_dir = tempfile.mkdtemp()
|
| - # Set file for the metadata.
|
| - self.story_set_archive_info_file = os.path.join(
|
| - self.tmp_dir, 'info.json')
|
| - self.overrides = system_stub.Override(archive_info, ['cloud_storage'])
|
| -
|
| - def tearDown(self):
|
| - shutil.rmtree(self.tmp_dir)
|
| - self.overrides.Restore()
|
| -
|
| - def createArchiveInfo(
|
| - self, archive_data=default_archive_info_contents,
|
| - cloud_storage_bucket=cloud_storage.PUBLIC_BUCKET, wpr_files=None):
|
| -
|
| - # Cannot set lists as a default parameter, so doing it this way.
|
| - if wpr_files is None:
|
| - wpr_files = default_wpr_files
|
| -
|
| - with open(self.story_set_archive_info_file, 'w') as f:
|
| - f.write(archive_data)
|
| -
|
| - assert isinstance(wpr_files, list)
|
| - for wpr_file in wpr_files:
|
| - assert isinstance(wpr_file, basestring)
|
| - with open(os.path.join(self.tmp_dir, wpr_file), 'w') as f:
|
| - f.write(archive_data)
|
| - return archive_info.WprArchiveInfo.FromFile(
|
| - self.story_set_archive_info_file, cloud_storage_bucket)
|
| -
|
| - def testInitNotPlatformSpecific(self):
|
| - with open(self.story_set_archive_info_file, 'w') as f:
|
| - f.write('{}')
|
| - with self.assertRaises(AssertionError):
|
| - self.createArchiveInfo(archive_data='{}')
|
| -
|
| -
|
| - def testDownloadArchivesIfNeededAllNeeded(self):
|
| - test_archive_info = self.createArchiveInfo()
|
| - cloud_storage_stub = self.overrides.cloud_storage
|
| - # Second hash doesn't match, need to fetch it.
|
| - cloud_storage_stub.SetRemotePathsForTesting(
|
| - {cloud_storage.PUBLIC_BUCKET: {recording1: "dummyhash1_old",
|
| - recording2: "dummyhash2_old",
|
| - recording3: "dummyhash3_old",
|
| - recording4: "dummyhash4_old"}})
|
| - cloud_storage_stub.SetCalculatedHashesForTesting(
|
| - {os.path.join(self.tmp_dir, recording1): "dummyhash1",
|
| - os.path.join(self.tmp_dir, recording2): "dummyhash2",
|
| - os.path.join(self.tmp_dir, recording3): "dummyhash3",
|
| - os.path.join(self.tmp_dir, recording4): "dummyhash4",})
|
| -
|
| - test_archive_info.DownloadArchivesIfNeeded()
|
| - self.assertItemsEqual(cloud_storage_stub.downloaded_files,
|
| - [recording1, recording2, recording3, recording4])
|
| -
|
| -
|
| - def testDownloadArchivesIfNeededOneNeeded(self):
|
| - test_archive_info = self.createArchiveInfo()
|
| - cloud_storage_stub = self.overrides.cloud_storage
|
| - # Second hash doesn't match, need to fetch it.
|
| - cloud_storage_stub.SetRemotePathsForTesting(
|
| - {cloud_storage.PUBLIC_BUCKET: {recording1: "dummyhash1_old",
|
| - recording2: "dummyhash2",
|
| - recording3: "dummyhash3",
|
| - recording4: "dummyhash4"}})
|
| - cloud_storage_stub.SetCalculatedHashesForTesting(
|
| - {os.path.join(self.tmp_dir, recording1): "dummyhash1",
|
| - os.path.join(self.tmp_dir, recording2): "dummyhash2",
|
| - os.path.join(self.tmp_dir, recording3): "dummyhash3",
|
| - os.path.join(self.tmp_dir, recording4): "dummyhash4",})
|
| - test_archive_info.DownloadArchivesIfNeeded()
|
| - self.assertItemsEqual(cloud_storage_stub.downloaded_files, [recording1])
|
| -
|
| - def testDownloadArchivesIfNeededNonDefault(self):
|
| - data = {
|
| - 'platform_specific': True,
|
| - 'archives': {
|
| - 'http://www.baz.com/': {
|
| - _DEFAULT_PLATFORM: 'data_001.wpr',
|
| - 'win': 'data_002.wpr',
|
| - 'linux': 'data_004.wpr',
|
| - 'mac': 'data_003.wpr',
|
| - 'android': 'data_005.wpr'},
|
| - 'Foo': {_DEFAULT_PLATFORM: 'data_003.wpr'},
|
| - 'Bar': {_DEFAULT_PLATFORM: 'data_002.wpr'}
|
| + u'archives': {
|
| + u'data_003.wpr': [u'Bar', u'http://www.baz.com/'],
|
| + u'data_001.wpr': [u'Foo']
|
| }
|
| }
|
| - test_archive_info = self.createArchiveInfo(
|
| - archive_data=json.dumps(data, separators=(',', ': ')))
|
| - cloud_storage_stub = self.overrides.cloud_storage
|
| - # Second hash doesn't match, need to fetch it.
|
| - cloud_storage_stub.SetRemotePathsForTesting(
|
| - {cloud_storage.PUBLIC_BUCKET: {recording1: "dummyhash1_old",
|
| - recording2: "dummyhash2",
|
| - recording3: "dummyhash3",
|
| - recording4: "dummyhash4_old"}})
|
| - cloud_storage_stub.SetCalculatedHashesForTesting(
|
| - {os.path.join(self.tmp_dir, recording1): "dummyhash1",
|
| - os.path.join(self.tmp_dir, recording2): "dummyhash2",
|
| - os.path.join(self.tmp_dir, recording3): "dummyhash3",
|
| - os.path.join(self.tmp_dir, recording4): "dummyhash4",})
|
| - test_archive_info.DownloadArchivesIfNeeded(target_platforms=['linux'])
|
| - self.assertItemsEqual(cloud_storage_stub.downloaded_files,
|
| - [recording1, recording4])
|
| -
|
| - def testDownloadArchivesIfNeededNoBucket(self):
|
| - test_archive_info = self.createArchiveInfo(cloud_storage_bucket=None)
|
| - cloud_storage_stub = self.overrides.cloud_storage
|
| - # Second hash doesn't match, need to fetch it.
|
| - cloud_storage_stub.SetRemotePathsForTesting(
|
| - {cloud_storage.PUBLIC_BUCKET: {recording1: "dummyhash1",
|
| - recording2: "dummyhash2",
|
| - recording3: "dummyhash3",
|
| - recording4: "dummyhash4_old"}})
|
| - cloud_storage_stub.SetCalculatedHashesForTesting(
|
| - {os.path.join(self.tmp_dir, recording1): "dummyhash1",
|
| - os.path.join(self.tmp_dir, recording2): "dummyhash2",
|
| - os.path.join(self.tmp_dir, recording3): "dummyhash3",
|
| - os.path.join(self.tmp_dir, recording4): "dummyhash4",})
|
| - test_archive_info.DownloadArchivesIfNeeded()
|
| - self.assertItemsEqual(cloud_storage_stub.downloaded_files, [])
|
| -
|
| - def testWprFilePathForStoryDefault(self):
|
| - test_archive_info = self.createArchiveInfo()
|
| - self.assertEqual(
|
| - test_archive_info.WprFilePathForStory(page1),
|
| - os.path.join(self.tmp_dir, recording1))
|
| - self.assertEqual(
|
| - test_archive_info.WprFilePathForStory(page2),
|
| - os.path.join(self.tmp_dir, recording2))
|
| - self.assertEqual(
|
| - test_archive_info.WprFilePathForStory(page3),
|
| - os.path.join(self.tmp_dir, recording1))
|
| -
|
| - def testWprFilePathForStoryMac(self):
|
| - test_archive_info = self.createArchiveInfo()
|
| - self.assertEqual(test_archive_info.WprFilePathForStory(page1, 'mac'),
|
| - os.path.join(self.tmp_dir, recording1))
|
| - self.assertEqual(test_archive_info.WprFilePathForStory(page2, 'mac'),
|
| - os.path.join(self.tmp_dir, recording2))
|
| - self.assertEqual(test_archive_info.WprFilePathForStory(page3, 'mac'),
|
| - os.path.join(self.tmp_dir, recording3))
|
| -
|
| - def testWprFilePathForStoryWin(self):
|
| - test_archive_info = self.createArchiveInfo()
|
| - self.assertEqual(test_archive_info.WprFilePathForStory(page1, 'win'),
|
| - os.path.join(self.tmp_dir, recording1))
|
| - self.assertEqual(test_archive_info.WprFilePathForStory(page2, 'win'),
|
| - os.path.join(self.tmp_dir, recording2))
|
| - self.assertEqual(test_archive_info.WprFilePathForStory(page3, 'win'),
|
| - os.path.join(self.tmp_dir, recording2))
|
| -
|
| - def testWprFilePathForStoryAndroid(self):
|
| - test_archive_info = self.createArchiveInfo()
|
| - self.assertEqual(test_archive_info.WprFilePathForStory(page1, 'android'),
|
| - os.path.join(self.tmp_dir, recording1))
|
| - self.assertEqual(test_archive_info.WprFilePathForStory(page2, 'android'),
|
| - os.path.join(self.tmp_dir, recording2))
|
| - self.assertEqual(test_archive_info.WprFilePathForStory(page3, 'android'),
|
| - os.path.join(self.tmp_dir, recording5))
|
| -
|
| - def testWprFilePathForStoryLinux(self):
|
| - test_archive_info = self.createArchiveInfo()
|
| - self.assertEqual(test_archive_info.WprFilePathForStory(page1, 'linux'),
|
| - os.path.join(self.tmp_dir, recording1))
|
| - self.assertEqual(test_archive_info.WprFilePathForStory(page2, 'linux'),
|
| - os.path.join(self.tmp_dir, recording2))
|
| - self.assertEqual(test_archive_info.WprFilePathForStory(page3, 'linux'),
|
| - os.path.join(self.tmp_dir, recording4))
|
| -
|
| - def testWprFilePathForStoryBadStory(self):
|
| - test_archive_info = self.createArchiveInfo()
|
| - self.assertIsNone(test_archive_info.WprFilePathForStory(pageNew1))
|
| -
|
| -
|
| - def testAddRecordedStoriesNoStories(self):
|
| - test_archive_info = self.createArchiveInfo()
|
| - old_data = test_archive_info._data.copy()
|
| - test_archive_info.AddNewTemporaryRecording()
|
| - test_archive_info.AddRecordedStories(None)
|
| - self.assertDictEqual(old_data, test_archive_info._data)
|
| -
|
| - def assertWprFileDoesNotExist(self, file_name):
|
| - sha_file = file_name + '.sha1'
|
| - self.assertFalse(os.path.isfile(os.path.join(self.tmp_dir, sha_file)))
|
| - self.assertFalse(os.path.isfile(os.path.join(self.tmp_dir, file_name)))
|
| -
|
| - def assertWprFileDoesExist(self, file_name):
|
| - sha_file = file_name + '.sha1'
|
| - self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, sha_file)))
|
| - self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, file_name)))
|
| -
|
| - def testAddRecordedStoriesDefault(self):
|
| - test_archive_info = self.createArchiveInfo()
|
| - self.assertWprFileDoesNotExist('data_006.wpr')
|
|
|
| new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
|
| - expected_archive_file_path = os.path.join(self.tmp_dir, 'data_006.wpr')
|
| - hash_dictionary = {expected_archive_file_path: 'filehash'}
|
| + expected_archive_file_path = os.path.join(self.tmp_dir, 'data_003.wpr')
|
| + hash_dictionary = {expected_archive_file_path:'filehash'}
|
| cloud_storage_stub = self.overrides.cloud_storage
|
| cloud_storage_stub.SetCalculatedHashesForTesting(hash_dictionary)
|
| -
|
| - with open(new_temp_recording, 'w') as f:
|
| - f.write('wpr data')
|
| -
|
| - test_archive_info.AddNewTemporaryRecording(new_temp_recording)
|
| - test_archive_info.AddRecordedStories([page2, page3])
|
| + with open(new_temp_recording, 'w') as f:
|
| + f.write('wpr data')
|
| + self.archive_info.AddNewTemporaryRecording(new_temp_recording)
|
| + self.archive_info.AddRecordedStories([page2, page3])
|
|
|
| with open(self.story_set_archive_info_file, 'r') as f:
|
| archive_file_contents = json.load(f)
|
| -
|
| - expected_archive_contents = _BASE_ARCHIVE.copy()
|
| - expected_archive_contents['archives'] = {
|
| - page1.display_name: {
|
| - _DEFAULT_PLATFORM: recording1
|
| - },
|
| - page2.display_name: {
|
| - _DEFAULT_PLATFORM: 'data_006.wpr'
|
| - },
|
| - page3.display_name: {
|
| - _DEFAULT_PLATFORM: u'data_006.wpr',
|
| - 'linux': recording4,
|
| - 'mac': recording3,
|
| - 'win': recording2,
|
| - 'android': recording5
|
| - }
|
| - }
|
| -
|
| - self.assertDictEqual(expected_archive_contents, archive_file_contents)
|
| - # Ensure the saved JSON does not contain trailing spaces.
|
| + self.assertEquals(expected_archive_file_contents, archive_file_contents)
|
| +
|
| + # Nit: Ensure the saved json does not contian trailing spaces.
|
| with open(self.story_set_archive_info_file, 'rU') as f:
|
| for line in f:
|
| self.assertFalse(line.rstrip('\n').endswith(' '))
|
| - self.assertWprFileDoesExist('data_006.wpr')
|
| -
|
| - def testAddRecordedStoriesNotDefault(self):
|
| - test_archive_info = self.createArchiveInfo()
|
| - self.assertWprFileDoesNotExist('data_006.wpr')
|
| - new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
|
| - expected_archive_file_path = os.path.join(self.tmp_dir, 'data_006.wpr')
|
| - hash_dictionary = {expected_archive_file_path: 'filehash'}
|
| +
|
| + def testModifications(self):
|
| + recording1_path = os.path.join(self.tmp_dir, recording1)
|
| + recording2_path = os.path.join(self.tmp_dir, recording2)
|
| +
|
| + new_recording1 = os.path.join(self.tmp_dir, 'data_003.wpr')
|
| + new_recording2 = os.path.join(self.tmp_dir, 'data_004.wpr')
|
| + hash_dictionary = {new_recording1:'file_hash1',
|
| + new_recording2:'file_hash2'}
|
| cloud_storage_stub = self.overrides.cloud_storage
|
| cloud_storage_stub.SetCalculatedHashesForTesting(hash_dictionary)
|
|
|
| - with open(new_temp_recording, 'w') as f:
|
| - f.write('wpr data')
|
| - test_archive_info.AddNewTemporaryRecording(new_temp_recording)
|
| - test_archive_info.AddRecordedStories([page2, page3],
|
| - target_platform='android')
|
| -
|
| - with open(self.story_set_archive_info_file, 'r') as f:
|
| - archive_file_contents = json.load(f)
|
| -
|
| - expected_archive_contents = _BASE_ARCHIVE.copy()
|
| - expected_archive_contents['archives'] = {
|
| - page1.display_name: {
|
| - _DEFAULT_PLATFORM: recording1
|
| - },
|
| - page2.display_name: {
|
| - _DEFAULT_PLATFORM: recording2,
|
| - 'android': 'data_006.wpr'
|
| - },
|
| - page3.display_name: {
|
| - _DEFAULT_PLATFORM: recording1,
|
| - 'linux': recording4,
|
| - 'mac': recording3,
|
| - 'win': recording2,
|
| - 'android': 'data_006.wpr'
|
| - },
|
| - }
|
| -
|
| - self.assertDictEqual(expected_archive_contents, archive_file_contents)
|
| - # Ensure the saved JSON does not contain trailing spaces.
|
| - with open(self.story_set_archive_info_file, 'rU') as f:
|
| - for line in f:
|
| - self.assertFalse(line.rstrip('\n').endswith(' '))
|
| - self.assertWprFileDoesExist('data_006.wpr')
|
| -
|
| -
|
| -
|
| - def testAddRecordedStoriesNewPage(self):
|
| - test_archive_info = self.createArchiveInfo()
|
| - self.assertWprFileDoesNotExist('data_006.wpr')
|
| - self.assertWprFileDoesNotExist('data_007.wpr')
|
| new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
|
| - expected_archive_file_path1 = os.path.join(self.tmp_dir, 'data_006.wpr')
|
| - expected_archive_file_path2 = os.path.join(self.tmp_dir, 'data_007.wpr')
|
| - hash_dictionary = {
|
| - expected_archive_file_path1: 'filehash',
|
| - expected_archive_file_path2: 'filehash2'
|
| - }
|
| - cloud_storage_stub = self.overrides.cloud_storage
|
| - cloud_storage_stub.SetCalculatedHashesForTesting(hash_dictionary)
|
| -
|
| - with open(new_temp_recording, 'w') as f:
|
| - f.write('wpr data')
|
| - test_archive_info.AddNewTemporaryRecording(new_temp_recording)
|
| - test_archive_info.AddRecordedStories([pageNew1])
|
| -
|
| - with open(new_temp_recording, 'w') as f:
|
| - f.write('wpr data2')
|
| -
|
| - test_archive_info.AddNewTemporaryRecording(new_temp_recording)
|
| - test_archive_info.AddRecordedStories([pageNew2], target_platform='android')
|
| -
|
| - with open(self.story_set_archive_info_file, 'r') as f:
|
| - archive_file_contents = json.load(f)
|
| -
|
| -
|
| - expected_archive_contents = _BASE_ARCHIVE.copy()
|
| - expected_archive_contents['archives'] = {
|
| - page1.display_name: {
|
| - _DEFAULT_PLATFORM: recording1
|
| - },
|
| - page2.display_name: {
|
| - _DEFAULT_PLATFORM: recording2,
|
| - },
|
| - page3.display_name: {
|
| - _DEFAULT_PLATFORM: recording1,
|
| - 'linux': recording4,
|
| - 'mac': recording3,
|
| - 'win': recording2,
|
| - 'android': recording5
|
| - },
|
| - pageNew1.display_name: {
|
| - _DEFAULT_PLATFORM: 'data_006.wpr'
|
| - },
|
| - pageNew2.display_name: {
|
| - _DEFAULT_PLATFORM: 'data_007.wpr',
|
| - 'android': 'data_007.wpr'
|
| + with open(new_temp_recording, 'w') as f:
|
| + f.write('wpr data')
|
| +
|
| + self.archive_info.AddNewTemporaryRecording(new_temp_recording)
|
| +
|
| + self.assertEquals(new_temp_recording,
|
| + self.archive_info.WprFilePathForStory(page1))
|
| + self.assertEquals(new_temp_recording,
|
| + self.archive_info.WprFilePathForStory(page2))
|
| + self.assertEquals(new_temp_recording,
|
| + self.archive_info.WprFilePathForStory(page3))
|
| +
|
| + self.archive_info.AddRecordedStories([page2])
|
| +
|
| + self.assertTrue(os.path.exists(new_recording1))
|
| + self.assertFalse(os.path.exists(new_temp_recording))
|
| +
|
| + self.assertTrue(os.path.exists(recording1_path))
|
| + self.assertTrue(os.path.exists(recording2_path))
|
| + self.assertCorrectHashFile(new_recording1)
|
| +
|
| + with open(new_temp_recording, 'w') as f:
|
| + f.write('wpr data')
|
| +
|
| + self.archive_info.AddNewTemporaryRecording(new_temp_recording)
|
| + self.archive_info.AddRecordedStories([page3])
|
| +
|
| + self.assertTrue(os.path.exists(new_recording2))
|
| + self.assertCorrectHashFile(new_recording2)
|
| + self.assertFalse(os.path.exists(new_temp_recording))
|
| +
|
| + self.assertTrue(os.path.exists(recording1_path))
|
| + # recording2 is no longer needed, so it was deleted.
|
| + self.assertFalse(os.path.exists(recording2_path))
|
| +
|
| + def testCreatingNewArchiveInfo(self):
|
| + # Write only the page set without the corresponding metadata file.
|
| + story_set_contents = ("""
|
| + {
|
| + archive_data_file": "new_archive_info.json",
|
| + "pages": [
|
| + {
|
| + "url": "%s",
|
| + }
|
| + ]
|
| + }""" % page1.url)
|
| +
|
| + story_set_file = os.path.join(self.tmp_dir, 'new_story_set.json')
|
| + with open(story_set_file, 'w') as f:
|
| + f.write(story_set_contents)
|
| +
|
| + self.story_set_archive_info_file = os.path.join(self.tmp_dir,
|
| + 'new_archive_info.json')
|
| +
|
| + expected_archive_file_path = os.path.join(self.tmp_dir,
|
| + 'new_archive_info_000.wpr')
|
| + hash_dictionary = {expected_archive_file_path:'filehash'}
|
| + self.overrides.cloud_storage.SetCalculatedHashesForTesting(hash_dictionary)
|
| +
|
| + # Create the WprArchiveInfo object to be tested.
|
| + self.archive_info = archive_info.WprArchiveInfo.FromFile(
|
| + self.story_set_archive_info_file, cloud_storage.PUBLIC_BUCKET)
|
| +
|
| + # Add a recording for all the pages.
|
| + new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
|
| + with open(new_temp_recording, 'w') as f:
|
| + f.write('wpr data')
|
| +
|
| + self.archive_info.AddNewTemporaryRecording(new_temp_recording)
|
| +
|
| + self.assertEquals(new_temp_recording,
|
| + self.archive_info.WprFilePathForStory(page1))
|
| +
|
| + self.archive_info.AddRecordedStories([page1])
|
| +
|
| + # Expected name for the recording (decided by WprArchiveInfo).
|
| + new_recording = os.path.join(self.tmp_dir, 'new_archive_info_000.wpr')
|
| +
|
| + self.assertTrue(os.path.exists(new_recording))
|
| + self.assertFalse(os.path.exists(new_temp_recording))
|
| + self.assertCorrectHashFile(new_recording)
|
| +
|
| + # Check that the archive info was written correctly.
|
| + self.assertTrue(os.path.exists(self.story_set_archive_info_file))
|
| + read_archive_info = archive_info.WprArchiveInfo.FromFile(
|
| + self.story_set_archive_info_file, cloud_storage.PUBLIC_BUCKET)
|
| + self.assertEquals(new_recording,
|
| + read_archive_info.WprFilePathForStory(page1))
|
| +
|
| + def testFromFileMultiplatform(self):
|
| + multiplatform_archive_info_contents = ("""
|
| + {
|
| + "platform_specific": true,
|
| + "archives": {
|
| + "%s": ["%s", "%s"],
|
| + "%s": ["%s"]
|
| + }
|
| }
|
| - }
|
| -
|
| - self.assertDictEqual(expected_archive_contents, archive_file_contents)
|
| - # Ensure the saved JSON does not contain trailing spaces.
|
| - with open(self.story_set_archive_info_file, 'rU') as f:
|
| - for line in f:
|
| - self.assertFalse(line.rstrip('\n').endswith(' '))
|
| - self.assertWprFileDoesExist('data_006.wpr')
|
| - self.assertWprFileDoesExist('data_007.wpr')
|
| + """ % (recording1, page1.display_name, page2.display_name, recording2,
|
| + page3.display_name))
|
| +
|
| + archive_info_file = os.path.join(
|
| + self.tmp_dir, 'info2.json')
|
| + with open(archive_info_file, 'w') as f:
|
| + f.write(multiplatform_archive_info_contents)
|
| +
|
| + archive = archive_info.WprArchiveInfo.FromFile(
|
| + archive_info_file, cloud_storage.PUBLIC_BUCKET)
|
| + self.assertTrue(isinstance(archive, archive_info2.WprArchiveInfo))
|
|
|