OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.test.test_application,twisted.test.test_cooperator
-*- | |
2 | |
3 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
4 # See LICENSE for details. | |
5 | |
6 """ | |
7 Reactor-based Services | |
8 | |
9 Here are services to run clients, servers and periodic services using | |
10 the reactor. | |
11 | |
12 This module (dynamically) defines various Service subclasses that let | |
13 you represent clients and servers in a Service hierarchy. | |
14 | |
15 They are as follows:: | |
16 | |
17 TCPServer, TCPClient, | |
18 UNIXServer, UNIXClient, | |
19 SSLServer, SSLClient, | |
20 UDPServer, UDPClient, | |
21 UNIXDatagramServer, UNIXDatagramClient, | |
22 MulticastServer | |
23 | |
24 These classes take arbitrary arguments in their constructors and pass | |
25 them straight on to their respective reactor.listenXXX or | |
26 reactor.connectXXX calls. | |
27 | |
28 For example, the following service starts a web server on port 8080: | |
29 C{TCPServer(8080, server.Site(r))}. See the documentation for the | |
30 reactor.listen/connect* methods for more information. | |
31 | |
32 Maintainer: U{Moshe Zadka<mailto:moshez@twistedmatrix.com>} | |
33 """ | |
34 | |
35 from twisted.python import log | |
36 from twisted.application import service | |
37 from twisted.internet import task | |
38 | |
39 | |
40 class _VolatileDataService(service.Service): | |
41 | |
42 volatile = [] | |
43 | |
44 def __getstate__(self): | |
45 d = service.Service.__getstate__(self) | |
46 for attr in self.volatile: | |
47 if attr in d: | |
48 del d[attr] | |
49 return d | |
50 | |
51 | |
52 | |
53 class _AbstractServer(_VolatileDataService): | |
54 """ | |
55 @cvar volatile: list of attribute to remove from pickling. | |
56 @type volatile: C{list} | |
57 | |
58 @ivar method: the type of method to call on the reactor, one of B{TCP}, | |
59 B{UDP}, B{SSL} or B{UNIX}. | |
60 @type method: C{str} | |
61 | |
62 @ivar reactor: the current running reactor. | |
63 @type reactor: a provider of C{IReactorTCP}, C{IReactorUDP}, | |
64 C{IReactorSSL} or C{IReactorUnix}. | |
65 | |
66 @ivar _port: instance of port set when the service is started. | |
67 @type _port: a provider of C{IListeningPort}. | |
68 """ | |
69 | |
70 volatile = ['_port'] | |
71 method = None | |
72 reactor = None | |
73 | |
74 _port = None | |
75 | |
76 def __init__(self, *args, **kwargs): | |
77 self.args = args | |
78 if 'reactor' in kwargs: | |
79 self.reactor = kwargs.pop("reactor") | |
80 self.kwargs = kwargs | |
81 | |
82 | |
83 def privilegedStartService(self): | |
84 service.Service.privilegedStartService(self) | |
85 self._port = self._getPort() | |
86 | |
87 | |
88 def startService(self): | |
89 service.Service.startService(self) | |
90 if self._port is None: | |
91 self._port = self._getPort() | |
92 | |
93 | |
94 def stopService(self): | |
95 service.Service.stopService(self) | |
96 # TODO: if startup failed, should shutdown skip stopListening? | |
97 # _port won't exist | |
98 if self._port is not None: | |
99 d = self._port.stopListening() | |
100 del self._port | |
101 return d | |
102 | |
103 | |
104 def _getPort(self): | |
105 """ | |
106 Wrapper around the appropriate listen method of the reactor. | |
107 | |
108 @return: the port object returned by the listen method. | |
109 @rtype: an object providing L{IListeningPort}. | |
110 """ | |
111 if self.reactor is None: | |
112 from twisted.internet import reactor | |
113 else: | |
114 reactor = self.reactor | |
115 return getattr(reactor, 'listen%s' % (self.method,))( | |
116 *self.args, **self.kwargs) | |
117 | |
118 | |
119 | |
120 class _AbstractClient(_VolatileDataService): | |
121 """ | |
122 @cvar volatile: list of attribute to remove from pickling. | |
123 @type volatile: C{list} | |
124 | |
125 @ivar method: the type of method to call on the reactor, one of B{TCP}, | |
126 B{UDP}, B{SSL} or B{UNIX}. | |
127 @type method: C{str} | |
128 | |
129 @ivar reactor: the current running reactor. | |
130 @type reactor: a provider of C{IReactorTCP}, C{IReactorUDP}, | |
131 C{IReactorSSL} or C{IReactorUnix}. | |
132 | |
133 @ivar _connection: instance of connection set when the service is started. | |
134 @type _connection: a provider of C{IConnector}. | |
135 """ | |
136 volatile = ['_connection'] | |
137 method = None | |
138 reactor = None | |
139 | |
140 _connection = None | |
141 | |
142 def __init__(self, *args, **kwargs): | |
143 self.args = args | |
144 if 'reactor' in kwargs: | |
145 self.reactor = kwargs.pop("reactor") | |
146 self.kwargs = kwargs | |
147 | |
148 | |
149 def startService(self): | |
150 service.Service.startService(self) | |
151 self._connection = self._getConnection() | |
152 | |
153 | |
154 def stopService(self): | |
155 service.Service.stopService(self) | |
156 if self._connection is not None: | |
157 self._connection.disconnect() | |
158 del self._connection | |
159 | |
160 | |
161 def _getConnection(self): | |
162 """ | |
163 Wrapper around the appropriate connect method of the reactor. | |
164 | |
165 @return: the port object returned by the connect method. | |
166 @rtype: an object providing L{IConnector}. | |
167 """ | |
168 if self.reactor is None: | |
169 from twisted.internet import reactor | |
170 else: | |
171 reactor = self.reactor | |
172 return getattr(reactor, 'connect%s' % (self.method,))( | |
173 *self.args, **self.kwargs) | |
174 | |
175 | |
176 | |
177 _doc={ | |
178 'Client': | |
179 """Connect to %(tran)s | |
180 | |
181 Call reactor.connect%(method)s when the service starts, with the | |
182 arguments given to the constructor. | |
183 """, | |
184 'Server': | |
185 """Serve %(tran)s clients | |
186 | |
187 Call reactor.listen%(method)s when the service starts, with the | |
188 arguments given to the constructor. When the service stops, | |
189 stop listening. See twisted.internet.interfaces for documentation | |
190 on arguments to the reactor method. | |
191 """, | |
192 } | |
193 | |
194 import new | |
195 for tran in 'Generic TCP UNIX SSL UDP UNIXDatagram Multicast'.split(): | |
196 for side in 'Server Client'.split(): | |
197 if tran == "Multicast" and side == "Client": | |
198 continue | |
199 base = globals()['_Abstract'+side] | |
200 method = {'Generic': 'With'}.get(tran, tran) | |
201 doc = _doc[side]%vars() | |
202 klass = new.classobj(tran+side, (base,), | |
203 {'method': method, '__doc__': doc}) | |
204 globals()[tran+side] = klass | |
205 | |
206 | |
207 class TimerService(_VolatileDataService): | |
208 | |
209 """Service to periodically call a function | |
210 | |
211 Every C{step} seconds call the given function with the given arguments. | |
212 The service starts the calls when it starts, and cancels them | |
213 when it stops. | |
214 """ | |
215 | |
216 volatile = ['_loop'] | |
217 | |
218 def __init__(self, step, callable, *args, **kwargs): | |
219 self.step = step | |
220 self.call = (callable, args, kwargs) | |
221 | |
222 def startService(self): | |
223 service.Service.startService(self) | |
224 callable, args, kwargs = self.call | |
225 # we have to make a new LoopingCall each time we're started, because | |
226 # an active LoopingCall remains active when serialized. If | |
227 # LoopingCall were a _VolatileDataService, we wouldn't need to do | |
228 # this. | |
229 self._loop = task.LoopingCall(callable, *args, **kwargs) | |
230 self._loop.start(self.step, now=True).addErrback(self._failed) | |
231 | |
232 def _failed(self, why): | |
233 # make a note that the LoopingCall is no longer looping, so we don't | |
234 # try to shut it down a second time in stopService. I think this | |
235 # should be in LoopingCall. -warner | |
236 self._loop.running = False | |
237 log.err(why) | |
238 | |
239 def stopService(self): | |
240 if self._loop.running: | |
241 self._loop.stop() | |
242 return service.Service.stopService(self) | |
243 | |
244 | |
245 | |
246 class CooperatorService(service.Service): | |
247 """ | |
248 Simple L{service.IService} which starts and stops a L{twisted.internet.task.
Cooperator}. | |
249 """ | |
250 def __init__(self): | |
251 self.coop = task.Cooperator(started=False) | |
252 | |
253 | |
254 def coiterate(self, iterator): | |
255 return self.coop.coiterate(iterator) | |
256 | |
257 | |
258 def startService(self): | |
259 self.coop.start() | |
260 | |
261 | |
262 def stopService(self): | |
263 self.coop.stop() | |
264 | |
265 | |
266 | |
267 __all__ = (['TimerService', 'CooperatorService'] + | |
268 [tran+side | |
269 for tran in 'Generic TCP UNIX SSL UDP UNIXDatagram Multicast'.split() | |
270 for side in 'Server Client'.split()]) | |
OLD | NEW |