Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(10)

Side by Side Diff: third_party/twisted_8_1/twisted/trial/test/test_util.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 import os
2
3 from zope.interface import implements
4
5 from twisted.internet.interfaces import IProcessTransport
6 from twisted.internet import defer
7 from twisted.internet.base import DelayedCall
8
9 from twisted.trial.unittest import TestCase
10 from twisted.trial import util
11 from twisted.trial.util import DirtyReactorAggregateError, _Janitor
12 from twisted.trial.test import packages
13
14
15
16 class TestMktemp(TestCase):
17 def test_name(self):
18 name = self.mktemp()
19 dirs = os.path.dirname(name).split(os.sep)[:-1]
20 self.failUnlessEqual(
21 dirs, ['twisted.trial.test.test_util', 'TestMktemp', 'test_name'])
22
23 def test_unique(self):
24 name = self.mktemp()
25 self.failIfEqual(name, self.mktemp())
26
27 def test_created(self):
28 name = self.mktemp()
29 dirname = os.path.dirname(name)
30 self.failUnless(os.path.exists(dirname))
31 self.failIf(os.path.exists(name))
32
33 def test_location(self):
34 path = os.path.abspath(self.mktemp())
35 self.failUnless(path.startswith(os.getcwd()))
36
37
38 class TestIntrospection(TestCase):
39 def test_containers(self):
40 import suppression
41 parents = util.getPythonContainers(
42 suppression.TestSuppression2.testSuppressModule)
43 expected = [suppression.TestSuppression2, suppression]
44 for a, b in zip(parents, expected):
45 self.failUnlessEqual(a, b)
46
47
48 class TestFindObject(packages.SysPathManglingTest):
49 def test_importPackage(self):
50 package1 = util.findObject('package')
51 import package as package2
52 self.failUnlessEqual(package1, (True, package2))
53
54 def test_importModule(self):
55 test_sample2 = util.findObject('goodpackage.test_sample')
56 from goodpackage import test_sample
57 self.failUnlessEqual((True, test_sample), test_sample2)
58
59 def test_importError(self):
60 self.failUnlessRaises(ZeroDivisionError,
61 util.findObject, 'package.test_bad_module')
62
63 def test_sophisticatedImportError(self):
64 self.failUnlessRaises(ImportError,
65 util.findObject, 'package2.test_module')
66
67 def test_importNonexistentPackage(self):
68 self.failUnlessEqual(util.findObject('doesntexist')[0], False)
69
70 def test_findNonexistentModule(self):
71 self.failUnlessEqual(util.findObject('package.doesntexist')[0], False)
72
73 def test_findNonexistentObject(self):
74 self.failUnlessEqual(util.findObject(
75 'goodpackage.test_sample.doesnt')[0], False)
76 self.failUnlessEqual(util.findObject(
77 'goodpackage.test_sample.AlphabetTest.doesntexist')[0], False)
78
79 def test_findObjectExist(self):
80 alpha1 = util.findObject('goodpackage.test_sample.AlphabetTest')
81 from goodpackage import test_sample
82 self.failUnlessEqual(alpha1, (True, test_sample.AlphabetTest))
83
84
85
86 class TestRunSequentially(TestCase):
87 """
88 Sometimes it is useful to be able to run an arbitrary list of callables,
89 one after the other.
90
91 When some of those callables can return Deferreds, things become complex.
92 """
93
94 def test_emptyList(self):
95 """
96 When asked to run an empty list of callables, runSequentially returns a
97 successful Deferred that fires an empty list.
98 """
99 d = util._runSequentially([])
100 d.addCallback(self.assertEqual, [])
101 return d
102
103
104 def test_singleSynchronousSuccess(self):
105 """
106 When given a callable that succeeds without returning a Deferred,
107 include the return value in the results list, tagged with a SUCCESS
108 flag.
109 """
110 d = util._runSequentially([lambda: None])
111 d.addCallback(self.assertEqual, [(defer.SUCCESS, None)])
112 return d
113
114
115 def test_singleSynchronousFailure(self):
116 """
117 When given a callable that raises an exception, include a Failure for
118 that exception in the results list, tagged with a FAILURE flag.
119 """
120 d = util._runSequentially([lambda: self.fail('foo')])
121 def check(results):
122 [(flag, fail)] = results
123 fail.trap(self.failureException)
124 self.assertEqual(fail.getErrorMessage(), 'foo')
125 self.assertEqual(flag, defer.FAILURE)
126 return d.addCallback(check)
127
128
129 def test_singleAsynchronousSuccess(self):
130 """
131 When given a callable that returns a successful Deferred, include the
132 result of the Deferred in the results list, tagged with a SUCCESS flag.
133 """
134 d = util._runSequentially([lambda: defer.succeed(None)])
135 d.addCallback(self.assertEqual, [(defer.SUCCESS, None)])
136 return d
137
138
139 def test_singleAsynchronousFailure(self):
140 """
141 When given a callable that returns a failing Deferred, include the
142 failure the results list, tagged with a FAILURE flag.
143 """
144 d = util._runSequentially([lambda: defer.fail(ValueError('foo'))])
145 def check(results):
146 [(flag, fail)] = results
147 fail.trap(ValueError)
148 self.assertEqual(fail.getErrorMessage(), 'foo')
149 self.assertEqual(flag, defer.FAILURE)
150 return d.addCallback(check)
151
152
153 def test_callablesCalledInOrder(self):
154 """
155 Check that the callables are called in the given order, one after the
156 other.
157 """
158 log = []
159 deferreds = []
160
161 def append(value):
162 d = defer.Deferred()
163 log.append(value)
164 deferreds.append(d)
165 return d
166
167 d = util._runSequentially([lambda: append('foo'),
168 lambda: append('bar')])
169
170 # runSequentially should wait until the Deferred has fired before
171 # running the second callable.
172 self.assertEqual(log, ['foo'])
173 deferreds[-1].callback(None)
174 self.assertEqual(log, ['foo', 'bar'])
175
176 # Because returning created Deferreds makes jml happy.
177 deferreds[-1].callback(None)
178 return d
179
180
181 def test_continuesAfterError(self):
182 """
183 If one of the callables raises an error, then runSequentially continues
184 to run the remaining callables.
185 """
186 d = util._runSequentially([lambda: self.fail('foo'), lambda: 'bar'])
187 def check(results):
188 [(flag1, fail), (flag2, result)] = results
189 fail.trap(self.failureException)
190 self.assertEqual(flag1, defer.FAILURE)
191 self.assertEqual(fail.getErrorMessage(), 'foo')
192 self.assertEqual(flag2, defer.SUCCESS)
193 self.assertEqual(result, 'bar')
194 return d.addCallback(check)
195
196
197 def test_stopOnFirstError(self):
198 """
199 If the C{stopOnFirstError} option is passed to C{runSequentially}, then
200 no further callables are called after the first exception is raised.
201 """
202 d = util._runSequentially([lambda: self.fail('foo'), lambda: 'bar'],
203 stopOnFirstError=True)
204 def check(results):
205 [(flag1, fail)] = results
206 fail.trap(self.failureException)
207 self.assertEqual(flag1, defer.FAILURE)
208 self.assertEqual(fail.getErrorMessage(), 'foo')
209 return d.addCallback(check)
210
211
212 def test_stripFlags(self):
213 """
214 If the C{stripFlags} option is passed to C{runSequentially} then the
215 SUCCESS / FAILURE flags are stripped from the output. Instead, the
216 Deferred fires a flat list of results containing only the results and
217 failures.
218 """
219 d = util._runSequentially([lambda: self.fail('foo'), lambda: 'bar'],
220 stripFlags=True)
221 def check(results):
222 [fail, result] = results
223 fail.trap(self.failureException)
224 self.assertEqual(fail.getErrorMessage(), 'foo')
225 self.assertEqual(result, 'bar')
226 return d.addCallback(check)
227 test_stripFlags.todo = "YAGNI"
228
229
230
231 class DirtyReactorAggregateErrorTest(TestCase):
232 """
233 Tests for the L{DirtyReactorAggregateError}.
234 """
235
236 def test_formatDelayedCall(self):
237 """
238 Delayed calls are formatted nicely.
239 """
240 error = DirtyReactorAggregateError(["Foo", "bar"])
241 self.assertEquals(str(error),
242 """\
243 Reactor was unclean.
244 DelayedCalls: (set twisted.internet.base.DelayedCall.debug = True to debug)
245 Foo
246 bar""")
247
248
249 def test_formatSelectables(self):
250 """
251 Selectables are formatted nicely.
252 """
253 error = DirtyReactorAggregateError([], ["selectable 1", "selectable 2"])
254 self.assertEquals(str(error),
255 """\
256 Reactor was unclean.
257 Selectables:
258 selectable 1
259 selectable 2""")
260
261
262 def test_formatDelayedCallsAndSelectables(self):
263 """
264 Both delayed calls and selectables can appear in the same error.
265 """
266 error = DirtyReactorAggregateError(["bleck", "Boozo"],
267 ["Sel1", "Sel2"])
268 self.assertEquals(str(error),
269 """\
270 Reactor was unclean.
271 DelayedCalls: (set twisted.internet.base.DelayedCall.debug = True to debug)
272 bleck
273 Boozo
274 Selectables:
275 Sel1
276 Sel2""")
277
278
279
280 class StubReactor(object):
281 """
282 A reactor stub which contains enough functionality to be used with the
283 L{_Janitor}.
284
285 @ivar iterations: A list of the arguments passed to L{iterate}.
286 @ivar removeAllCalled: Number of times that L{removeAll} was called.
287 @ivar selectables: The value that will be returned from L{removeAll}.
288 @ivar delayedCalls: The value to return from L{getDelayedCalls}.
289 """
290
291 def __init__(self, delayedCalls, selectables=None):
292 """
293 @param delayedCalls: See L{StubReactor.delayedCalls}.
294 @param selectables: See L{StubReactor.selectables}.
295 """
296 self.delayedCalls = delayedCalls
297 self.iterations = []
298 self.removeAllCalled = 0
299 if not selectables:
300 selectables = []
301 self.selectables = selectables
302
303
304 def iterate(self, timeout=None):
305 """
306 Increment C{self.iterations}.
307 """
308 self.iterations.append(timeout)
309
310
311 def getDelayedCalls(self):
312 """
313 Return C{self.delayedCalls}.
314 """
315 return self.delayedCalls
316
317
318 def removeAll(self):
319 """
320 Increment C{self.removeAllCalled} and return C{self.selectables}.
321 """
322 self.removeAllCalled += 1
323 return self.selectables
324
325
326
327 class StubErrorReporter(object):
328 """
329 A subset of L{twisted.trial.itrial.IReporter} which records L{addError}
330 calls.
331
332 @ivar errors: List of two-tuples of (test, error) which were passed to
333 L{addError}.
334 """
335
336 def __init__(self):
337 self.errors = []
338
339
340 def addError(self, test, error):
341 """
342 Record parameters in C{self.errors}.
343 """
344 self.errors.append((test, error))
345
346
347
348 class JanitorTests(TestCase):
349 """
350 Tests for L{_Janitor}!
351 """
352
353 def test_cleanPendingSpinsReactor(self):
354 """
355 During pending-call cleanup, the reactor will be spun twice with an
356 instant timeout. This is not a requirement, it is only a test for
357 current behavior. Hopefully Trial will eventually not do this kind of
358 reactor stuff.
359 """
360 reactor = StubReactor([])
361 jan = _Janitor(None, None, reactor=reactor)
362 jan._cleanPending()
363 self.assertEquals(reactor.iterations, [0, 0])
364
365
366 def test_cleanPendingCancelsCalls(self):
367 """
368 During pending-call cleanup, the janitor cancels pending timed calls.
369 """
370 def func():
371 return "Lulz"
372 cancelled = []
373 delayedCall = DelayedCall(300, func, (), {},
374 cancelled.append, lambda x: None)
375 reactor = StubReactor([delayedCall])
376 jan = _Janitor(None, None, reactor=reactor)
377 jan._cleanPending()
378 self.assertEquals(cancelled, [delayedCall])
379
380
381 def test_cleanPendingReturnsDelayedCallStrings(self):
382 """
383 The Janitor produces string representations of delayed calls from the
384 delayed call cleanup method. It gets the string representations
385 *before* cancelling the calls; this is important because cancelling the
386 call removes critical debugging information from the string
387 representation.
388 """
389 delayedCall = DelayedCall(300, lambda: None, (), {},
390 lambda x: None, lambda x: None,
391 seconds=lambda: 0)
392 delayedCallString = str(delayedCall)
393 reactor = StubReactor([delayedCall])
394 jan = _Janitor(None, None, reactor=reactor)
395 strings = jan._cleanPending()
396 self.assertEquals(strings, [delayedCallString])
397
398
399 def test_cleanReactorRemovesSelectables(self):
400 """
401 The Janitor will remove selectables during reactor cleanup.
402 """
403 reactor = StubReactor([])
404 jan = _Janitor(None, None, reactor=reactor)
405 jan._cleanReactor()
406 self.assertEquals(reactor.removeAllCalled, 1)
407
408
409 def test_cleanReactorKillsProcesses(self):
410 """
411 The Janitor will kill processes during reactor cleanup.
412 """
413 class StubProcessTransport(object):
414 """
415 A stub L{IProcessTransport} provider which records signals.
416 @ivar signals: The signals passed to L{signalProcess}.
417 """
418 implements(IProcessTransport)
419
420 def __init__(self):
421 self.signals = []
422
423 def signalProcess(self, signal):
424 """
425 Append C{signal} to C{self.signals}.
426 """
427 self.signals.append(signal)
428
429 pt = StubProcessTransport()
430 reactor = StubReactor([], [pt])
431 jan = _Janitor(None, None, reactor=reactor)
432 jan._cleanReactor()
433 self.assertEquals(pt.signals, ["KILL"])
434
435
436 def test_cleanReactorReturnsSelectableStrings(self):
437 """
438 The Janitor returns string representations of the selectables that it
439 cleaned up from the reactor cleanup method.
440 """
441 class Selectable(object):
442 """
443 A stub Selectable which only has an interesting string
444 representation.
445 """
446 def __repr__(self):
447 return "(SELECTABLE!)"
448
449 reactor = StubReactor([], [Selectable()])
450 jan = _Janitor(None, None, reactor=reactor)
451 self.assertEquals(jan._cleanReactor(), ["(SELECTABLE!)"])
452
453
454 def test_postCaseCleanupNoErrors(self):
455 """
456 The post-case cleanup method will return True and not call C{addError}
457 on the result if there are no pending calls.
458 """
459 reactor = StubReactor([])
460 test = object()
461 reporter = StubErrorReporter()
462 jan = _Janitor(test, reporter, reactor=reactor)
463 self.assertTrue(jan.postCaseCleanup())
464 self.assertEquals(reporter.errors, [])
465
466
467 def test_postCaseCleanupWithErrors(self):
468 """
469 The post-case cleanup method will return False and call C{addError} on
470 the result with a L{DirtyReactorAggregateError} Failure if there are
471 pending calls.
472 """
473 delayedCall = DelayedCall(300, lambda: None, (), {},
474 lambda x: None, lambda x: None,
475 seconds=lambda: 0)
476 delayedCallString = str(delayedCall)
477 reactor = StubReactor([delayedCall], [])
478 test = object()
479 reporter = StubErrorReporter()
480 jan = _Janitor(test, reporter, reactor=reactor)
481 self.assertFalse(jan.postCaseCleanup())
482 self.assertEquals(len(reporter.errors), 1)
483 self.assertEquals(reporter.errors[0][1].value.delayedCalls,
484 [delayedCallString])
485
486
487 def test_postClassCleanupNoErrors(self):
488 """
489 The post-class cleanup method will not call C{addError} on the result
490 if there are no pending calls or selectables.
491 """
492 reactor = StubReactor([])
493 test = object()
494 reporter = StubErrorReporter()
495 jan = _Janitor(test, reporter, reactor=reactor)
496 jan.postClassCleanup()
497 self.assertEquals(reporter.errors, [])
498
499
500 def test_postClassCleanupWithPendingCallErrors(self):
501 """
502 The post-class cleanup method call C{addError} on the result with a
503 L{DirtyReactorAggregateError} Failure if there are pending calls.
504 """
505 delayedCall = DelayedCall(300, lambda: None, (), {},
506 lambda x: None, lambda x: None,
507 seconds=lambda: 0)
508 delayedCallString = str(delayedCall)
509 reactor = StubReactor([delayedCall], [])
510 test = object()
511 reporter = StubErrorReporter()
512 jan = _Janitor(test, reporter, reactor=reactor)
513 jan.postClassCleanup()
514 self.assertEquals(len(reporter.errors), 1)
515 self.assertEquals(reporter.errors[0][1].value.delayedCalls,
516 [delayedCallString])
517
518
519 def test_postClassCleanupWithSelectableErrors(self):
520 """
521 The post-class cleanup method call C{addError} on the result with a
522 L{DirtyReactorAggregateError} Failure if there are selectables.
523 """
524 selectable = "SELECTABLE HERE"
525 reactor = StubReactor([], [selectable])
526 test = object()
527 reporter = StubErrorReporter()
528 jan = _Janitor(test, reporter, reactor=reactor)
529 jan.postClassCleanup()
530 self.assertEquals(len(reporter.errors), 1)
531 self.assertEquals(reporter.errors[0][1].value.selectables,
532 [repr(selectable)])
533
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/trial/test/test_tests.py ('k') | third_party/twisted_8_1/twisted/trial/test/weird.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698