OLD | NEW |
| (Empty) |
1 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 import sys, copy, os, pickle, warnings | |
5 from StringIO import StringIO | |
6 | |
7 | |
8 from twisted.trial import unittest, util | |
9 from twisted.application import service, internet, app | |
10 from twisted.persisted import sob | |
11 from twisted.python import usage | |
12 from twisted.python.util import sibpath | |
13 from twisted.internet import interfaces, defer | |
14 from twisted.protocols import wire, basic | |
15 from twisted.internet import protocol, reactor | |
16 from twisted.internet.utils import getProcessOutputAndValue | |
17 from twisted.application import reactors | |
18 | |
19 try: | |
20 from twisted.web import microdom | |
21 gotMicrodom = True | |
22 except ImportError: | |
23 warnings.warn("Not testing xml persistence as twisted.web.microdom " | |
24 "not available") | |
25 gotMicrodom = False | |
26 | |
27 | |
28 oldAppSuppressions = [util.suppress(message='twisted.internet.app is deprecated'
, | |
29 category=DeprecationWarning)] | |
30 | |
31 class Dummy: | |
32 processName=None | |
33 | |
34 class TestService(unittest.TestCase): | |
35 | |
36 def testName(self): | |
37 s = service.Service() | |
38 s.setName("hello") | |
39 self.failUnlessEqual(s.name, "hello") | |
40 | |
41 def testParent(self): | |
42 s = service.Service() | |
43 p = service.MultiService() | |
44 s.setServiceParent(p) | |
45 self.failUnlessEqual(list(p), [s]) | |
46 self.failUnlessEqual(s.parent, p) | |
47 | |
48 def testApplicationAsParent(self): | |
49 s = service.Service() | |
50 p = service.Application("") | |
51 s.setServiceParent(p) | |
52 self.failUnlessEqual(list(service.IServiceCollection(p)), [s]) | |
53 self.failUnlessEqual(s.parent, service.IServiceCollection(p)) | |
54 | |
55 def testNamedChild(self): | |
56 s = service.Service() | |
57 p = service.MultiService() | |
58 s.setName("hello") | |
59 s.setServiceParent(p) | |
60 self.failUnlessEqual(list(p), [s]) | |
61 self.failUnlessEqual(s.parent, p) | |
62 self.failUnlessEqual(p.getServiceNamed("hello"), s) | |
63 | |
64 def testDoublyNamedChild(self): | |
65 s = service.Service() | |
66 p = service.MultiService() | |
67 s.setName("hello") | |
68 s.setServiceParent(p) | |
69 self.failUnlessRaises(RuntimeError, s.setName, "lala") | |
70 | |
71 def testDuplicateNamedChild(self): | |
72 s = service.Service() | |
73 p = service.MultiService() | |
74 s.setName("hello") | |
75 s.setServiceParent(p) | |
76 s = service.Service() | |
77 s.setName("hello") | |
78 self.failUnlessRaises(RuntimeError, s.setServiceParent, p) | |
79 | |
80 def testDisowning(self): | |
81 s = service.Service() | |
82 p = service.MultiService() | |
83 s.setServiceParent(p) | |
84 self.failUnlessEqual(list(p), [s]) | |
85 self.failUnlessEqual(s.parent, p) | |
86 s.disownServiceParent() | |
87 self.failUnlessEqual(list(p), []) | |
88 self.failUnlessEqual(s.parent, None) | |
89 | |
90 def testRunning(self): | |
91 s = service.Service() | |
92 self.assert_(not s.running) | |
93 s.startService() | |
94 self.assert_(s.running) | |
95 s.stopService() | |
96 self.assert_(not s.running) | |
97 | |
98 def testRunningChildren1(self): | |
99 s = service.Service() | |
100 p = service.MultiService() | |
101 s.setServiceParent(p) | |
102 self.assert_(not s.running) | |
103 self.assert_(not p.running) | |
104 p.startService() | |
105 self.assert_(s.running) | |
106 self.assert_(p.running) | |
107 p.stopService() | |
108 self.assert_(not s.running) | |
109 self.assert_(not p.running) | |
110 | |
111 def testRunningChildren2(self): | |
112 s = service.Service() | |
113 def checkRunning(): | |
114 self.assert_(s.running) | |
115 t = service.Service() | |
116 t.stopService = checkRunning | |
117 t.startService = checkRunning | |
118 p = service.MultiService() | |
119 s.setServiceParent(p) | |
120 t.setServiceParent(p) | |
121 p.startService() | |
122 p.stopService() | |
123 | |
124 def testAddingIntoRunning(self): | |
125 p = service.MultiService() | |
126 p.startService() | |
127 s = service.Service() | |
128 self.assert_(not s.running) | |
129 s.setServiceParent(p) | |
130 self.assert_(s.running) | |
131 s.disownServiceParent() | |
132 self.assert_(not s.running) | |
133 | |
134 def testPrivileged(self): | |
135 s = service.Service() | |
136 def pss(): | |
137 s.privilegedStarted = 1 | |
138 s.privilegedStartService = pss | |
139 s1 = service.Service() | |
140 p = service.MultiService() | |
141 s.setServiceParent(p) | |
142 s1.setServiceParent(p) | |
143 p.privilegedStartService() | |
144 self.assert_(s.privilegedStarted) | |
145 | |
146 def testCopying(self): | |
147 s = service.Service() | |
148 s.startService() | |
149 s1 = copy.copy(s) | |
150 self.assert_(not s1.running) | |
151 self.assert_(s.running) | |
152 | |
153 | |
154 if hasattr(os, "getuid"): | |
155 curuid = os.getuid() | |
156 curgid = os.getgid() | |
157 else: | |
158 curuid = curgid = 0 | |
159 | |
160 | |
161 class TestProcess(unittest.TestCase): | |
162 | |
163 def testID(self): | |
164 p = service.Process(5, 6) | |
165 self.assertEqual(p.uid, 5) | |
166 self.assertEqual(p.gid, 6) | |
167 | |
168 def testDefaults(self): | |
169 p = service.Process(5) | |
170 self.assertEqual(p.uid, 5) | |
171 self.assertEqual(p.gid, None) | |
172 p = service.Process(gid=5) | |
173 self.assertEqual(p.uid, None) | |
174 self.assertEqual(p.gid, 5) | |
175 p = service.Process() | |
176 self.assertEqual(p.uid, None) | |
177 self.assertEqual(p.gid, None) | |
178 | |
179 def testProcessName(self): | |
180 p = service.Process() | |
181 self.assertEqual(p.processName, None) | |
182 p.processName = 'hello' | |
183 self.assertEqual(p.processName, 'hello') | |
184 | |
185 | |
186 class TestInterfaces(unittest.TestCase): | |
187 | |
188 def testService(self): | |
189 self.assert_(service.IService.providedBy(service.Service())) | |
190 | |
191 def testMultiService(self): | |
192 self.assert_(service.IService.providedBy(service.MultiService())) | |
193 self.assert_(service.IServiceCollection.providedBy(service.MultiService(
))) | |
194 | |
195 def testProcess(self): | |
196 self.assert_(service.IProcess.providedBy(service.Process())) | |
197 | |
198 | |
199 class TestApplication(unittest.TestCase): | |
200 | |
201 def testConstructor(self): | |
202 service.Application("hello") | |
203 service.Application("hello", 5) | |
204 service.Application("hello", 5, 6) | |
205 | |
206 def testProcessComponent(self): | |
207 a = service.Application("hello") | |
208 self.assertEqual(service.IProcess(a).uid, None) | |
209 self.assertEqual(service.IProcess(a).gid, None) | |
210 a = service.Application("hello", 5) | |
211 self.assertEqual(service.IProcess(a).uid, 5) | |
212 self.assertEqual(service.IProcess(a).gid, None) | |
213 a = service.Application("hello", 5, 6) | |
214 self.assertEqual(service.IProcess(a).uid, 5) | |
215 self.assertEqual(service.IProcess(a).gid, 6) | |
216 | |
217 def testServiceComponent(self): | |
218 a = service.Application("hello") | |
219 self.assert_(service.IService(a) is service.IServiceCollection(a)) | |
220 self.assertEqual(service.IService(a).name, "hello") | |
221 self.assertEqual(service.IService(a).parent, None) | |
222 | |
223 def testPersistableComponent(self): | |
224 a = service.Application("hello") | |
225 p = sob.IPersistable(a) | |
226 self.assertEqual(p.style, 'pickle') | |
227 self.assertEqual(p.name, 'hello') | |
228 self.assert_(p.original is a) | |
229 | |
230 class TestLoading(unittest.TestCase): | |
231 | |
232 def test_simpleStoreAndLoad(self): | |
233 a = service.Application("hello") | |
234 p = sob.IPersistable(a) | |
235 for style in 'xml source pickle'.split(): | |
236 if style == 'xml' and not gotMicrodom: | |
237 continue | |
238 p.setStyle(style) | |
239 p.save() | |
240 a1 = service.loadApplication("hello.ta"+style[0], style) | |
241 self.assertEqual(service.IService(a1).name, "hello") | |
242 open("hello.tac", 'w').writelines([ | |
243 "from twisted.application import service\n", | |
244 "application = service.Application('hello')\n", | |
245 ]) | |
246 a1 = service.loadApplication("hello.tac", 'python') | |
247 self.assertEqual(service.IService(a1).name, "hello") | |
248 | |
249 | |
250 | |
251 class TestAppSupport(unittest.TestCase): | |
252 | |
253 def testPassphrase(self): | |
254 self.assertEqual(app.getPassphrase(0), None) | |
255 | |
256 def testLoadApplication(self): | |
257 """ | |
258 Test loading an application file in different dump format. | |
259 """ | |
260 a = service.Application("hello") | |
261 baseconfig = {'file': None, 'xml': None, 'source': None, 'python':None} | |
262 for style in 'source xml pickle'.split(): | |
263 if style == 'xml' and not gotMicrodom: | |
264 continue | |
265 config = baseconfig.copy() | |
266 config[{'pickle': 'file'}.get(style, style)] = 'helloapplication' | |
267 sob.IPersistable(a).setStyle(style) | |
268 sob.IPersistable(a).save(filename='helloapplication') | |
269 a1 = app.getApplication(config, None) | |
270 self.assertEqual(service.IService(a1).name, "hello") | |
271 config = baseconfig.copy() | |
272 config['python'] = 'helloapplication' | |
273 open("helloapplication", 'w').writelines([ | |
274 "from twisted.application import service\n", | |
275 "application = service.Application('hello')\n", | |
276 ]) | |
277 a1 = app.getApplication(config, None) | |
278 self.assertEqual(service.IService(a1).name, "hello") | |
279 | |
280 testLoadApplication.suppress = [ | |
281 util.suppress(message="twisted.persisted.marmalade is deprecated", | |
282 category=DeprecationWarning)] | |
283 | |
284 def test_convertStyle(self): | |
285 appl = service.Application("lala") | |
286 for instyle in 'xml source pickle'.split(): | |
287 if instyle == 'xml' and not gotMicrodom: | |
288 continue | |
289 for outstyle in 'xml source pickle'.split(): | |
290 if outstyle == 'xml' and not gotMicrodom: | |
291 continue | |
292 sob.IPersistable(appl).setStyle(instyle) | |
293 sob.IPersistable(appl).save(filename="converttest") | |
294 app.convertStyle("converttest", instyle, None, | |
295 "converttest.out", outstyle, 0) | |
296 appl2 = service.loadApplication("converttest.out", outstyle) | |
297 self.assertEqual(service.IService(appl2).name, "lala") | |
298 | |
299 def test_getLogFile(self): | |
300 """ | |
301 Test L{app.getLogFile}, veryfying the LogFile instance it returns. | |
302 """ | |
303 os.mkdir("logfiledir") | |
304 l = app.getLogFile(os.path.join("logfiledir", "lala")) | |
305 self.assertEqual(l.path, | |
306 os.path.abspath(os.path.join("logfiledir", "lala"))) | |
307 self.assertEqual(l.name, "lala") | |
308 self.assertEqual(l.directory, os.path.abspath("logfiledir")) | |
309 | |
310 test_getLogFile.suppress = [ | |
311 util.suppress(message="app.getLogFile is deprecated. Use " | |
312 "twisted.python.logfile.LogFile.fromFullPath instead", | |
313 category=DeprecationWarning)] | |
314 | |
315 def test_startApplication(self): | |
316 appl = service.Application("lala") | |
317 app.startApplication(appl, 0) | |
318 self.assert_(service.IService(appl).running) | |
319 | |
320 | |
321 class Foo(basic.LineReceiver): | |
322 def connectionMade(self): | |
323 self.transport.write('lalala\r\n') | |
324 def lineReceived(self, line): | |
325 self.factory.line = line | |
326 self.transport.loseConnection() | |
327 def connectionLost(self, reason): | |
328 self.factory.d.callback(self.factory.line) | |
329 | |
330 | |
331 class DummyApp: | |
332 processName = None | |
333 def addService(self, service): | |
334 self.services[service.name] = service | |
335 def removeService(self, service): | |
336 del self.services[service.name] | |
337 | |
338 | |
339 class TimerTarget: | |
340 def __init__(self): | |
341 self.l = [] | |
342 def append(self, what): | |
343 self.l.append(what) | |
344 | |
345 class TestEcho(wire.Echo): | |
346 def connectionLost(self, reason): | |
347 self.d.callback(True) | |
348 | |
349 class TestInternet2(unittest.TestCase): | |
350 | |
351 def testTCP(self): | |
352 s = service.MultiService() | |
353 s.startService() | |
354 factory = protocol.ServerFactory() | |
355 factory.protocol = TestEcho | |
356 TestEcho.d = defer.Deferred() | |
357 t = internet.TCPServer(0, factory) | |
358 t.setServiceParent(s) | |
359 num = t._port.getHost().port | |
360 factory = protocol.ClientFactory() | |
361 factory.d = defer.Deferred() | |
362 factory.protocol = Foo | |
363 factory.line = None | |
364 internet.TCPClient('127.0.0.1', num, factory).setServiceParent(s) | |
365 factory.d.addCallback(self.assertEqual, 'lalala') | |
366 factory.d.addCallback(lambda x : s.stopService()) | |
367 factory.d.addCallback(lambda x : TestEcho.d) | |
368 return factory.d | |
369 | |
370 | |
371 def test_UDP(self): | |
372 """ | |
373 Test L{internet.UDPServer} with a random port: starting the service | |
374 should give it valid port, and stopService should free it so that we | |
375 can start a server on the same port again. | |
376 """ | |
377 if not interfaces.IReactorUDP(reactor, None): | |
378 raise unittest.SkipTest("This reactor does not support UDP sockets") | |
379 p = protocol.DatagramProtocol() | |
380 t = internet.UDPServer(0, p) | |
381 t.startService() | |
382 num = t._port.getHost().port | |
383 self.assertNotEquals(num, 0) | |
384 def onStop(ignored): | |
385 t = internet.UDPServer(num, p) | |
386 t.startService() | |
387 return t.stopService() | |
388 return defer.maybeDeferred(t.stopService).addCallback(onStop) | |
389 | |
390 | |
391 def testPrivileged(self): | |
392 factory = protocol.ServerFactory() | |
393 factory.protocol = TestEcho | |
394 TestEcho.d = defer.Deferred() | |
395 t = internet.TCPServer(0, factory) | |
396 t.privileged = 1 | |
397 t.privilegedStartService() | |
398 num = t._port.getHost().port | |
399 factory = protocol.ClientFactory() | |
400 factory.d = defer.Deferred() | |
401 factory.protocol = Foo | |
402 factory.line = None | |
403 c = internet.TCPClient('127.0.0.1', num, factory) | |
404 c.startService() | |
405 factory.d.addCallback(self.assertEqual, 'lalala') | |
406 factory.d.addCallback(lambda x : c.stopService()) | |
407 factory.d.addCallback(lambda x : t.stopService()) | |
408 factory.d.addCallback(lambda x : TestEcho.d) | |
409 return factory.d | |
410 | |
411 def testConnectionGettingRefused(self): | |
412 factory = protocol.ServerFactory() | |
413 factory.protocol = wire.Echo | |
414 t = internet.TCPServer(0, factory) | |
415 t.startService() | |
416 num = t._port.getHost().port | |
417 t.stopService() | |
418 d = defer.Deferred() | |
419 factory = protocol.ClientFactory() | |
420 factory.clientConnectionFailed = lambda *args: d.callback(None) | |
421 c = internet.TCPClient('127.0.0.1', num, factory) | |
422 c.startService() | |
423 return d | |
424 | |
425 def testUNIX(self): | |
426 # FIXME: This test is far too dense. It needs comments. | |
427 # -- spiv, 2004-11-07 | |
428 if not interfaces.IReactorUNIX(reactor, None): | |
429 raise unittest.SkipTest, "This reactor does not support UNIX domain
sockets" | |
430 s = service.MultiService() | |
431 s.startService() | |
432 factory = protocol.ServerFactory() | |
433 factory.protocol = TestEcho | |
434 TestEcho.d = defer.Deferred() | |
435 t = internet.UNIXServer('echo.skt', factory) | |
436 t.setServiceParent(s) | |
437 factory = protocol.ClientFactory() | |
438 factory.protocol = Foo | |
439 factory.d = defer.Deferred() | |
440 factory.line = None | |
441 internet.UNIXClient('echo.skt', factory).setServiceParent(s) | |
442 factory.d.addCallback(self.assertEqual, 'lalala') | |
443 factory.d.addCallback(lambda x : s.stopService()) | |
444 factory.d.addCallback(lambda x : TestEcho.d) | |
445 factory.d.addCallback(self._cbTestUnix, factory, s) | |
446 return factory.d | |
447 | |
448 def _cbTestUnix(self, ignored, factory, s): | |
449 TestEcho.d = defer.Deferred() | |
450 factory.line = None | |
451 factory.d = defer.Deferred() | |
452 s.startService() | |
453 factory.d.addCallback(self.assertEqual, 'lalala') | |
454 factory.d.addCallback(lambda x : s.stopService()) | |
455 factory.d.addCallback(lambda x : TestEcho.d) | |
456 return factory.d | |
457 | |
458 def testVolatile(self): | |
459 if not interfaces.IReactorUNIX(reactor, None): | |
460 raise unittest.SkipTest, "This reactor does not support UNIX domain
sockets" | |
461 factory = protocol.ServerFactory() | |
462 factory.protocol = wire.Echo | |
463 t = internet.UNIXServer('echo.skt', factory) | |
464 t.startService() | |
465 self.failIfIdentical(t._port, None) | |
466 t1 = copy.copy(t) | |
467 self.assertIdentical(t1._port, None) | |
468 t.stopService() | |
469 self.assertIdentical(t._port, None) | |
470 self.failIf(t.running) | |
471 | |
472 factory = protocol.ClientFactory() | |
473 factory.protocol = wire.Echo | |
474 t = internet.UNIXClient('echo.skt', factory) | |
475 t.startService() | |
476 self.failIfIdentical(t._connection, None) | |
477 t1 = copy.copy(t) | |
478 self.assertIdentical(t1._connection, None) | |
479 t.stopService() | |
480 self.assertIdentical(t._connection, None) | |
481 self.failIf(t.running) | |
482 | |
483 def testStoppingServer(self): | |
484 if not interfaces.IReactorUNIX(reactor, None): | |
485 raise unittest.SkipTest, "This reactor does not support UNIX domain
sockets" | |
486 factory = protocol.ServerFactory() | |
487 factory.protocol = wire.Echo | |
488 t = internet.UNIXServer('echo.skt', factory) | |
489 t.startService() | |
490 t.stopService() | |
491 self.failIf(t.running) | |
492 factory = protocol.ClientFactory() | |
493 d = defer.Deferred() | |
494 factory.clientConnectionFailed = lambda *args: d.callback(None) | |
495 reactor.connectUNIX('echo.skt', factory) | |
496 return d | |
497 | |
498 def testPickledTimer(self): | |
499 target = TimerTarget() | |
500 t0 = internet.TimerService(1, target.append, "hello") | |
501 t0.startService() | |
502 s = pickle.dumps(t0) | |
503 t0.stopService() | |
504 | |
505 t = pickle.loads(s) | |
506 self.failIf(t.running) | |
507 | |
508 def testBrokenTimer(self): | |
509 d = defer.Deferred() | |
510 t = internet.TimerService(1, lambda: 1 / 0) | |
511 oldFailed = t._failed | |
512 def _failed(why): | |
513 oldFailed(why) | |
514 d.callback(None) | |
515 t._failed = _failed | |
516 t.startService() | |
517 d.addCallback(lambda x : t.stopService) | |
518 d.addCallback(lambda x : self.assertEqual( | |
519 [ZeroDivisionError], | |
520 [o.value.__class__ for o in self.flushLoggedErrors(ZeroDivisionError
)])) | |
521 return d | |
522 | |
523 def testEverythingThere(self): | |
524 trans = 'TCP UNIX SSL UDP UNIXDatagram Multicast'.split() | |
525 for tran in trans[:]: | |
526 if not getattr(interfaces, "IReactor"+tran)(reactor, None): | |
527 trans.remove(tran) | |
528 if interfaces.IReactorArbitrary(reactor, None) is not None: | |
529 trans.insert(0, "Generic") | |
530 for tran in trans: | |
531 for side in 'Server Client'.split(): | |
532 if tran == "Multicast" and side == "Client": | |
533 continue | |
534 self.assert_(hasattr(internet, tran+side)) | |
535 method = getattr(internet, tran+side).method | |
536 prefix = {'Server': 'listen', 'Client': 'connect'}[side] | |
537 self.assert_(hasattr(reactor, prefix+method) or | |
538 (prefix == "connect" and method == "UDP")) | |
539 o = getattr(internet, tran+side)() | |
540 self.assertEqual(service.IService(o), o) | |
541 | |
542 | |
543 def test_reactorParametrizationInServer(self): | |
544 """ | |
545 L{internet._AbstractServer} supports a C{reactor} keyword argument | |
546 that can be used to parametrize the reactor used to listen for | |
547 connections. | |
548 """ | |
549 listen = [] | |
550 class FakeReactor(object): | |
551 def listenTCP(self, portNumber, factory): | |
552 listen.append((portNumber, factory)) | |
553 reactor = FakeReactor() | |
554 | |
555 factory = object() | |
556 t = internet.TCPServer(1234, factory, reactor=reactor) | |
557 t.startService() | |
558 self.assertEquals(listen, [(1234, factory)]) | |
559 | |
560 | |
561 def test_reactorParametrizationInClient(self): | |
562 """ | |
563 L{internet._AbstractClient} supports a C{reactor} keyword arguments | |
564 that can be used to parametrize the reactor used to create new client | |
565 connections. | |
566 """ | |
567 connect = [] | |
568 class FakeReactor(object): | |
569 def connectTCP(self, ip, portNumber, factory): | |
570 connect.append((ip, portNumber, factory)) | |
571 reactor = FakeReactor() | |
572 | |
573 factory = object() | |
574 t = internet.TCPClient('127.0.0.1', 1234, factory, reactor=reactor) | |
575 t.startService() | |
576 self.assertEquals(connect, [('127.0.0.1', 1234, factory)]) | |
577 | |
578 | |
579 def test_reactorParametrizationInServerMultipleStart(self): | |
580 """ | |
581 Like L{test_reactorParametrizationInServer}, but stop and restart the | |
582 service and check that the given reactor is still used. | |
583 """ | |
584 listen = [] | |
585 class FakeReactor(object): | |
586 def listenTCP(self, portNumber, factory): | |
587 listen.append((portNumber, factory)) | |
588 reactor = FakeReactor() | |
589 | |
590 factory = object() | |
591 t = internet.TCPServer(1234, factory, reactor=reactor) | |
592 t.startService() | |
593 self.assertEquals(listen, [(1234, factory)]) | |
594 t.stopService() | |
595 t.startService() | |
596 self.assertEquals(listen, [(1234, factory), (1234, factory)]) | |
597 | |
598 | |
599 def test_reactorParametrizationInClientMultipleStart(self): | |
600 """ | |
601 Like L{test_reactorParametrizationInClient}, but stop and restart the | |
602 service and check that the given reactor is still used. | |
603 """ | |
604 connect = [] | |
605 class FakeReactor(object): | |
606 def connectTCP(self, ip, portNumber, factory): | |
607 connect.append((ip, portNumber, factory)) | |
608 reactor = FakeReactor() | |
609 | |
610 factory = object() | |
611 t = internet.TCPClient('127.0.0.1', 1234, factory, reactor=reactor) | |
612 t.startService() | |
613 self.assertEquals(connect, [('127.0.0.1', 1234, factory)]) | |
614 t.stopService() | |
615 t.startService() | |
616 self.assertEquals(connect, [('127.0.0.1', 1234, factory), | |
617 ('127.0.0.1', 1234, factory)]) | |
618 | |
619 | |
620 | |
621 class TestTimerBasic(unittest.TestCase): | |
622 | |
623 def testTimerRuns(self): | |
624 d = defer.Deferred() | |
625 self.t = internet.TimerService(1, d.callback, 'hello') | |
626 self.t.startService() | |
627 d.addCallback(self.assertEqual, 'hello') | |
628 d.addCallback(lambda x : self.t.stopService()) | |
629 d.addCallback(lambda x : self.failIf(self.t.running)) | |
630 return d | |
631 | |
632 def tearDown(self): | |
633 return self.t.stopService() | |
634 | |
635 def testTimerRestart(self): | |
636 # restart the same TimerService | |
637 d1 = defer.Deferred() | |
638 d2 = defer.Deferred() | |
639 work = [(d2, "bar"), (d1, "foo")] | |
640 def trigger(): | |
641 d, arg = work.pop() | |
642 d.callback(arg) | |
643 self.t = internet.TimerService(1, trigger) | |
644 self.t.startService() | |
645 def onFirstResult(result): | |
646 self.assertEqual(result, 'foo') | |
647 return self.t.stopService() | |
648 def onFirstStop(ignored): | |
649 self.failIf(self.t.running) | |
650 self.t.startService() | |
651 return d2 | |
652 def onSecondResult(result): | |
653 self.assertEqual(result, 'bar') | |
654 self.t.stopService() | |
655 d1.addCallback(onFirstResult) | |
656 d1.addCallback(onFirstStop) | |
657 d1.addCallback(onSecondResult) | |
658 return d1 | |
659 | |
660 def testTimerLoops(self): | |
661 l = [] | |
662 def trigger(data, number, d): | |
663 l.append(data) | |
664 if len(l) == number: | |
665 d.callback(l) | |
666 d = defer.Deferred() | |
667 self.t = internet.TimerService(0.01, trigger, "hello", 10, d) | |
668 self.t.startService() | |
669 d.addCallback(self.assertEqual, ['hello'] * 10) | |
670 d.addCallback(lambda x : self.t.stopService()) | |
671 return d | |
672 | |
673 | |
674 class FakeReactor(reactors.Reactor): | |
675 """ | |
676 A fake reactor with a hooked install method. | |
677 """ | |
678 | |
679 def __init__(self, install, *args, **kwargs): | |
680 """ | |
681 @param install: any callable that will be used as install method. | |
682 @type install: C{callable} | |
683 """ | |
684 reactors.Reactor.__init__(self, *args, **kwargs) | |
685 self.install = install | |
686 | |
687 | |
688 | |
689 class PluggableReactorTestCase(unittest.TestCase): | |
690 """ | |
691 Tests for the reactor discovery/inspection APIs. | |
692 """ | |
693 | |
694 def setUp(self): | |
695 """ | |
696 Override the L{reactors.getPlugins} function, normally bound to | |
697 L{twisted.plugin.getPlugins}, in order to control which | |
698 L{IReactorInstaller} plugins are seen as available. | |
699 | |
700 C{self.pluginResults} can be customized and will be used as the | |
701 result of calls to C{reactors.getPlugins}. | |
702 """ | |
703 self.pluginCalls = [] | |
704 self.pluginResults = [] | |
705 self.originalFunction = reactors.getPlugins | |
706 reactors.getPlugins = self._getPlugins | |
707 | |
708 | |
709 def tearDown(self): | |
710 """ | |
711 Restore the original L{reactors.getPlugins}. | |
712 """ | |
713 reactors.getPlugins = self.originalFunction | |
714 | |
715 | |
716 def _getPlugins(self, interface, package=None): | |
717 """ | |
718 Stand-in for the real getPlugins method which records its arguments | |
719 and returns a fixed result. | |
720 """ | |
721 self.pluginCalls.append((interface, package)) | |
722 return list(self.pluginResults) | |
723 | |
724 | |
725 def test_getPluginReactorTypes(self): | |
726 """ | |
727 Test that reactor plugins are returned from L{getReactorTypes} | |
728 """ | |
729 name = 'fakereactortest' | |
730 package = __name__ + '.fakereactor' | |
731 description = 'description' | |
732 self.pluginResults = [reactors.Reactor(name, package, description)] | |
733 reactorTypes = reactors.getReactorTypes() | |
734 | |
735 self.assertEqual( | |
736 self.pluginCalls, | |
737 [(reactors.IReactorInstaller, None)]) | |
738 | |
739 for r in reactorTypes: | |
740 if r.shortName == name: | |
741 self.assertEqual(r.description, description) | |
742 break | |
743 else: | |
744 self.fail("Reactor plugin not present in getReactorTypes() result") | |
745 | |
746 | |
747 def test_reactorInstallation(self): | |
748 """ | |
749 Test that L{reactors.Reactor.install} loads the correct module and | |
750 calls its install attribute. | |
751 """ | |
752 installed = [] | |
753 def install(): | |
754 installed.append(True) | |
755 installer = FakeReactor(install, | |
756 'fakereactortest', __name__, 'described') | |
757 installer.install() | |
758 self.assertEqual(installed, [True]) | |
759 | |
760 | |
761 def test_installReactor(self): | |
762 """ | |
763 Test that the L{reactors.installReactor} function correctly installs | |
764 the specified reactor. | |
765 """ | |
766 installed = [] | |
767 def install(): | |
768 installed.append(True) | |
769 name = 'fakereactortest' | |
770 package = __name__ | |
771 description = 'description' | |
772 self.pluginResults = [FakeReactor(install, name, package, description)] | |
773 reactors.installReactor(name) | |
774 self.assertEqual(installed, [True]) | |
775 | |
776 | |
777 def test_installNonExistentReactor(self): | |
778 """ | |
779 Test that L{reactors.installReactor} raises L{reactors.NoSuchReactor} | |
780 when asked to install a reactor which it cannot find. | |
781 """ | |
782 self.pluginResults = [] | |
783 self.assertRaises( | |
784 reactors.NoSuchReactor, | |
785 reactors.installReactor, 'somereactor') | |
786 | |
787 | |
788 def test_installNotAvailableReactor(self): | |
789 """ | |
790 Test that L{reactors.installReactor} raises an exception when asked to | |
791 install a reactor which doesn't work in this environment. | |
792 """ | |
793 def install(): | |
794 raise ImportError("Missing foo bar") | |
795 name = 'fakereactortest' | |
796 package = __name__ | |
797 description = 'description' | |
798 self.pluginResults = [FakeReactor(install, name, package, description)] | |
799 self.assertRaises(ImportError, reactors.installReactor, name) | |
800 | |
801 | |
802 def test_reactorSelectionMixin(self): | |
803 """ | |
804 Test that the reactor selected is installed as soon as possible, ie | |
805 when the option is parsed. | |
806 """ | |
807 executed = [] | |
808 INSTALL_EVENT = 'reactor installed' | |
809 SUBCOMMAND_EVENT = 'subcommands loaded' | |
810 | |
811 class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin): | |
812 def subCommands(self): | |
813 executed.append(SUBCOMMAND_EVENT) | |
814 return [('subcommand', None, lambda: self, 'test subcommand')] | |
815 subCommands = property(subCommands) | |
816 | |
817 def install(): | |
818 executed.append(INSTALL_EVENT) | |
819 self.pluginResults = [ | |
820 FakeReactor(install, 'fakereactortest', __name__, 'described') | |
821 ] | |
822 | |
823 options = ReactorSelectionOptions() | |
824 options.parseOptions(['--reactor', 'fakereactortest', 'subcommand']) | |
825 self.assertEqual(executed[0], INSTALL_EVENT) | |
826 self.assertEqual(executed.count(INSTALL_EVENT), 1) | |
827 | |
828 | |
829 def test_reactorSelectionMixinNonExistent(self): | |
830 """ | |
831 Test that the usage mixin exits when trying to use a non existent | |
832 reactor (the name not matching to any reactor), giving an error | |
833 message. | |
834 """ | |
835 class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin): | |
836 pass | |
837 self.pluginResults = [] | |
838 | |
839 options = ReactorSelectionOptions() | |
840 options.messageOutput = StringIO() | |
841 e = self.assertRaises(usage.UsageError, options.parseOptions, | |
842 ['--reactor', 'fakereactortest', 'subcommand']) | |
843 self.assertIn("fakereactortest", e.args[0]) | |
844 self.assertIn("help-reactors", e.args[0]) | |
845 | |
846 | |
847 def test_reactorSelectionMixinNotAvailable(self): | |
848 """ | |
849 Test that the usage mixin exits when trying to use a reactor not | |
850 available (the reactor raises an error at installation), giving an | |
851 error message. | |
852 """ | |
853 class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin): | |
854 pass | |
855 message = "Missing foo bar" | |
856 def install(): | |
857 raise ImportError(message) | |
858 | |
859 name = 'fakereactortest' | |
860 package = __name__ | |
861 description = 'description' | |
862 self.pluginResults = [FakeReactor(install, name, package, description)] | |
863 | |
864 options = ReactorSelectionOptions() | |
865 options.messageOutput = StringIO() | |
866 e = self.assertRaises(usage.UsageError, options.parseOptions, | |
867 ['--reactor', 'fakereactortest', 'subcommand']) | |
868 self.assertIn(message, e.args[0]) | |
869 self.assertIn("help-reactors", e.args[0]) | |
870 | |
871 | |
872 def test_qtStub(self): | |
873 """ | |
874 Test that installing qtreactor when it's absent fails properly. | |
875 """ | |
876 scriptPath = sibpath(__file__, "app_qtstub.py") | |
877 def _checkOutput((output, err, code)): | |
878 self.failIf(output, output) | |
879 result = getProcessOutputAndValue( | |
880 sys.executable, | |
881 args=(sys.executable, scriptPath), | |
882 env=None) | |
883 result.addCallback(_checkOutput) | |
884 return result | |
885 | |
886 | |
887 | |
888 class ReportProfileTestCase(unittest.TestCase): | |
889 """ | |
890 Tests for L{app.reportProfile}. | |
891 """ | |
892 | |
893 def test_deprecation(self): | |
894 """ | |
895 Check that L{app.reportProfile} prints a warning and does nothing else. | |
896 """ | |
897 self.assertWarns(DeprecationWarning, | |
898 "reportProfile is deprecated and a no-op since Twisted 8.0.", | |
899 app.__file__, app.reportProfile, None, None) | |
OLD | NEW |