OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.test.test_tcp -*- | |
2 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
3 # See LICENSE for details. | |
4 | |
5 """ | |
6 Various asynchronous TCP/IP classes. | |
7 | |
8 End users shouldn't use this module directly - use the reactor APIs instead. | |
9 | |
10 Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>} | |
11 """ | |
12 | |
13 | |
14 # System Imports | |
15 import os | |
16 import types | |
17 import socket | |
18 import sys | |
19 import operator | |
20 import warnings | |
21 | |
22 try: | |
23 import fcntl | |
24 except ImportError: | |
25 fcntl = None | |
26 from zope.interface import implements, classImplements | |
27 | |
28 try: | |
29 from OpenSSL import SSL | |
30 except ImportError: | |
31 SSL = None | |
32 | |
33 from twisted.python.runtime import platformType | |
34 | |
35 | |
36 if platformType == 'win32': | |
37 # no such thing as WSAEPERM or error code 10001 according to winsock.h or MS
DN | |
38 EPERM = object() | |
39 from errno import WSAEINVAL as EINVAL | |
40 from errno import WSAEWOULDBLOCK as EWOULDBLOCK | |
41 from errno import WSAEINPROGRESS as EINPROGRESS | |
42 from errno import WSAEALREADY as EALREADY | |
43 from errno import WSAECONNRESET as ECONNRESET | |
44 from errno import WSAEISCONN as EISCONN | |
45 from errno import WSAENOTCONN as ENOTCONN | |
46 from errno import WSAEINTR as EINTR | |
47 from errno import WSAENOBUFS as ENOBUFS | |
48 from errno import WSAEMFILE as EMFILE | |
49 # No such thing as WSAENFILE, either. | |
50 ENFILE = object() | |
51 # Nor ENOMEM | |
52 ENOMEM = object() | |
53 EAGAIN = EWOULDBLOCK | |
54 from errno import WSAECONNRESET as ECONNABORTED | |
55 | |
56 from twisted.python.win32 import formatError as strerror | |
57 else: | |
58 from errno import EPERM | |
59 from errno import EINVAL | |
60 from errno import EWOULDBLOCK | |
61 from errno import EINPROGRESS | |
62 from errno import EALREADY | |
63 from errno import ECONNRESET | |
64 from errno import EISCONN | |
65 from errno import ENOTCONN | |
66 from errno import EINTR | |
67 from errno import ENOBUFS | |
68 from errno import EMFILE | |
69 from errno import ENFILE | |
70 from errno import ENOMEM | |
71 from errno import EAGAIN | |
72 from errno import ECONNABORTED | |
73 | |
74 from os import strerror | |
75 | |
76 from errno import errorcode | |
77 | |
78 # Twisted Imports | |
79 from twisted.internet import defer, base, address | |
80 from twisted.python import log, failure, reflect | |
81 from twisted.python.util import unsignedID | |
82 from twisted.internet.error import CannotListenError | |
83 from twisted.internet import abstract, main, interfaces, error | |
84 | |
85 | |
86 | |
87 class _SocketCloser: | |
88 _socketShutdownMethod = 'shutdown' | |
89 | |
90 def _closeSocket(self): | |
91 # socket.close() doesn't *really* close if there's another reference | |
92 # to it in the TCP/IP stack, e.g. if it was was inherited by a | |
93 # subprocess. And we really do want to close the connection. So we | |
94 # use shutdown() instead, and then close() in order to release the | |
95 # filedescriptor. | |
96 skt = self.socket | |
97 try: | |
98 getattr(skt, self._socketShutdownMethod)(2) | |
99 except socket.error: | |
100 pass | |
101 try: | |
102 skt.close() | |
103 except socket.error: | |
104 pass | |
105 | |
106 class _TLSMixin: | |
107 _socketShutdownMethod = 'sock_shutdown' | |
108 | |
109 writeBlockedOnRead = 0 | |
110 readBlockedOnWrite = 0 | |
111 _userWantRead = _userWantWrite = True | |
112 | |
113 def getPeerCertificate(self): | |
114 return self.socket.get_peer_certificate() | |
115 | |
116 def doRead(self): | |
117 if self.writeBlockedOnRead: | |
118 self.writeBlockedOnRead = 0 | |
119 self._resetReadWrite() | |
120 try: | |
121 return Connection.doRead(self) | |
122 except SSL.ZeroReturnError: | |
123 return main.CONNECTION_DONE | |
124 except SSL.WantReadError: | |
125 return | |
126 except SSL.WantWriteError: | |
127 self.readBlockedOnWrite = 1 | |
128 Connection.startWriting(self) | |
129 Connection.stopReading(self) | |
130 return | |
131 except SSL.SysCallError, (retval, desc): | |
132 if ((retval == -1 and desc == 'Unexpected EOF') | |
133 or retval > 0): | |
134 return main.CONNECTION_LOST | |
135 log.err() | |
136 return main.CONNECTION_LOST | |
137 except SSL.Error, e: | |
138 return e | |
139 | |
140 def doWrite(self): | |
141 # Retry disconnecting | |
142 if self.disconnected: | |
143 return self._postLoseConnection() | |
144 if self._writeDisconnected: | |
145 return self._closeWriteConnection() | |
146 | |
147 if self.readBlockedOnWrite: | |
148 self.readBlockedOnWrite = 0 | |
149 self._resetReadWrite() | |
150 return Connection.doWrite(self) | |
151 | |
152 def writeSomeData(self, data): | |
153 try: | |
154 return Connection.writeSomeData(self, data) | |
155 except SSL.WantWriteError: | |
156 return 0 | |
157 except SSL.WantReadError: | |
158 self.writeBlockedOnRead = 1 | |
159 Connection.stopWriting(self) | |
160 Connection.startReading(self) | |
161 return 0 | |
162 except SSL.ZeroReturnError: | |
163 return main.CONNECTION_LOST | |
164 except SSL.SysCallError, e: | |
165 if e[0] == -1 and data == "": | |
166 # errors when writing empty strings are expected | |
167 # and can be ignored | |
168 return 0 | |
169 else: | |
170 return main.CONNECTION_LOST | |
171 except SSL.Error, e: | |
172 return e | |
173 | |
174 def _postLoseConnection(self): | |
175 """Gets called after loseConnection(), after buffered data is sent. | |
176 | |
177 We try to send an SSL shutdown alert, but if it doesn't work, retry | |
178 when the socket is writable. | |
179 """ | |
180 self.disconnected=1 | |
181 if hasattr(self.socket, 'set_shutdown'): | |
182 self.socket.set_shutdown(SSL.RECEIVED_SHUTDOWN) | |
183 return self._sendCloseAlert() | |
184 | |
185 _first=False | |
186 def _sendCloseAlert(self): | |
187 # Okay, *THIS* is a bit complicated. | |
188 | |
189 # Basically, the issue is, OpenSSL seems to not actually return | |
190 # errors from SSL_shutdown. Therefore, the only way to | |
191 # determine if the close notification has been sent is by | |
192 # SSL_shutdown returning "done". However, it will not claim it's | |
193 # done until it's both sent *and* received a shutdown notification. | |
194 | |
195 # I don't actually want to wait for a received shutdown | |
196 # notification, though, so, I have to set RECEIVED_SHUTDOWN | |
197 # before calling shutdown. Then, it'll return True once it's | |
198 # *SENT* the shutdown. | |
199 | |
200 # However, RECEIVED_SHUTDOWN can't be left set, because then | |
201 # reads will fail, breaking half close. | |
202 | |
203 # Also, since shutdown doesn't report errors, an empty write call is | |
204 # done first, to try to detect if the connection has gone away. | |
205 # (*NOT* an SSL_write call, because that fails once you've called | |
206 # shutdown) | |
207 try: | |
208 os.write(self.socket.fileno(), '') | |
209 except OSError, se: | |
210 if se.args[0] in (EINTR, EWOULDBLOCK, ENOBUFS): | |
211 return 0 | |
212 # Write error, socket gone | |
213 return main.CONNECTION_LOST | |
214 | |
215 try: | |
216 if hasattr(self.socket, 'set_shutdown'): | |
217 laststate = self.socket.get_shutdown() | |
218 self.socket.set_shutdown(laststate | SSL.RECEIVED_SHUTDOWN) | |
219 done = self.socket.shutdown() | |
220 if not (laststate & SSL.RECEIVED_SHUTDOWN): | |
221 self.socket.set_shutdown(SSL.SENT_SHUTDOWN) | |
222 else: | |
223 #warnings.warn("SSL connection shutdown possibly unreliable, " | |
224 # "please upgrade to ver 0.XX", category=UserWarnin
g) | |
225 self.socket.shutdown() | |
226 done = True | |
227 except SSL.Error, e: | |
228 return e | |
229 | |
230 if done: | |
231 self.stopWriting() | |
232 # Note that this is tested for by identity below. | |
233 return main.CONNECTION_DONE | |
234 else: | |
235 self.startWriting() | |
236 return None | |
237 | |
238 def _closeWriteConnection(self): | |
239 result = self._sendCloseAlert() | |
240 | |
241 if result is main.CONNECTION_DONE: | |
242 return Connection._closeWriteConnection(self) | |
243 | |
244 return result | |
245 | |
246 def startReading(self): | |
247 self._userWantRead = True | |
248 if not self.readBlockedOnWrite: | |
249 return Connection.startReading(self) | |
250 | |
251 def stopReading(self): | |
252 self._userWantRead = False | |
253 if not self.writeBlockedOnRead: | |
254 return Connection.stopReading(self) | |
255 | |
256 def startWriting(self): | |
257 self._userWantWrite = True | |
258 if not self.writeBlockedOnRead: | |
259 return Connection.startWriting(self) | |
260 | |
261 def stopWriting(self): | |
262 self._userWantWrite = False | |
263 if not self.readBlockedOnWrite: | |
264 return Connection.stopWriting(self) | |
265 | |
266 def _resetReadWrite(self): | |
267 # After changing readBlockedOnWrite or writeBlockedOnRead, | |
268 # call this to reset the state to what the user requested. | |
269 if self._userWantWrite: | |
270 self.startWriting() | |
271 else: | |
272 self.stopWriting() | |
273 | |
274 if self._userWantRead: | |
275 self.startReading() | |
276 else: | |
277 self.stopReading() | |
278 | |
279 def _getTLSClass(klass, _existing={}): | |
280 if klass not in _existing: | |
281 class TLSConnection(_TLSMixin, klass): | |
282 implements(interfaces.ISSLTransport) | |
283 _existing[klass] = TLSConnection | |
284 return _existing[klass] | |
285 | |
286 class Connection(abstract.FileDescriptor, _SocketCloser): | |
287 """ | |
288 Superclass of all socket-based FileDescriptors. | |
289 | |
290 This is an abstract superclass of all objects which represent a TCP/IP | |
291 connection based socket. | |
292 | |
293 @ivar logstr: prefix used when logging events related to this connection. | |
294 @type logstr: C{str} | |
295 """ | |
296 | |
297 implements(interfaces.ITCPTransport, interfaces.ISystemHandle) | |
298 | |
299 TLS = 0 | |
300 | |
301 def __init__(self, skt, protocol, reactor=None): | |
302 abstract.FileDescriptor.__init__(self, reactor=reactor) | |
303 self.socket = skt | |
304 self.socket.setblocking(0) | |
305 self.fileno = skt.fileno | |
306 self.protocol = protocol | |
307 | |
308 if SSL: | |
309 | |
310 def startTLS(self, ctx): | |
311 assert not self.TLS | |
312 error=False | |
313 if self.dataBuffer or self._tempDataBuffer: | |
314 self.dataBuffer += "".join(self._tempDataBuffer) | |
315 self._tempDataBuffer = [] | |
316 self._tempDataLen = 0 | |
317 written = self.writeSomeData(buffer(self.dataBuffer, self.offset
)) | |
318 offset = self.offset | |
319 dataLen = len(self.dataBuffer) | |
320 self.offset = 0 | |
321 self.dataBuffer = "" | |
322 if isinstance(written, Exception) or (offset + written != dataLe
n): | |
323 error=True | |
324 | |
325 | |
326 self.stopReading() | |
327 self.stopWriting() | |
328 self._startTLS() | |
329 self.socket = SSL.Connection(ctx.getContext(), self.socket) | |
330 self.fileno = self.socket.fileno | |
331 self.startReading() | |
332 if error: | |
333 warnings.warn("startTLS with unwritten buffered data currently d
oesn't work right. See issue #686. Closing connection.", category=RuntimeWarning
, stacklevel=2) | |
334 self.loseConnection() | |
335 return | |
336 | |
337 def _startTLS(self): | |
338 self.TLS = 1 | |
339 self.__class__ = _getTLSClass(self.__class__) | |
340 | |
341 def getHandle(self): | |
342 """Return the socket for this connection.""" | |
343 return self.socket | |
344 | |
345 def doRead(self): | |
346 """Calls self.protocol.dataReceived with all available data. | |
347 | |
348 This reads up to self.bufferSize bytes of data from its socket, then | |
349 calls self.dataReceived(data) to process it. If the connection is not | |
350 lost through an error in the physical recv(), this function will return | |
351 the result of the dataReceived call. | |
352 """ | |
353 try: | |
354 data = self.socket.recv(self.bufferSize) | |
355 except socket.error, se: | |
356 if se.args[0] == EWOULDBLOCK: | |
357 return | |
358 else: | |
359 return main.CONNECTION_LOST | |
360 if not data: | |
361 return main.CONNECTION_DONE | |
362 return self.protocol.dataReceived(data) | |
363 | |
364 def writeSomeData(self, data): | |
365 """Connection.writeSomeData(data) -> #of bytes written | CONNECTION_LOST | |
366 This writes as much data as possible to the socket and returns either | |
367 the number of bytes read (which is positive) or a connection error code | |
368 (which is negative) | |
369 """ | |
370 try: | |
371 # Limit length of buffer to try to send, because some OSes are too | |
372 # stupid to do so themselves (ahem windows) | |
373 return self.socket.send(buffer(data, 0, self.SEND_LIMIT)) | |
374 except socket.error, se: | |
375 if se.args[0] == EINTR: | |
376 return self.writeSomeData(data) | |
377 elif se.args[0] in (EWOULDBLOCK, ENOBUFS): | |
378 return 0 | |
379 else: | |
380 return main.CONNECTION_LOST | |
381 | |
382 def _closeWriteConnection(self): | |
383 try: | |
384 getattr(self.socket, self._socketShutdownMethod)(1) | |
385 except socket.error: | |
386 pass | |
387 p = interfaces.IHalfCloseableProtocol(self.protocol, None) | |
388 if p: | |
389 try: | |
390 p.writeConnectionLost() | |
391 except: | |
392 f = failure.Failure() | |
393 log.err() | |
394 self.connectionLost(f) | |
395 | |
396 def readConnectionLost(self, reason): | |
397 p = interfaces.IHalfCloseableProtocol(self.protocol, None) | |
398 if p: | |
399 try: | |
400 p.readConnectionLost() | |
401 except: | |
402 log.err() | |
403 self.connectionLost(failure.Failure()) | |
404 else: | |
405 self.connectionLost(reason) | |
406 | |
407 def connectionLost(self, reason): | |
408 """See abstract.FileDescriptor.connectionLost(). | |
409 """ | |
410 abstract.FileDescriptor.connectionLost(self, reason) | |
411 self._closeSocket() | |
412 protocol = self.protocol | |
413 del self.protocol | |
414 del self.socket | |
415 del self.fileno | |
416 protocol.connectionLost(reason) | |
417 | |
418 logstr = "Uninitialized" | |
419 | |
420 def logPrefix(self): | |
421 """Return the prefix to log with when I own the logging thread. | |
422 """ | |
423 return self.logstr | |
424 | |
425 def getTcpNoDelay(self): | |
426 return operator.truth(self.socket.getsockopt(socket.IPPROTO_TCP, socket.
TCP_NODELAY)) | |
427 | |
428 def setTcpNoDelay(self, enabled): | |
429 self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, enabled) | |
430 | |
431 def getTcpKeepAlive(self): | |
432 return operator.truth(self.socket.getsockopt(socket.SOL_SOCKET, | |
433 socket.SO_KEEPALIVE)) | |
434 | |
435 def setTcpKeepAlive(self, enabled): | |
436 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, enabled) | |
437 | |
438 if SSL: | |
439 classImplements(Connection, interfaces.ITLSTransport) | |
440 | |
441 class BaseClient(Connection): | |
442 """A base class for client TCP (and similiar) sockets. | |
443 """ | |
444 addressFamily = socket.AF_INET | |
445 socketType = socket.SOCK_STREAM | |
446 | |
447 def _finishInit(self, whenDone, skt, error, reactor): | |
448 """Called by base classes to continue to next stage of initialization.""
" | |
449 if whenDone: | |
450 Connection.__init__(self, skt, None, reactor) | |
451 self.doWrite = self.doConnect | |
452 self.doRead = self.doConnect | |
453 reactor.callLater(0, whenDone) | |
454 else: | |
455 reactor.callLater(0, self.failIfNotConnected, error) | |
456 | |
457 def startTLS(self, ctx, client=1): | |
458 holder = Connection.startTLS(self, ctx) | |
459 if client: | |
460 self.socket.set_connect_state() | |
461 else: | |
462 self.socket.set_accept_state() | |
463 return holder | |
464 | |
465 def stopConnecting(self): | |
466 """Stop attempt to connect.""" | |
467 self.failIfNotConnected(error.UserError()) | |
468 | |
469 def failIfNotConnected(self, err): | |
470 """ | |
471 Generic method called when the attemps to connect failed. It basically | |
472 cleans everything it can: call connectionFailed, stop read and write, | |
473 delete socket related members. | |
474 """ | |
475 if (self.connected or self.disconnected or | |
476 not hasattr(self, "connector")): | |
477 return | |
478 | |
479 self.connector.connectionFailed(failure.Failure(err)) | |
480 if hasattr(self, "reactor"): | |
481 # this doesn't happen if we failed in __init__ | |
482 self.stopReading() | |
483 self.stopWriting() | |
484 del self.connector | |
485 | |
486 try: | |
487 self._closeSocket() | |
488 except AttributeError: | |
489 pass | |
490 else: | |
491 del self.socket, self.fileno | |
492 | |
493 def createInternetSocket(self): | |
494 """(internal) Create a non-blocking socket using | |
495 self.addressFamily, self.socketType. | |
496 """ | |
497 s = socket.socket(self.addressFamily, self.socketType) | |
498 s.setblocking(0) | |
499 if fcntl and hasattr(fcntl, 'FD_CLOEXEC'): | |
500 old = fcntl.fcntl(s.fileno(), fcntl.F_GETFD) | |
501 fcntl.fcntl(s.fileno(), fcntl.F_SETFD, old | fcntl.FD_CLOEXEC) | |
502 return s | |
503 | |
504 def resolveAddress(self): | |
505 if abstract.isIPAddress(self.addr[0]): | |
506 self._setRealAddress(self.addr[0]) | |
507 else: | |
508 d = self.reactor.resolve(self.addr[0]) | |
509 d.addCallbacks(self._setRealAddress, self.failIfNotConnected) | |
510 | |
511 def _setRealAddress(self, address): | |
512 self.realAddress = (address, self.addr[1]) | |
513 self.doConnect() | |
514 | |
515 def doConnect(self): | |
516 """I connect the socket. | |
517 | |
518 Then, call the protocol's makeConnection, and start waiting for data. | |
519 """ | |
520 if not hasattr(self, "connector"): | |
521 # this happens when connection failed but doConnect | |
522 # was scheduled via a callLater in self._finishInit | |
523 return | |
524 | |
525 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) | |
526 if err: | |
527 self.failIfNotConnected(error.getConnectError((err, strerror(err)))) | |
528 return | |
529 | |
530 | |
531 # doConnect gets called twice. The first time we actually need to | |
532 # start the connection attempt. The second time we don't really | |
533 # want to (SO_ERROR above will have taken care of any errors, and if | |
534 # it reported none, the mere fact that doConnect was called again is | |
535 # sufficient to indicate that the connection has succeeded), but it | |
536 # is not /particularly/ detrimental to do so. This should get | |
537 # cleaned up some day, though. | |
538 try: | |
539 connectResult = self.socket.connect_ex(self.realAddress) | |
540 except socket.error, se: | |
541 connectResult = se.args[0] | |
542 if connectResult: | |
543 if connectResult == EISCONN: | |
544 pass | |
545 # on Windows EINVAL means sometimes that we should keep trying: | |
546 # http://msdn.microsoft.com/library/default.asp?url=/library/en-us/w
insock/winsock/connect_2.asp | |
547 elif ((connectResult in (EWOULDBLOCK, EINPROGRESS, EALREADY)) or | |
548 (connectResult == EINVAL and platformType == "win32")): | |
549 self.startReading() | |
550 self.startWriting() | |
551 return | |
552 else: | |
553 self.failIfNotConnected(error.getConnectError((connectResult, st
rerror(connectResult)))) | |
554 return | |
555 | |
556 # If I have reached this point without raising or returning, that means | |
557 # that the socket is connected. | |
558 del self.doWrite | |
559 del self.doRead | |
560 # we first stop and then start, to reset any references to the old doRea
d | |
561 self.stopReading() | |
562 self.stopWriting() | |
563 self._connectDone() | |
564 | |
565 def _connectDone(self): | |
566 self.protocol = self.connector.buildProtocol(self.getPeer()) | |
567 self.connected = 1 | |
568 self.logstr = self.protocol.__class__.__name__ + ",client" | |
569 self.startReading() | |
570 self.protocol.makeConnection(self) | |
571 | |
572 def connectionLost(self, reason): | |
573 if not self.connected: | |
574 self.failIfNotConnected(error.ConnectError(string=reason)) | |
575 else: | |
576 Connection.connectionLost(self, reason) | |
577 self.connector.connectionLost(reason) | |
578 | |
579 | |
580 class Client(BaseClient): | |
581 """A TCP client.""" | |
582 | |
583 def __init__(self, host, port, bindAddress, connector, reactor=None): | |
584 # BaseClient.__init__ is invoked later | |
585 self.connector = connector | |
586 self.addr = (host, port) | |
587 | |
588 whenDone = self.resolveAddress | |
589 err = None | |
590 skt = None | |
591 | |
592 try: | |
593 skt = self.createInternetSocket() | |
594 except socket.error, se: | |
595 err = error.ConnectBindError(se[0], se[1]) | |
596 whenDone = None | |
597 if whenDone and bindAddress is not None: | |
598 try: | |
599 skt.bind(bindAddress) | |
600 except socket.error, se: | |
601 err = error.ConnectBindError(se[0], se[1]) | |
602 whenDone = None | |
603 self._finishInit(whenDone, skt, err, reactor) | |
604 | |
605 def getHost(self): | |
606 """Returns an IPv4Address. | |
607 | |
608 This indicates the address from which I am connecting. | |
609 """ | |
610 return address.IPv4Address('TCP', *(self.socket.getsockname() + ('INET',
))) | |
611 | |
612 def getPeer(self): | |
613 """Returns an IPv4Address. | |
614 | |
615 This indicates the address that I am connected to. | |
616 """ | |
617 return address.IPv4Address('TCP', *(self.addr + ('INET',))) | |
618 | |
619 def __repr__(self): | |
620 s = '<%s to %s at %x>' % (self.__class__, self.addr, unsignedID(self)) | |
621 return s | |
622 | |
623 | |
624 class Server(Connection): | |
625 """ | |
626 Serverside socket-stream connection class. | |
627 | |
628 This is a serverside network connection transport; a socket which came from | |
629 an accept() on a server. | |
630 """ | |
631 | |
632 def __init__(self, sock, protocol, client, server, sessionno): | |
633 """ | |
634 Server(sock, protocol, client, server, sessionno) | |
635 | |
636 Initialize it with a socket, a protocol, a descriptor for my peer (a | |
637 tuple of host, port describing the other end of the connection), an | |
638 instance of Port, and a session number. | |
639 """ | |
640 Connection.__init__(self, sock, protocol) | |
641 self.server = server | |
642 self.client = client | |
643 self.sessionno = sessionno | |
644 self.hostname = client[0] | |
645 self.logstr = "%s,%s,%s" % (self.protocol.__class__.__name__, | |
646 sessionno, | |
647 self.hostname) | |
648 self.repstr = "<%s #%s on %s>" % (self.protocol.__class__.__name__, | |
649 self.sessionno, | |
650 self.server._realPortNumber) | |
651 self.startReading() | |
652 self.connected = 1 | |
653 | |
654 def __repr__(self): | |
655 """A string representation of this connection. | |
656 """ | |
657 return self.repstr | |
658 | |
659 def startTLS(self, ctx, server=1): | |
660 holder = Connection.startTLS(self, ctx) | |
661 if server: | |
662 self.socket.set_accept_state() | |
663 else: | |
664 self.socket.set_connect_state() | |
665 return holder | |
666 | |
667 def getHost(self): | |
668 """Returns an IPv4Address. | |
669 | |
670 This indicates the server's address. | |
671 """ | |
672 return address.IPv4Address('TCP', *(self.socket.getsockname() + ('INET',
))) | |
673 | |
674 def getPeer(self): | |
675 """Returns an IPv4Address. | |
676 | |
677 This indicates the client's address. | |
678 """ | |
679 return address.IPv4Address('TCP', *(self.client + ('INET',))) | |
680 | |
681 class Port(base.BasePort, _SocketCloser): | |
682 """I am a TCP server port, listening for connections. | |
683 | |
684 When a connection is accepted, I will call my factory's buildProtocol with | |
685 the incoming connection as an argument, according to the specification | |
686 described in twisted.internet.interfaces.IProtocolFactory. | |
687 | |
688 If you wish to change the sort of transport that will be used, my | |
689 `transport' attribute will be called with the signature expected for | |
690 Server.__init__, so it can be replaced. | |
691 """ | |
692 | |
693 implements(interfaces.IListeningPort) | |
694 | |
695 addressFamily = socket.AF_INET | |
696 socketType = socket.SOCK_STREAM | |
697 | |
698 transport = Server | |
699 sessionno = 0 | |
700 interface = '' | |
701 backlog = 50 | |
702 | |
703 # Actual port number being listened on, only set to a non-None | |
704 # value when we are actually listening. | |
705 _realPortNumber = None | |
706 | |
707 def __init__(self, port, factory, backlog=50, interface='', reactor=None): | |
708 """Initialize with a numeric port to listen on. | |
709 """ | |
710 base.BasePort.__init__(self, reactor=reactor) | |
711 self.port = port | |
712 self.factory = factory | |
713 self.backlog = backlog | |
714 self.interface = interface | |
715 | |
716 def __repr__(self): | |
717 if self._realPortNumber is not None: | |
718 return "<%s of %s on %s>" % (self.__class__, self.factory.__class__, | |
719 self._realPortNumber) | |
720 else: | |
721 return "<%s of %s (not listening)>" % (self.__class__, self.factory.
__class__) | |
722 | |
723 def createInternetSocket(self): | |
724 s = base.BasePort.createInternetSocket(self) | |
725 if platformType == "posix" and sys.platform != "cygwin": | |
726 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
727 return s | |
728 | |
729 def startListening(self): | |
730 """Create and bind my socket, and begin listening on it. | |
731 | |
732 This is called on unserialization, and must be called after creating a | |
733 server to begin listening on the specified port. | |
734 """ | |
735 try: | |
736 skt = self.createInternetSocket() | |
737 skt.bind((self.interface, self.port)) | |
738 except socket.error, le: | |
739 raise CannotListenError, (self.interface, self.port, le) | |
740 | |
741 # Make sure that if we listened on port 0, we update that to | |
742 # reflect what the OS actually assigned us. | |
743 self._realPortNumber = skt.getsockname()[1] | |
744 | |
745 log.msg("%s starting on %s" % (self.factory.__class__, self._realPortNum
ber)) | |
746 | |
747 # The order of the next 6 lines is kind of bizarre. If no one | |
748 # can explain it, perhaps we should re-arrange them. | |
749 self.factory.doStart() | |
750 skt.listen(self.backlog) | |
751 self.connected = 1 | |
752 self.socket = skt | |
753 self.fileno = self.socket.fileno | |
754 self.numberAccepts = 100 | |
755 | |
756 self.startReading() | |
757 | |
758 def _buildAddr(self, (host, port)): | |
759 return address._ServerFactoryIPv4Address('TCP', host, port) | |
760 | |
761 def doRead(self): | |
762 """Called when my socket is ready for reading. | |
763 | |
764 This accepts a connection and calls self.protocol() to handle the | |
765 wire-level protocol. | |
766 """ | |
767 try: | |
768 if platformType == "posix": | |
769 numAccepts = self.numberAccepts | |
770 else: | |
771 # win32 event loop breaks if we do more than one accept() | |
772 # in an iteration of the event loop. | |
773 numAccepts = 1 | |
774 for i in range(numAccepts): | |
775 # we need this so we can deal with a factory's buildProtocol | |
776 # calling our loseConnection | |
777 if self.disconnecting: | |
778 return | |
779 try: | |
780 skt, addr = self.socket.accept() | |
781 except socket.error, e: | |
782 if e.args[0] in (EWOULDBLOCK, EAGAIN): | |
783 self.numberAccepts = i | |
784 break | |
785 elif e.args[0] == EPERM: | |
786 # Netfilter on Linux may have rejected the | |
787 # connection, but we get told to try to accept() | |
788 # anyway. | |
789 continue | |
790 elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABO
RTED): | |
791 | |
792 # Linux gives EMFILE when a process is not allowed | |
793 # to allocate any more file descriptors. *BSD and | |
794 # Win32 give (WSA)ENOBUFS. Linux can also give | |
795 # ENFILE if the system is out of inodes, or ENOMEM | |
796 # if there is insufficient memory to allocate a new | |
797 # dentry. ECONNABORTED is documented as possible on | |
798 # both Linux and Windows, but it is not clear | |
799 # whether there are actually any circumstances under | |
800 # which it can happen (one might expect it to be | |
801 # possible if a client sends a FIN or RST after the | |
802 # server sends a SYN|ACK but before application code | |
803 # calls accept(2), however at least on Linux this | |
804 # _seems_ to be short-circuited by syncookies. | |
805 | |
806 log.msg("Could not accept new connection (%s)" % ( | |
807 errorcode[e.args[0]],)) | |
808 break | |
809 raise | |
810 | |
811 protocol = self.factory.buildProtocol(self._buildAddr(addr)) | |
812 if protocol is None: | |
813 skt.close() | |
814 continue | |
815 s = self.sessionno | |
816 self.sessionno = s+1 | |
817 transport = self.transport(skt, protocol, addr, self, s) | |
818 transport = self._preMakeConnection(transport) | |
819 protocol.makeConnection(transport) | |
820 else: | |
821 self.numberAccepts = self.numberAccepts+20 | |
822 except: | |
823 # Note that in TLS mode, this will possibly catch SSL.Errors | |
824 # raised by self.socket.accept() | |
825 # | |
826 # There is no "except SSL.Error:" above because SSL may be | |
827 # None if there is no SSL support. In any case, all the | |
828 # "except SSL.Error:" suite would probably do is log.deferr() | |
829 # and return, so handling it here works just as well. | |
830 log.deferr() | |
831 | |
832 def _preMakeConnection(self, transport): | |
833 return transport | |
834 | |
835 def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE)): | |
836 """Stop accepting connections on this port. | |
837 | |
838 This will shut down my socket and call self.connectionLost(). | |
839 It returns a deferred which will fire successfully when the | |
840 port is actually closed. | |
841 """ | |
842 self.disconnecting = 1 | |
843 self.stopReading() | |
844 if self.connected: | |
845 self.deferred = defer.Deferred() | |
846 self.reactor.callLater(0, self.connectionLost, connDone) | |
847 return self.deferred | |
848 | |
849 stopListening = loseConnection | |
850 | |
851 def connectionLost(self, reason): | |
852 """Cleans up my socket. | |
853 """ | |
854 log.msg('(Port %s Closed)' % self._realPortNumber) | |
855 self._realPortNumber = None | |
856 base.BasePort.connectionLost(self, reason) | |
857 self.connected = 0 | |
858 self._closeSocket() | |
859 del self.socket | |
860 del self.fileno | |
861 self.factory.doStop() | |
862 if hasattr(self, "deferred"): | |
863 self.deferred.callback(None) | |
864 del self.deferred | |
865 | |
866 def logPrefix(self): | |
867 """Returns the name of my class, to prefix log entries with. | |
868 """ | |
869 return reflect.qual(self.factory.__class__) | |
870 | |
871 def getHost(self): | |
872 """Returns an IPv4Address. | |
873 | |
874 This indicates the server's address. | |
875 """ | |
876 return address.IPv4Address('TCP', *(self.socket.getsockname() + ('INET',
))) | |
877 | |
878 class Connector(base.BaseConnector): | |
879 def __init__(self, host, port, factory, timeout, bindAddress, reactor=None): | |
880 self.host = host | |
881 if isinstance(port, types.StringTypes): | |
882 try: | |
883 port = socket.getservbyname(port, 'tcp') | |
884 except socket.error, e: | |
885 raise error.ServiceNameUnknownError(string="%s (%r)" % (e, port)
) | |
886 self.port = port | |
887 self.bindAddress = bindAddress | |
888 base.BaseConnector.__init__(self, factory, timeout, reactor) | |
889 | |
890 def _makeTransport(self): | |
891 return Client(self.host, self.port, self.bindAddress, self, self.reactor
) | |
892 | |
893 def getDestination(self): | |
894 return address.IPv4Address('TCP', self.host, self.port, 'INET') | |
OLD | NEW |