| Index: telemetry/telemetry/wpr/archive_info_unittest.py
|
| diff --git a/telemetry/telemetry/wpr/archive_info_unittest.py b/telemetry/telemetry/wpr/archive_info_unittest.py
|
| index 9ac7cf0532cf7b8ff081281442698111d3bd1890..1711eda92c7bf434877f90988d9f9c0b710b79af 100644
|
| --- a/telemetry/telemetry/wpr/archive_info_unittest.py
|
| +++ b/telemetry/telemetry/wpr/archive_info_unittest.py
|
| @@ -1,4 +1,4 @@
|
| -# Copyright 2012 The Chromium Authors. All rights reserved.
|
| +# Copyright 2017 The Chromium Authors. All rights reserved.
|
| # Use of this source code is governed by a BSD-style license that can be
|
| # found in the LICENSE file.
|
| import json
|
| @@ -12,240 +12,396 @@ from py_utils import cloud_storage # pylint: disable=import-error
|
| from telemetry.page import page
|
| from telemetry.testing import system_stub
|
| from telemetry.wpr import archive_info
|
| -from telemetry.wpr import archive_info2
|
|
|
|
|
| class MockPage(page.Page):
|
| - def __init__(self, url, name=None):
|
| + def __init__(self, url, name=None, platform_specific=False):
|
| super(MockPage, self).__init__(url, None, name=name)
|
| -
|
| + self._platform_specific = platform_specific
|
|
|
| page1 = MockPage('http://www.foo.com/', 'Foo')
|
| -page2 = MockPage('http://www.bar.com/', 'Bar')
|
| -page3 = MockPage('http://www.baz.com/')
|
| +page2 = MockPage('http://www.bar.com/', 'Bar', True)
|
| +page3 = MockPage('http://www.baz.com/', platform_specific=True)
|
| +pageNew1 = MockPage('http://www.new.com/', 'New')
|
| +pageNew2 = MockPage('http://www.newer.com/', 'Newer', True)
|
| recording1 = 'data_001.wpr'
|
| recording2 = 'data_002.wpr'
|
| -archive_info_contents = ("""
|
| -{
|
| -"archives": {
|
| - "%s": ["%s", "%s"],
|
| - "%s": ["%s"]
|
| +recording3 = 'data_003.wpr'
|
| +recording4 = 'data_004.wpr'
|
| +recording5 = 'data_005.wpr'
|
| +_DEFAULT_PLATFORM = archive_info._DEFAULT_PLATFORM
|
| +
|
| +default_archives_info_contents_dict = {
|
| + "platform_specific": True,
|
| + "archives": {
|
| + "Foo": {
|
| + _DEFAULT_PLATFORM: recording1
|
| + },
|
| + "Bar": {
|
| + _DEFAULT_PLATFORM: recording2
|
| + },
|
| + "http://www.baz.com/": {
|
| + _DEFAULT_PLATFORM: recording1,
|
| + "win": recording2,
|
| + "mac": recording3,
|
| + "linux": recording4,
|
| + "android": recording5
|
| + }
|
| + }
|
| }
|
| +
|
| +default_archive_info_contents = json.dumps(default_archives_info_contents_dict)
|
| +default_wpr_files = [
|
| + 'data_001.wpr', 'data_002.wpr', 'data_003.wpr', 'data_004.wpr',
|
| + 'data_005.wpr']
|
| +_BASE_ARCHIVE = {
|
| + u'platform_specific': True,
|
| + u'description': (u'Describes the Web Page Replay archives for a'
|
| + u' story set. Don\'t edit by hand! Use record_wpr for'
|
| + u' updating.'),
|
| + u'archives': {},
|
| }
|
| -""" % (recording1, page1.display_name, page2.display_name, recording2,
|
| - page3.display_name))
|
|
|
|
|
| class WprArchiveInfoTest(unittest.TestCase):
|
| def setUp(self):
|
| self.tmp_dir = tempfile.mkdtemp()
|
| - # Write the metadata.
|
| + # Set file for the metadata.
|
| self.story_set_archive_info_file = os.path.join(
|
| self.tmp_dir, 'info.json')
|
| - with open(self.story_set_archive_info_file, 'w') as f:
|
| - f.write(archive_info_contents)
|
| -
|
| - # Write the existing .wpr files.
|
| - for i in [1, 2]:
|
| - with open(os.path.join(self.tmp_dir, ('data_00%d.wpr' % i)), 'w') as f:
|
| - f.write(archive_info_contents)
|
| -
|
| - # Create the PageSetArchiveInfo object to be tested.
|
| - self.archive_info = archive_info.WprArchiveInfo.FromFile(
|
| - self.story_set_archive_info_file, cloud_storage.PUBLIC_BUCKET)
|
| - # Use cloud_storage system stub.
|
| self.overrides = system_stub.Override(archive_info, ['cloud_storage'])
|
|
|
| def tearDown(self):
|
| shutil.rmtree(self.tmp_dir)
|
| self.overrides.Restore()
|
|
|
| - def assertCorrectHashFile(self, file_path):
|
| - old_ch = cloud_storage.CalculateHash
|
| - cloud_storage.CalculateHash = self.overrides.cloud_storage.CalculateHash
|
| - try:
|
| - self.assertTrue(os.path.exists(file_path + '.sha1'))
|
| - with open(file_path + '.sha1', 'rb') as f:
|
| - self.assertEquals(cloud_storage.CalculateHash(file_path), f.read())
|
| - finally:
|
| - cloud_storage.CalculateHash = old_ch
|
| -
|
| - def testDownloadArchivesIfNeeded(self):
|
| + def createArchiveInfo(
|
| + self, archive_data=default_archive_info_contents,
|
| + cloud_storage_bucket=cloud_storage.PUBLIC_BUCKET, wpr_files=None):
|
| +
|
| + # Cannot set lists as a default parameter, so doing it this way.
|
| + if wpr_files is None:
|
| + wpr_files = default_wpr_files
|
| +
|
| + with open(self.story_set_archive_info_file, 'w') as f:
|
| + f.write(archive_data)
|
| +
|
| + assert isinstance(wpr_files, list)
|
| + for wpr_file in wpr_files:
|
| + assert isinstance(wpr_file, basestring)
|
| + with open(os.path.join(self.tmp_dir, wpr_file), 'w') as f:
|
| + f.write(archive_data)
|
| + return archive_info.WprArchiveInfo.FromFile(
|
| + self.story_set_archive_info_file, cloud_storage_bucket)
|
| +
|
| + def testInitNotPlatformSpecific(self):
|
| + with open(self.story_set_archive_info_file, 'w') as f:
|
| + f.write('{}')
|
| + with self.assertRaises(AssertionError):
|
| + self.createArchiveInfo(archive_data='{}')
|
| +
|
| +
|
| + def testDownloadArchivesIfNeededAllNeeded(self):
|
| + test_archive_info = self.createArchiveInfo()
|
| cloud_storage_stub = self.overrides.cloud_storage
|
| # Second hash doesn't match, need to fetch it.
|
| cloud_storage_stub.SetRemotePathsForTesting(
|
| - {cloud_storage.PUBLIC_BUCKET: {recording1: "dummyhash",
|
| - recording2: "dummyhash22"}})
|
| + {cloud_storage.PUBLIC_BUCKET: {recording1: "dummyhash1_old",
|
| + recording2: "dummyhash2_old",
|
| + recording3: "dummyhash3_old",
|
| + recording4: "dummyhash4_old"}})
|
| cloud_storage_stub.SetCalculatedHashesForTesting(
|
| - {os.path.join(self.tmp_dir, recording1): "dummyhash",
|
| - os.path.join(self.tmp_dir, recording2): "dummyhash2",})
|
| - self.archive_info.DownloadArchivesIfNeeded()
|
| - self.assertEquals(len(cloud_storage_stub.downloaded_files), 1)
|
| - self.assertEquals(cloud_storage_stub.downloaded_files[0], recording2)
|
| -
|
| - def testReadingArchiveInfo(self):
|
| - self.assertIsNotNone(self.archive_info.WprFilePathForStory(page1))
|
| - self.assertEquals(recording1, os.path.basename(
|
| - self.archive_info.WprFilePathForStory(page1)))
|
| + {os.path.join(self.tmp_dir, recording1): "dummyhash1",
|
| + os.path.join(self.tmp_dir, recording2): "dummyhash2",
|
| + os.path.join(self.tmp_dir, recording3): "dummyhash3",
|
| + os.path.join(self.tmp_dir, recording4): "dummyhash4",})
|
|
|
| - self.assertIsNotNone(self.archive_info.WprFilePathForStory(page2))
|
| - self.assertEquals(recording1, os.path.basename(
|
| - self.archive_info.WprFilePathForStory(page2)))
|
| + test_archive_info.DownloadArchivesIfNeeded()
|
| + self.assertItemsEqual(cloud_storage_stub.downloaded_files,
|
| + [recording1, recording2, recording3, recording4])
|
|
|
| - self.assertIsNotNone(self.archive_info.WprFilePathForStory(page3))
|
| - self.assertEquals(recording2, os.path.basename(
|
| - self.archive_info.WprFilePathForStory(page3)))
|
|
|
| - def testArchiveInfoFileGetsUpdated(self):
|
| - """Ensures that the archive info file is updated correctly."""
|
| -
|
| - expected_archive_file_contents = {
|
| - u'description': (u'Describes the Web Page Replay archives for a'
|
| - u' story set. Don\'t edit by hand! Use record_wpr for'
|
| - u' updating.'),
|
| - u'archives': {
|
| - u'data_003.wpr': [u'Bar', u'http://www.baz.com/'],
|
| - u'data_001.wpr': [u'Foo']
|
| + def testDownloadArchivesIfNeededOneNeeded(self):
|
| + test_archive_info = self.createArchiveInfo()
|
| + cloud_storage_stub = self.overrides.cloud_storage
|
| + # Second hash doesn't match, need to fetch it.
|
| + cloud_storage_stub.SetRemotePathsForTesting(
|
| + {cloud_storage.PUBLIC_BUCKET: {recording1: "dummyhash1_old",
|
| + recording2: "dummyhash2",
|
| + recording3: "dummyhash3",
|
| + recording4: "dummyhash4"}})
|
| + cloud_storage_stub.SetCalculatedHashesForTesting(
|
| + {os.path.join(self.tmp_dir, recording1): "dummyhash1",
|
| + os.path.join(self.tmp_dir, recording2): "dummyhash2",
|
| + os.path.join(self.tmp_dir, recording3): "dummyhash3",
|
| + os.path.join(self.tmp_dir, recording4): "dummyhash4",})
|
| + test_archive_info.DownloadArchivesIfNeeded()
|
| + self.assertItemsEqual(cloud_storage_stub.downloaded_files, [recording1])
|
| +
|
| + def testDownloadArchivesIfNeededNonDefault(self):
|
| + data = {
|
| + 'platform_specific': True,
|
| + 'archives': {
|
| + 'http://www.baz.com/': {
|
| + _DEFAULT_PLATFORM: 'data_001.wpr',
|
| + 'win': 'data_002.wpr',
|
| + 'linux': 'data_004.wpr',
|
| + 'mac': 'data_003.wpr',
|
| + 'android': 'data_005.wpr'},
|
| + 'Foo': {_DEFAULT_PLATFORM: 'data_003.wpr'},
|
| + 'Bar': {_DEFAULT_PLATFORM: 'data_002.wpr'}
|
| }
|
| }
|
| + test_archive_info = self.createArchiveInfo(
|
| + archive_data=json.dumps(data, separators=(',', ': ')))
|
| + cloud_storage_stub = self.overrides.cloud_storage
|
| + # Second hash doesn't match, need to fetch it.
|
| + cloud_storage_stub.SetRemotePathsForTesting(
|
| + {cloud_storage.PUBLIC_BUCKET: {recording1: "dummyhash1_old",
|
| + recording2: "dummyhash2",
|
| + recording3: "dummyhash3",
|
| + recording4: "dummyhash4_old"}})
|
| + cloud_storage_stub.SetCalculatedHashesForTesting(
|
| + {os.path.join(self.tmp_dir, recording1): "dummyhash1",
|
| + os.path.join(self.tmp_dir, recording2): "dummyhash2",
|
| + os.path.join(self.tmp_dir, recording3): "dummyhash3",
|
| + os.path.join(self.tmp_dir, recording4): "dummyhash4",})
|
| + test_archive_info.DownloadArchivesIfNeeded(target_platforms=['linux'])
|
| + self.assertItemsEqual(cloud_storage_stub.downloaded_files,
|
| + [recording1, recording4])
|
| +
|
| + def testDownloadArchivesIfNeededNoBucket(self):
|
| + test_archive_info = self.createArchiveInfo(cloud_storage_bucket=None)
|
| + cloud_storage_stub = self.overrides.cloud_storage
|
| + # Second hash doesn't match, need to fetch it.
|
| + cloud_storage_stub.SetRemotePathsForTesting(
|
| + {cloud_storage.PUBLIC_BUCKET: {recording1: "dummyhash1",
|
| + recording2: "dummyhash2",
|
| + recording3: "dummyhash3",
|
| + recording4: "dummyhash4_old"}})
|
| + cloud_storage_stub.SetCalculatedHashesForTesting(
|
| + {os.path.join(self.tmp_dir, recording1): "dummyhash1",
|
| + os.path.join(self.tmp_dir, recording2): "dummyhash2",
|
| + os.path.join(self.tmp_dir, recording3): "dummyhash3",
|
| + os.path.join(self.tmp_dir, recording4): "dummyhash4",})
|
| + test_archive_info.DownloadArchivesIfNeeded()
|
| + self.assertItemsEqual(cloud_storage_stub.downloaded_files, [])
|
| +
|
| + def testWprFilePathForStoryDefault(self):
|
| + test_archive_info = self.createArchiveInfo()
|
| + self.assertEqual(
|
| + test_archive_info.WprFilePathForStory(page1),
|
| + os.path.join(self.tmp_dir, recording1))
|
| + self.assertEqual(
|
| + test_archive_info.WprFilePathForStory(page2),
|
| + os.path.join(self.tmp_dir, recording2))
|
| + self.assertEqual(
|
| + test_archive_info.WprFilePathForStory(page3),
|
| + os.path.join(self.tmp_dir, recording1))
|
| +
|
| + def testWprFilePathForStoryMac(self):
|
| + test_archive_info = self.createArchiveInfo()
|
| + self.assertEqual(test_archive_info.WprFilePathForStory(page1, 'mac'),
|
| + os.path.join(self.tmp_dir, recording1))
|
| + self.assertEqual(test_archive_info.WprFilePathForStory(page2, 'mac'),
|
| + os.path.join(self.tmp_dir, recording2))
|
| + self.assertEqual(test_archive_info.WprFilePathForStory(page3, 'mac'),
|
| + os.path.join(self.tmp_dir, recording3))
|
| +
|
| + def testWprFilePathForStoryWin(self):
|
| + test_archive_info = self.createArchiveInfo()
|
| + self.assertEqual(test_archive_info.WprFilePathForStory(page1, 'win'),
|
| + os.path.join(self.tmp_dir, recording1))
|
| + self.assertEqual(test_archive_info.WprFilePathForStory(page2, 'win'),
|
| + os.path.join(self.tmp_dir, recording2))
|
| + self.assertEqual(test_archive_info.WprFilePathForStory(page3, 'win'),
|
| + os.path.join(self.tmp_dir, recording2))
|
| +
|
| + def testWprFilePathForStoryAndroid(self):
|
| + test_archive_info = self.createArchiveInfo()
|
| + self.assertEqual(test_archive_info.WprFilePathForStory(page1, 'android'),
|
| + os.path.join(self.tmp_dir, recording1))
|
| + self.assertEqual(test_archive_info.WprFilePathForStory(page2, 'android'),
|
| + os.path.join(self.tmp_dir, recording2))
|
| + self.assertEqual(test_archive_info.WprFilePathForStory(page3, 'android'),
|
| + os.path.join(self.tmp_dir, recording5))
|
| +
|
| + def testWprFilePathForStoryLinux(self):
|
| + test_archive_info = self.createArchiveInfo()
|
| + self.assertEqual(test_archive_info.WprFilePathForStory(page1, 'linux'),
|
| + os.path.join(self.tmp_dir, recording1))
|
| + self.assertEqual(test_archive_info.WprFilePathForStory(page2, 'linux'),
|
| + os.path.join(self.tmp_dir, recording2))
|
| + self.assertEqual(test_archive_info.WprFilePathForStory(page3, 'linux'),
|
| + os.path.join(self.tmp_dir, recording4))
|
| +
|
| + def testWprFilePathForStoryBadStory(self):
|
| + test_archive_info = self.createArchiveInfo()
|
| + self.assertIsNone(test_archive_info.WprFilePathForStory(pageNew1))
|
| +
|
| +
|
| + def testAddRecordedStoriesNoStories(self):
|
| + test_archive_info = self.createArchiveInfo()
|
| + old_data = test_archive_info._data.copy()
|
| + test_archive_info.AddNewTemporaryRecording()
|
| + test_archive_info.AddRecordedStories(None)
|
| + self.assertDictEqual(old_data, test_archive_info._data)
|
| +
|
| + def assertWprFileDoesNotExist(self, file_name):
|
| + sha_file = file_name + '.sha1'
|
| + self.assertFalse(os.path.isfile(os.path.join(self.tmp_dir, sha_file)))
|
| + self.assertFalse(os.path.isfile(os.path.join(self.tmp_dir, file_name)))
|
| +
|
| + def assertWprFileDoesExist(self, file_name):
|
| + sha_file = file_name + '.sha1'
|
| + self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, sha_file)))
|
| + self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, file_name)))
|
| +
|
| + def testAddRecordedStoriesDefault(self):
|
| + test_archive_info = self.createArchiveInfo()
|
| + self.assertWprFileDoesNotExist('data_006.wpr')
|
|
|
| new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
|
| - expected_archive_file_path = os.path.join(self.tmp_dir, 'data_003.wpr')
|
| - hash_dictionary = {expected_archive_file_path:'filehash'}
|
| + expected_archive_file_path = os.path.join(self.tmp_dir, 'data_006.wpr')
|
| + hash_dictionary = {expected_archive_file_path: 'filehash'}
|
| cloud_storage_stub = self.overrides.cloud_storage
|
| cloud_storage_stub.SetCalculatedHashesForTesting(hash_dictionary)
|
| +
|
| with open(new_temp_recording, 'w') as f:
|
| f.write('wpr data')
|
| - self.archive_info.AddNewTemporaryRecording(new_temp_recording)
|
| - self.archive_info.AddRecordedStories([page2, page3])
|
| +
|
| + test_archive_info.AddNewTemporaryRecording(new_temp_recording)
|
| + test_archive_info.AddRecordedStories([page2, page3])
|
|
|
| with open(self.story_set_archive_info_file, 'r') as f:
|
| archive_file_contents = json.load(f)
|
| - self.assertEquals(expected_archive_file_contents, archive_file_contents)
|
|
|
| - # Nit: Ensure the saved json does not contian trailing spaces.
|
| + expected_archive_contents = _BASE_ARCHIVE.copy()
|
| + expected_archive_contents['archives'] = {
|
| + page1.display_name: {
|
| + _DEFAULT_PLATFORM: recording1
|
| + },
|
| + page2.display_name: {
|
| + _DEFAULT_PLATFORM: 'data_006.wpr'
|
| + },
|
| + page3.display_name: {
|
| + _DEFAULT_PLATFORM: u'data_006.wpr',
|
| + 'linux': recording4,
|
| + 'mac': recording3,
|
| + 'win': recording2,
|
| + 'android': recording5
|
| + }
|
| + }
|
| +
|
| + self.assertDictEqual(expected_archive_contents, archive_file_contents)
|
| + # Ensure the saved JSON does not contain trailing spaces.
|
| with open(self.story_set_archive_info_file, 'rU') as f:
|
| for line in f:
|
| self.assertFalse(line.rstrip('\n').endswith(' '))
|
| + self.assertWprFileDoesExist('data_006.wpr')
|
|
|
| - def testModifications(self):
|
| - recording1_path = os.path.join(self.tmp_dir, recording1)
|
| - recording2_path = os.path.join(self.tmp_dir, recording2)
|
| -
|
| - new_recording1 = os.path.join(self.tmp_dir, 'data_003.wpr')
|
| - new_recording2 = os.path.join(self.tmp_dir, 'data_004.wpr')
|
| - hash_dictionary = {new_recording1:'file_hash1',
|
| - new_recording2:'file_hash2'}
|
| + def testAddRecordedStoriesNotDefault(self):
|
| + test_archive_info = self.createArchiveInfo()
|
| + self.assertWprFileDoesNotExist('data_006.wpr')
|
| + new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
|
| + expected_archive_file_path = os.path.join(self.tmp_dir, 'data_006.wpr')
|
| + hash_dictionary = {expected_archive_file_path: 'filehash'}
|
| cloud_storage_stub = self.overrides.cloud_storage
|
| cloud_storage_stub.SetCalculatedHashesForTesting(hash_dictionary)
|
|
|
| - new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
|
| with open(new_temp_recording, 'w') as f:
|
| f.write('wpr data')
|
| + test_archive_info.AddNewTemporaryRecording(new_temp_recording)
|
| + test_archive_info.AddRecordedStories([page2, page3],
|
| + target_platform='android')
|
|
|
| - self.archive_info.AddNewTemporaryRecording(new_temp_recording)
|
| + with open(self.story_set_archive_info_file, 'r') as f:
|
| + archive_file_contents = json.load(f)
|
|
|
| - self.assertEquals(new_temp_recording,
|
| - self.archive_info.WprFilePathForStory(page1))
|
| - self.assertEquals(new_temp_recording,
|
| - self.archive_info.WprFilePathForStory(page2))
|
| - self.assertEquals(new_temp_recording,
|
| - self.archive_info.WprFilePathForStory(page3))
|
| + expected_archive_contents = _BASE_ARCHIVE.copy()
|
| + expected_archive_contents['archives'] = {
|
| + page1.display_name: {
|
| + _DEFAULT_PLATFORM: recording1
|
| + },
|
| + page2.display_name: {
|
| + _DEFAULT_PLATFORM: recording2,
|
| + 'android': 'data_006.wpr'
|
| + },
|
| + page3.display_name: {
|
| + _DEFAULT_PLATFORM: recording1,
|
| + 'linux': recording4,
|
| + 'mac': recording3,
|
| + 'win': recording2,
|
| + 'android': 'data_006.wpr'
|
| + },
|
| + }
|
|
|
| - self.archive_info.AddRecordedStories([page2])
|
| + self.assertDictEqual(expected_archive_contents, archive_file_contents)
|
| + # Ensure the saved JSON does not contain trailing spaces.
|
| + with open(self.story_set_archive_info_file, 'rU') as f:
|
| + for line in f:
|
| + self.assertFalse(line.rstrip('\n').endswith(' '))
|
| + self.assertWprFileDoesExist('data_006.wpr')
|
|
|
| - self.assertTrue(os.path.exists(new_recording1))
|
| - self.assertFalse(os.path.exists(new_temp_recording))
|
|
|
| - self.assertTrue(os.path.exists(recording1_path))
|
| - self.assertTrue(os.path.exists(recording2_path))
|
| - self.assertCorrectHashFile(new_recording1)
|
|
|
| - with open(new_temp_recording, 'w') as f:
|
| - f.write('wpr data')
|
| -
|
| - self.archive_info.AddNewTemporaryRecording(new_temp_recording)
|
| - self.archive_info.AddRecordedStories([page3])
|
| -
|
| - self.assertTrue(os.path.exists(new_recording2))
|
| - self.assertCorrectHashFile(new_recording2)
|
| - self.assertFalse(os.path.exists(new_temp_recording))
|
| -
|
| - self.assertTrue(os.path.exists(recording1_path))
|
| - # recording2 is no longer needed, so it was deleted.
|
| - self.assertFalse(os.path.exists(recording2_path))
|
| -
|
| - def testCreatingNewArchiveInfo(self):
|
| - # Write only the page set without the corresponding metadata file.
|
| - story_set_contents = ("""
|
| - {
|
| - archive_data_file": "new_archive_info.json",
|
| - "pages": [
|
| - {
|
| - "url": "%s",
|
| - }
|
| - ]
|
| - }""" % page1.url)
|
| -
|
| - story_set_file = os.path.join(self.tmp_dir, 'new_story_set.json')
|
| - with open(story_set_file, 'w') as f:
|
| - f.write(story_set_contents)
|
| -
|
| - self.story_set_archive_info_file = os.path.join(self.tmp_dir,
|
| - 'new_archive_info.json')
|
| -
|
| - expected_archive_file_path = os.path.join(self.tmp_dir,
|
| - 'new_archive_info_000.wpr')
|
| - hash_dictionary = {expected_archive_file_path:'filehash'}
|
| - self.overrides.cloud_storage.SetCalculatedHashesForTesting(hash_dictionary)
|
| -
|
| - # Create the WprArchiveInfo object to be tested.
|
| - self.archive_info = archive_info.WprArchiveInfo.FromFile(
|
| - self.story_set_archive_info_file, cloud_storage.PUBLIC_BUCKET)
|
| -
|
| - # Add a recording for all the pages.
|
| + def testAddRecordedStoriesNewPage(self):
|
| + test_archive_info = self.createArchiveInfo()
|
| + self.assertWprFileDoesNotExist('data_006.wpr')
|
| + self.assertWprFileDoesNotExist('data_007.wpr')
|
| new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
|
| + expected_archive_file_path1 = os.path.join(self.tmp_dir, 'data_006.wpr')
|
| + expected_archive_file_path2 = os.path.join(self.tmp_dir, 'data_007.wpr')
|
| + hash_dictionary = {
|
| + expected_archive_file_path1: 'filehash',
|
| + expected_archive_file_path2: 'filehash2'
|
| + }
|
| + cloud_storage_stub = self.overrides.cloud_storage
|
| + cloud_storage_stub.SetCalculatedHashesForTesting(hash_dictionary)
|
| +
|
| with open(new_temp_recording, 'w') as f:
|
| f.write('wpr data')
|
| + test_archive_info.AddNewTemporaryRecording(new_temp_recording)
|
| + test_archive_info.AddRecordedStories([pageNew1])
|
|
|
| - self.archive_info.AddNewTemporaryRecording(new_temp_recording)
|
| -
|
| - self.assertEquals(new_temp_recording,
|
| - self.archive_info.WprFilePathForStory(page1))
|
| -
|
| - self.archive_info.AddRecordedStories([page1])
|
| + with open(new_temp_recording, 'w') as f:
|
| + f.write('wpr data2')
|
|
|
| - # Expected name for the recording (decided by WprArchiveInfo).
|
| - new_recording = os.path.join(self.tmp_dir, 'new_archive_info_000.wpr')
|
| + test_archive_info.AddNewTemporaryRecording(new_temp_recording)
|
| + test_archive_info.AddRecordedStories([pageNew2], target_platform='android')
|
|
|
| - self.assertTrue(os.path.exists(new_recording))
|
| - self.assertFalse(os.path.exists(new_temp_recording))
|
| - self.assertCorrectHashFile(new_recording)
|
| + with open(self.story_set_archive_info_file, 'r') as f:
|
| + archive_file_contents = json.load(f)
|
|
|
| - # Check that the archive info was written correctly.
|
| - self.assertTrue(os.path.exists(self.story_set_archive_info_file))
|
| - read_archive_info = archive_info.WprArchiveInfo.FromFile(
|
| - self.story_set_archive_info_file, cloud_storage.PUBLIC_BUCKET)
|
| - self.assertEquals(new_recording,
|
| - read_archive_info.WprFilePathForStory(page1))
|
|
|
| - def testFromFileMultiplatform(self):
|
| - multiplatform_archive_info_contents = ("""
|
| - {
|
| - "platform_specific": true,
|
| - "archives": {
|
| - "%s": ["%s", "%s"],
|
| - "%s": ["%s"]
|
| - }
|
| + expected_archive_contents = _BASE_ARCHIVE.copy()
|
| + expected_archive_contents['archives'] = {
|
| + page1.display_name: {
|
| + _DEFAULT_PLATFORM: recording1
|
| + },
|
| + page2.display_name: {
|
| + _DEFAULT_PLATFORM: recording2,
|
| + },
|
| + page3.display_name: {
|
| + _DEFAULT_PLATFORM: recording1,
|
| + 'linux': recording4,
|
| + 'mac': recording3,
|
| + 'win': recording2,
|
| + 'android': recording5
|
| + },
|
| + pageNew1.display_name: {
|
| + _DEFAULT_PLATFORM: 'data_006.wpr'
|
| + },
|
| + pageNew2.display_name: {
|
| + _DEFAULT_PLATFORM: 'data_007.wpr',
|
| + 'android': 'data_007.wpr'
|
| }
|
| - """ % (recording1, page1.display_name, page2.display_name, recording2,
|
| - page3.display_name))
|
| -
|
| - archive_info_file = os.path.join(
|
| - self.tmp_dir, 'info2.json')
|
| - with open(archive_info_file, 'w') as f:
|
| - f.write(multiplatform_archive_info_contents)
|
| + }
|
|
|
| - archive = archive_info.WprArchiveInfo.FromFile(
|
| - archive_info_file, cloud_storage.PUBLIC_BUCKET)
|
| - self.assertTrue(isinstance(archive, archive_info2.WprArchiveInfo))
|
| + self.assertDictEqual(expected_archive_contents, archive_file_contents)
|
| + # Ensure the saved JSON does not contain trailing spaces.
|
| + with open(self.story_set_archive_info_file, 'rU') as f:
|
| + for line in f:
|
| + self.assertFalse(line.rstrip('\n').endswith(' '))
|
| + self.assertWprFileDoesExist('data_006.wpr')
|
| + self.assertWprFileDoesExist('data_007.wpr')
|
|
|