| OLD | NEW |
| (Empty) |
| 1 # -*- test-case-name: twisted.test.test_udp -*- | |
| 2 | |
| 3 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
| 4 # See LICENSE for details. | |
| 5 | |
| 6 | |
| 7 """ | |
| 8 Various asynchronous UDP classes. | |
| 9 | |
| 10 Please do not use this module directly. | |
| 11 | |
| 12 Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>} | |
| 13 """ | |
| 14 | |
| 15 # System Imports | |
| 16 import os | |
| 17 import socket | |
| 18 import operator | |
| 19 import struct | |
| 20 import warnings | |
| 21 from zope.interface import implements | |
| 22 | |
| 23 from twisted.python.runtime import platformType | |
| 24 if platformType == 'win32': | |
| 25 from errno import WSAEWOULDBLOCK as EWOULDBLOCK | |
| 26 from errno import WSAEINTR as EINTR | |
| 27 from errno import WSAEMSGSIZE as EMSGSIZE | |
| 28 from errno import WSAECONNREFUSED as ECONNREFUSED | |
| 29 from errno import WSAECONNRESET | |
| 30 EAGAIN=EWOULDBLOCK | |
| 31 else: | |
| 32 from errno import EWOULDBLOCK, EINTR, EMSGSIZE, ECONNREFUSED, EAGAIN | |
| 33 | |
| 34 # Twisted Imports | |
| 35 from twisted.internet import protocol, base, defer, address | |
| 36 from twisted.persisted import styles | |
| 37 from twisted.python import log, reflect, failure | |
| 38 | |
| 39 # Sibling Imports | |
| 40 import abstract, error, interfaces | |
| 41 | |
| 42 | |
| 43 class Port(base.BasePort): | |
| 44 """UDP port, listening for packets.""" | |
| 45 | |
| 46 implements(interfaces.IUDPTransport, interfaces.ISystemHandle) | |
| 47 | |
| 48 addressFamily = socket.AF_INET | |
| 49 socketType = socket.SOCK_DGRAM | |
| 50 maxThroughput = 256 * 1024 # max bytes we read in one eventloop iteration | |
| 51 | |
| 52 # Actual port number being listened on, only set to a non-None | |
| 53 # value when we are actually listening. | |
| 54 _realPortNumber = None | |
| 55 | |
| 56 def __init__(self, port, proto, interface='', maxPacketSize=8192, reactor=No
ne): | |
| 57 """Initialize with a numeric port to listen on. | |
| 58 """ | |
| 59 base.BasePort.__init__(self, reactor) | |
| 60 self.port = port | |
| 61 self.protocol = proto | |
| 62 self.maxPacketSize = maxPacketSize | |
| 63 self.interface = interface | |
| 64 self.setLogStr() | |
| 65 self._connectedAddr = None | |
| 66 | |
| 67 def __repr__(self): | |
| 68 if self._realPortNumber is not None: | |
| 69 return "<%s on %s>" % (self.protocol.__class__, self._realPortNumber
) | |
| 70 else: | |
| 71 return "<%s not connected>" % (self.protocol.__class__,) | |
| 72 | |
| 73 def getHandle(self): | |
| 74 """Return a socket object.""" | |
| 75 return self.socket | |
| 76 | |
| 77 def startListening(self): | |
| 78 """Create and bind my socket, and begin listening on it. | |
| 79 | |
| 80 This is called on unserialization, and must be called after creating a | |
| 81 server to begin listening on the specified port. | |
| 82 """ | |
| 83 self._bindSocket() | |
| 84 self._connectToProtocol() | |
| 85 | |
| 86 def _bindSocket(self): | |
| 87 try: | |
| 88 skt = self.createInternetSocket() | |
| 89 skt.bind((self.interface, self.port)) | |
| 90 except socket.error, le: | |
| 91 raise error.CannotListenError, (self.interface, self.port, le) | |
| 92 | |
| 93 # Make sure that if we listened on port 0, we update that to | |
| 94 # reflect what the OS actually assigned us. | |
| 95 self._realPortNumber = skt.getsockname()[1] | |
| 96 | |
| 97 log.msg("%s starting on %s"%(self.protocol.__class__, self._realPortNumb
er)) | |
| 98 | |
| 99 self.connected = 1 | |
| 100 self.socket = skt | |
| 101 self.fileno = self.socket.fileno | |
| 102 | |
| 103 def _connectToProtocol(self): | |
| 104 self.protocol.makeConnection(self) | |
| 105 self.startReading() | |
| 106 | |
| 107 | |
| 108 def doRead(self): | |
| 109 """Called when my socket is ready for reading.""" | |
| 110 read = 0 | |
| 111 while read < self.maxThroughput: | |
| 112 try: | |
| 113 data, addr = self.socket.recvfrom(self.maxPacketSize) | |
| 114 except socket.error, se: | |
| 115 no = se.args[0] | |
| 116 if no in (EAGAIN, EINTR, EWOULDBLOCK): | |
| 117 return | |
| 118 if (no == ECONNREFUSED) or (platformType == "win32" and no == WS
AECONNRESET): | |
| 119 if self._connectedAddr: | |
| 120 self.protocol.connectionRefused() | |
| 121 else: | |
| 122 raise | |
| 123 else: | |
| 124 read += len(data) | |
| 125 try: | |
| 126 self.protocol.datagramReceived(data, addr) | |
| 127 except: | |
| 128 log.err() | |
| 129 | |
| 130 | |
| 131 def write(self, datagram, addr=None): | |
| 132 """Write a datagram. | |
| 133 | |
| 134 @param addr: should be a tuple (ip, port), can be None in connected mode
. | |
| 135 """ | |
| 136 if self._connectedAddr: | |
| 137 assert addr in (None, self._connectedAddr) | |
| 138 try: | |
| 139 return self.socket.send(datagram) | |
| 140 except socket.error, se: | |
| 141 no = se.args[0] | |
| 142 if no == EINTR: | |
| 143 return self.write(datagram) | |
| 144 elif no == EMSGSIZE: | |
| 145 raise error.MessageLengthError, "message too long" | |
| 146 elif no == ECONNREFUSED: | |
| 147 self.protocol.connectionRefused() | |
| 148 else: | |
| 149 raise | |
| 150 else: | |
| 151 assert addr != None | |
| 152 if not addr[0].replace(".", "").isdigit(): | |
| 153 warnings.warn("Please only pass IPs to write(), not hostnames",
DeprecationWarning, stacklevel=2) | |
| 154 try: | |
| 155 return self.socket.sendto(datagram, addr) | |
| 156 except socket.error, se: | |
| 157 no = se.args[0] | |
| 158 if no == EINTR: | |
| 159 return self.write(datagram, addr) | |
| 160 elif no == EMSGSIZE: | |
| 161 raise error.MessageLengthError, "message too long" | |
| 162 elif no == ECONNREFUSED: | |
| 163 # in non-connected UDP ECONNREFUSED is platform dependent, I
think | |
| 164 # and the info is not necessarily useful. Nevertheless maybe
we | |
| 165 # should call connectionRefused? XXX | |
| 166 return | |
| 167 else: | |
| 168 raise | |
| 169 | |
| 170 def writeSequence(self, seq, addr): | |
| 171 self.write("".join(seq), addr) | |
| 172 | |
| 173 def connect(self, host, port): | |
| 174 """'Connect' to remote server.""" | |
| 175 if self._connectedAddr: | |
| 176 raise RuntimeError, "already connected, reconnecting is not currentl
y supported (talk to itamar if you want this)" | |
| 177 if not abstract.isIPAddress(host): | |
| 178 raise ValueError, "please pass only IP addresses, not domain names" | |
| 179 self._connectedAddr = (host, port) | |
| 180 self.socket.connect((host, port)) | |
| 181 | |
| 182 def _loseConnection(self): | |
| 183 self.stopReading() | |
| 184 if self.connected: # actually means if we are *listening* | |
| 185 from twisted.internet import reactor | |
| 186 reactor.callLater(0, self.connectionLost) | |
| 187 | |
| 188 def stopListening(self): | |
| 189 if self.connected: | |
| 190 result = self.d = defer.Deferred() | |
| 191 else: | |
| 192 result = None | |
| 193 self._loseConnection() | |
| 194 return result | |
| 195 | |
| 196 def loseConnection(self): | |
| 197 warnings.warn("Please use stopListening() to disconnect port", Deprecati
onWarning, stacklevel=2) | |
| 198 self.stopListening() | |
| 199 | |
| 200 def connectionLost(self, reason=None): | |
| 201 """Cleans up my socket. | |
| 202 """ | |
| 203 log.msg('(Port %s Closed)' % self._realPortNumber) | |
| 204 self._realPortNumber = None | |
| 205 base.BasePort.connectionLost(self, reason) | |
| 206 if hasattr(self, "protocol"): | |
| 207 # we won't have attribute in ConnectedPort, in cases | |
| 208 # where there was an error in connection process | |
| 209 self.protocol.doStop() | |
| 210 self.connected = 0 | |
| 211 self.socket.close() | |
| 212 del self.socket | |
| 213 del self.fileno | |
| 214 if hasattr(self, "d"): | |
| 215 self.d.callback(None) | |
| 216 del self.d | |
| 217 | |
| 218 def setLogStr(self): | |
| 219 self.logstr = reflect.qual(self.protocol.__class__) + " (UDP)" | |
| 220 | |
| 221 def logPrefix(self): | |
| 222 """Returns the name of my class, to prefix log entries with. | |
| 223 """ | |
| 224 return self.logstr | |
| 225 | |
| 226 def getHost(self): | |
| 227 """ | |
| 228 Returns an IPv4Address. | |
| 229 | |
| 230 This indicates the address from which I am connecting. | |
| 231 """ | |
| 232 return address.IPv4Address('UDP', *(self.socket.getsockname() + ('INET_U
DP',))) | |
| 233 | |
| 234 | |
| 235 class ConnectedPort(Port): | |
| 236 """DEPRECATED. | |
| 237 | |
| 238 A connected UDP socket.""" | |
| 239 | |
| 240 implements(interfaces.IUDPConnectedTransport) | |
| 241 | |
| 242 def __init__(self, (remotehost, remoteport), port, proto, interface='', maxP
acketSize=8192, reactor=None): | |
| 243 Port.__init__(self, port, proto, interface, maxPacketSize, reactor) | |
| 244 self.remotehost = remotehost | |
| 245 self.remoteport = remoteport | |
| 246 | |
| 247 def startListening(self): | |
| 248 self._bindSocket() | |
| 249 if abstract.isIPAddress(self.remotehost): | |
| 250 self.setRealAddress(self.remotehost) | |
| 251 else: | |
| 252 self.realAddress = None | |
| 253 d = self.reactor.resolve(self.remotehost) | |
| 254 d.addCallback(self.setRealAddress).addErrback(self.connectionFailed) | |
| 255 | |
| 256 def setRealAddress(self, addr): | |
| 257 self.realAddress = addr | |
| 258 self.socket.connect((addr, self.remoteport)) | |
| 259 self._connectToProtocol() | |
| 260 | |
| 261 def connectionFailed(self, reason): | |
| 262 self._loseConnection() | |
| 263 self.protocol.connectionFailed(reason) | |
| 264 del self.protocol | |
| 265 | |
| 266 def doRead(self): | |
| 267 """Called when my socket is ready for reading.""" | |
| 268 read = 0 | |
| 269 while read < self.maxThroughput: | |
| 270 try: | |
| 271 data, addr = self.socket.recvfrom(self.maxPacketSize) | |
| 272 read += len(data) | |
| 273 self.protocol.datagramReceived(data) | |
| 274 except socket.error, se: | |
| 275 no = se.args[0] | |
| 276 if no in (EAGAIN, EINTR, EWOULDBLOCK): | |
| 277 return | |
| 278 if (no == ECONNREFUSED) or (platformType == "win32" and no == WS
AECONNRESET): | |
| 279 self.protocol.connectionRefused() | |
| 280 else: | |
| 281 raise | |
| 282 except: | |
| 283 log.deferr() | |
| 284 | |
| 285 def write(self, data): | |
| 286 """Write a datagram.""" | |
| 287 try: | |
| 288 return self.socket.send(data) | |
| 289 except socket.error, se: | |
| 290 no = se.args[0] | |
| 291 if no == EINTR: | |
| 292 return self.write(data) | |
| 293 elif no == EMSGSIZE: | |
| 294 raise error.MessageLengthError, "message too long" | |
| 295 elif no == ECONNREFUSED: | |
| 296 self.protocol.connectionRefused() | |
| 297 else: | |
| 298 raise | |
| 299 | |
| 300 def getPeer(self): | |
| 301 """ | |
| 302 Returns a tuple of ('INET_UDP', hostname, port), indicating | |
| 303 the remote address. | |
| 304 """ | |
| 305 return address.IPv4Address('UDP', self.remotehost, self.remoteport, 'INE
T_UDP') | |
| 306 | |
| 307 | |
| 308 class MulticastMixin: | |
| 309 """Implement multicast functionality.""" | |
| 310 | |
| 311 def getOutgoingInterface(self): | |
| 312 i = self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF) | |
| 313 return socket.inet_ntoa(struct.pack("@i", i)) | |
| 314 | |
| 315 def setOutgoingInterface(self, addr): | |
| 316 """Returns Deferred of success.""" | |
| 317 return self.reactor.resolve(addr).addCallback(self._setInterface) | |
| 318 | |
| 319 def _setInterface(self, addr): | |
| 320 i = socket.inet_aton(addr) | |
| 321 self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, i) | |
| 322 return 1 | |
| 323 | |
| 324 def getLoopbackMode(self): | |
| 325 return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOO
P) | |
| 326 | |
| 327 def setLoopbackMode(self, mode): | |
| 328 mode = struct.pack("b", operator.truth(mode)) | |
| 329 self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, mode
) | |
| 330 | |
| 331 def getTTL(self): | |
| 332 return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL
) | |
| 333 | |
| 334 def setTTL(self, ttl): | |
| 335 ttl = struct.pack("B", ttl) | |
| 336 self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl) | |
| 337 | |
| 338 def joinGroup(self, addr, interface=""): | |
| 339 """Join a multicast group. Returns Deferred of success.""" | |
| 340 return self.reactor.resolve(addr).addCallback(self._joinAddr1, interface
, 1) | |
| 341 | |
| 342 def _joinAddr1(self, addr, interface, join): | |
| 343 return self.reactor.resolve(interface).addCallback(self._joinAddr2, addr
, join) | |
| 344 | |
| 345 def _joinAddr2(self, interface, addr, join): | |
| 346 addr = socket.inet_aton(addr) | |
| 347 interface = socket.inet_aton(interface) | |
| 348 if join: | |
| 349 cmd = socket.IP_ADD_MEMBERSHIP | |
| 350 else: | |
| 351 cmd = socket.IP_DROP_MEMBERSHIP | |
| 352 try: | |
| 353 self.socket.setsockopt(socket.IPPROTO_IP, cmd, addr + interface) | |
| 354 except socket.error, e: | |
| 355 return failure.Failure(error.MulticastJoinError(addr, interface, *e.
args)) | |
| 356 | |
| 357 def leaveGroup(self, addr, interface=""): | |
| 358 """Leave multicast group, return Deferred of success.""" | |
| 359 return self.reactor.resolve(addr).addCallback(self._joinAddr1, interface
, 0) | |
| 360 | |
| 361 | |
| 362 class MulticastPort(MulticastMixin, Port): | |
| 363 """UDP Port that supports multicasting.""" | |
| 364 | |
| 365 implements(interfaces.IMulticastTransport) | |
| 366 | |
| 367 def __init__(self, port, proto, interface='', maxPacketSize=8192, reactor=No
ne, listenMultiple=False): | |
| 368 Port.__init__(self, port, proto, interface, maxPacketSize, reactor) | |
| 369 self.listenMultiple = listenMultiple | |
| 370 | |
| 371 def createInternetSocket(self): | |
| 372 skt = Port.createInternetSocket(self) | |
| 373 if self.listenMultiple: | |
| 374 skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| 375 if hasattr(socket, "SO_REUSEPORT"): | |
| 376 skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) | |
| 377 return skt | |
| 378 | |
| 379 | |
| 380 class ConnectedMulticastPort(MulticastMixin, ConnectedPort): | |
| 381 """DEPRECATED. | |
| 382 | |
| 383 Connected UDP Port that supports multicasting.""" | |
| 384 | |
| 385 implements(interfaces.IMulticastTransport) | |
| OLD | NEW |