OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.test.test_unix -*- | |
2 | |
3 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
4 # See LICENSE for details. | |
5 | |
6 | |
7 """Various asynchronous TCP/IP classes. | |
8 | |
9 End users shouldn't use this module directly - use the reactor APIs instead. | |
10 | |
11 Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>} | |
12 """ | |
13 | |
14 # System imports | |
15 import os, stat, socket | |
16 from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED | |
17 | |
18 from zope.interface import implements, implementsOnly, implementedBy | |
19 | |
20 if not hasattr(socket, 'AF_UNIX'): | |
21 raise ImportError("UNIX sockets not supported on this platform") | |
22 | |
23 # Twisted imports | |
24 from twisted.internet import base, tcp, udp, error, interfaces, protocol, addres
s | |
25 from twisted.internet.error import CannotListenError | |
26 from twisted.python import lockfile, log, reflect, failure | |
27 | |
28 | |
29 class Server(tcp.Server): | |
30 def __init__(self, sock, protocol, client, server, sessionno): | |
31 tcp.Server.__init__(self, sock, protocol, (client, None), server, sessio
nno) | |
32 | |
33 def getHost(self): | |
34 return address.UNIXAddress(self.socket.getsockname()) | |
35 | |
36 def getPeer(self): | |
37 return address.UNIXAddress(self.hostname) | |
38 | |
39 | |
40 class Port(tcp.Port): | |
41 addressFamily = socket.AF_UNIX | |
42 socketType = socket.SOCK_STREAM | |
43 | |
44 transport = Server | |
45 lockFile = None | |
46 | |
47 def __init__(self, fileName, factory, backlog=50, mode=0666, reactor=None, w
antPID = 0): | |
48 tcp.Port.__init__(self, fileName, factory, backlog, reactor=reactor) | |
49 self.mode = mode | |
50 self.wantPID = wantPID | |
51 | |
52 def __repr__(self): | |
53 factoryName = reflect.qual(self.factory.__class__) | |
54 if hasattr(self, 'socket'): | |
55 return '<%s on %r>' % (factoryName, self.port) | |
56 else: | |
57 return '<%s (not listening)>' % (factoryName,) | |
58 | |
59 def _buildAddr(self, name): | |
60 return address.UNIXAddress(name) | |
61 | |
62 def startListening(self): | |
63 """Create and bind my socket, and begin listening on it. | |
64 | |
65 This is called on unserialization, and must be called after creating a | |
66 server to begin listening on the specified port. | |
67 """ | |
68 log.msg("%s starting on %r" % (self.factory.__class__, repr(self.port))) | |
69 if self.wantPID: | |
70 self.lockFile = lockfile.FilesystemLock(self.port + ".lock") | |
71 if not self.lockFile.lock(): | |
72 raise CannotListenError, (None, self.port, "Cannot acquire lock"
) | |
73 else: | |
74 if not self.lockFile.clean: | |
75 try: | |
76 # This is a best-attempt at cleaning up | |
77 # left-over unix sockets on the filesystem. | |
78 # If it fails, there's not much else we can | |
79 # do. The bind() below will fail with an | |
80 # exception that actually propegates. | |
81 if stat.S_ISSOCK(os.stat(self.port).st_mode): | |
82 os.remove(self.port) | |
83 except: | |
84 pass | |
85 | |
86 self.factory.doStart() | |
87 try: | |
88 skt = self.createInternetSocket() | |
89 skt.bind(self.port) | |
90 except socket.error, le: | |
91 raise CannotListenError, (None, self.port, le) | |
92 else: | |
93 # Make the socket readable and writable to the world. | |
94 try: | |
95 os.chmod(self.port, self.mode) | |
96 except: # probably not a visible filesystem name | |
97 pass | |
98 skt.listen(self.backlog) | |
99 self.connected = True | |
100 self.socket = skt | |
101 self.fileno = self.socket.fileno | |
102 self.numberAccepts = 100 | |
103 self.startReading() | |
104 | |
105 def connectionLost(self, reason): | |
106 os.unlink(self.port) | |
107 if self.lockFile is not None: | |
108 self.lockFile.unlock() | |
109 tcp.Port.connectionLost(self, reason) | |
110 | |
111 def getHost(self): | |
112 """Returns a UNIXAddress. | |
113 | |
114 This indicates the server's address. | |
115 """ | |
116 return address.UNIXAddress(self.socket.getsockname()) | |
117 | |
118 | |
119 class Client(tcp.BaseClient): | |
120 """A client for Unix sockets.""" | |
121 addressFamily = socket.AF_UNIX | |
122 socketType = socket.SOCK_STREAM | |
123 | |
124 def __init__(self, filename, connector, reactor=None, checkPID = 0): | |
125 self.connector = connector | |
126 self.realAddress = self.addr = filename | |
127 if checkPID and not lockfile.isLocked(filename + ".lock"): | |
128 self._finishInit(None, None, error.BadFileError(filename), reactor) | |
129 self._finishInit(self.doConnect, self.createInternetSocket(), | |
130 None, reactor) | |
131 | |
132 def getPeer(self): | |
133 return address.UNIXAddress(self.addr) | |
134 | |
135 def getHost(self): | |
136 return address.UNIXAddress(None) | |
137 | |
138 | |
139 class Connector(base.BaseConnector): | |
140 def __init__(self, address, factory, timeout, reactor, checkPID): | |
141 base.BaseConnector.__init__(self, factory, timeout, reactor) | |
142 self.address = address | |
143 self.checkPID = checkPID | |
144 | |
145 def _makeTransport(self): | |
146 return Client(self.address, self, self.reactor, self.checkPID) | |
147 | |
148 def getDestination(self): | |
149 return address.UNIXAddress(self.address) | |
150 | |
151 | |
152 class DatagramPort(udp.Port): | |
153 """Datagram UNIX port, listening for packets.""" | |
154 | |
155 implements(interfaces.IUNIXDatagramTransport) | |
156 | |
157 addressFamily = socket.AF_UNIX | |
158 | |
159 def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, reactor=None)
: | |
160 """Initialize with address to listen on. | |
161 """ | |
162 udp.Port.__init__(self, addr, proto, maxPacketSize=maxPacketSize, reacto
r=reactor) | |
163 self.mode = mode | |
164 | |
165 | |
166 def __repr__(self): | |
167 protocolName = reflect.qual(self.protocol.__class__,) | |
168 if hasattr(self, 'socket'): | |
169 return '<%s on %r>' % (protocolName, self.port) | |
170 else: | |
171 return '<%s (not listening)>' % (protocolName,) | |
172 | |
173 | |
174 def _bindSocket(self): | |
175 log.msg("%s starting on %s"%(self.protocol.__class__, repr(self.port))) | |
176 try: | |
177 skt = self.createInternetSocket() # XXX: haha misnamed method | |
178 if self.port: | |
179 skt.bind(self.port) | |
180 except socket.error, le: | |
181 raise error.CannotListenError, (None, self.port, le) | |
182 if self.port: | |
183 try: | |
184 os.chmod(self.port, self.mode) | |
185 except: # probably not a visible filesystem name | |
186 pass | |
187 self.connected = 1 | |
188 self.socket = skt | |
189 self.fileno = self.socket.fileno | |
190 | |
191 def write(self, datagram, address): | |
192 """Write a datagram.""" | |
193 try: | |
194 return self.socket.sendto(datagram, address) | |
195 except socket.error, se: | |
196 no = se.args[0] | |
197 if no == EINTR: | |
198 return self.write(datagram, address) | |
199 elif no == EMSGSIZE: | |
200 raise error.MessageLengthError, "message too long" | |
201 elif no == EAGAIN: | |
202 # oh, well, drop the data. The only difference from UDP | |
203 # is that UDP won't ever notice. | |
204 # TODO: add TCP-like buffering | |
205 pass | |
206 else: | |
207 raise | |
208 | |
209 def connectionLost(self, reason=None): | |
210 """Cleans up my socket. | |
211 """ | |
212 log.msg('(Port %s Closed)' % repr(self.port)) | |
213 base.BasePort.connectionLost(self, reason) | |
214 if hasattr(self, "protocol"): | |
215 # we won't have attribute in ConnectedPort, in cases | |
216 # where there was an error in connection process | |
217 self.protocol.doStop() | |
218 self.connected = 0 | |
219 self.socket.close() | |
220 del self.socket | |
221 del self.fileno | |
222 if hasattr(self, "d"): | |
223 self.d.callback(None) | |
224 del self.d | |
225 | |
226 def setLogStr(self): | |
227 self.logstr = reflect.qual(self.protocol.__class__) + " (UDP)" | |
228 | |
229 def getHost(self): | |
230 return address.UNIXAddress(self.socket.getsockname()) | |
231 | |
232 | |
233 class ConnectedDatagramPort(DatagramPort): | |
234 """A connected datagram UNIX socket.""" | |
235 | |
236 implementsOnly(interfaces.IUNIXDatagramConnectedTransport, | |
237 *(implementedBy(base.BasePort))) | |
238 | |
239 def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=N
one, reactor=None): | |
240 assert isinstance(proto, protocol.ConnectedDatagramProtocol) | |
241 DatagramPort.__init__(self, bindAddress, proto, maxPacketSize, mode, rea
ctor) | |
242 self.remoteaddr = addr | |
243 | |
244 def startListening(self): | |
245 try: | |
246 self._bindSocket() | |
247 self.socket.connect(self.remoteaddr) | |
248 self._connectToProtocol() | |
249 except: | |
250 self.connectionFailed(failure.Failure()) | |
251 | |
252 def connectionFailed(self, reason): | |
253 self.loseConnection() | |
254 self.protocol.connectionFailed(reason) | |
255 del self.protocol | |
256 | |
257 def doRead(self): | |
258 """Called when my socket is ready for reading.""" | |
259 read = 0 | |
260 while read < self.maxThroughput: | |
261 try: | |
262 data, addr = self.socket.recvfrom(self.maxPacketSize) | |
263 read += len(data) | |
264 self.protocol.datagramReceived(data) | |
265 except socket.error, se: | |
266 no = se.args[0] | |
267 if no in (EAGAIN, EINTR, EWOULDBLOCK): | |
268 return | |
269 if no == ECONNREFUSED: | |
270 self.protocol.connectionRefused() | |
271 else: | |
272 raise | |
273 except: | |
274 log.deferr() | |
275 | |
276 def write(self, data): | |
277 """Write a datagram.""" | |
278 try: | |
279 return self.socket.send(data) | |
280 except socket.error, se: | |
281 no = se.args[0] | |
282 if no == EINTR: | |
283 return self.write(data) | |
284 elif no == EMSGSIZE: | |
285 raise error.MessageLengthError, "message too long" | |
286 elif no == ECONNREFUSED: | |
287 self.protocol.connectionRefused() | |
288 elif no == EAGAIN: | |
289 # oh, well, drop the data. The only difference from UDP | |
290 # is that UDP won't ever notice. | |
291 # TODO: add TCP-like buffering | |
292 pass | |
293 else: | |
294 raise | |
295 | |
296 def getPeer(self): | |
297 return address.UNIXAddress(self.remoteaddr) | |
OLD | NEW |