| OLD | NEW |
| (Empty) |
| 1 # -*- test-case-name: twisted.test.test_internet -*- | |
| 2 # | |
| 3 # Copyright (c) 2001-2008 Twisted Matrix Laboratories. | |
| 4 # See LICENSE for details. | |
| 5 | |
| 6 | |
| 7 """ | |
| 8 Posix reactor base class | |
| 9 | |
| 10 Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>} | |
| 11 """ | |
| 12 | |
| 13 import warnings | |
| 14 import socket | |
| 15 import errno | |
| 16 import os | |
| 17 | |
| 18 from zope.interface import implements, classImplements | |
| 19 | |
| 20 from twisted.internet.interfaces import IReactorUNIX, IReactorUNIXDatagram | |
| 21 from twisted.internet.interfaces import IReactorTCP, IReactorUDP, IReactorSSL, I
ReactorArbitrary | |
| 22 from twisted.internet.interfaces import IReactorProcess, IReactorMulticast | |
| 23 from twisted.internet.interfaces import IHalfCloseableDescriptor | |
| 24 from twisted.internet import error | |
| 25 from twisted.internet import tcp, udp | |
| 26 | |
| 27 from twisted.python import log, failure, util | |
| 28 from twisted.persisted import styles | |
| 29 from twisted.python.runtime import platformType, platform | |
| 30 | |
| 31 from twisted.internet.base import ReactorBase, _SignalReactorMixin | |
| 32 | |
| 33 try: | |
| 34 from twisted.internet import ssl | |
| 35 sslEnabled = True | |
| 36 except ImportError: | |
| 37 sslEnabled = False | |
| 38 | |
| 39 try: | |
| 40 from twisted.internet import unix | |
| 41 unixEnabled = True | |
| 42 except ImportError: | |
| 43 unixEnabled = False | |
| 44 | |
| 45 processEnabled = False | |
| 46 if platformType == 'posix': | |
| 47 from twisted.internet import fdesc | |
| 48 import process | |
| 49 processEnabled = True | |
| 50 | |
| 51 if platform.isWindows(): | |
| 52 try: | |
| 53 import win32process | |
| 54 processEnabled = True | |
| 55 except ImportError: | |
| 56 win32process = None | |
| 57 | |
| 58 | |
| 59 class _Win32Waker(log.Logger, styles.Ephemeral): | |
| 60 """I am a workaround for the lack of pipes on win32. | |
| 61 | |
| 62 I am a pair of connected sockets which can wake up the main loop | |
| 63 from another thread. | |
| 64 """ | |
| 65 disconnected = 0 | |
| 66 | |
| 67 def __init__(self, reactor): | |
| 68 """Initialize. | |
| 69 """ | |
| 70 self.reactor = reactor | |
| 71 # Following select_trigger (from asyncore)'s example; | |
| 72 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| 73 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| 74 client.setsockopt(socket.IPPROTO_TCP, 1, 1) | |
| 75 server.bind(('127.0.0.1', 0)) | |
| 76 server.listen(1) | |
| 77 client.connect(server.getsockname()) | |
| 78 reader, clientaddr = server.accept() | |
| 79 client.setblocking(0) | |
| 80 reader.setblocking(0) | |
| 81 self.r = reader | |
| 82 self.w = client | |
| 83 self.fileno = self.r.fileno | |
| 84 | |
| 85 def wakeUp(self): | |
| 86 """Send a byte to my connection. | |
| 87 """ | |
| 88 try: | |
| 89 util.untilConcludes(self.w.send, 'x') | |
| 90 except socket.error, (err, msg): | |
| 91 if err != errno.WSAEWOULDBLOCK: | |
| 92 raise | |
| 93 | |
| 94 def doRead(self): | |
| 95 """Read some data from my connection. | |
| 96 """ | |
| 97 try: | |
| 98 self.r.recv(8192) | |
| 99 except socket.error: | |
| 100 pass | |
| 101 | |
| 102 def connectionLost(self, reason): | |
| 103 self.r.close() | |
| 104 self.w.close() | |
| 105 self.reactor.waker = None | |
| 106 | |
| 107 class _UnixWaker(log.Logger, styles.Ephemeral): | |
| 108 """This class provides a simple interface to wake up the event loop. | |
| 109 | |
| 110 This is used by threads or signals to wake up the event loop. | |
| 111 """ | |
| 112 disconnected = 0 | |
| 113 | |
| 114 i = None | |
| 115 o = None | |
| 116 | |
| 117 def __init__(self, reactor): | |
| 118 """Initialize. | |
| 119 """ | |
| 120 self.reactor = reactor | |
| 121 self.i, self.o = os.pipe() | |
| 122 fdesc.setNonBlocking(self.i) | |
| 123 fdesc.setNonBlocking(self.o) | |
| 124 self.fileno = lambda: self.i | |
| 125 | |
| 126 def doRead(self): | |
| 127 """Read some bytes from the pipe. | |
| 128 """ | |
| 129 fdesc.readFromFD(self.fileno(), lambda data: None) | |
| 130 | |
| 131 def wakeUp(self): | |
| 132 """Write one byte to the pipe, and flush it. | |
| 133 """ | |
| 134 # We don't use fdesc.writeToFD since we need to distinguish | |
| 135 # between EINTR (try again) and EAGAIN (do nothing). | |
| 136 if self.o is not None: | |
| 137 try: | |
| 138 util.untilConcludes(os.write, self.o, 'x') | |
| 139 except OSError, e: | |
| 140 if e.errno != errno.EAGAIN: | |
| 141 raise | |
| 142 | |
| 143 def connectionLost(self, reason): | |
| 144 """Close both ends of my pipe. | |
| 145 """ | |
| 146 if not hasattr(self, "o"): | |
| 147 return | |
| 148 for fd in self.i, self.o: | |
| 149 try: | |
| 150 os.close(fd) | |
| 151 except IOError: | |
| 152 pass | |
| 153 del self.i, self.o | |
| 154 self.reactor.waker = None | |
| 155 | |
| 156 | |
| 157 if platformType == 'posix': | |
| 158 _Waker = _UnixWaker | |
| 159 elif platformType == 'win32': | |
| 160 _Waker = _Win32Waker | |
| 161 | |
| 162 | |
| 163 class PosixReactorBase(_SignalReactorMixin, ReactorBase): | |
| 164 """ | |
| 165 A basis for reactors that use file descriptors. | |
| 166 """ | |
| 167 implements(IReactorArbitrary, IReactorTCP, IReactorUDP, IReactorMulticast) | |
| 168 | |
| 169 def __init__(self): | |
| 170 ReactorBase.__init__(self) | |
| 171 if self.usingThreads or platformType == "posix": | |
| 172 self.installWaker() | |
| 173 | |
| 174 | |
| 175 def _disconnectSelectable(self, selectable, why, isRead, faildict={ | |
| 176 error.ConnectionDone: failure.Failure(error.ConnectionDone()), | |
| 177 error.ConnectionLost: failure.Failure(error.ConnectionLost()) | |
| 178 }): | |
| 179 """ | |
| 180 Utility function for disconnecting a selectable. | |
| 181 | |
| 182 Supports half-close notification, isRead should be boolean indicating | |
| 183 whether error resulted from doRead(). | |
| 184 """ | |
| 185 self.removeReader(selectable) | |
| 186 f = faildict.get(why.__class__) | |
| 187 if f: | |
| 188 if (isRead and why.__class__ == error.ConnectionDone | |
| 189 and IHalfCloseableDescriptor.providedBy(selectable)): | |
| 190 selectable.readConnectionLost(f) | |
| 191 else: | |
| 192 self.removeWriter(selectable) | |
| 193 selectable.connectionLost(f) | |
| 194 else: | |
| 195 self.removeWriter(selectable) | |
| 196 selectable.connectionLost(failure.Failure(why)) | |
| 197 | |
| 198 def installWaker(self): | |
| 199 """ | |
| 200 Install a `waker' to allow threads and signals to wake up the IO thread. | |
| 201 | |
| 202 We use the self-pipe trick (http://cr.yp.to/docs/selfpipe.html) to wake | |
| 203 the reactor. On Windows we use a pair of sockets. | |
| 204 """ | |
| 205 if not self.waker: | |
| 206 self.waker = _Waker(self) | |
| 207 self.addReader(self.waker) | |
| 208 | |
| 209 | |
| 210 # IReactorProcess | |
| 211 | |
| 212 def spawnProcess(self, processProtocol, executable, args=(), | |
| 213 env={}, path=None, | |
| 214 uid=None, gid=None, usePTY=0, childFDs=None): | |
| 215 args, env = self._checkProcessArgs(args, env) | |
| 216 if platformType == 'posix': | |
| 217 if usePTY: | |
| 218 if childFDs is not None: | |
| 219 raise ValueError("Using childFDs is not supported with usePT
Y=True.") | |
| 220 return process.PTYProcess(self, executable, args, env, path, | |
| 221 processProtocol, uid, gid, usePTY) | |
| 222 else: | |
| 223 return process.Process(self, executable, args, env, path, | |
| 224 processProtocol, uid, gid, childFDs) | |
| 225 elif platformType == "win32": | |
| 226 if uid is not None or gid is not None: | |
| 227 raise ValueError("The uid and gid parameters are not supported o
n Windows.") | |
| 228 if usePTY: | |
| 229 raise ValueError("The usePTY parameter is not supported on Windo
ws.") | |
| 230 if childFDs: | |
| 231 raise ValueError("Customizing childFDs is not supported on Windo
ws.") | |
| 232 | |
| 233 if win32process: | |
| 234 from twisted.internet._dumbwin32proc import Process | |
| 235 return Process(self, processProtocol, executable, args, env, pat
h) | |
| 236 else: | |
| 237 raise NotImplementedError, "spawnProcess not available since pyw
in32 is not installed." | |
| 238 else: | |
| 239 raise NotImplementedError, "spawnProcess only available on Windows o
r POSIX." | |
| 240 | |
| 241 # IReactorUDP | |
| 242 | |
| 243 def listenUDP(self, port, protocol, interface='', maxPacketSize=8192): | |
| 244 """Connects a given L{DatagramProtocol} to the given numeric UDP port. | |
| 245 | |
| 246 @returns: object conforming to L{IListeningPort}. | |
| 247 """ | |
| 248 p = udp.Port(port, protocol, interface, maxPacketSize, self) | |
| 249 p.startListening() | |
| 250 return p | |
| 251 | |
| 252 def connectUDP(self, remotehost, remoteport, protocol, localport=0, | |
| 253 interface='', maxPacketSize=8192): | |
| 254 """DEPRECATED. | |
| 255 | |
| 256 Connects a L{ConnectedDatagramProtocol} instance to a UDP port. | |
| 257 """ | |
| 258 warnings.warn("use listenUDP and then transport.connect().", Deprecation
Warning, stacklevel=2) | |
| 259 p = udp.ConnectedPort((remotehost, remoteport), localport, protocol, int
erface, maxPacketSize, self) | |
| 260 p.startListening() | |
| 261 return p | |
| 262 | |
| 263 | |
| 264 # IReactorMulticast | |
| 265 | |
| 266 def listenMulticast(self, port, protocol, interface='', maxPacketSize=8192,
listenMultiple=False): | |
| 267 """Connects a given DatagramProtocol to the given numeric UDP port. | |
| 268 | |
| 269 EXPERIMENTAL. | |
| 270 | |
| 271 @returns: object conforming to IListeningPort. | |
| 272 """ | |
| 273 p = udp.MulticastPort(port, protocol, interface, maxPacketSize, self, li
stenMultiple) | |
| 274 p.startListening() | |
| 275 return p | |
| 276 | |
| 277 | |
| 278 # IReactorUNIX | |
| 279 | |
| 280 def connectUNIX(self, address, factory, timeout=30, checkPID=0): | |
| 281 """@see: twisted.internet.interfaces.IReactorUNIX.connectUNIX | |
| 282 """ | |
| 283 assert unixEnabled, "UNIX support is not present" | |
| 284 c = unix.Connector(address, factory, timeout, self, checkPID) | |
| 285 c.connect() | |
| 286 return c | |
| 287 | |
| 288 def listenUNIX(self, address, factory, backlog=50, mode=0666, wantPID=0): | |
| 289 """@see: twisted.internet.interfaces.IReactorUNIX.listenUNIX | |
| 290 """ | |
| 291 assert unixEnabled, "UNIX support is not present" | |
| 292 p = unix.Port(address, factory, backlog, mode, self, wantPID) | |
| 293 p.startListening() | |
| 294 return p | |
| 295 | |
| 296 | |
| 297 # IReactorUNIXDatagram | |
| 298 | |
| 299 def listenUNIXDatagram(self, address, protocol, maxPacketSize=8192, mode=066
6): | |
| 300 """Connects a given L{DatagramProtocol} to the given path. | |
| 301 | |
| 302 EXPERIMENTAL. | |
| 303 | |
| 304 @returns: object conforming to L{IListeningPort}. | |
| 305 """ | |
| 306 assert unixEnabled, "UNIX support is not present" | |
| 307 p = unix.DatagramPort(address, protocol, maxPacketSize, mode, self) | |
| 308 p.startListening() | |
| 309 return p | |
| 310 | |
| 311 def connectUNIXDatagram(self, address, protocol, maxPacketSize=8192, mode=06
66, bindAddress=None): | |
| 312 """Connects a L{ConnectedDatagramProtocol} instance to a path. | |
| 313 | |
| 314 EXPERIMENTAL. | |
| 315 """ | |
| 316 assert unixEnabled, "UNIX support is not present" | |
| 317 p = unix.ConnectedDatagramPort(address, protocol, maxPacketSize, mode, b
indAddress, self) | |
| 318 p.startListening() | |
| 319 return p | |
| 320 | |
| 321 | |
| 322 # IReactorTCP | |
| 323 | |
| 324 def listenTCP(self, port, factory, backlog=50, interface=''): | |
| 325 """@see: twisted.internet.interfaces.IReactorTCP.listenTCP | |
| 326 """ | |
| 327 p = tcp.Port(port, factory, backlog, interface, self) | |
| 328 p.startListening() | |
| 329 return p | |
| 330 | |
| 331 def connectTCP(self, host, port, factory, timeout=30, bindAddress=None): | |
| 332 """@see: twisted.internet.interfaces.IReactorTCP.connectTCP | |
| 333 """ | |
| 334 c = tcp.Connector(host, port, factory, timeout, bindAddress, self) | |
| 335 c.connect() | |
| 336 return c | |
| 337 | |
| 338 # IReactorSSL (sometimes, not implemented) | |
| 339 | |
| 340 def connectSSL(self, host, port, factory, contextFactory, timeout=30, bindAd
dress=None): | |
| 341 """@see: twisted.internet.interfaces.IReactorSSL.connectSSL | |
| 342 """ | |
| 343 assert sslEnabled, "SSL support is not present" | |
| 344 c = ssl.Connector(host, port, factory, contextFactory, timeout, bindAddr
ess, self) | |
| 345 c.connect() | |
| 346 return c | |
| 347 | |
| 348 def listenSSL(self, port, factory, contextFactory, backlog=50, interface='')
: | |
| 349 """@see: twisted.internet.interfaces.IReactorSSL.listenSSL | |
| 350 """ | |
| 351 assert sslEnabled, "SSL support is not present" | |
| 352 p = ssl.Port(port, factory, contextFactory, backlog, interface, self) | |
| 353 p.startListening() | |
| 354 return p | |
| 355 | |
| 356 # IReactorArbitrary | |
| 357 def listenWith(self, portType, *args, **kw): | |
| 358 kw['reactor'] = self | |
| 359 p = portType(*args, **kw) | |
| 360 p.startListening() | |
| 361 return p | |
| 362 | |
| 363 def connectWith(self, connectorType, *args, **kw): | |
| 364 kw['reactor'] = self | |
| 365 c = connectorType(*args, **kw) | |
| 366 c.connect() | |
| 367 return c | |
| 368 | |
| 369 def _removeAll(self, readers, writers): | |
| 370 """ | |
| 371 Remove all readers and writers, and return list of Selectables. | |
| 372 | |
| 373 Meant for calling from subclasses, to implement removeAll, like:: | |
| 374 | |
| 375 def removeAll(self): | |
| 376 return self._removeAll(reads, writes) | |
| 377 | |
| 378 where C{reads} and C{writes} are iterables. | |
| 379 """ | |
| 380 readers = [reader for reader in readers if | |
| 381 reader is not self.waker] | |
| 382 | |
| 383 readers_dict = {} | |
| 384 for reader in readers: | |
| 385 readers_dict[reader] = 1 | |
| 386 | |
| 387 for reader in readers: | |
| 388 self.removeReader(reader) | |
| 389 self.removeWriter(reader) | |
| 390 | |
| 391 writers = [writer for writer in writers if | |
| 392 writer not in readers_dict] | |
| 393 for writer in writers: | |
| 394 self.removeWriter(writer) | |
| 395 | |
| 396 return readers+writers | |
| 397 | |
| 398 | |
| 399 if sslEnabled: | |
| 400 classImplements(PosixReactorBase, IReactorSSL) | |
| 401 if unixEnabled: | |
| 402 classImplements(PosixReactorBase, IReactorUNIX, IReactorUNIXDatagram) | |
| 403 if processEnabled: | |
| 404 classImplements(PosixReactorBase, IReactorProcess) | |
| 405 | |
| 406 __all__ = ["PosixReactorBase"] | |
| OLD | NEW |