| OLD | NEW |
| 1 # Copyright 2014 The Chromium Authors. All rights reserved. | 1 # Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 import os | 5 import os |
| 6 import shutil | 6 import shutil |
| 7 import tempfile | 7 import tempfile |
| 8 import unittest | 8 import unittest |
| 9 | 9 |
| 10 from telemetry import story | 10 from telemetry import story |
| 11 from telemetry import page as page_module | 11 from telemetry import page as page_module |
| 12 from telemetry.testing import system_stub | 12 from telemetry.testing import system_stub |
| 13 from telemetry.timeline import trace_data | 13 from telemetry.timeline import trace_data |
| 14 from telemetry.value import trace | 14 from telemetry.value import trace |
| 15 from tracing_build import html2trace |
| 15 | 16 |
| 16 | 17 |
| 17 class TestBase(unittest.TestCase): | 18 class TestBase(unittest.TestCase): |
| 18 | 19 |
| 19 def setUp(self): | 20 def setUp(self): |
| 20 story_set = story.StorySet(base_dir=os.path.dirname(__file__)) | 21 story_set = story.StorySet(base_dir=os.path.dirname(__file__)) |
| 21 story_set.AddStory( | 22 story_set.AddStory( |
| 22 page_module.Page('http://www.bar.com/', story_set, story_set.base_dir)) | 23 page_module.Page('http://www.bar.com/', story_set, story_set.base_dir)) |
| 23 story_set.AddStory( | 24 story_set.AddStory( |
| 24 page_module.Page('http://www.baz.com/', story_set, story_set.base_dir)) | 25 page_module.Page('http://www.baz.com/', story_set, story_set.base_dir)) |
| (...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 97 # pylint: enable=no-member | 98 # pylint: enable=no-member |
| 98 bucket = trace.cloud_storage.PUBLIC_BUCKET | 99 bucket = trace.cloud_storage.PUBLIC_BUCKET |
| 99 cloud_url = v.UploadToCloud(bucket) | 100 cloud_url = v.UploadToCloud(bucket) |
| 100 d = v.AsDict() | 101 d = v.AsDict() |
| 101 self.assertEqual(d['cloud_url'], cloud_url) | 102 self.assertEqual(d['cloud_url'], cloud_url) |
| 102 finally: | 103 finally: |
| 103 if os.path.exists(test_temp_file.name): | 104 if os.path.exists(test_temp_file.name): |
| 104 test_temp_file.close() | 105 test_temp_file.close() |
| 105 os.remove(test_temp_file.name) | 106 os.remove(test_temp_file.name) |
| 106 | 107 |
| 108 def testFindTraceParts(self): |
| 109 raw_data = { |
| 110 'powerTraceAsString': 'Battor Data', |
| 111 'traceEvents': 'Chrome Data', |
| 112 'tabIds': 'Tab Data', |
| 113 } |
| 114 data = trace_data.TraceData(raw_data) |
| 115 v = trace.TraceValue(None, data) |
| 116 tempdir = tempfile.mkdtemp() |
| 117 temp_path = os.path.join(tempdir, 'test.json') |
| 118 battor_seen = False |
| 119 chrome_seen = False |
| 120 tabs_seen = False |
| 121 try: |
| 122 trace_files = html2trace.CopyTraceDataFromHTMLFilePath(v.filename, |
| 123 temp_path) |
| 124 for f in trace_files: |
| 125 with open(f, 'r') as trace_file: |
| 126 d = trace_file.read() |
| 127 if d == raw_data['powerTraceAsString']: |
| 128 self.assertFalse(battor_seen) |
| 129 battor_seen = True |
| 130 elif d == raw_data['traceEvents']: |
| 131 self.assertFalse(chrome_seen) |
| 132 chrome_seen = True |
| 133 elif d == raw_data['tabIds']: |
| 134 self.assertFalse(tabs_seen) |
| 135 tabs_seen = True |
| 136 self.assertTrue(battor_seen) |
| 137 self.assertTrue(chrome_seen) |
| 138 self.assertTrue(tabs_seen) |
| 139 finally: |
| 140 shutil.rmtree(tempdir) |
| 141 os.remove(v.filename) |
| 142 |
| 107 | 143 |
| 108 def _IsEmptyDir(path): | 144 def _IsEmptyDir(path): |
| 109 return os.path.exists(path) and not os.listdir(path) | 145 return os.path.exists(path) and not os.listdir(path) |
| 110 | 146 |
| 111 | 147 |
| 112 class NoLeakedTempfilesTests(TestBase): | 148 class NoLeakedTempfilesTests(TestBase): |
| 113 | 149 |
| 114 def setUp(self): | 150 def setUp(self): |
| 115 super(NoLeakedTempfilesTests, self).setUp() | 151 super(NoLeakedTempfilesTests, self).setUp() |
| 116 self.temp_test_dir = tempfile.mkdtemp() | 152 self.temp_test_dir = tempfile.mkdtemp() |
| 117 self.actual_tempdir = trace.tempfile.tempdir | 153 self.actual_tempdir = trace.tempfile.tempdir |
| 118 trace.tempfile.tempdir = self.temp_test_dir | 154 trace.tempfile.tempdir = self.temp_test_dir |
| 119 | 155 |
| 120 def testNoLeakedTempFileOnImplicitCleanUp(self): | 156 def testNoLeakedTempFileOnImplicitCleanUp(self): |
| 121 with trace.TraceValue(None, trace_data.TraceData({'test': 1})): | 157 with trace.TraceValue(None, trace_data.TraceData({'test': 1})): |
| 122 pass | 158 pass |
| 123 self.assertTrue(_IsEmptyDir(self.temp_test_dir)) | 159 self.assertTrue(_IsEmptyDir(self.temp_test_dir)) |
| 124 | 160 |
| 125 def testNoLeakedTempFileWhenUploadingTrace(self): | 161 def testNoLeakedTempFileWhenUploadingTrace(self): |
| 126 v = trace.TraceValue(None, trace_data.TraceData({'test': 1})) | 162 v = trace.TraceValue(None, trace_data.TraceData({'test': 1})) |
| 127 v.CleanUp() | 163 v.CleanUp() |
| 128 self.assertTrue(_IsEmptyDir(self.temp_test_dir)) | 164 self.assertTrue(_IsEmptyDir(self.temp_test_dir)) |
| 129 | 165 |
| 130 def tearDown(self): | 166 def tearDown(self): |
| 131 super(NoLeakedTempfilesTests, self).tearDown() | 167 super(NoLeakedTempfilesTests, self).tearDown() |
| 132 shutil.rmtree(self.temp_test_dir) | 168 shutil.rmtree(self.temp_test_dir) |
| 133 trace.tempfile.tempdir = self.actual_tempdir | 169 trace.tempfile.tempdir = self.actual_tempdir |
| OLD | NEW |