| OLD | NEW |
| (Empty) |
| 1 # -*- test-case-name: twisted.test.test_tcp -*- | |
| 2 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
| 3 # See LICENSE for details. | |
| 4 | |
| 5 """ | |
| 6 Various asynchronous TCP/IP classes. | |
| 7 | |
| 8 End users shouldn't use this module directly - use the reactor APIs instead. | |
| 9 | |
| 10 Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>} | |
| 11 """ | |
| 12 | |
| 13 | |
| 14 # System Imports | |
| 15 import os | |
| 16 import types | |
| 17 import socket | |
| 18 import sys | |
| 19 import operator | |
| 20 import warnings | |
| 21 | |
| 22 try: | |
| 23 import fcntl | |
| 24 except ImportError: | |
| 25 fcntl = None | |
| 26 from zope.interface import implements, classImplements | |
| 27 | |
| 28 try: | |
| 29 from OpenSSL import SSL | |
| 30 except ImportError: | |
| 31 SSL = None | |
| 32 | |
| 33 from twisted.python.runtime import platformType | |
| 34 | |
| 35 | |
| 36 if platformType == 'win32': | |
| 37 # no such thing as WSAEPERM or error code 10001 according to winsock.h or MS
DN | |
| 38 EPERM = object() | |
| 39 from errno import WSAEINVAL as EINVAL | |
| 40 from errno import WSAEWOULDBLOCK as EWOULDBLOCK | |
| 41 from errno import WSAEINPROGRESS as EINPROGRESS | |
| 42 from errno import WSAEALREADY as EALREADY | |
| 43 from errno import WSAECONNRESET as ECONNRESET | |
| 44 from errno import WSAEISCONN as EISCONN | |
| 45 from errno import WSAENOTCONN as ENOTCONN | |
| 46 from errno import WSAEINTR as EINTR | |
| 47 from errno import WSAENOBUFS as ENOBUFS | |
| 48 from errno import WSAEMFILE as EMFILE | |
| 49 # No such thing as WSAENFILE, either. | |
| 50 ENFILE = object() | |
| 51 # Nor ENOMEM | |
| 52 ENOMEM = object() | |
| 53 EAGAIN = EWOULDBLOCK | |
| 54 from errno import WSAECONNRESET as ECONNABORTED | |
| 55 | |
| 56 from twisted.python.win32 import formatError as strerror | |
| 57 else: | |
| 58 from errno import EPERM | |
| 59 from errno import EINVAL | |
| 60 from errno import EWOULDBLOCK | |
| 61 from errno import EINPROGRESS | |
| 62 from errno import EALREADY | |
| 63 from errno import ECONNRESET | |
| 64 from errno import EISCONN | |
| 65 from errno import ENOTCONN | |
| 66 from errno import EINTR | |
| 67 from errno import ENOBUFS | |
| 68 from errno import EMFILE | |
| 69 from errno import ENFILE | |
| 70 from errno import ENOMEM | |
| 71 from errno import EAGAIN | |
| 72 from errno import ECONNABORTED | |
| 73 | |
| 74 from os import strerror | |
| 75 | |
| 76 from errno import errorcode | |
| 77 | |
| 78 # Twisted Imports | |
| 79 from twisted.internet import defer, base, address | |
| 80 from twisted.python import log, failure, reflect | |
| 81 from twisted.python.util import unsignedID | |
| 82 from twisted.internet.error import CannotListenError | |
| 83 from twisted.internet import abstract, main, interfaces, error | |
| 84 | |
| 85 | |
| 86 | |
| 87 class _SocketCloser: | |
| 88 _socketShutdownMethod = 'shutdown' | |
| 89 | |
| 90 def _closeSocket(self): | |
| 91 # socket.close() doesn't *really* close if there's another reference | |
| 92 # to it in the TCP/IP stack, e.g. if it was was inherited by a | |
| 93 # subprocess. And we really do want to close the connection. So we | |
| 94 # use shutdown() instead, and then close() in order to release the | |
| 95 # filedescriptor. | |
| 96 skt = self.socket | |
| 97 try: | |
| 98 getattr(skt, self._socketShutdownMethod)(2) | |
| 99 except socket.error: | |
| 100 pass | |
| 101 try: | |
| 102 skt.close() | |
| 103 except socket.error: | |
| 104 pass | |
| 105 | |
| 106 class _TLSMixin: | |
| 107 _socketShutdownMethod = 'sock_shutdown' | |
| 108 | |
| 109 writeBlockedOnRead = 0 | |
| 110 readBlockedOnWrite = 0 | |
| 111 _userWantRead = _userWantWrite = True | |
| 112 | |
| 113 def getPeerCertificate(self): | |
| 114 return self.socket.get_peer_certificate() | |
| 115 | |
| 116 def doRead(self): | |
| 117 if self.writeBlockedOnRead: | |
| 118 self.writeBlockedOnRead = 0 | |
| 119 self._resetReadWrite() | |
| 120 try: | |
| 121 return Connection.doRead(self) | |
| 122 except SSL.ZeroReturnError: | |
| 123 return main.CONNECTION_DONE | |
| 124 except SSL.WantReadError: | |
| 125 return | |
| 126 except SSL.WantWriteError: | |
| 127 self.readBlockedOnWrite = 1 | |
| 128 Connection.startWriting(self) | |
| 129 Connection.stopReading(self) | |
| 130 return | |
| 131 except SSL.SysCallError, (retval, desc): | |
| 132 if ((retval == -1 and desc == 'Unexpected EOF') | |
| 133 or retval > 0): | |
| 134 return main.CONNECTION_LOST | |
| 135 log.err() | |
| 136 return main.CONNECTION_LOST | |
| 137 except SSL.Error, e: | |
| 138 return e | |
| 139 | |
| 140 def doWrite(self): | |
| 141 # Retry disconnecting | |
| 142 if self.disconnected: | |
| 143 return self._postLoseConnection() | |
| 144 if self._writeDisconnected: | |
| 145 return self._closeWriteConnection() | |
| 146 | |
| 147 if self.readBlockedOnWrite: | |
| 148 self.readBlockedOnWrite = 0 | |
| 149 self._resetReadWrite() | |
| 150 return Connection.doWrite(self) | |
| 151 | |
| 152 def writeSomeData(self, data): | |
| 153 try: | |
| 154 return Connection.writeSomeData(self, data) | |
| 155 except SSL.WantWriteError: | |
| 156 return 0 | |
| 157 except SSL.WantReadError: | |
| 158 self.writeBlockedOnRead = 1 | |
| 159 Connection.stopWriting(self) | |
| 160 Connection.startReading(self) | |
| 161 return 0 | |
| 162 except SSL.ZeroReturnError: | |
| 163 return main.CONNECTION_LOST | |
| 164 except SSL.SysCallError, e: | |
| 165 if e[0] == -1 and data == "": | |
| 166 # errors when writing empty strings are expected | |
| 167 # and can be ignored | |
| 168 return 0 | |
| 169 else: | |
| 170 return main.CONNECTION_LOST | |
| 171 except SSL.Error, e: | |
| 172 return e | |
| 173 | |
| 174 def _postLoseConnection(self): | |
| 175 """Gets called after loseConnection(), after buffered data is sent. | |
| 176 | |
| 177 We try to send an SSL shutdown alert, but if it doesn't work, retry | |
| 178 when the socket is writable. | |
| 179 """ | |
| 180 self.disconnected=1 | |
| 181 if hasattr(self.socket, 'set_shutdown'): | |
| 182 self.socket.set_shutdown(SSL.RECEIVED_SHUTDOWN) | |
| 183 return self._sendCloseAlert() | |
| 184 | |
| 185 _first=False | |
| 186 def _sendCloseAlert(self): | |
| 187 # Okay, *THIS* is a bit complicated. | |
| 188 | |
| 189 # Basically, the issue is, OpenSSL seems to not actually return | |
| 190 # errors from SSL_shutdown. Therefore, the only way to | |
| 191 # determine if the close notification has been sent is by | |
| 192 # SSL_shutdown returning "done". However, it will not claim it's | |
| 193 # done until it's both sent *and* received a shutdown notification. | |
| 194 | |
| 195 # I don't actually want to wait for a received shutdown | |
| 196 # notification, though, so, I have to set RECEIVED_SHUTDOWN | |
| 197 # before calling shutdown. Then, it'll return True once it's | |
| 198 # *SENT* the shutdown. | |
| 199 | |
| 200 # However, RECEIVED_SHUTDOWN can't be left set, because then | |
| 201 # reads will fail, breaking half close. | |
| 202 | |
| 203 # Also, since shutdown doesn't report errors, an empty write call is | |
| 204 # done first, to try to detect if the connection has gone away. | |
| 205 # (*NOT* an SSL_write call, because that fails once you've called | |
| 206 # shutdown) | |
| 207 try: | |
| 208 os.write(self.socket.fileno(), '') | |
| 209 except OSError, se: | |
| 210 if se.args[0] in (EINTR, EWOULDBLOCK, ENOBUFS): | |
| 211 return 0 | |
| 212 # Write error, socket gone | |
| 213 return main.CONNECTION_LOST | |
| 214 | |
| 215 try: | |
| 216 if hasattr(self.socket, 'set_shutdown'): | |
| 217 laststate = self.socket.get_shutdown() | |
| 218 self.socket.set_shutdown(laststate | SSL.RECEIVED_SHUTDOWN) | |
| 219 done = self.socket.shutdown() | |
| 220 if not (laststate & SSL.RECEIVED_SHUTDOWN): | |
| 221 self.socket.set_shutdown(SSL.SENT_SHUTDOWN) | |
| 222 else: | |
| 223 #warnings.warn("SSL connection shutdown possibly unreliable, " | |
| 224 # "please upgrade to ver 0.XX", category=UserWarnin
g) | |
| 225 self.socket.shutdown() | |
| 226 done = True | |
| 227 except SSL.Error, e: | |
| 228 return e | |
| 229 | |
| 230 if done: | |
| 231 self.stopWriting() | |
| 232 # Note that this is tested for by identity below. | |
| 233 return main.CONNECTION_DONE | |
| 234 else: | |
| 235 self.startWriting() | |
| 236 return None | |
| 237 | |
| 238 def _closeWriteConnection(self): | |
| 239 result = self._sendCloseAlert() | |
| 240 | |
| 241 if result is main.CONNECTION_DONE: | |
| 242 return Connection._closeWriteConnection(self) | |
| 243 | |
| 244 return result | |
| 245 | |
| 246 def startReading(self): | |
| 247 self._userWantRead = True | |
| 248 if not self.readBlockedOnWrite: | |
| 249 return Connection.startReading(self) | |
| 250 | |
| 251 def stopReading(self): | |
| 252 self._userWantRead = False | |
| 253 if not self.writeBlockedOnRead: | |
| 254 return Connection.stopReading(self) | |
| 255 | |
| 256 def startWriting(self): | |
| 257 self._userWantWrite = True | |
| 258 if not self.writeBlockedOnRead: | |
| 259 return Connection.startWriting(self) | |
| 260 | |
| 261 def stopWriting(self): | |
| 262 self._userWantWrite = False | |
| 263 if not self.readBlockedOnWrite: | |
| 264 return Connection.stopWriting(self) | |
| 265 | |
| 266 def _resetReadWrite(self): | |
| 267 # After changing readBlockedOnWrite or writeBlockedOnRead, | |
| 268 # call this to reset the state to what the user requested. | |
| 269 if self._userWantWrite: | |
| 270 self.startWriting() | |
| 271 else: | |
| 272 self.stopWriting() | |
| 273 | |
| 274 if self._userWantRead: | |
| 275 self.startReading() | |
| 276 else: | |
| 277 self.stopReading() | |
| 278 | |
| 279 def _getTLSClass(klass, _existing={}): | |
| 280 if klass not in _existing: | |
| 281 class TLSConnection(_TLSMixin, klass): | |
| 282 implements(interfaces.ISSLTransport) | |
| 283 _existing[klass] = TLSConnection | |
| 284 return _existing[klass] | |
| 285 | |
| 286 class Connection(abstract.FileDescriptor, _SocketCloser): | |
| 287 """ | |
| 288 Superclass of all socket-based FileDescriptors. | |
| 289 | |
| 290 This is an abstract superclass of all objects which represent a TCP/IP | |
| 291 connection based socket. | |
| 292 | |
| 293 @ivar logstr: prefix used when logging events related to this connection. | |
| 294 @type logstr: C{str} | |
| 295 """ | |
| 296 | |
| 297 implements(interfaces.ITCPTransport, interfaces.ISystemHandle) | |
| 298 | |
| 299 TLS = 0 | |
| 300 | |
| 301 def __init__(self, skt, protocol, reactor=None): | |
| 302 abstract.FileDescriptor.__init__(self, reactor=reactor) | |
| 303 self.socket = skt | |
| 304 self.socket.setblocking(0) | |
| 305 self.fileno = skt.fileno | |
| 306 self.protocol = protocol | |
| 307 | |
| 308 if SSL: | |
| 309 | |
| 310 def startTLS(self, ctx): | |
| 311 assert not self.TLS | |
| 312 error=False | |
| 313 if self.dataBuffer or self._tempDataBuffer: | |
| 314 self.dataBuffer += "".join(self._tempDataBuffer) | |
| 315 self._tempDataBuffer = [] | |
| 316 self._tempDataLen = 0 | |
| 317 written = self.writeSomeData(buffer(self.dataBuffer, self.offset
)) | |
| 318 offset = self.offset | |
| 319 dataLen = len(self.dataBuffer) | |
| 320 self.offset = 0 | |
| 321 self.dataBuffer = "" | |
| 322 if isinstance(written, Exception) or (offset + written != dataLe
n): | |
| 323 error=True | |
| 324 | |
| 325 | |
| 326 self.stopReading() | |
| 327 self.stopWriting() | |
| 328 self._startTLS() | |
| 329 self.socket = SSL.Connection(ctx.getContext(), self.socket) | |
| 330 self.fileno = self.socket.fileno | |
| 331 self.startReading() | |
| 332 if error: | |
| 333 warnings.warn("startTLS with unwritten buffered data currently d
oesn't work right. See issue #686. Closing connection.", category=RuntimeWarning
, stacklevel=2) | |
| 334 self.loseConnection() | |
| 335 return | |
| 336 | |
| 337 def _startTLS(self): | |
| 338 self.TLS = 1 | |
| 339 self.__class__ = _getTLSClass(self.__class__) | |
| 340 | |
| 341 def getHandle(self): | |
| 342 """Return the socket for this connection.""" | |
| 343 return self.socket | |
| 344 | |
| 345 def doRead(self): | |
| 346 """Calls self.protocol.dataReceived with all available data. | |
| 347 | |
| 348 This reads up to self.bufferSize bytes of data from its socket, then | |
| 349 calls self.dataReceived(data) to process it. If the connection is not | |
| 350 lost through an error in the physical recv(), this function will return | |
| 351 the result of the dataReceived call. | |
| 352 """ | |
| 353 try: | |
| 354 data = self.socket.recv(self.bufferSize) | |
| 355 except socket.error, se: | |
| 356 if se.args[0] == EWOULDBLOCK: | |
| 357 return | |
| 358 else: | |
| 359 return main.CONNECTION_LOST | |
| 360 if not data: | |
| 361 return main.CONNECTION_DONE | |
| 362 return self.protocol.dataReceived(data) | |
| 363 | |
| 364 def writeSomeData(self, data): | |
| 365 """Connection.writeSomeData(data) -> #of bytes written | CONNECTION_LOST | |
| 366 This writes as much data as possible to the socket and returns either | |
| 367 the number of bytes read (which is positive) or a connection error code | |
| 368 (which is negative) | |
| 369 """ | |
| 370 try: | |
| 371 # Limit length of buffer to try to send, because some OSes are too | |
| 372 # stupid to do so themselves (ahem windows) | |
| 373 return self.socket.send(buffer(data, 0, self.SEND_LIMIT)) | |
| 374 except socket.error, se: | |
| 375 if se.args[0] == EINTR: | |
| 376 return self.writeSomeData(data) | |
| 377 elif se.args[0] in (EWOULDBLOCK, ENOBUFS): | |
| 378 return 0 | |
| 379 else: | |
| 380 return main.CONNECTION_LOST | |
| 381 | |
| 382 def _closeWriteConnection(self): | |
| 383 try: | |
| 384 getattr(self.socket, self._socketShutdownMethod)(1) | |
| 385 except socket.error: | |
| 386 pass | |
| 387 p = interfaces.IHalfCloseableProtocol(self.protocol, None) | |
| 388 if p: | |
| 389 try: | |
| 390 p.writeConnectionLost() | |
| 391 except: | |
| 392 f = failure.Failure() | |
| 393 log.err() | |
| 394 self.connectionLost(f) | |
| 395 | |
| 396 def readConnectionLost(self, reason): | |
| 397 p = interfaces.IHalfCloseableProtocol(self.protocol, None) | |
| 398 if p: | |
| 399 try: | |
| 400 p.readConnectionLost() | |
| 401 except: | |
| 402 log.err() | |
| 403 self.connectionLost(failure.Failure()) | |
| 404 else: | |
| 405 self.connectionLost(reason) | |
| 406 | |
| 407 def connectionLost(self, reason): | |
| 408 """See abstract.FileDescriptor.connectionLost(). | |
| 409 """ | |
| 410 abstract.FileDescriptor.connectionLost(self, reason) | |
| 411 self._closeSocket() | |
| 412 protocol = self.protocol | |
| 413 del self.protocol | |
| 414 del self.socket | |
| 415 del self.fileno | |
| 416 protocol.connectionLost(reason) | |
| 417 | |
| 418 logstr = "Uninitialized" | |
| 419 | |
| 420 def logPrefix(self): | |
| 421 """Return the prefix to log with when I own the logging thread. | |
| 422 """ | |
| 423 return self.logstr | |
| 424 | |
| 425 def getTcpNoDelay(self): | |
| 426 return operator.truth(self.socket.getsockopt(socket.IPPROTO_TCP, socket.
TCP_NODELAY)) | |
| 427 | |
| 428 def setTcpNoDelay(self, enabled): | |
| 429 self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, enabled) | |
| 430 | |
| 431 def getTcpKeepAlive(self): | |
| 432 return operator.truth(self.socket.getsockopt(socket.SOL_SOCKET, | |
| 433 socket.SO_KEEPALIVE)) | |
| 434 | |
| 435 def setTcpKeepAlive(self, enabled): | |
| 436 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, enabled) | |
| 437 | |
| 438 if SSL: | |
| 439 classImplements(Connection, interfaces.ITLSTransport) | |
| 440 | |
| 441 class BaseClient(Connection): | |
| 442 """A base class for client TCP (and similiar) sockets. | |
| 443 """ | |
| 444 addressFamily = socket.AF_INET | |
| 445 socketType = socket.SOCK_STREAM | |
| 446 | |
| 447 def _finishInit(self, whenDone, skt, error, reactor): | |
| 448 """Called by base classes to continue to next stage of initialization.""
" | |
| 449 if whenDone: | |
| 450 Connection.__init__(self, skt, None, reactor) | |
| 451 self.doWrite = self.doConnect | |
| 452 self.doRead = self.doConnect | |
| 453 reactor.callLater(0, whenDone) | |
| 454 else: | |
| 455 reactor.callLater(0, self.failIfNotConnected, error) | |
| 456 | |
| 457 def startTLS(self, ctx, client=1): | |
| 458 holder = Connection.startTLS(self, ctx) | |
| 459 if client: | |
| 460 self.socket.set_connect_state() | |
| 461 else: | |
| 462 self.socket.set_accept_state() | |
| 463 return holder | |
| 464 | |
| 465 def stopConnecting(self): | |
| 466 """Stop attempt to connect.""" | |
| 467 self.failIfNotConnected(error.UserError()) | |
| 468 | |
| 469 def failIfNotConnected(self, err): | |
| 470 """ | |
| 471 Generic method called when the attemps to connect failed. It basically | |
| 472 cleans everything it can: call connectionFailed, stop read and write, | |
| 473 delete socket related members. | |
| 474 """ | |
| 475 if (self.connected or self.disconnected or | |
| 476 not hasattr(self, "connector")): | |
| 477 return | |
| 478 | |
| 479 self.connector.connectionFailed(failure.Failure(err)) | |
| 480 if hasattr(self, "reactor"): | |
| 481 # this doesn't happen if we failed in __init__ | |
| 482 self.stopReading() | |
| 483 self.stopWriting() | |
| 484 del self.connector | |
| 485 | |
| 486 try: | |
| 487 self._closeSocket() | |
| 488 except AttributeError: | |
| 489 pass | |
| 490 else: | |
| 491 del self.socket, self.fileno | |
| 492 | |
| 493 def createInternetSocket(self): | |
| 494 """(internal) Create a non-blocking socket using | |
| 495 self.addressFamily, self.socketType. | |
| 496 """ | |
| 497 s = socket.socket(self.addressFamily, self.socketType) | |
| 498 s.setblocking(0) | |
| 499 if fcntl and hasattr(fcntl, 'FD_CLOEXEC'): | |
| 500 old = fcntl.fcntl(s.fileno(), fcntl.F_GETFD) | |
| 501 fcntl.fcntl(s.fileno(), fcntl.F_SETFD, old | fcntl.FD_CLOEXEC) | |
| 502 return s | |
| 503 | |
| 504 def resolveAddress(self): | |
| 505 if abstract.isIPAddress(self.addr[0]): | |
| 506 self._setRealAddress(self.addr[0]) | |
| 507 else: | |
| 508 d = self.reactor.resolve(self.addr[0]) | |
| 509 d.addCallbacks(self._setRealAddress, self.failIfNotConnected) | |
| 510 | |
| 511 def _setRealAddress(self, address): | |
| 512 self.realAddress = (address, self.addr[1]) | |
| 513 self.doConnect() | |
| 514 | |
| 515 def doConnect(self): | |
| 516 """I connect the socket. | |
| 517 | |
| 518 Then, call the protocol's makeConnection, and start waiting for data. | |
| 519 """ | |
| 520 if not hasattr(self, "connector"): | |
| 521 # this happens when connection failed but doConnect | |
| 522 # was scheduled via a callLater in self._finishInit | |
| 523 return | |
| 524 | |
| 525 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) | |
| 526 if err: | |
| 527 self.failIfNotConnected(error.getConnectError((err, strerror(err)))) | |
| 528 return | |
| 529 | |
| 530 | |
| 531 # doConnect gets called twice. The first time we actually need to | |
| 532 # start the connection attempt. The second time we don't really | |
| 533 # want to (SO_ERROR above will have taken care of any errors, and if | |
| 534 # it reported none, the mere fact that doConnect was called again is | |
| 535 # sufficient to indicate that the connection has succeeded), but it | |
| 536 # is not /particularly/ detrimental to do so. This should get | |
| 537 # cleaned up some day, though. | |
| 538 try: | |
| 539 connectResult = self.socket.connect_ex(self.realAddress) | |
| 540 except socket.error, se: | |
| 541 connectResult = se.args[0] | |
| 542 if connectResult: | |
| 543 if connectResult == EISCONN: | |
| 544 pass | |
| 545 # on Windows EINVAL means sometimes that we should keep trying: | |
| 546 # http://msdn.microsoft.com/library/default.asp?url=/library/en-us/w
insock/winsock/connect_2.asp | |
| 547 elif ((connectResult in (EWOULDBLOCK, EINPROGRESS, EALREADY)) or | |
| 548 (connectResult == EINVAL and platformType == "win32")): | |
| 549 self.startReading() | |
| 550 self.startWriting() | |
| 551 return | |
| 552 else: | |
| 553 self.failIfNotConnected(error.getConnectError((connectResult, st
rerror(connectResult)))) | |
| 554 return | |
| 555 | |
| 556 # If I have reached this point without raising or returning, that means | |
| 557 # that the socket is connected. | |
| 558 del self.doWrite | |
| 559 del self.doRead | |
| 560 # we first stop and then start, to reset any references to the old doRea
d | |
| 561 self.stopReading() | |
| 562 self.stopWriting() | |
| 563 self._connectDone() | |
| 564 | |
| 565 def _connectDone(self): | |
| 566 self.protocol = self.connector.buildProtocol(self.getPeer()) | |
| 567 self.connected = 1 | |
| 568 self.logstr = self.protocol.__class__.__name__ + ",client" | |
| 569 self.startReading() | |
| 570 self.protocol.makeConnection(self) | |
| 571 | |
| 572 def connectionLost(self, reason): | |
| 573 if not self.connected: | |
| 574 self.failIfNotConnected(error.ConnectError(string=reason)) | |
| 575 else: | |
| 576 Connection.connectionLost(self, reason) | |
| 577 self.connector.connectionLost(reason) | |
| 578 | |
| 579 | |
| 580 class Client(BaseClient): | |
| 581 """A TCP client.""" | |
| 582 | |
| 583 def __init__(self, host, port, bindAddress, connector, reactor=None): | |
| 584 # BaseClient.__init__ is invoked later | |
| 585 self.connector = connector | |
| 586 self.addr = (host, port) | |
| 587 | |
| 588 whenDone = self.resolveAddress | |
| 589 err = None | |
| 590 skt = None | |
| 591 | |
| 592 try: | |
| 593 skt = self.createInternetSocket() | |
| 594 except socket.error, se: | |
| 595 err = error.ConnectBindError(se[0], se[1]) | |
| 596 whenDone = None | |
| 597 if whenDone and bindAddress is not None: | |
| 598 try: | |
| 599 skt.bind(bindAddress) | |
| 600 except socket.error, se: | |
| 601 err = error.ConnectBindError(se[0], se[1]) | |
| 602 whenDone = None | |
| 603 self._finishInit(whenDone, skt, err, reactor) | |
| 604 | |
| 605 def getHost(self): | |
| 606 """Returns an IPv4Address. | |
| 607 | |
| 608 This indicates the address from which I am connecting. | |
| 609 """ | |
| 610 return address.IPv4Address('TCP', *(self.socket.getsockname() + ('INET',
))) | |
| 611 | |
| 612 def getPeer(self): | |
| 613 """Returns an IPv4Address. | |
| 614 | |
| 615 This indicates the address that I am connected to. | |
| 616 """ | |
| 617 return address.IPv4Address('TCP', *(self.addr + ('INET',))) | |
| 618 | |
| 619 def __repr__(self): | |
| 620 s = '<%s to %s at %x>' % (self.__class__, self.addr, unsignedID(self)) | |
| 621 return s | |
| 622 | |
| 623 | |
| 624 class Server(Connection): | |
| 625 """ | |
| 626 Serverside socket-stream connection class. | |
| 627 | |
| 628 This is a serverside network connection transport; a socket which came from | |
| 629 an accept() on a server. | |
| 630 """ | |
| 631 | |
| 632 def __init__(self, sock, protocol, client, server, sessionno): | |
| 633 """ | |
| 634 Server(sock, protocol, client, server, sessionno) | |
| 635 | |
| 636 Initialize it with a socket, a protocol, a descriptor for my peer (a | |
| 637 tuple of host, port describing the other end of the connection), an | |
| 638 instance of Port, and a session number. | |
| 639 """ | |
| 640 Connection.__init__(self, sock, protocol) | |
| 641 self.server = server | |
| 642 self.client = client | |
| 643 self.sessionno = sessionno | |
| 644 self.hostname = client[0] | |
| 645 self.logstr = "%s,%s,%s" % (self.protocol.__class__.__name__, | |
| 646 sessionno, | |
| 647 self.hostname) | |
| 648 self.repstr = "<%s #%s on %s>" % (self.protocol.__class__.__name__, | |
| 649 self.sessionno, | |
| 650 self.server._realPortNumber) | |
| 651 self.startReading() | |
| 652 self.connected = 1 | |
| 653 | |
| 654 def __repr__(self): | |
| 655 """A string representation of this connection. | |
| 656 """ | |
| 657 return self.repstr | |
| 658 | |
| 659 def startTLS(self, ctx, server=1): | |
| 660 holder = Connection.startTLS(self, ctx) | |
| 661 if server: | |
| 662 self.socket.set_accept_state() | |
| 663 else: | |
| 664 self.socket.set_connect_state() | |
| 665 return holder | |
| 666 | |
| 667 def getHost(self): | |
| 668 """Returns an IPv4Address. | |
| 669 | |
| 670 This indicates the server's address. | |
| 671 """ | |
| 672 return address.IPv4Address('TCP', *(self.socket.getsockname() + ('INET',
))) | |
| 673 | |
| 674 def getPeer(self): | |
| 675 """Returns an IPv4Address. | |
| 676 | |
| 677 This indicates the client's address. | |
| 678 """ | |
| 679 return address.IPv4Address('TCP', *(self.client + ('INET',))) | |
| 680 | |
| 681 class Port(base.BasePort, _SocketCloser): | |
| 682 """I am a TCP server port, listening for connections. | |
| 683 | |
| 684 When a connection is accepted, I will call my factory's buildProtocol with | |
| 685 the incoming connection as an argument, according to the specification | |
| 686 described in twisted.internet.interfaces.IProtocolFactory. | |
| 687 | |
| 688 If you wish to change the sort of transport that will be used, my | |
| 689 `transport' attribute will be called with the signature expected for | |
| 690 Server.__init__, so it can be replaced. | |
| 691 """ | |
| 692 | |
| 693 implements(interfaces.IListeningPort) | |
| 694 | |
| 695 addressFamily = socket.AF_INET | |
| 696 socketType = socket.SOCK_STREAM | |
| 697 | |
| 698 transport = Server | |
| 699 sessionno = 0 | |
| 700 interface = '' | |
| 701 backlog = 50 | |
| 702 | |
| 703 # Actual port number being listened on, only set to a non-None | |
| 704 # value when we are actually listening. | |
| 705 _realPortNumber = None | |
| 706 | |
| 707 def __init__(self, port, factory, backlog=50, interface='', reactor=None): | |
| 708 """Initialize with a numeric port to listen on. | |
| 709 """ | |
| 710 base.BasePort.__init__(self, reactor=reactor) | |
| 711 self.port = port | |
| 712 self.factory = factory | |
| 713 self.backlog = backlog | |
| 714 self.interface = interface | |
| 715 | |
| 716 def __repr__(self): | |
| 717 if self._realPortNumber is not None: | |
| 718 return "<%s of %s on %s>" % (self.__class__, self.factory.__class__, | |
| 719 self._realPortNumber) | |
| 720 else: | |
| 721 return "<%s of %s (not listening)>" % (self.__class__, self.factory.
__class__) | |
| 722 | |
| 723 def createInternetSocket(self): | |
| 724 s = base.BasePort.createInternetSocket(self) | |
| 725 if platformType == "posix" and sys.platform != "cygwin": | |
| 726 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| 727 return s | |
| 728 | |
| 729 def startListening(self): | |
| 730 """Create and bind my socket, and begin listening on it. | |
| 731 | |
| 732 This is called on unserialization, and must be called after creating a | |
| 733 server to begin listening on the specified port. | |
| 734 """ | |
| 735 try: | |
| 736 skt = self.createInternetSocket() | |
| 737 skt.bind((self.interface, self.port)) | |
| 738 except socket.error, le: | |
| 739 raise CannotListenError, (self.interface, self.port, le) | |
| 740 | |
| 741 # Make sure that if we listened on port 0, we update that to | |
| 742 # reflect what the OS actually assigned us. | |
| 743 self._realPortNumber = skt.getsockname()[1] | |
| 744 | |
| 745 log.msg("%s starting on %s" % (self.factory.__class__, self._realPortNum
ber)) | |
| 746 | |
| 747 # The order of the next 6 lines is kind of bizarre. If no one | |
| 748 # can explain it, perhaps we should re-arrange them. | |
| 749 self.factory.doStart() | |
| 750 skt.listen(self.backlog) | |
| 751 self.connected = 1 | |
| 752 self.socket = skt | |
| 753 self.fileno = self.socket.fileno | |
| 754 self.numberAccepts = 100 | |
| 755 | |
| 756 self.startReading() | |
| 757 | |
| 758 def _buildAddr(self, (host, port)): | |
| 759 return address._ServerFactoryIPv4Address('TCP', host, port) | |
| 760 | |
| 761 def doRead(self): | |
| 762 """Called when my socket is ready for reading. | |
| 763 | |
| 764 This accepts a connection and calls self.protocol() to handle the | |
| 765 wire-level protocol. | |
| 766 """ | |
| 767 try: | |
| 768 if platformType == "posix": | |
| 769 numAccepts = self.numberAccepts | |
| 770 else: | |
| 771 # win32 event loop breaks if we do more than one accept() | |
| 772 # in an iteration of the event loop. | |
| 773 numAccepts = 1 | |
| 774 for i in range(numAccepts): | |
| 775 # we need this so we can deal with a factory's buildProtocol | |
| 776 # calling our loseConnection | |
| 777 if self.disconnecting: | |
| 778 return | |
| 779 try: | |
| 780 skt, addr = self.socket.accept() | |
| 781 except socket.error, e: | |
| 782 if e.args[0] in (EWOULDBLOCK, EAGAIN): | |
| 783 self.numberAccepts = i | |
| 784 break | |
| 785 elif e.args[0] == EPERM: | |
| 786 # Netfilter on Linux may have rejected the | |
| 787 # connection, but we get told to try to accept() | |
| 788 # anyway. | |
| 789 continue | |
| 790 elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABO
RTED): | |
| 791 | |
| 792 # Linux gives EMFILE when a process is not allowed | |
| 793 # to allocate any more file descriptors. *BSD and | |
| 794 # Win32 give (WSA)ENOBUFS. Linux can also give | |
| 795 # ENFILE if the system is out of inodes, or ENOMEM | |
| 796 # if there is insufficient memory to allocate a new | |
| 797 # dentry. ECONNABORTED is documented as possible on | |
| 798 # both Linux and Windows, but it is not clear | |
| 799 # whether there are actually any circumstances under | |
| 800 # which it can happen (one might expect it to be | |
| 801 # possible if a client sends a FIN or RST after the | |
| 802 # server sends a SYN|ACK but before application code | |
| 803 # calls accept(2), however at least on Linux this | |
| 804 # _seems_ to be short-circuited by syncookies. | |
| 805 | |
| 806 log.msg("Could not accept new connection (%s)" % ( | |
| 807 errorcode[e.args[0]],)) | |
| 808 break | |
| 809 raise | |
| 810 | |
| 811 protocol = self.factory.buildProtocol(self._buildAddr(addr)) | |
| 812 if protocol is None: | |
| 813 skt.close() | |
| 814 continue | |
| 815 s = self.sessionno | |
| 816 self.sessionno = s+1 | |
| 817 transport = self.transport(skt, protocol, addr, self, s) | |
| 818 transport = self._preMakeConnection(transport) | |
| 819 protocol.makeConnection(transport) | |
| 820 else: | |
| 821 self.numberAccepts = self.numberAccepts+20 | |
| 822 except: | |
| 823 # Note that in TLS mode, this will possibly catch SSL.Errors | |
| 824 # raised by self.socket.accept() | |
| 825 # | |
| 826 # There is no "except SSL.Error:" above because SSL may be | |
| 827 # None if there is no SSL support. In any case, all the | |
| 828 # "except SSL.Error:" suite would probably do is log.deferr() | |
| 829 # and return, so handling it here works just as well. | |
| 830 log.deferr() | |
| 831 | |
| 832 def _preMakeConnection(self, transport): | |
| 833 return transport | |
| 834 | |
| 835 def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE)): | |
| 836 """Stop accepting connections on this port. | |
| 837 | |
| 838 This will shut down my socket and call self.connectionLost(). | |
| 839 It returns a deferred which will fire successfully when the | |
| 840 port is actually closed. | |
| 841 """ | |
| 842 self.disconnecting = 1 | |
| 843 self.stopReading() | |
| 844 if self.connected: | |
| 845 self.deferred = defer.Deferred() | |
| 846 self.reactor.callLater(0, self.connectionLost, connDone) | |
| 847 return self.deferred | |
| 848 | |
| 849 stopListening = loseConnection | |
| 850 | |
| 851 def connectionLost(self, reason): | |
| 852 """Cleans up my socket. | |
| 853 """ | |
| 854 log.msg('(Port %s Closed)' % self._realPortNumber) | |
| 855 self._realPortNumber = None | |
| 856 base.BasePort.connectionLost(self, reason) | |
| 857 self.connected = 0 | |
| 858 self._closeSocket() | |
| 859 del self.socket | |
| 860 del self.fileno | |
| 861 self.factory.doStop() | |
| 862 if hasattr(self, "deferred"): | |
| 863 self.deferred.callback(None) | |
| 864 del self.deferred | |
| 865 | |
| 866 def logPrefix(self): | |
| 867 """Returns the name of my class, to prefix log entries with. | |
| 868 """ | |
| 869 return reflect.qual(self.factory.__class__) | |
| 870 | |
| 871 def getHost(self): | |
| 872 """Returns an IPv4Address. | |
| 873 | |
| 874 This indicates the server's address. | |
| 875 """ | |
| 876 return address.IPv4Address('TCP', *(self.socket.getsockname() + ('INET',
))) | |
| 877 | |
| 878 class Connector(base.BaseConnector): | |
| 879 def __init__(self, host, port, factory, timeout, bindAddress, reactor=None): | |
| 880 self.host = host | |
| 881 if isinstance(port, types.StringTypes): | |
| 882 try: | |
| 883 port = socket.getservbyname(port, 'tcp') | |
| 884 except socket.error, e: | |
| 885 raise error.ServiceNameUnknownError(string="%s (%r)" % (e, port)
) | |
| 886 self.port = port | |
| 887 self.bindAddress = bindAddress | |
| 888 base.BaseConnector.__init__(self, factory, timeout, reactor) | |
| 889 | |
| 890 def _makeTransport(self): | |
| 891 return Client(self.host, self.port, self.bindAddress, self, self.reactor
) | |
| 892 | |
| 893 def getDestination(self): | |
| 894 return address.IPv4Address('TCP', self.host, self.port, 'INET') | |
| OLD | NEW |