Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(46)

Side by Side Diff: third_party/twisted_8_1/twisted/test/test_application.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # Copyright (c) 2001-2007 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 import sys, copy, os, pickle, warnings
5 from StringIO import StringIO
6
7
8 from twisted.trial import unittest, util
9 from twisted.application import service, internet, app
10 from twisted.persisted import sob
11 from twisted.python import usage
12 from twisted.python.util import sibpath
13 from twisted.internet import interfaces, defer
14 from twisted.protocols import wire, basic
15 from twisted.internet import protocol, reactor
16 from twisted.internet.utils import getProcessOutputAndValue
17 from twisted.application import reactors
18
19 try:
20 from twisted.web import microdom
21 gotMicrodom = True
22 except ImportError:
23 warnings.warn("Not testing xml persistence as twisted.web.microdom "
24 "not available")
25 gotMicrodom = False
26
27
28 oldAppSuppressions = [util.suppress(message='twisted.internet.app is deprecated' ,
29 category=DeprecationWarning)]
30
31 class Dummy:
32 processName=None
33
34 class TestService(unittest.TestCase):
35
36 def testName(self):
37 s = service.Service()
38 s.setName("hello")
39 self.failUnlessEqual(s.name, "hello")
40
41 def testParent(self):
42 s = service.Service()
43 p = service.MultiService()
44 s.setServiceParent(p)
45 self.failUnlessEqual(list(p), [s])
46 self.failUnlessEqual(s.parent, p)
47
48 def testApplicationAsParent(self):
49 s = service.Service()
50 p = service.Application("")
51 s.setServiceParent(p)
52 self.failUnlessEqual(list(service.IServiceCollection(p)), [s])
53 self.failUnlessEqual(s.parent, service.IServiceCollection(p))
54
55 def testNamedChild(self):
56 s = service.Service()
57 p = service.MultiService()
58 s.setName("hello")
59 s.setServiceParent(p)
60 self.failUnlessEqual(list(p), [s])
61 self.failUnlessEqual(s.parent, p)
62 self.failUnlessEqual(p.getServiceNamed("hello"), s)
63
64 def testDoublyNamedChild(self):
65 s = service.Service()
66 p = service.MultiService()
67 s.setName("hello")
68 s.setServiceParent(p)
69 self.failUnlessRaises(RuntimeError, s.setName, "lala")
70
71 def testDuplicateNamedChild(self):
72 s = service.Service()
73 p = service.MultiService()
74 s.setName("hello")
75 s.setServiceParent(p)
76 s = service.Service()
77 s.setName("hello")
78 self.failUnlessRaises(RuntimeError, s.setServiceParent, p)
79
80 def testDisowning(self):
81 s = service.Service()
82 p = service.MultiService()
83 s.setServiceParent(p)
84 self.failUnlessEqual(list(p), [s])
85 self.failUnlessEqual(s.parent, p)
86 s.disownServiceParent()
87 self.failUnlessEqual(list(p), [])
88 self.failUnlessEqual(s.parent, None)
89
90 def testRunning(self):
91 s = service.Service()
92 self.assert_(not s.running)
93 s.startService()
94 self.assert_(s.running)
95 s.stopService()
96 self.assert_(not s.running)
97
98 def testRunningChildren1(self):
99 s = service.Service()
100 p = service.MultiService()
101 s.setServiceParent(p)
102 self.assert_(not s.running)
103 self.assert_(not p.running)
104 p.startService()
105 self.assert_(s.running)
106 self.assert_(p.running)
107 p.stopService()
108 self.assert_(not s.running)
109 self.assert_(not p.running)
110
111 def testRunningChildren2(self):
112 s = service.Service()
113 def checkRunning():
114 self.assert_(s.running)
115 t = service.Service()
116 t.stopService = checkRunning
117 t.startService = checkRunning
118 p = service.MultiService()
119 s.setServiceParent(p)
120 t.setServiceParent(p)
121 p.startService()
122 p.stopService()
123
124 def testAddingIntoRunning(self):
125 p = service.MultiService()
126 p.startService()
127 s = service.Service()
128 self.assert_(not s.running)
129 s.setServiceParent(p)
130 self.assert_(s.running)
131 s.disownServiceParent()
132 self.assert_(not s.running)
133
134 def testPrivileged(self):
135 s = service.Service()
136 def pss():
137 s.privilegedStarted = 1
138 s.privilegedStartService = pss
139 s1 = service.Service()
140 p = service.MultiService()
141 s.setServiceParent(p)
142 s1.setServiceParent(p)
143 p.privilegedStartService()
144 self.assert_(s.privilegedStarted)
145
146 def testCopying(self):
147 s = service.Service()
148 s.startService()
149 s1 = copy.copy(s)
150 self.assert_(not s1.running)
151 self.assert_(s.running)
152
153
154 if hasattr(os, "getuid"):
155 curuid = os.getuid()
156 curgid = os.getgid()
157 else:
158 curuid = curgid = 0
159
160
161 class TestProcess(unittest.TestCase):
162
163 def testID(self):
164 p = service.Process(5, 6)
165 self.assertEqual(p.uid, 5)
166 self.assertEqual(p.gid, 6)
167
168 def testDefaults(self):
169 p = service.Process(5)
170 self.assertEqual(p.uid, 5)
171 self.assertEqual(p.gid, None)
172 p = service.Process(gid=5)
173 self.assertEqual(p.uid, None)
174 self.assertEqual(p.gid, 5)
175 p = service.Process()
176 self.assertEqual(p.uid, None)
177 self.assertEqual(p.gid, None)
178
179 def testProcessName(self):
180 p = service.Process()
181 self.assertEqual(p.processName, None)
182 p.processName = 'hello'
183 self.assertEqual(p.processName, 'hello')
184
185
186 class TestInterfaces(unittest.TestCase):
187
188 def testService(self):
189 self.assert_(service.IService.providedBy(service.Service()))
190
191 def testMultiService(self):
192 self.assert_(service.IService.providedBy(service.MultiService()))
193 self.assert_(service.IServiceCollection.providedBy(service.MultiService( )))
194
195 def testProcess(self):
196 self.assert_(service.IProcess.providedBy(service.Process()))
197
198
199 class TestApplication(unittest.TestCase):
200
201 def testConstructor(self):
202 service.Application("hello")
203 service.Application("hello", 5)
204 service.Application("hello", 5, 6)
205
206 def testProcessComponent(self):
207 a = service.Application("hello")
208 self.assertEqual(service.IProcess(a).uid, None)
209 self.assertEqual(service.IProcess(a).gid, None)
210 a = service.Application("hello", 5)
211 self.assertEqual(service.IProcess(a).uid, 5)
212 self.assertEqual(service.IProcess(a).gid, None)
213 a = service.Application("hello", 5, 6)
214 self.assertEqual(service.IProcess(a).uid, 5)
215 self.assertEqual(service.IProcess(a).gid, 6)
216
217 def testServiceComponent(self):
218 a = service.Application("hello")
219 self.assert_(service.IService(a) is service.IServiceCollection(a))
220 self.assertEqual(service.IService(a).name, "hello")
221 self.assertEqual(service.IService(a).parent, None)
222
223 def testPersistableComponent(self):
224 a = service.Application("hello")
225 p = sob.IPersistable(a)
226 self.assertEqual(p.style, 'pickle')
227 self.assertEqual(p.name, 'hello')
228 self.assert_(p.original is a)
229
230 class TestLoading(unittest.TestCase):
231
232 def test_simpleStoreAndLoad(self):
233 a = service.Application("hello")
234 p = sob.IPersistable(a)
235 for style in 'xml source pickle'.split():
236 if style == 'xml' and not gotMicrodom:
237 continue
238 p.setStyle(style)
239 p.save()
240 a1 = service.loadApplication("hello.ta"+style[0], style)
241 self.assertEqual(service.IService(a1).name, "hello")
242 open("hello.tac", 'w').writelines([
243 "from twisted.application import service\n",
244 "application = service.Application('hello')\n",
245 ])
246 a1 = service.loadApplication("hello.tac", 'python')
247 self.assertEqual(service.IService(a1).name, "hello")
248
249
250
251 class TestAppSupport(unittest.TestCase):
252
253 def testPassphrase(self):
254 self.assertEqual(app.getPassphrase(0), None)
255
256 def testLoadApplication(self):
257 """
258 Test loading an application file in different dump format.
259 """
260 a = service.Application("hello")
261 baseconfig = {'file': None, 'xml': None, 'source': None, 'python':None}
262 for style in 'source xml pickle'.split():
263 if style == 'xml' and not gotMicrodom:
264 continue
265 config = baseconfig.copy()
266 config[{'pickle': 'file'}.get(style, style)] = 'helloapplication'
267 sob.IPersistable(a).setStyle(style)
268 sob.IPersistable(a).save(filename='helloapplication')
269 a1 = app.getApplication(config, None)
270 self.assertEqual(service.IService(a1).name, "hello")
271 config = baseconfig.copy()
272 config['python'] = 'helloapplication'
273 open("helloapplication", 'w').writelines([
274 "from twisted.application import service\n",
275 "application = service.Application('hello')\n",
276 ])
277 a1 = app.getApplication(config, None)
278 self.assertEqual(service.IService(a1).name, "hello")
279
280 testLoadApplication.suppress = [
281 util.suppress(message="twisted.persisted.marmalade is deprecated",
282 category=DeprecationWarning)]
283
284 def test_convertStyle(self):
285 appl = service.Application("lala")
286 for instyle in 'xml source pickle'.split():
287 if instyle == 'xml' and not gotMicrodom:
288 continue
289 for outstyle in 'xml source pickle'.split():
290 if outstyle == 'xml' and not gotMicrodom:
291 continue
292 sob.IPersistable(appl).setStyle(instyle)
293 sob.IPersistable(appl).save(filename="converttest")
294 app.convertStyle("converttest", instyle, None,
295 "converttest.out", outstyle, 0)
296 appl2 = service.loadApplication("converttest.out", outstyle)
297 self.assertEqual(service.IService(appl2).name, "lala")
298
299 def test_getLogFile(self):
300 """
301 Test L{app.getLogFile}, veryfying the LogFile instance it returns.
302 """
303 os.mkdir("logfiledir")
304 l = app.getLogFile(os.path.join("logfiledir", "lala"))
305 self.assertEqual(l.path,
306 os.path.abspath(os.path.join("logfiledir", "lala")))
307 self.assertEqual(l.name, "lala")
308 self.assertEqual(l.directory, os.path.abspath("logfiledir"))
309
310 test_getLogFile.suppress = [
311 util.suppress(message="app.getLogFile is deprecated. Use "
312 "twisted.python.logfile.LogFile.fromFullPath instead",
313 category=DeprecationWarning)]
314
315 def test_startApplication(self):
316 appl = service.Application("lala")
317 app.startApplication(appl, 0)
318 self.assert_(service.IService(appl).running)
319
320
321 class Foo(basic.LineReceiver):
322 def connectionMade(self):
323 self.transport.write('lalala\r\n')
324 def lineReceived(self, line):
325 self.factory.line = line
326 self.transport.loseConnection()
327 def connectionLost(self, reason):
328 self.factory.d.callback(self.factory.line)
329
330
331 class DummyApp:
332 processName = None
333 def addService(self, service):
334 self.services[service.name] = service
335 def removeService(self, service):
336 del self.services[service.name]
337
338
339 class TimerTarget:
340 def __init__(self):
341 self.l = []
342 def append(self, what):
343 self.l.append(what)
344
345 class TestEcho(wire.Echo):
346 def connectionLost(self, reason):
347 self.d.callback(True)
348
349 class TestInternet2(unittest.TestCase):
350
351 def testTCP(self):
352 s = service.MultiService()
353 s.startService()
354 factory = protocol.ServerFactory()
355 factory.protocol = TestEcho
356 TestEcho.d = defer.Deferred()
357 t = internet.TCPServer(0, factory)
358 t.setServiceParent(s)
359 num = t._port.getHost().port
360 factory = protocol.ClientFactory()
361 factory.d = defer.Deferred()
362 factory.protocol = Foo
363 factory.line = None
364 internet.TCPClient('127.0.0.1', num, factory).setServiceParent(s)
365 factory.d.addCallback(self.assertEqual, 'lalala')
366 factory.d.addCallback(lambda x : s.stopService())
367 factory.d.addCallback(lambda x : TestEcho.d)
368 return factory.d
369
370
371 def test_UDP(self):
372 """
373 Test L{internet.UDPServer} with a random port: starting the service
374 should give it valid port, and stopService should free it so that we
375 can start a server on the same port again.
376 """
377 if not interfaces.IReactorUDP(reactor, None):
378 raise unittest.SkipTest("This reactor does not support UDP sockets")
379 p = protocol.DatagramProtocol()
380 t = internet.UDPServer(0, p)
381 t.startService()
382 num = t._port.getHost().port
383 self.assertNotEquals(num, 0)
384 def onStop(ignored):
385 t = internet.UDPServer(num, p)
386 t.startService()
387 return t.stopService()
388 return defer.maybeDeferred(t.stopService).addCallback(onStop)
389
390
391 def testPrivileged(self):
392 factory = protocol.ServerFactory()
393 factory.protocol = TestEcho
394 TestEcho.d = defer.Deferred()
395 t = internet.TCPServer(0, factory)
396 t.privileged = 1
397 t.privilegedStartService()
398 num = t._port.getHost().port
399 factory = protocol.ClientFactory()
400 factory.d = defer.Deferred()
401 factory.protocol = Foo
402 factory.line = None
403 c = internet.TCPClient('127.0.0.1', num, factory)
404 c.startService()
405 factory.d.addCallback(self.assertEqual, 'lalala')
406 factory.d.addCallback(lambda x : c.stopService())
407 factory.d.addCallback(lambda x : t.stopService())
408 factory.d.addCallback(lambda x : TestEcho.d)
409 return factory.d
410
411 def testConnectionGettingRefused(self):
412 factory = protocol.ServerFactory()
413 factory.protocol = wire.Echo
414 t = internet.TCPServer(0, factory)
415 t.startService()
416 num = t._port.getHost().port
417 t.stopService()
418 d = defer.Deferred()
419 factory = protocol.ClientFactory()
420 factory.clientConnectionFailed = lambda *args: d.callback(None)
421 c = internet.TCPClient('127.0.0.1', num, factory)
422 c.startService()
423 return d
424
425 def testUNIX(self):
426 # FIXME: This test is far too dense. It needs comments.
427 # -- spiv, 2004-11-07
428 if not interfaces.IReactorUNIX(reactor, None):
429 raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
430 s = service.MultiService()
431 s.startService()
432 factory = protocol.ServerFactory()
433 factory.protocol = TestEcho
434 TestEcho.d = defer.Deferred()
435 t = internet.UNIXServer('echo.skt', factory)
436 t.setServiceParent(s)
437 factory = protocol.ClientFactory()
438 factory.protocol = Foo
439 factory.d = defer.Deferred()
440 factory.line = None
441 internet.UNIXClient('echo.skt', factory).setServiceParent(s)
442 factory.d.addCallback(self.assertEqual, 'lalala')
443 factory.d.addCallback(lambda x : s.stopService())
444 factory.d.addCallback(lambda x : TestEcho.d)
445 factory.d.addCallback(self._cbTestUnix, factory, s)
446 return factory.d
447
448 def _cbTestUnix(self, ignored, factory, s):
449 TestEcho.d = defer.Deferred()
450 factory.line = None
451 factory.d = defer.Deferred()
452 s.startService()
453 factory.d.addCallback(self.assertEqual, 'lalala')
454 factory.d.addCallback(lambda x : s.stopService())
455 factory.d.addCallback(lambda x : TestEcho.d)
456 return factory.d
457
458 def testVolatile(self):
459 if not interfaces.IReactorUNIX(reactor, None):
460 raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
461 factory = protocol.ServerFactory()
462 factory.protocol = wire.Echo
463 t = internet.UNIXServer('echo.skt', factory)
464 t.startService()
465 self.failIfIdentical(t._port, None)
466 t1 = copy.copy(t)
467 self.assertIdentical(t1._port, None)
468 t.stopService()
469 self.assertIdentical(t._port, None)
470 self.failIf(t.running)
471
472 factory = protocol.ClientFactory()
473 factory.protocol = wire.Echo
474 t = internet.UNIXClient('echo.skt', factory)
475 t.startService()
476 self.failIfIdentical(t._connection, None)
477 t1 = copy.copy(t)
478 self.assertIdentical(t1._connection, None)
479 t.stopService()
480 self.assertIdentical(t._connection, None)
481 self.failIf(t.running)
482
483 def testStoppingServer(self):
484 if not interfaces.IReactorUNIX(reactor, None):
485 raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
486 factory = protocol.ServerFactory()
487 factory.protocol = wire.Echo
488 t = internet.UNIXServer('echo.skt', factory)
489 t.startService()
490 t.stopService()
491 self.failIf(t.running)
492 factory = protocol.ClientFactory()
493 d = defer.Deferred()
494 factory.clientConnectionFailed = lambda *args: d.callback(None)
495 reactor.connectUNIX('echo.skt', factory)
496 return d
497
498 def testPickledTimer(self):
499 target = TimerTarget()
500 t0 = internet.TimerService(1, target.append, "hello")
501 t0.startService()
502 s = pickle.dumps(t0)
503 t0.stopService()
504
505 t = pickle.loads(s)
506 self.failIf(t.running)
507
508 def testBrokenTimer(self):
509 d = defer.Deferred()
510 t = internet.TimerService(1, lambda: 1 / 0)
511 oldFailed = t._failed
512 def _failed(why):
513 oldFailed(why)
514 d.callback(None)
515 t._failed = _failed
516 t.startService()
517 d.addCallback(lambda x : t.stopService)
518 d.addCallback(lambda x : self.assertEqual(
519 [ZeroDivisionError],
520 [o.value.__class__ for o in self.flushLoggedErrors(ZeroDivisionError )]))
521 return d
522
523 def testEverythingThere(self):
524 trans = 'TCP UNIX SSL UDP UNIXDatagram Multicast'.split()
525 for tran in trans[:]:
526 if not getattr(interfaces, "IReactor"+tran)(reactor, None):
527 trans.remove(tran)
528 if interfaces.IReactorArbitrary(reactor, None) is not None:
529 trans.insert(0, "Generic")
530 for tran in trans:
531 for side in 'Server Client'.split():
532 if tran == "Multicast" and side == "Client":
533 continue
534 self.assert_(hasattr(internet, tran+side))
535 method = getattr(internet, tran+side).method
536 prefix = {'Server': 'listen', 'Client': 'connect'}[side]
537 self.assert_(hasattr(reactor, prefix+method) or
538 (prefix == "connect" and method == "UDP"))
539 o = getattr(internet, tran+side)()
540 self.assertEqual(service.IService(o), o)
541
542
543 def test_reactorParametrizationInServer(self):
544 """
545 L{internet._AbstractServer} supports a C{reactor} keyword argument
546 that can be used to parametrize the reactor used to listen for
547 connections.
548 """
549 listen = []
550 class FakeReactor(object):
551 def listenTCP(self, portNumber, factory):
552 listen.append((portNumber, factory))
553 reactor = FakeReactor()
554
555 factory = object()
556 t = internet.TCPServer(1234, factory, reactor=reactor)
557 t.startService()
558 self.assertEquals(listen, [(1234, factory)])
559
560
561 def test_reactorParametrizationInClient(self):
562 """
563 L{internet._AbstractClient} supports a C{reactor} keyword arguments
564 that can be used to parametrize the reactor used to create new client
565 connections.
566 """
567 connect = []
568 class FakeReactor(object):
569 def connectTCP(self, ip, portNumber, factory):
570 connect.append((ip, portNumber, factory))
571 reactor = FakeReactor()
572
573 factory = object()
574 t = internet.TCPClient('127.0.0.1', 1234, factory, reactor=reactor)
575 t.startService()
576 self.assertEquals(connect, [('127.0.0.1', 1234, factory)])
577
578
579 def test_reactorParametrizationInServerMultipleStart(self):
580 """
581 Like L{test_reactorParametrizationInServer}, but stop and restart the
582 service and check that the given reactor is still used.
583 """
584 listen = []
585 class FakeReactor(object):
586 def listenTCP(self, portNumber, factory):
587 listen.append((portNumber, factory))
588 reactor = FakeReactor()
589
590 factory = object()
591 t = internet.TCPServer(1234, factory, reactor=reactor)
592 t.startService()
593 self.assertEquals(listen, [(1234, factory)])
594 t.stopService()
595 t.startService()
596 self.assertEquals(listen, [(1234, factory), (1234, factory)])
597
598
599 def test_reactorParametrizationInClientMultipleStart(self):
600 """
601 Like L{test_reactorParametrizationInClient}, but stop and restart the
602 service and check that the given reactor is still used.
603 """
604 connect = []
605 class FakeReactor(object):
606 def connectTCP(self, ip, portNumber, factory):
607 connect.append((ip, portNumber, factory))
608 reactor = FakeReactor()
609
610 factory = object()
611 t = internet.TCPClient('127.0.0.1', 1234, factory, reactor=reactor)
612 t.startService()
613 self.assertEquals(connect, [('127.0.0.1', 1234, factory)])
614 t.stopService()
615 t.startService()
616 self.assertEquals(connect, [('127.0.0.1', 1234, factory),
617 ('127.0.0.1', 1234, factory)])
618
619
620
621 class TestTimerBasic(unittest.TestCase):
622
623 def testTimerRuns(self):
624 d = defer.Deferred()
625 self.t = internet.TimerService(1, d.callback, 'hello')
626 self.t.startService()
627 d.addCallback(self.assertEqual, 'hello')
628 d.addCallback(lambda x : self.t.stopService())
629 d.addCallback(lambda x : self.failIf(self.t.running))
630 return d
631
632 def tearDown(self):
633 return self.t.stopService()
634
635 def testTimerRestart(self):
636 # restart the same TimerService
637 d1 = defer.Deferred()
638 d2 = defer.Deferred()
639 work = [(d2, "bar"), (d1, "foo")]
640 def trigger():
641 d, arg = work.pop()
642 d.callback(arg)
643 self.t = internet.TimerService(1, trigger)
644 self.t.startService()
645 def onFirstResult(result):
646 self.assertEqual(result, 'foo')
647 return self.t.stopService()
648 def onFirstStop(ignored):
649 self.failIf(self.t.running)
650 self.t.startService()
651 return d2
652 def onSecondResult(result):
653 self.assertEqual(result, 'bar')
654 self.t.stopService()
655 d1.addCallback(onFirstResult)
656 d1.addCallback(onFirstStop)
657 d1.addCallback(onSecondResult)
658 return d1
659
660 def testTimerLoops(self):
661 l = []
662 def trigger(data, number, d):
663 l.append(data)
664 if len(l) == number:
665 d.callback(l)
666 d = defer.Deferred()
667 self.t = internet.TimerService(0.01, trigger, "hello", 10, d)
668 self.t.startService()
669 d.addCallback(self.assertEqual, ['hello'] * 10)
670 d.addCallback(lambda x : self.t.stopService())
671 return d
672
673
674 class FakeReactor(reactors.Reactor):
675 """
676 A fake reactor with a hooked install method.
677 """
678
679 def __init__(self, install, *args, **kwargs):
680 """
681 @param install: any callable that will be used as install method.
682 @type install: C{callable}
683 """
684 reactors.Reactor.__init__(self, *args, **kwargs)
685 self.install = install
686
687
688
689 class PluggableReactorTestCase(unittest.TestCase):
690 """
691 Tests for the reactor discovery/inspection APIs.
692 """
693
694 def setUp(self):
695 """
696 Override the L{reactors.getPlugins} function, normally bound to
697 L{twisted.plugin.getPlugins}, in order to control which
698 L{IReactorInstaller} plugins are seen as available.
699
700 C{self.pluginResults} can be customized and will be used as the
701 result of calls to C{reactors.getPlugins}.
702 """
703 self.pluginCalls = []
704 self.pluginResults = []
705 self.originalFunction = reactors.getPlugins
706 reactors.getPlugins = self._getPlugins
707
708
709 def tearDown(self):
710 """
711 Restore the original L{reactors.getPlugins}.
712 """
713 reactors.getPlugins = self.originalFunction
714
715
716 def _getPlugins(self, interface, package=None):
717 """
718 Stand-in for the real getPlugins method which records its arguments
719 and returns a fixed result.
720 """
721 self.pluginCalls.append((interface, package))
722 return list(self.pluginResults)
723
724
725 def test_getPluginReactorTypes(self):
726 """
727 Test that reactor plugins are returned from L{getReactorTypes}
728 """
729 name = 'fakereactortest'
730 package = __name__ + '.fakereactor'
731 description = 'description'
732 self.pluginResults = [reactors.Reactor(name, package, description)]
733 reactorTypes = reactors.getReactorTypes()
734
735 self.assertEqual(
736 self.pluginCalls,
737 [(reactors.IReactorInstaller, None)])
738
739 for r in reactorTypes:
740 if r.shortName == name:
741 self.assertEqual(r.description, description)
742 break
743 else:
744 self.fail("Reactor plugin not present in getReactorTypes() result")
745
746
747 def test_reactorInstallation(self):
748 """
749 Test that L{reactors.Reactor.install} loads the correct module and
750 calls its install attribute.
751 """
752 installed = []
753 def install():
754 installed.append(True)
755 installer = FakeReactor(install,
756 'fakereactortest', __name__, 'described')
757 installer.install()
758 self.assertEqual(installed, [True])
759
760
761 def test_installReactor(self):
762 """
763 Test that the L{reactors.installReactor} function correctly installs
764 the specified reactor.
765 """
766 installed = []
767 def install():
768 installed.append(True)
769 name = 'fakereactortest'
770 package = __name__
771 description = 'description'
772 self.pluginResults = [FakeReactor(install, name, package, description)]
773 reactors.installReactor(name)
774 self.assertEqual(installed, [True])
775
776
777 def test_installNonExistentReactor(self):
778 """
779 Test that L{reactors.installReactor} raises L{reactors.NoSuchReactor}
780 when asked to install a reactor which it cannot find.
781 """
782 self.pluginResults = []
783 self.assertRaises(
784 reactors.NoSuchReactor,
785 reactors.installReactor, 'somereactor')
786
787
788 def test_installNotAvailableReactor(self):
789 """
790 Test that L{reactors.installReactor} raises an exception when asked to
791 install a reactor which doesn't work in this environment.
792 """
793 def install():
794 raise ImportError("Missing foo bar")
795 name = 'fakereactortest'
796 package = __name__
797 description = 'description'
798 self.pluginResults = [FakeReactor(install, name, package, description)]
799 self.assertRaises(ImportError, reactors.installReactor, name)
800
801
802 def test_reactorSelectionMixin(self):
803 """
804 Test that the reactor selected is installed as soon as possible, ie
805 when the option is parsed.
806 """
807 executed = []
808 INSTALL_EVENT = 'reactor installed'
809 SUBCOMMAND_EVENT = 'subcommands loaded'
810
811 class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin):
812 def subCommands(self):
813 executed.append(SUBCOMMAND_EVENT)
814 return [('subcommand', None, lambda: self, 'test subcommand')]
815 subCommands = property(subCommands)
816
817 def install():
818 executed.append(INSTALL_EVENT)
819 self.pluginResults = [
820 FakeReactor(install, 'fakereactortest', __name__, 'described')
821 ]
822
823 options = ReactorSelectionOptions()
824 options.parseOptions(['--reactor', 'fakereactortest', 'subcommand'])
825 self.assertEqual(executed[0], INSTALL_EVENT)
826 self.assertEqual(executed.count(INSTALL_EVENT), 1)
827
828
829 def test_reactorSelectionMixinNonExistent(self):
830 """
831 Test that the usage mixin exits when trying to use a non existent
832 reactor (the name not matching to any reactor), giving an error
833 message.
834 """
835 class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin):
836 pass
837 self.pluginResults = []
838
839 options = ReactorSelectionOptions()
840 options.messageOutput = StringIO()
841 e = self.assertRaises(usage.UsageError, options.parseOptions,
842 ['--reactor', 'fakereactortest', 'subcommand'])
843 self.assertIn("fakereactortest", e.args[0])
844 self.assertIn("help-reactors", e.args[0])
845
846
847 def test_reactorSelectionMixinNotAvailable(self):
848 """
849 Test that the usage mixin exits when trying to use a reactor not
850 available (the reactor raises an error at installation), giving an
851 error message.
852 """
853 class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin):
854 pass
855 message = "Missing foo bar"
856 def install():
857 raise ImportError(message)
858
859 name = 'fakereactortest'
860 package = __name__
861 description = 'description'
862 self.pluginResults = [FakeReactor(install, name, package, description)]
863
864 options = ReactorSelectionOptions()
865 options.messageOutput = StringIO()
866 e = self.assertRaises(usage.UsageError, options.parseOptions,
867 ['--reactor', 'fakereactortest', 'subcommand'])
868 self.assertIn(message, e.args[0])
869 self.assertIn("help-reactors", e.args[0])
870
871
872 def test_qtStub(self):
873 """
874 Test that installing qtreactor when it's absent fails properly.
875 """
876 scriptPath = sibpath(__file__, "app_qtstub.py")
877 def _checkOutput((output, err, code)):
878 self.failIf(output, output)
879 result = getProcessOutputAndValue(
880 sys.executable,
881 args=(sys.executable, scriptPath),
882 env=None)
883 result.addCallback(_checkOutput)
884 return result
885
886
887
888 class ReportProfileTestCase(unittest.TestCase):
889 """
890 Tests for L{app.reportProfile}.
891 """
892
893 def test_deprecation(self):
894 """
895 Check that L{app.reportProfile} prints a warning and does nothing else.
896 """
897 self.assertWarns(DeprecationWarning,
898 "reportProfile is deprecated and a no-op since Twisted 8.0.",
899 app.__file__, app.reportProfile, None, None)
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/test/test_amp.py ('k') | third_party/twisted_8_1/twisted/test/test_assertions.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698