OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.test.test_internet -*- | |
2 # | |
3 # Copyright (c) 2001-2008 Twisted Matrix Laboratories. | |
4 # See LICENSE for details. | |
5 | |
6 | |
7 """ | |
8 Posix reactor base class | |
9 | |
10 Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>} | |
11 """ | |
12 | |
13 import warnings | |
14 import socket | |
15 import errno | |
16 import os | |
17 | |
18 from zope.interface import implements, classImplements | |
19 | |
20 from twisted.internet.interfaces import IReactorUNIX, IReactorUNIXDatagram | |
21 from twisted.internet.interfaces import IReactorTCP, IReactorUDP, IReactorSSL, I
ReactorArbitrary | |
22 from twisted.internet.interfaces import IReactorProcess, IReactorMulticast | |
23 from twisted.internet.interfaces import IHalfCloseableDescriptor | |
24 from twisted.internet import error | |
25 from twisted.internet import tcp, udp | |
26 | |
27 from twisted.python import log, failure, util | |
28 from twisted.persisted import styles | |
29 from twisted.python.runtime import platformType, platform | |
30 | |
31 from twisted.internet.base import ReactorBase, _SignalReactorMixin | |
32 | |
33 try: | |
34 from twisted.internet import ssl | |
35 sslEnabled = True | |
36 except ImportError: | |
37 sslEnabled = False | |
38 | |
39 try: | |
40 from twisted.internet import unix | |
41 unixEnabled = True | |
42 except ImportError: | |
43 unixEnabled = False | |
44 | |
45 processEnabled = False | |
46 if platformType == 'posix': | |
47 from twisted.internet import fdesc | |
48 import process | |
49 processEnabled = True | |
50 | |
51 if platform.isWindows(): | |
52 try: | |
53 import win32process | |
54 processEnabled = True | |
55 except ImportError: | |
56 win32process = None | |
57 | |
58 | |
59 class _Win32Waker(log.Logger, styles.Ephemeral): | |
60 """I am a workaround for the lack of pipes on win32. | |
61 | |
62 I am a pair of connected sockets which can wake up the main loop | |
63 from another thread. | |
64 """ | |
65 disconnected = 0 | |
66 | |
67 def __init__(self, reactor): | |
68 """Initialize. | |
69 """ | |
70 self.reactor = reactor | |
71 # Following select_trigger (from asyncore)'s example; | |
72 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
73 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
74 client.setsockopt(socket.IPPROTO_TCP, 1, 1) | |
75 server.bind(('127.0.0.1', 0)) | |
76 server.listen(1) | |
77 client.connect(server.getsockname()) | |
78 reader, clientaddr = server.accept() | |
79 client.setblocking(0) | |
80 reader.setblocking(0) | |
81 self.r = reader | |
82 self.w = client | |
83 self.fileno = self.r.fileno | |
84 | |
85 def wakeUp(self): | |
86 """Send a byte to my connection. | |
87 """ | |
88 try: | |
89 util.untilConcludes(self.w.send, 'x') | |
90 except socket.error, (err, msg): | |
91 if err != errno.WSAEWOULDBLOCK: | |
92 raise | |
93 | |
94 def doRead(self): | |
95 """Read some data from my connection. | |
96 """ | |
97 try: | |
98 self.r.recv(8192) | |
99 except socket.error: | |
100 pass | |
101 | |
102 def connectionLost(self, reason): | |
103 self.r.close() | |
104 self.w.close() | |
105 self.reactor.waker = None | |
106 | |
107 class _UnixWaker(log.Logger, styles.Ephemeral): | |
108 """This class provides a simple interface to wake up the event loop. | |
109 | |
110 This is used by threads or signals to wake up the event loop. | |
111 """ | |
112 disconnected = 0 | |
113 | |
114 i = None | |
115 o = None | |
116 | |
117 def __init__(self, reactor): | |
118 """Initialize. | |
119 """ | |
120 self.reactor = reactor | |
121 self.i, self.o = os.pipe() | |
122 fdesc.setNonBlocking(self.i) | |
123 fdesc.setNonBlocking(self.o) | |
124 self.fileno = lambda: self.i | |
125 | |
126 def doRead(self): | |
127 """Read some bytes from the pipe. | |
128 """ | |
129 fdesc.readFromFD(self.fileno(), lambda data: None) | |
130 | |
131 def wakeUp(self): | |
132 """Write one byte to the pipe, and flush it. | |
133 """ | |
134 # We don't use fdesc.writeToFD since we need to distinguish | |
135 # between EINTR (try again) and EAGAIN (do nothing). | |
136 if self.o is not None: | |
137 try: | |
138 util.untilConcludes(os.write, self.o, 'x') | |
139 except OSError, e: | |
140 if e.errno != errno.EAGAIN: | |
141 raise | |
142 | |
143 def connectionLost(self, reason): | |
144 """Close both ends of my pipe. | |
145 """ | |
146 if not hasattr(self, "o"): | |
147 return | |
148 for fd in self.i, self.o: | |
149 try: | |
150 os.close(fd) | |
151 except IOError: | |
152 pass | |
153 del self.i, self.o | |
154 self.reactor.waker = None | |
155 | |
156 | |
157 if platformType == 'posix': | |
158 _Waker = _UnixWaker | |
159 elif platformType == 'win32': | |
160 _Waker = _Win32Waker | |
161 | |
162 | |
163 class PosixReactorBase(_SignalReactorMixin, ReactorBase): | |
164 """ | |
165 A basis for reactors that use file descriptors. | |
166 """ | |
167 implements(IReactorArbitrary, IReactorTCP, IReactorUDP, IReactorMulticast) | |
168 | |
169 def __init__(self): | |
170 ReactorBase.__init__(self) | |
171 if self.usingThreads or platformType == "posix": | |
172 self.installWaker() | |
173 | |
174 | |
175 def _disconnectSelectable(self, selectable, why, isRead, faildict={ | |
176 error.ConnectionDone: failure.Failure(error.ConnectionDone()), | |
177 error.ConnectionLost: failure.Failure(error.ConnectionLost()) | |
178 }): | |
179 """ | |
180 Utility function for disconnecting a selectable. | |
181 | |
182 Supports half-close notification, isRead should be boolean indicating | |
183 whether error resulted from doRead(). | |
184 """ | |
185 self.removeReader(selectable) | |
186 f = faildict.get(why.__class__) | |
187 if f: | |
188 if (isRead and why.__class__ == error.ConnectionDone | |
189 and IHalfCloseableDescriptor.providedBy(selectable)): | |
190 selectable.readConnectionLost(f) | |
191 else: | |
192 self.removeWriter(selectable) | |
193 selectable.connectionLost(f) | |
194 else: | |
195 self.removeWriter(selectable) | |
196 selectable.connectionLost(failure.Failure(why)) | |
197 | |
198 def installWaker(self): | |
199 """ | |
200 Install a `waker' to allow threads and signals to wake up the IO thread. | |
201 | |
202 We use the self-pipe trick (http://cr.yp.to/docs/selfpipe.html) to wake | |
203 the reactor. On Windows we use a pair of sockets. | |
204 """ | |
205 if not self.waker: | |
206 self.waker = _Waker(self) | |
207 self.addReader(self.waker) | |
208 | |
209 | |
210 # IReactorProcess | |
211 | |
212 def spawnProcess(self, processProtocol, executable, args=(), | |
213 env={}, path=None, | |
214 uid=None, gid=None, usePTY=0, childFDs=None): | |
215 args, env = self._checkProcessArgs(args, env) | |
216 if platformType == 'posix': | |
217 if usePTY: | |
218 if childFDs is not None: | |
219 raise ValueError("Using childFDs is not supported with usePT
Y=True.") | |
220 return process.PTYProcess(self, executable, args, env, path, | |
221 processProtocol, uid, gid, usePTY) | |
222 else: | |
223 return process.Process(self, executable, args, env, path, | |
224 processProtocol, uid, gid, childFDs) | |
225 elif platformType == "win32": | |
226 if uid is not None or gid is not None: | |
227 raise ValueError("The uid and gid parameters are not supported o
n Windows.") | |
228 if usePTY: | |
229 raise ValueError("The usePTY parameter is not supported on Windo
ws.") | |
230 if childFDs: | |
231 raise ValueError("Customizing childFDs is not supported on Windo
ws.") | |
232 | |
233 if win32process: | |
234 from twisted.internet._dumbwin32proc import Process | |
235 return Process(self, processProtocol, executable, args, env, pat
h) | |
236 else: | |
237 raise NotImplementedError, "spawnProcess not available since pyw
in32 is not installed." | |
238 else: | |
239 raise NotImplementedError, "spawnProcess only available on Windows o
r POSIX." | |
240 | |
241 # IReactorUDP | |
242 | |
243 def listenUDP(self, port, protocol, interface='', maxPacketSize=8192): | |
244 """Connects a given L{DatagramProtocol} to the given numeric UDP port. | |
245 | |
246 @returns: object conforming to L{IListeningPort}. | |
247 """ | |
248 p = udp.Port(port, protocol, interface, maxPacketSize, self) | |
249 p.startListening() | |
250 return p | |
251 | |
252 def connectUDP(self, remotehost, remoteport, protocol, localport=0, | |
253 interface='', maxPacketSize=8192): | |
254 """DEPRECATED. | |
255 | |
256 Connects a L{ConnectedDatagramProtocol} instance to a UDP port. | |
257 """ | |
258 warnings.warn("use listenUDP and then transport.connect().", Deprecation
Warning, stacklevel=2) | |
259 p = udp.ConnectedPort((remotehost, remoteport), localport, protocol, int
erface, maxPacketSize, self) | |
260 p.startListening() | |
261 return p | |
262 | |
263 | |
264 # IReactorMulticast | |
265 | |
266 def listenMulticast(self, port, protocol, interface='', maxPacketSize=8192,
listenMultiple=False): | |
267 """Connects a given DatagramProtocol to the given numeric UDP port. | |
268 | |
269 EXPERIMENTAL. | |
270 | |
271 @returns: object conforming to IListeningPort. | |
272 """ | |
273 p = udp.MulticastPort(port, protocol, interface, maxPacketSize, self, li
stenMultiple) | |
274 p.startListening() | |
275 return p | |
276 | |
277 | |
278 # IReactorUNIX | |
279 | |
280 def connectUNIX(self, address, factory, timeout=30, checkPID=0): | |
281 """@see: twisted.internet.interfaces.IReactorUNIX.connectUNIX | |
282 """ | |
283 assert unixEnabled, "UNIX support is not present" | |
284 c = unix.Connector(address, factory, timeout, self, checkPID) | |
285 c.connect() | |
286 return c | |
287 | |
288 def listenUNIX(self, address, factory, backlog=50, mode=0666, wantPID=0): | |
289 """@see: twisted.internet.interfaces.IReactorUNIX.listenUNIX | |
290 """ | |
291 assert unixEnabled, "UNIX support is not present" | |
292 p = unix.Port(address, factory, backlog, mode, self, wantPID) | |
293 p.startListening() | |
294 return p | |
295 | |
296 | |
297 # IReactorUNIXDatagram | |
298 | |
299 def listenUNIXDatagram(self, address, protocol, maxPacketSize=8192, mode=066
6): | |
300 """Connects a given L{DatagramProtocol} to the given path. | |
301 | |
302 EXPERIMENTAL. | |
303 | |
304 @returns: object conforming to L{IListeningPort}. | |
305 """ | |
306 assert unixEnabled, "UNIX support is not present" | |
307 p = unix.DatagramPort(address, protocol, maxPacketSize, mode, self) | |
308 p.startListening() | |
309 return p | |
310 | |
311 def connectUNIXDatagram(self, address, protocol, maxPacketSize=8192, mode=06
66, bindAddress=None): | |
312 """Connects a L{ConnectedDatagramProtocol} instance to a path. | |
313 | |
314 EXPERIMENTAL. | |
315 """ | |
316 assert unixEnabled, "UNIX support is not present" | |
317 p = unix.ConnectedDatagramPort(address, protocol, maxPacketSize, mode, b
indAddress, self) | |
318 p.startListening() | |
319 return p | |
320 | |
321 | |
322 # IReactorTCP | |
323 | |
324 def listenTCP(self, port, factory, backlog=50, interface=''): | |
325 """@see: twisted.internet.interfaces.IReactorTCP.listenTCP | |
326 """ | |
327 p = tcp.Port(port, factory, backlog, interface, self) | |
328 p.startListening() | |
329 return p | |
330 | |
331 def connectTCP(self, host, port, factory, timeout=30, bindAddress=None): | |
332 """@see: twisted.internet.interfaces.IReactorTCP.connectTCP | |
333 """ | |
334 c = tcp.Connector(host, port, factory, timeout, bindAddress, self) | |
335 c.connect() | |
336 return c | |
337 | |
338 # IReactorSSL (sometimes, not implemented) | |
339 | |
340 def connectSSL(self, host, port, factory, contextFactory, timeout=30, bindAd
dress=None): | |
341 """@see: twisted.internet.interfaces.IReactorSSL.connectSSL | |
342 """ | |
343 assert sslEnabled, "SSL support is not present" | |
344 c = ssl.Connector(host, port, factory, contextFactory, timeout, bindAddr
ess, self) | |
345 c.connect() | |
346 return c | |
347 | |
348 def listenSSL(self, port, factory, contextFactory, backlog=50, interface='')
: | |
349 """@see: twisted.internet.interfaces.IReactorSSL.listenSSL | |
350 """ | |
351 assert sslEnabled, "SSL support is not present" | |
352 p = ssl.Port(port, factory, contextFactory, backlog, interface, self) | |
353 p.startListening() | |
354 return p | |
355 | |
356 # IReactorArbitrary | |
357 def listenWith(self, portType, *args, **kw): | |
358 kw['reactor'] = self | |
359 p = portType(*args, **kw) | |
360 p.startListening() | |
361 return p | |
362 | |
363 def connectWith(self, connectorType, *args, **kw): | |
364 kw['reactor'] = self | |
365 c = connectorType(*args, **kw) | |
366 c.connect() | |
367 return c | |
368 | |
369 def _removeAll(self, readers, writers): | |
370 """ | |
371 Remove all readers and writers, and return list of Selectables. | |
372 | |
373 Meant for calling from subclasses, to implement removeAll, like:: | |
374 | |
375 def removeAll(self): | |
376 return self._removeAll(reads, writes) | |
377 | |
378 where C{reads} and C{writes} are iterables. | |
379 """ | |
380 readers = [reader for reader in readers if | |
381 reader is not self.waker] | |
382 | |
383 readers_dict = {} | |
384 for reader in readers: | |
385 readers_dict[reader] = 1 | |
386 | |
387 for reader in readers: | |
388 self.removeReader(reader) | |
389 self.removeWriter(reader) | |
390 | |
391 writers = [writer for writer in writers if | |
392 writer not in readers_dict] | |
393 for writer in writers: | |
394 self.removeWriter(writer) | |
395 | |
396 return readers+writers | |
397 | |
398 | |
399 if sslEnabled: | |
400 classImplements(PosixReactorBase, IReactorSSL) | |
401 if unixEnabled: | |
402 classImplements(PosixReactorBase, IReactorUNIX, IReactorUNIXDatagram) | |
403 if processEnabled: | |
404 classImplements(PosixReactorBase, IReactorProcess) | |
405 | |
406 __all__ = ["PosixReactorBase"] | |
OLD | NEW |