Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 #!/usr/bin/env python | |
| 2 # Copyright (c) 2013 The Chromium Authors. All rights reserved. | |
| 3 # Use of this source code is governed by a BSD-style license that can be | |
| 4 # found in the LICENSE file. | |
| 5 | |
| 6 """Unit tests for classes in annotator.py.""" | |
| 7 | |
| 8 import cStringIO | |
| 9 import json | |
| 10 import os | |
| 11 import sys | |
| 12 import tempfile | |
| 13 import unittest | |
| 14 | |
| 15 import test_env # pylint: disable=W0611 | |
| 16 | |
| 17 from common import annotator | |
| 18 from common import chromium_utils | |
| 19 | |
| 20 | |
| 21 SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| 22 | |
| 23 | |
| 24 class FilterCapture(chromium_utils.RunCommandFilter): | |
| 25 """Captures the text and places it into an array.""" | |
| 26 def __init__(self): | |
| 27 chromium_utils.RunCommandFilter.__init__(self) | |
| 28 self.text = [] | |
| 29 | |
| 30 def FilterLine(self, line): | |
| 31 self.text.append(line.rstrip()) | |
| 32 | |
| 33 def FilterDone(self, text): | |
| 34 self.text.append(text) | |
| 35 | |
| 36 | |
| 37 class TestAnnotationStreams(unittest.TestCase): | |
| 38 def setUp(self): | |
| 39 self.buf = cStringIO.StringIO() | |
| 40 | |
| 41 def _getLines(self): | |
| 42 return self.buf.getvalue().rstrip().split('\n') | |
| 43 | |
| 44 def testBasicUsage(self): | |
| 45 stream = annotator.StructuredAnnotationStream(stream=self.buf) | |
| 46 with stream.step('one') as _: | |
| 47 pass | |
| 48 with stream.step('two') as _: | |
| 49 pass | |
| 50 | |
| 51 result = [ | |
| 52 '@@@SEED_STEP one@@@', | |
| 53 '@@@STEP_CURSOR one@@@', | |
| 54 '@@@STEP_STARTED@@@', | |
| 55 '@@@STEP_CLOSED@@@', | |
| 56 '@@@SEED_STEP two@@@', | |
| 57 '@@@STEP_CURSOR two@@@', | |
| 58 '@@@STEP_STARTED@@@', | |
| 59 '@@@STEP_CLOSED@@@', | |
| 60 ] | |
| 61 | |
| 62 self.assertEquals(result, self._getLines()) | |
| 63 | |
| 64 def testStepAnnotations(self): | |
| 65 stream = annotator.StructuredAnnotationStream(stream=self.buf) | |
| 66 with stream.step('one') as s: | |
| 67 s.step_warnings() | |
| 68 s.step_failure() | |
| 69 s.step_exception() | |
| 70 s.step_clear() | |
| 71 s.step_summary_clear() | |
| 72 s.step_text('hello') | |
| 73 s.step_summary_text('hello!') | |
| 74 s.step_log_line('mylog', 'test') | |
| 75 s.step_log_end('mylog') | |
| 76 s.step_log_line('myperflog', 'perf data') | |
| 77 s.step_log_end_perf('myperflog', 'dashboardname') | |
| 78 s.write_log_lines('full_log', ['line one', 'line two']) | |
| 79 s.write_log_lines('full_perf_log', ['perf line one', 'perf line two'], | |
| 80 perf='full_perf') | |
|
iannucci
2013/03/02 01:49:14
Should there be a test that tries to write to logs
Mike Stip (use stip instead)
2013/03/02 02:13:45
The master already throws up an exception when thi
| |
| 81 | |
| 82 result = [ | |
| 83 '@@@SEED_STEP one@@@', | |
| 84 '@@@STEP_CURSOR one@@@', | |
| 85 '@@@STEP_STARTED@@@', | |
| 86 '@@@STEP_WARNINGS@@@', | |
| 87 '@@@STEP_FAILURE@@@', | |
| 88 '@@@STEP_EXCEPTION@@@', | |
| 89 '@@@STEP_CLEAR@@@', | |
| 90 '@@@STEP_SUMMARY_CLEAR@@@', | |
| 91 '@@@STEP_TEXT@hello@@@', | |
| 92 '@@@STEP_SUMMARY_TEXT@hello!@@@', | |
| 93 '@@@STEP_LOG_LINE@mylog@test@@@', | |
| 94 '@@@STEP_LOG_END@mylog@@@', | |
| 95 '@@@STEP_LOG_LINE@myperflog@perf data@@@', | |
| 96 '@@@STEP_LOG_END_PERF@myperflog@dashboardname@@@', | |
| 97 '@@@STEP_LOG_LINE@full_log@line one@@@', | |
| 98 '@@@STEP_LOG_LINE@full_log@line two@@@', | |
| 99 '@@@STEP_LOG_END@full_log@@@', | |
| 100 '@@@STEP_LOG_LINE@full_perf_log@perf line one@@@', | |
| 101 '@@@STEP_LOG_LINE@full_perf_log@perf line two@@@', | |
| 102 '@@@STEP_LOG_END_PERF@full_perf_log@full_perf@@@', | |
| 103 '@@@STEP_CLOSED@@@', | |
| 104 ] | |
| 105 | |
| 106 self.assertEquals(result, self._getLines()) | |
| 107 | |
| 108 def testSeedStep(self): | |
| 109 steps = ['one', 'two'] | |
| 110 stream = annotator.StructuredAnnotationStream(seed_steps=steps, | |
| 111 stream=self.buf) | |
| 112 with stream.step('one'): | |
| 113 pass | |
| 114 with stream.step('two'): | |
| 115 pass | |
| 116 | |
| 117 result = [ | |
| 118 '@@@SEED_STEP one@@@', | |
| 119 '@@@SEED_STEP two@@@', | |
| 120 '@@@STEP_CURSOR one@@@', | |
| 121 '@@@STEP_STARTED@@@', | |
| 122 '@@@STEP_CLOSED@@@', | |
| 123 '@@@STEP_CURSOR two@@@', | |
| 124 '@@@STEP_STARTED@@@', | |
| 125 '@@@STEP_CLOSED@@@' | |
| 126 ] | |
| 127 | |
| 128 self.assertEquals(result, self._getLines()) | |
| 129 | |
| 130 def testSeedStepSkip(self): | |
| 131 steps = ['one', 'two', 'three'] | |
| 132 stream = annotator.StructuredAnnotationStream(seed_steps=steps, | |
| 133 stream=self.buf) | |
| 134 with stream.step('one'): | |
| 135 pass | |
| 136 with stream.step('three'): | |
| 137 pass | |
| 138 with stream.step('two'): | |
| 139 pass | |
| 140 | |
| 141 result = [ | |
| 142 '@@@SEED_STEP one@@@', | |
| 143 '@@@SEED_STEP two@@@', | |
| 144 '@@@SEED_STEP three@@@', | |
| 145 '@@@STEP_CURSOR one@@@', | |
| 146 '@@@STEP_STARTED@@@', | |
| 147 '@@@STEP_CLOSED@@@', | |
| 148 '@@@STEP_CURSOR three@@@', | |
| 149 '@@@STEP_STARTED@@@', | |
| 150 '@@@STEP_CLOSED@@@', | |
| 151 '@@@SEED_STEP two@@@', | |
| 152 '@@@STEP_CURSOR two@@@', | |
| 153 '@@@STEP_STARTED@@@', | |
| 154 '@@@STEP_CLOSED@@@', | |
| 155 ] | |
| 156 | |
| 157 self.assertEquals(result, self._getLines()) | |
| 158 | |
| 159 def testException(self): | |
| 160 stream = annotator.StructuredAnnotationStream(stream=self.buf) | |
| 161 | |
| 162 with self.assertRaises(Exception): | |
| 163 with stream.step('one'): | |
| 164 raise Exception('oh no!') | |
| 165 | |
| 166 log_string = '@@@STEP_LOG_LINE@exception' | |
| 167 exception = any(line.startswith(log_string) for line in self._getLines()) | |
| 168 self.assertTrue(exception) | |
| 169 | |
| 170 def testNoNesting(self): | |
| 171 stream = annotator.StructuredAnnotationStream(stream=self.buf) | |
| 172 | |
| 173 with self.assertRaises(Exception): | |
| 174 with stream.step('one'): | |
| 175 with stream.step('two'): | |
| 176 pass | |
| 177 | |
| 178 def testProtectedStartStop(self): | |
| 179 stream = annotator.StructuredAnnotationStream(stream=self.buf) | |
| 180 | |
| 181 with self.assertRaises(AttributeError): | |
| 182 with stream.step('one') as s: | |
| 183 s.step_started() | |
| 184 with self.assertRaises(AttributeError): | |
| 185 with stream.step('two') as s: | |
| 186 s.step_closed() | |
| 187 | |
| 188 def testAdvanced(self): | |
| 189 step = annotator.AdvancedAnnotationStep(stream=self.buf) | |
| 190 stream = annotator.AdvancedAnnotationStream(stream=self.buf) | |
| 191 stream.seed_step('one') | |
| 192 stream.seed_step('two') | |
| 193 stream.step_cursor('one') | |
| 194 step.step_started() | |
| 195 stream.step_cursor('two') | |
| 196 step.step_started() | |
| 197 stream.step_cursor('one') | |
| 198 step.step_closed() | |
| 199 stream.step_cursor('two') | |
| 200 step.step_closed() | |
| 201 | |
| 202 result = [ | |
| 203 '@@@SEED_STEP one@@@', | |
| 204 '@@@SEED_STEP two@@@', | |
| 205 '@@@STEP_CURSOR one@@@', | |
| 206 '@@@STEP_STARTED@@@', | |
| 207 '@@@STEP_CURSOR two@@@', | |
| 208 '@@@STEP_STARTED@@@', | |
| 209 '@@@STEP_CURSOR one@@@', | |
| 210 '@@@STEP_CLOSED@@@', | |
| 211 '@@@STEP_CURSOR two@@@', | |
| 212 '@@@STEP_CLOSED@@@', | |
| 213 ] | |
| 214 | |
| 215 self.assertEquals(result, self._getLines()) | |
| 216 | |
| 217 | |
| 218 def _synthesizeCmd(args): | |
| 219 basecmd = [sys.executable, '-c'] | |
| 220 basecmd.extend(args) | |
| 221 return basecmd | |
| 222 | |
| 223 | |
| 224 class TestExecution(unittest.TestCase): | |
| 225 def setUp(self): | |
| 226 self.capture = FilterCapture() | |
| 227 self.tempfd, self.tempfn = tempfile.mkstemp() | |
| 228 self.temp = os.fdopen(self.tempfd, 'wb') | |
| 229 self.script = os.path.join(SCRIPT_DIR, os.pardir, 'annotator.py') | |
| 230 | |
| 231 def tearDown(self): | |
| 232 self.temp.close() | |
| 233 if os.path.exists(self.tempfn): | |
| 234 os.remove(self.tempfn) | |
| 235 | |
| 236 def _runAnnotator(self, cmdlist): | |
| 237 json.dump(cmdlist, self.temp) | |
| 238 self.temp.close() | |
| 239 cmd = [sys.executable, self.script, self.tempfn] | |
| 240 env = os.environ.copy() | |
| 241 env['PYTHONPATH'] = os.pathsep.join(sys.path) | |
| 242 return chromium_utils.RunCommand(cmd, filter_obj=self.capture, env=env, | |
| 243 print_cmd=False) | |
| 244 | |
| 245 def testSimpleExecution(self): | |
| 246 cmdlist = [{'name': 'one', 'cmd': _synthesizeCmd(['print \'hello!\''])}, | |
| 247 {'name': 'two', 'cmd': _synthesizeCmd(['print \'yo!\''])}] | |
| 248 | |
| 249 ret = self._runAnnotator(cmdlist) | |
| 250 | |
| 251 self.assertEquals(ret, 0) | |
| 252 | |
| 253 result = [ | |
| 254 '@@@SEED_STEP one@@@', | |
| 255 '@@@SEED_STEP two@@@', | |
| 256 '@@@STEP_CURSOR one@@@', | |
| 257 '@@@STEP_STARTED@@@', | |
| 258 '', | |
| 259 '/usr/bin/python -c "print \'hello!\'"', | |
| 260 'hello!', | |
| 261 '@@@STEP_CLOSED@@@', | |
| 262 '@@@STEP_CURSOR two@@@', | |
| 263 '@@@STEP_STARTED@@@', | |
| 264 '', | |
| 265 '/usr/bin/python -c "print \'yo!\'"', | |
| 266 'yo!', | |
| 267 '@@@STEP_CLOSED@@@', | |
| 268 ] | |
| 269 self.assertEquals(result, self.capture.text) | |
| 270 | |
| 271 def testFailBuild(self): | |
| 272 cmdlist = [{'name': 'one', 'cmd': _synthesizeCmd(['print \'hello!\''])}, | |
| 273 {'name': 'two', 'cmd': _synthesizeCmd(['error'])}] | |
| 274 | |
| 275 ret = self._runAnnotator(cmdlist) | |
| 276 | |
| 277 self.assertIn('@@@STEP_FAILURE@@@', self.capture.text) | |
| 278 self.assertEquals(ret, 1) | |
| 279 | |
| 280 def testStopBuild(self): | |
| 281 cmdlist = [{'name': 'one', 'cmd': _synthesizeCmd(['error'])}, | |
| 282 {'name': 'two', 'cmd': _synthesizeCmd(['print \'yo!\''])}] | |
| 283 | |
| 284 ret = self._runAnnotator(cmdlist) | |
| 285 | |
| 286 self.assertNotIn('@@@STEP_CURSOR two@@@', self.capture.text) | |
| 287 self.assertEquals(ret, 1) | |
| 288 | |
| 289 def testException(self): | |
| 290 cmdlist = [{'name': 'one', 'cmd': ['doesn\'t exist']}] | |
| 291 | |
| 292 ret = self._runAnnotator(cmdlist) | |
| 293 | |
| 294 self.assertIn('@@@STEP_EXCEPTION@@@', self.capture.text) | |
| 295 self.assertEquals(ret, 1) | |
| 296 | |
| 297 def testAlwaysRun(self): | |
| 298 cmdlist = [{'name': 'one', 'cmd': _synthesizeCmd(['error'])}, | |
| 299 {'name': 'two', 'cmd': _synthesizeCmd(['print \'yo!\'']), | |
| 300 'always_run': True}] | |
| 301 | |
| 302 ret = self._runAnnotator(cmdlist) | |
| 303 self.assertIn('@@@STEP_CURSOR two@@@', self.capture.text) | |
| 304 self.assertIn('yo!', self.capture.text) | |
| 305 self.assertEquals(ret, 1) | |
| 306 | |
| 307 def testAlwaysRunNoDupes(self): | |
| 308 cmdlist = [{'name': 'one', 'cmd': _synthesizeCmd(['print \'yo!\'']), | |
| 309 'always_run': True}, | |
| 310 {'name': 'two', 'cmd': _synthesizeCmd(['error'])}, | |
| 311 {'name': 'three', 'cmd': _synthesizeCmd(['print \'hello!\'']), | |
| 312 'always_run': True}] | |
| 313 | |
| 314 ret = self._runAnnotator(cmdlist) | |
| 315 self.assertEquals(self.capture.text.count('yo!'), 1) | |
| 316 self.assertIn('hello!', self.capture.text) | |
| 317 self.assertEquals(ret, 1) | |
| 318 | |
| 319 if __name__ == '__main__': | |
| 320 unittest.main() | |
| OLD | NEW |