| Index: tools/telemetry/telemetry/wpr/archive_info_unittest.py
|
| diff --git a/tools/telemetry/telemetry/wpr/archive_info_unittest.py b/tools/telemetry/telemetry/wpr/archive_info_unittest.py
|
| index 9feaf7e71dd6a3c198bfcfc695a3f305362b02a1..29e61a8983ca008d89b33ce7e557bc509bfc2214 100644
|
| --- a/tools/telemetry/telemetry/wpr/archive_info_unittest.py
|
| +++ b/tools/telemetry/telemetry/wpr/archive_info_unittest.py
|
| @@ -51,31 +51,35 @@ class WprArchiveInfoTest(unittest.TestCase):
|
| # Create the PageSetArchiveInfo object to be tested.
|
| self.archive_info = archive_info.WprArchiveInfo.FromFile(
|
| self.user_story_set_archive_info_file, cloud_storage.PUBLIC_BUCKET)
|
| + # Use cloud_storage system stub.
|
| + self.overrides = system_stub.Override(archive_info, ['cloud_storage'])
|
|
|
| def tearDown(self):
|
| shutil.rmtree(self.tmp_dir)
|
| + self.overrides.Restore()
|
|
|
| def assertCorrectHashFile(self, file_path):
|
| - self.assertTrue(os.path.exists(file_path + '.sha1'))
|
| - with open(file_path + '.sha1', 'rb') as f:
|
| - self.assertEquals(cloud_storage.CalculateHash(file_path), f.read())
|
| -
|
| - def testDownloadArchivesIfNeeded(self):
|
| - overrides = system_stub.Override(archive_info, ['cloud_storage'])
|
| + old_ch = cloud_storage.CalculateHash
|
| + cloud_storage.CalculateHash = self.overrides.cloud_storage.CalculateHash
|
| try:
|
| - cloud_storage_stub = overrides.cloud_storage
|
| - # Second hash doesn't match, need to fetch it.
|
| - cloud_storage_stub.SetRemotePathsForTesting(
|
| - {cloud_storage.PUBLIC_BUCKET: {recording1: "dummyhash",
|
| - recording2: "dummyhash22"}})
|
| - cloud_storage_stub.SetCalculatedHashesForTesting(
|
| - {os.path.join(self.tmp_dir, recording1): "dummyhash",
|
| - os.path.join(self.tmp_dir, recording2): "dummyhash2",})
|
| - self.archive_info.DownloadArchivesIfNeeded()
|
| - self.assertEquals(len(cloud_storage_stub.downloaded_files), 1)
|
| - self.assertEquals(cloud_storage_stub.downloaded_files[0], recording2)
|
| + self.assertTrue(os.path.exists(file_path + '.sha1'))
|
| + with open(file_path + '.sha1', 'rb') as f:
|
| + self.assertEquals(cloud_storage.CalculateHash(file_path), f.read())
|
| finally:
|
| - overrides.Restore()
|
| + cloud_storage.CalculateHash = old_ch
|
| +
|
| + def testDownloadArchivesIfNeeded(self):
|
| + cloud_storage_stub = self.overrides.cloud_storage
|
| + # Second hash doesn't match, need to fetch it.
|
| + cloud_storage_stub.SetRemotePathsForTesting(
|
| + {cloud_storage.PUBLIC_BUCKET: {recording1: "dummyhash",
|
| + recording2: "dummyhash22"}})
|
| + cloud_storage_stub.SetCalculatedHashesForTesting(
|
| + {os.path.join(self.tmp_dir, recording1): "dummyhash",
|
| + os.path.join(self.tmp_dir, recording2): "dummyhash2",})
|
| + self.archive_info.DownloadArchivesIfNeeded()
|
| + self.assertEquals(len(cloud_storage_stub.downloaded_files), 1)
|
| + self.assertEquals(cloud_storage_stub.downloaded_files[0], recording2)
|
|
|
| def testReadingArchiveInfo(self):
|
| self.assertIsNotNone(self.archive_info.WprFilePathForUserStory(page1))
|
| @@ -104,6 +108,10 @@ class WprArchiveInfoTest(unittest.TestCase):
|
| }
|
|
|
| new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
|
| + expected_archive_file_path = os.path.join(self.tmp_dir, 'data_003.wpr')
|
| + hash_dictionary = {expected_archive_file_path:'filehash'}
|
| + cloud_storage_stub = self.overrides.cloud_storage
|
| + cloud_storage_stub.SetCalculatedHashesForTesting(hash_dictionary)
|
| with open(new_temp_recording, 'w') as f:
|
| f.write('wpr data')
|
| self.archive_info.AddNewTemporaryRecording(new_temp_recording)
|
| @@ -118,6 +126,12 @@ class WprArchiveInfoTest(unittest.TestCase):
|
| recording2_path = os.path.join(self.tmp_dir, recording2)
|
|
|
| new_recording1 = os.path.join(self.tmp_dir, 'data_003.wpr')
|
| + new_recording2 = os.path.join(self.tmp_dir, 'data_004.wpr')
|
| + hash_dictionary = {new_recording1:'file_hash1',
|
| + new_recording2:'file_hash2'}
|
| + cloud_storage_stub = self.overrides.cloud_storage
|
| + cloud_storage_stub.SetCalculatedHashesForTesting(hash_dictionary)
|
| +
|
| new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
|
| with open(new_temp_recording, 'w') as f:
|
| f.write('wpr data')
|
| @@ -140,7 +154,6 @@ class WprArchiveInfoTest(unittest.TestCase):
|
| self.assertTrue(os.path.exists(recording2_path))
|
| self.assertCorrectHashFile(new_recording1)
|
|
|
| - new_recording2 = os.path.join(self.tmp_dir, 'data_004.wpr')
|
| with open(new_temp_recording, 'w') as f:
|
| f.write('wpr data')
|
|
|
| @@ -174,6 +187,11 @@ class WprArchiveInfoTest(unittest.TestCase):
|
| self.user_story_set_archive_info_file = os.path.join(self.tmp_dir,
|
| 'new_archive_info.json')
|
|
|
| + expected_archive_file_path = os.path.join(self.tmp_dir,
|
| + 'new_archive_info_000.wpr')
|
| + hash_dictionary = {expected_archive_file_path:'filehash'}
|
| + self.overrides.cloud_storage.SetCalculatedHashesForTesting(hash_dictionary)
|
| +
|
| # Create the WprArchiveInfo object to be tested.
|
| self.archive_info = archive_info.WprArchiveInfo.FromFile(
|
| self.user_story_set_archive_info_file, cloud_storage.PUBLIC_BUCKET)
|
|
|