| OLD | NEW |
| (Empty) |
| 1 # -*- test-case-name: twisted.test.test_unix -*- | |
| 2 | |
| 3 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
| 4 # See LICENSE for details. | |
| 5 | |
| 6 | |
| 7 """Various asynchronous TCP/IP classes. | |
| 8 | |
| 9 End users shouldn't use this module directly - use the reactor APIs instead. | |
| 10 | |
| 11 Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>} | |
| 12 """ | |
| 13 | |
| 14 # System imports | |
| 15 import os, stat, socket | |
| 16 from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED | |
| 17 | |
| 18 from zope.interface import implements, implementsOnly, implementedBy | |
| 19 | |
| 20 if not hasattr(socket, 'AF_UNIX'): | |
| 21 raise ImportError("UNIX sockets not supported on this platform") | |
| 22 | |
| 23 # Twisted imports | |
| 24 from twisted.internet import base, tcp, udp, error, interfaces, protocol, addres
s | |
| 25 from twisted.internet.error import CannotListenError | |
| 26 from twisted.python import lockfile, log, reflect, failure | |
| 27 | |
| 28 | |
| 29 class Server(tcp.Server): | |
| 30 def __init__(self, sock, protocol, client, server, sessionno): | |
| 31 tcp.Server.__init__(self, sock, protocol, (client, None), server, sessio
nno) | |
| 32 | |
| 33 def getHost(self): | |
| 34 return address.UNIXAddress(self.socket.getsockname()) | |
| 35 | |
| 36 def getPeer(self): | |
| 37 return address.UNIXAddress(self.hostname) | |
| 38 | |
| 39 | |
| 40 class Port(tcp.Port): | |
| 41 addressFamily = socket.AF_UNIX | |
| 42 socketType = socket.SOCK_STREAM | |
| 43 | |
| 44 transport = Server | |
| 45 lockFile = None | |
| 46 | |
| 47 def __init__(self, fileName, factory, backlog=50, mode=0666, reactor=None, w
antPID = 0): | |
| 48 tcp.Port.__init__(self, fileName, factory, backlog, reactor=reactor) | |
| 49 self.mode = mode | |
| 50 self.wantPID = wantPID | |
| 51 | |
| 52 def __repr__(self): | |
| 53 factoryName = reflect.qual(self.factory.__class__) | |
| 54 if hasattr(self, 'socket'): | |
| 55 return '<%s on %r>' % (factoryName, self.port) | |
| 56 else: | |
| 57 return '<%s (not listening)>' % (factoryName,) | |
| 58 | |
| 59 def _buildAddr(self, name): | |
| 60 return address.UNIXAddress(name) | |
| 61 | |
| 62 def startListening(self): | |
| 63 """Create and bind my socket, and begin listening on it. | |
| 64 | |
| 65 This is called on unserialization, and must be called after creating a | |
| 66 server to begin listening on the specified port. | |
| 67 """ | |
| 68 log.msg("%s starting on %r" % (self.factory.__class__, repr(self.port))) | |
| 69 if self.wantPID: | |
| 70 self.lockFile = lockfile.FilesystemLock(self.port + ".lock") | |
| 71 if not self.lockFile.lock(): | |
| 72 raise CannotListenError, (None, self.port, "Cannot acquire lock"
) | |
| 73 else: | |
| 74 if not self.lockFile.clean: | |
| 75 try: | |
| 76 # This is a best-attempt at cleaning up | |
| 77 # left-over unix sockets on the filesystem. | |
| 78 # If it fails, there's not much else we can | |
| 79 # do. The bind() below will fail with an | |
| 80 # exception that actually propegates. | |
| 81 if stat.S_ISSOCK(os.stat(self.port).st_mode): | |
| 82 os.remove(self.port) | |
| 83 except: | |
| 84 pass | |
| 85 | |
| 86 self.factory.doStart() | |
| 87 try: | |
| 88 skt = self.createInternetSocket() | |
| 89 skt.bind(self.port) | |
| 90 except socket.error, le: | |
| 91 raise CannotListenError, (None, self.port, le) | |
| 92 else: | |
| 93 # Make the socket readable and writable to the world. | |
| 94 try: | |
| 95 os.chmod(self.port, self.mode) | |
| 96 except: # probably not a visible filesystem name | |
| 97 pass | |
| 98 skt.listen(self.backlog) | |
| 99 self.connected = True | |
| 100 self.socket = skt | |
| 101 self.fileno = self.socket.fileno | |
| 102 self.numberAccepts = 100 | |
| 103 self.startReading() | |
| 104 | |
| 105 def connectionLost(self, reason): | |
| 106 os.unlink(self.port) | |
| 107 if self.lockFile is not None: | |
| 108 self.lockFile.unlock() | |
| 109 tcp.Port.connectionLost(self, reason) | |
| 110 | |
| 111 def getHost(self): | |
| 112 """Returns a UNIXAddress. | |
| 113 | |
| 114 This indicates the server's address. | |
| 115 """ | |
| 116 return address.UNIXAddress(self.socket.getsockname()) | |
| 117 | |
| 118 | |
| 119 class Client(tcp.BaseClient): | |
| 120 """A client for Unix sockets.""" | |
| 121 addressFamily = socket.AF_UNIX | |
| 122 socketType = socket.SOCK_STREAM | |
| 123 | |
| 124 def __init__(self, filename, connector, reactor=None, checkPID = 0): | |
| 125 self.connector = connector | |
| 126 self.realAddress = self.addr = filename | |
| 127 if checkPID and not lockfile.isLocked(filename + ".lock"): | |
| 128 self._finishInit(None, None, error.BadFileError(filename), reactor) | |
| 129 self._finishInit(self.doConnect, self.createInternetSocket(), | |
| 130 None, reactor) | |
| 131 | |
| 132 def getPeer(self): | |
| 133 return address.UNIXAddress(self.addr) | |
| 134 | |
| 135 def getHost(self): | |
| 136 return address.UNIXAddress(None) | |
| 137 | |
| 138 | |
| 139 class Connector(base.BaseConnector): | |
| 140 def __init__(self, address, factory, timeout, reactor, checkPID): | |
| 141 base.BaseConnector.__init__(self, factory, timeout, reactor) | |
| 142 self.address = address | |
| 143 self.checkPID = checkPID | |
| 144 | |
| 145 def _makeTransport(self): | |
| 146 return Client(self.address, self, self.reactor, self.checkPID) | |
| 147 | |
| 148 def getDestination(self): | |
| 149 return address.UNIXAddress(self.address) | |
| 150 | |
| 151 | |
| 152 class DatagramPort(udp.Port): | |
| 153 """Datagram UNIX port, listening for packets.""" | |
| 154 | |
| 155 implements(interfaces.IUNIXDatagramTransport) | |
| 156 | |
| 157 addressFamily = socket.AF_UNIX | |
| 158 | |
| 159 def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, reactor=None)
: | |
| 160 """Initialize with address to listen on. | |
| 161 """ | |
| 162 udp.Port.__init__(self, addr, proto, maxPacketSize=maxPacketSize, reacto
r=reactor) | |
| 163 self.mode = mode | |
| 164 | |
| 165 | |
| 166 def __repr__(self): | |
| 167 protocolName = reflect.qual(self.protocol.__class__,) | |
| 168 if hasattr(self, 'socket'): | |
| 169 return '<%s on %r>' % (protocolName, self.port) | |
| 170 else: | |
| 171 return '<%s (not listening)>' % (protocolName,) | |
| 172 | |
| 173 | |
| 174 def _bindSocket(self): | |
| 175 log.msg("%s starting on %s"%(self.protocol.__class__, repr(self.port))) | |
| 176 try: | |
| 177 skt = self.createInternetSocket() # XXX: haha misnamed method | |
| 178 if self.port: | |
| 179 skt.bind(self.port) | |
| 180 except socket.error, le: | |
| 181 raise error.CannotListenError, (None, self.port, le) | |
| 182 if self.port: | |
| 183 try: | |
| 184 os.chmod(self.port, self.mode) | |
| 185 except: # probably not a visible filesystem name | |
| 186 pass | |
| 187 self.connected = 1 | |
| 188 self.socket = skt | |
| 189 self.fileno = self.socket.fileno | |
| 190 | |
| 191 def write(self, datagram, address): | |
| 192 """Write a datagram.""" | |
| 193 try: | |
| 194 return self.socket.sendto(datagram, address) | |
| 195 except socket.error, se: | |
| 196 no = se.args[0] | |
| 197 if no == EINTR: | |
| 198 return self.write(datagram, address) | |
| 199 elif no == EMSGSIZE: | |
| 200 raise error.MessageLengthError, "message too long" | |
| 201 elif no == EAGAIN: | |
| 202 # oh, well, drop the data. The only difference from UDP | |
| 203 # is that UDP won't ever notice. | |
| 204 # TODO: add TCP-like buffering | |
| 205 pass | |
| 206 else: | |
| 207 raise | |
| 208 | |
| 209 def connectionLost(self, reason=None): | |
| 210 """Cleans up my socket. | |
| 211 """ | |
| 212 log.msg('(Port %s Closed)' % repr(self.port)) | |
| 213 base.BasePort.connectionLost(self, reason) | |
| 214 if hasattr(self, "protocol"): | |
| 215 # we won't have attribute in ConnectedPort, in cases | |
| 216 # where there was an error in connection process | |
| 217 self.protocol.doStop() | |
| 218 self.connected = 0 | |
| 219 self.socket.close() | |
| 220 del self.socket | |
| 221 del self.fileno | |
| 222 if hasattr(self, "d"): | |
| 223 self.d.callback(None) | |
| 224 del self.d | |
| 225 | |
| 226 def setLogStr(self): | |
| 227 self.logstr = reflect.qual(self.protocol.__class__) + " (UDP)" | |
| 228 | |
| 229 def getHost(self): | |
| 230 return address.UNIXAddress(self.socket.getsockname()) | |
| 231 | |
| 232 | |
| 233 class ConnectedDatagramPort(DatagramPort): | |
| 234 """A connected datagram UNIX socket.""" | |
| 235 | |
| 236 implementsOnly(interfaces.IUNIXDatagramConnectedTransport, | |
| 237 *(implementedBy(base.BasePort))) | |
| 238 | |
| 239 def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=N
one, reactor=None): | |
| 240 assert isinstance(proto, protocol.ConnectedDatagramProtocol) | |
| 241 DatagramPort.__init__(self, bindAddress, proto, maxPacketSize, mode, rea
ctor) | |
| 242 self.remoteaddr = addr | |
| 243 | |
| 244 def startListening(self): | |
| 245 try: | |
| 246 self._bindSocket() | |
| 247 self.socket.connect(self.remoteaddr) | |
| 248 self._connectToProtocol() | |
| 249 except: | |
| 250 self.connectionFailed(failure.Failure()) | |
| 251 | |
| 252 def connectionFailed(self, reason): | |
| 253 self.loseConnection() | |
| 254 self.protocol.connectionFailed(reason) | |
| 255 del self.protocol | |
| 256 | |
| 257 def doRead(self): | |
| 258 """Called when my socket is ready for reading.""" | |
| 259 read = 0 | |
| 260 while read < self.maxThroughput: | |
| 261 try: | |
| 262 data, addr = self.socket.recvfrom(self.maxPacketSize) | |
| 263 read += len(data) | |
| 264 self.protocol.datagramReceived(data) | |
| 265 except socket.error, se: | |
| 266 no = se.args[0] | |
| 267 if no in (EAGAIN, EINTR, EWOULDBLOCK): | |
| 268 return | |
| 269 if no == ECONNREFUSED: | |
| 270 self.protocol.connectionRefused() | |
| 271 else: | |
| 272 raise | |
| 273 except: | |
| 274 log.deferr() | |
| 275 | |
| 276 def write(self, data): | |
| 277 """Write a datagram.""" | |
| 278 try: | |
| 279 return self.socket.send(data) | |
| 280 except socket.error, se: | |
| 281 no = se.args[0] | |
| 282 if no == EINTR: | |
| 283 return self.write(data) | |
| 284 elif no == EMSGSIZE: | |
| 285 raise error.MessageLengthError, "message too long" | |
| 286 elif no == ECONNREFUSED: | |
| 287 self.protocol.connectionRefused() | |
| 288 elif no == EAGAIN: | |
| 289 # oh, well, drop the data. The only difference from UDP | |
| 290 # is that UDP won't ever notice. | |
| 291 # TODO: add TCP-like buffering | |
| 292 pass | |
| 293 else: | |
| 294 raise | |
| 295 | |
| 296 def getPeer(self): | |
| 297 return address.UNIXAddress(self.remoteaddr) | |
| OLD | NEW |