Index: tools/telemetry/telemetry/wpr/archive_info_unittest.py |
diff --git a/tools/telemetry/telemetry/wpr/archive_info_unittest.py b/tools/telemetry/telemetry/wpr/archive_info_unittest.py |
index 9feaf7e71dd6a3c198bfcfc695a3f305362b02a1..29e61a8983ca008d89b33ce7e557bc509bfc2214 100644 |
--- a/tools/telemetry/telemetry/wpr/archive_info_unittest.py |
+++ b/tools/telemetry/telemetry/wpr/archive_info_unittest.py |
@@ -51,31 +51,35 @@ class WprArchiveInfoTest(unittest.TestCase): |
# Create the PageSetArchiveInfo object to be tested. |
self.archive_info = archive_info.WprArchiveInfo.FromFile( |
self.user_story_set_archive_info_file, cloud_storage.PUBLIC_BUCKET) |
+ # Use cloud_storage system stub. |
+ self.overrides = system_stub.Override(archive_info, ['cloud_storage']) |
def tearDown(self): |
shutil.rmtree(self.tmp_dir) |
+ self.overrides.Restore() |
def assertCorrectHashFile(self, file_path): |
- self.assertTrue(os.path.exists(file_path + '.sha1')) |
- with open(file_path + '.sha1', 'rb') as f: |
- self.assertEquals(cloud_storage.CalculateHash(file_path), f.read()) |
- |
- def testDownloadArchivesIfNeeded(self): |
- overrides = system_stub.Override(archive_info, ['cloud_storage']) |
+ old_ch = cloud_storage.CalculateHash |
+ cloud_storage.CalculateHash = self.overrides.cloud_storage.CalculateHash |
try: |
- cloud_storage_stub = overrides.cloud_storage |
- # Second hash doesn't match, need to fetch it. |
- cloud_storage_stub.SetRemotePathsForTesting( |
- {cloud_storage.PUBLIC_BUCKET: {recording1: "dummyhash", |
- recording2: "dummyhash22"}}) |
- cloud_storage_stub.SetCalculatedHashesForTesting( |
- {os.path.join(self.tmp_dir, recording1): "dummyhash", |
- os.path.join(self.tmp_dir, recording2): "dummyhash2",}) |
- self.archive_info.DownloadArchivesIfNeeded() |
- self.assertEquals(len(cloud_storage_stub.downloaded_files), 1) |
- self.assertEquals(cloud_storage_stub.downloaded_files[0], recording2) |
+ self.assertTrue(os.path.exists(file_path + '.sha1')) |
+ with open(file_path + '.sha1', 'rb') as f: |
+ self.assertEquals(cloud_storage.CalculateHash(file_path), f.read()) |
finally: |
- overrides.Restore() |
+ cloud_storage.CalculateHash = old_ch |
+ |
+ def testDownloadArchivesIfNeeded(self): |
+ cloud_storage_stub = self.overrides.cloud_storage |
+ # Second hash doesn't match, need to fetch it. |
+ cloud_storage_stub.SetRemotePathsForTesting( |
+ {cloud_storage.PUBLIC_BUCKET: {recording1: "dummyhash", |
+ recording2: "dummyhash22"}}) |
+ cloud_storage_stub.SetCalculatedHashesForTesting( |
+ {os.path.join(self.tmp_dir, recording1): "dummyhash", |
+ os.path.join(self.tmp_dir, recording2): "dummyhash2",}) |
+ self.archive_info.DownloadArchivesIfNeeded() |
+ self.assertEquals(len(cloud_storage_stub.downloaded_files), 1) |
+ self.assertEquals(cloud_storage_stub.downloaded_files[0], recording2) |
def testReadingArchiveInfo(self): |
self.assertIsNotNone(self.archive_info.WprFilePathForUserStory(page1)) |
@@ -104,6 +108,10 @@ class WprArchiveInfoTest(unittest.TestCase): |
} |
new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr') |
+ expected_archive_file_path = os.path.join(self.tmp_dir, 'data_003.wpr') |
+ hash_dictionary = {expected_archive_file_path:'filehash'} |
+ cloud_storage_stub = self.overrides.cloud_storage |
+ cloud_storage_stub.SetCalculatedHashesForTesting(hash_dictionary) |
with open(new_temp_recording, 'w') as f: |
f.write('wpr data') |
self.archive_info.AddNewTemporaryRecording(new_temp_recording) |
@@ -118,6 +126,12 @@ class WprArchiveInfoTest(unittest.TestCase): |
recording2_path = os.path.join(self.tmp_dir, recording2) |
new_recording1 = os.path.join(self.tmp_dir, 'data_003.wpr') |
+ new_recording2 = os.path.join(self.tmp_dir, 'data_004.wpr') |
+ hash_dictionary = {new_recording1:'file_hash1', |
+ new_recording2:'file_hash2'} |
+ cloud_storage_stub = self.overrides.cloud_storage |
+ cloud_storage_stub.SetCalculatedHashesForTesting(hash_dictionary) |
+ |
new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr') |
with open(new_temp_recording, 'w') as f: |
f.write('wpr data') |
@@ -140,7 +154,6 @@ class WprArchiveInfoTest(unittest.TestCase): |
self.assertTrue(os.path.exists(recording2_path)) |
self.assertCorrectHashFile(new_recording1) |
- new_recording2 = os.path.join(self.tmp_dir, 'data_004.wpr') |
with open(new_temp_recording, 'w') as f: |
f.write('wpr data') |
@@ -174,6 +187,11 @@ class WprArchiveInfoTest(unittest.TestCase): |
self.user_story_set_archive_info_file = os.path.join(self.tmp_dir, |
'new_archive_info.json') |
+ expected_archive_file_path = os.path.join(self.tmp_dir, |
+ 'new_archive_info_000.wpr') |
+ hash_dictionary = {expected_archive_file_path:'filehash'} |
+ self.overrides.cloud_storage.SetCalculatedHashesForTesting(hash_dictionary) |
+ |
# Create the WprArchiveInfo object to be tested. |
self.archive_info = archive_info.WprArchiveInfo.FromFile( |
self.user_story_set_archive_info_file, cloud_storage.PUBLIC_BUCKET) |