| OLD | NEW |
| (Empty) |
| 1 # -*- test-case-name: twisted.test.test_internet -*- | |
| 2 # Copyright (c) 2001-2008 Twisted Matrix Laboratories. | |
| 3 # See LICENSE for details. | |
| 4 | |
| 5 | |
| 6 """ | |
| 7 Very basic functionality for a Reactor implementation. | |
| 8 | |
| 9 Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>} | |
| 10 """ | |
| 11 | |
| 12 import socket # needed only for sync-dns | |
| 13 from zope.interface import implements, classImplements | |
| 14 | |
| 15 import sys | |
| 16 import warnings | |
| 17 import operator | |
| 18 from heapq import heappush, heappop, heapify | |
| 19 | |
| 20 try: | |
| 21 import fcntl | |
| 22 except ImportError: | |
| 23 fcntl = None | |
| 24 import traceback | |
| 25 | |
| 26 from twisted.internet.interfaces import IReactorCore, IReactorTime, IReactorThre
ads | |
| 27 from twisted.internet.interfaces import IResolverSimple, IReactorPluggableResolv
er | |
| 28 from twisted.internet.interfaces import IConnector, IDelayedCall | |
| 29 from twisted.internet import main, error, abstract, defer, threads | |
| 30 from twisted.python import log, failure, reflect | |
| 31 from twisted.python.runtime import seconds as runtimeSeconds, platform, platform
Type | |
| 32 from twisted.internet.defer import Deferred, DeferredList | |
| 33 from twisted.persisted import styles | |
| 34 | |
| 35 # This import is for side-effects! Even if you don't see any code using it | |
| 36 # in this module, don't delete it. | |
| 37 from twisted.python import threadable | |
| 38 | |
| 39 | |
| 40 class DelayedCall(styles.Ephemeral): | |
| 41 | |
| 42 implements(IDelayedCall) | |
| 43 # enable .debug to record creator call stack, and it will be logged if | |
| 44 # an exception occurs while the function is being run | |
| 45 debug = False | |
| 46 _str = None | |
| 47 | |
| 48 def __init__(self, time, func, args, kw, cancel, reset, | |
| 49 seconds=runtimeSeconds): | |
| 50 """ | |
| 51 @param time: Seconds from the epoch at which to call C{func}. | |
| 52 @param func: The callable to call. | |
| 53 @param args: The positional arguments to pass to the callable. | |
| 54 @param kw: The keyword arguments to pass to the callable. | |
| 55 @param cancel: A callable which will be called with this | |
| 56 DelayedCall before cancellation. | |
| 57 @param reset: A callable which will be called with this | |
| 58 DelayedCall after changing this DelayedCall's scheduled | |
| 59 execution time. The callable should adjust any necessary | |
| 60 scheduling details to ensure this DelayedCall is invoked | |
| 61 at the new appropriate time. | |
| 62 @param seconds: If provided, a no-argument callable which will be | |
| 63 used to determine the current time any time that information is | |
| 64 needed. | |
| 65 """ | |
| 66 self.time, self.func, self.args, self.kw = time, func, args, kw | |
| 67 self.resetter = reset | |
| 68 self.canceller = cancel | |
| 69 self.seconds = seconds | |
| 70 self.cancelled = self.called = 0 | |
| 71 self.delayed_time = 0 | |
| 72 if self.debug: | |
| 73 self.creator = traceback.format_stack()[:-2] | |
| 74 | |
| 75 def getTime(self): | |
| 76 """Return the time at which this call will fire | |
| 77 | |
| 78 @rtype: C{float} | |
| 79 @return: The number of seconds after the epoch at which this call is | |
| 80 scheduled to be made. | |
| 81 """ | |
| 82 return self.time + self.delayed_time | |
| 83 | |
| 84 def cancel(self): | |
| 85 """Unschedule this call | |
| 86 | |
| 87 @raise AlreadyCancelled: Raised if this call has already been | |
| 88 unscheduled. | |
| 89 | |
| 90 @raise AlreadyCalled: Raised if this call has already been made. | |
| 91 """ | |
| 92 if self.cancelled: | |
| 93 raise error.AlreadyCancelled | |
| 94 elif self.called: | |
| 95 raise error.AlreadyCalled | |
| 96 else: | |
| 97 self.canceller(self) | |
| 98 self.cancelled = 1 | |
| 99 if self.debug: | |
| 100 self._str = str(self) | |
| 101 del self.func, self.args, self.kw | |
| 102 | |
| 103 def reset(self, secondsFromNow): | |
| 104 """Reschedule this call for a different time | |
| 105 | |
| 106 @type secondsFromNow: C{float} | |
| 107 @param secondsFromNow: The number of seconds from the time of the | |
| 108 C{reset} call at which this call will be scheduled. | |
| 109 | |
| 110 @raise AlreadyCancelled: Raised if this call has been cancelled. | |
| 111 @raise AlreadyCalled: Raised if this call has already been made. | |
| 112 """ | |
| 113 if self.cancelled: | |
| 114 raise error.AlreadyCancelled | |
| 115 elif self.called: | |
| 116 raise error.AlreadyCalled | |
| 117 else: | |
| 118 newTime = self.seconds() + secondsFromNow | |
| 119 if newTime < self.time: | |
| 120 self.delayed_time = 0 | |
| 121 self.time = newTime | |
| 122 self.resetter(self) | |
| 123 else: | |
| 124 self.delayed_time = newTime - self.time | |
| 125 | |
| 126 def delay(self, secondsLater): | |
| 127 """Reschedule this call for a later time | |
| 128 | |
| 129 @type secondsLater: C{float} | |
| 130 @param secondsLater: The number of seconds after the originally | |
| 131 scheduled time for which to reschedule this call. | |
| 132 | |
| 133 @raise AlreadyCancelled: Raised if this call has been cancelled. | |
| 134 @raise AlreadyCalled: Raised if this call has already been made. | |
| 135 """ | |
| 136 if self.cancelled: | |
| 137 raise error.AlreadyCancelled | |
| 138 elif self.called: | |
| 139 raise error.AlreadyCalled | |
| 140 else: | |
| 141 self.delayed_time += secondsLater | |
| 142 if self.delayed_time < 0: | |
| 143 self.activate_delay() | |
| 144 self.resetter(self) | |
| 145 | |
| 146 def activate_delay(self): | |
| 147 self.time += self.delayed_time | |
| 148 self.delayed_time = 0 | |
| 149 | |
| 150 def active(self): | |
| 151 """Determine whether this call is still pending | |
| 152 | |
| 153 @rtype: C{bool} | |
| 154 @return: True if this call has not yet been made or cancelled, | |
| 155 False otherwise. | |
| 156 """ | |
| 157 return not (self.cancelled or self.called) | |
| 158 | |
| 159 def __le__(self, other): | |
| 160 return self.time <= other.time | |
| 161 | |
| 162 def __str__(self): | |
| 163 if self._str is not None: | |
| 164 return self._str | |
| 165 if hasattr(self, 'func'): | |
| 166 if hasattr(self.func, 'func_name'): | |
| 167 func = self.func.func_name | |
| 168 if hasattr(self.func, 'im_class'): | |
| 169 func = self.func.im_class.__name__ + '.' + func | |
| 170 else: | |
| 171 func = reflect.safe_repr(self.func) | |
| 172 else: | |
| 173 func = None | |
| 174 | |
| 175 now = self.seconds() | |
| 176 L = ["<DelayedCall %s [%ss] called=%s cancelled=%s" % ( | |
| 177 id(self), self.time - now, self.called, self.cancelled)] | |
| 178 if func is not None: | |
| 179 L.extend((" ", func, "(")) | |
| 180 if self.args: | |
| 181 L.append(", ".join([reflect.safe_repr(e) for e in self.args])) | |
| 182 if self.kw: | |
| 183 L.append(", ") | |
| 184 if self.kw: | |
| 185 L.append(", ".join(['%s=%s' % (k, reflect.safe_repr(v)) for (k,
v) in self.kw.iteritems()])) | |
| 186 L.append(")") | |
| 187 | |
| 188 if self.debug: | |
| 189 L.append("\n\ntraceback at creation: \n\n%s" % (' '.join(self.cre
ator))) | |
| 190 L.append('>') | |
| 191 | |
| 192 return "".join(L) | |
| 193 | |
| 194 | |
| 195 class ThreadedResolver: | |
| 196 implements(IResolverSimple) | |
| 197 | |
| 198 def __init__(self, reactor): | |
| 199 self.reactor = reactor | |
| 200 self._runningQueries = {} | |
| 201 | |
| 202 def _fail(self, name, err): | |
| 203 err = error.DNSLookupError("address %r not found: %s" % (name, err)) | |
| 204 return failure.Failure(err) | |
| 205 | |
| 206 def _cleanup(self, name, lookupDeferred): | |
| 207 userDeferred, cancelCall = self._runningQueries[lookupDeferred] | |
| 208 del self._runningQueries[lookupDeferred] | |
| 209 userDeferred.errback(self._fail(name, "timeout error")) | |
| 210 | |
| 211 def _checkTimeout(self, result, name, lookupDeferred): | |
| 212 try: | |
| 213 userDeferred, cancelCall = self._runningQueries[lookupDeferred] | |
| 214 except KeyError: | |
| 215 pass | |
| 216 else: | |
| 217 del self._runningQueries[lookupDeferred] | |
| 218 cancelCall.cancel() | |
| 219 | |
| 220 if isinstance(result, failure.Failure): | |
| 221 userDeferred.errback(self._fail(name, result.getErrorMessage())) | |
| 222 else: | |
| 223 userDeferred.callback(result) | |
| 224 | |
| 225 def getHostByName(self, name, timeout = (1, 3, 11, 45)): | |
| 226 if timeout: | |
| 227 timeoutDelay = reduce(operator.add, timeout) | |
| 228 else: | |
| 229 timeoutDelay = 60 | |
| 230 userDeferred = defer.Deferred() | |
| 231 lookupDeferred = threads.deferToThread(socket.gethostbyname, name) | |
| 232 cancelCall = self.reactor.callLater( | |
| 233 timeoutDelay, self._cleanup, name, lookupDeferred) | |
| 234 self._runningQueries[lookupDeferred] = (userDeferred, cancelCall) | |
| 235 lookupDeferred.addBoth(self._checkTimeout, name, lookupDeferred) | |
| 236 return userDeferred | |
| 237 | |
| 238 class BlockingResolver: | |
| 239 implements(IResolverSimple) | |
| 240 | |
| 241 def getHostByName(self, name, timeout = (1, 3, 11, 45)): | |
| 242 try: | |
| 243 address = socket.gethostbyname(name) | |
| 244 except socket.error: | |
| 245 msg = "address %r not found" % (name,) | |
| 246 err = error.DNSLookupError(msg) | |
| 247 return defer.fail(err) | |
| 248 else: | |
| 249 return defer.succeed(address) | |
| 250 | |
| 251 | |
| 252 class _ThreePhaseEvent(object): | |
| 253 """ | |
| 254 Collection of callables (with arguments) which can be invoked as a group in | |
| 255 a particular order. | |
| 256 | |
| 257 This provides the underlying implementation for the reactor's system event | |
| 258 triggers. An instance of this class tracks triggers for all phases of a | |
| 259 single type of event. | |
| 260 | |
| 261 @ivar before: A list of the before-phase triggers containing three-tuples | |
| 262 of a callable, a tuple of positional arguments, and a dict of keyword | |
| 263 arguments | |
| 264 | |
| 265 @ivar finishedBefore: A list of the before-phase triggers which have | |
| 266 already been executed. This is only populated in the C{'BEFORE'} state. | |
| 267 | |
| 268 @ivar during: A list of the during-phase triggers containing three-tuples | |
| 269 of a callable, a tuple of positional arguments, and a dict of keyword | |
| 270 arguments | |
| 271 | |
| 272 @ivar after: A list of the after-phase triggers containing three-tuples | |
| 273 of a callable, a tuple of positional arguments, and a dict of keyword | |
| 274 arguments | |
| 275 | |
| 276 @ivar state: A string indicating what is currently going on with this | |
| 277 object. One of C{'BASE'} (for when nothing in particular is happening; | |
| 278 this is the initial value), C{'BEFORE'} (when the before-phase triggers | |
| 279 are in the process of being executed). | |
| 280 """ | |
| 281 def __init__(self): | |
| 282 self.before = [] | |
| 283 self.during = [] | |
| 284 self.after = [] | |
| 285 self.state = 'BASE' | |
| 286 | |
| 287 | |
| 288 def addTrigger(self, phase, callable, *args, **kwargs): | |
| 289 """ | |
| 290 Add a trigger to the indicate phase. | |
| 291 | |
| 292 @param phase: One of C{'before'}, C{'during'}, or C{'after'}. | |
| 293 | |
| 294 @param callable: An object to be called when this event is triggered. | |
| 295 @param *args: Positional arguments to pass to C{callable}. | |
| 296 @param **kwargs: Keyword arguments to pass to C{callable}. | |
| 297 | |
| 298 @return: An opaque handle which may be passed to L{removeTrigger} to | |
| 299 reverse the effects of calling this method. | |
| 300 """ | |
| 301 if phase not in ('before', 'during', 'after'): | |
| 302 raise KeyError("invalid phase") | |
| 303 getattr(self, phase).append((callable, args, kwargs)) | |
| 304 return phase, callable, args, kwargs | |
| 305 | |
| 306 | |
| 307 def removeTrigger(self, handle): | |
| 308 """ | |
| 309 Remove a previously added trigger callable. | |
| 310 | |
| 311 @param handle: An object previously returned by L{addTrigger}. The | |
| 312 trigger added by that call will be removed. | |
| 313 | |
| 314 @raise ValueError: If the trigger associated with C{handle} has already | |
| 315 been removed or if C{handle} is not a valid handle. | |
| 316 """ | |
| 317 return getattr(self, 'removeTrigger_' + self.state)(handle) | |
| 318 | |
| 319 | |
| 320 def removeTrigger_BASE(self, handle): | |
| 321 """ | |
| 322 Just try to remove the trigger. | |
| 323 | |
| 324 @see removeTrigger | |
| 325 """ | |
| 326 try: | |
| 327 phase, callable, args, kwargs = handle | |
| 328 except (TypeError, ValueError), e: | |
| 329 raise ValueError("invalid trigger handle") | |
| 330 else: | |
| 331 if phase not in ('before', 'during', 'after'): | |
| 332 raise KeyError("invalid phase") | |
| 333 getattr(self, phase).remove((callable, args, kwargs)) | |
| 334 | |
| 335 | |
| 336 def removeTrigger_BEFORE(self, handle): | |
| 337 """ | |
| 338 Remove the trigger if it has yet to be executed, otherwise emit a | |
| 339 warning that in the future an exception will be raised when removing an | |
| 340 already-executed trigger. | |
| 341 | |
| 342 @see removeTrigger | |
| 343 """ | |
| 344 phase, callable, args, kwargs = handle | |
| 345 if phase != 'before': | |
| 346 return self.removeTrigger_BASE(handle) | |
| 347 if (callable, args, kwargs) in self.finishedBefore: | |
| 348 warnings.warn( | |
| 349 "Removing already-fired system event triggers will raise an " | |
| 350 "exception in a future version of Twisted.", | |
| 351 category=DeprecationWarning, | |
| 352 stacklevel=3) | |
| 353 else: | |
| 354 self.removeTrigger_BASE(handle) | |
| 355 | |
| 356 | |
| 357 def fireEvent(self): | |
| 358 """ | |
| 359 Call the triggers added to this event. | |
| 360 """ | |
| 361 self.state = 'BEFORE' | |
| 362 self.finishedBefore = [] | |
| 363 beforeResults = [] | |
| 364 while self.before: | |
| 365 callable, args, kwargs = self.before.pop(0) | |
| 366 self.finishedBefore.append((callable, args, kwargs)) | |
| 367 try: | |
| 368 result = callable(*args, **kwargs) | |
| 369 except: | |
| 370 log.err() | |
| 371 else: | |
| 372 if isinstance(result, Deferred): | |
| 373 beforeResults.append(result) | |
| 374 DeferredList(beforeResults).addCallback(self._continueFiring) | |
| 375 | |
| 376 | |
| 377 def _continueFiring(self, ignored): | |
| 378 """ | |
| 379 Call the during and after phase triggers for this event. | |
| 380 """ | |
| 381 self.state = 'BASE' | |
| 382 self.finishedBefore = [] | |
| 383 for phase in self.during, self.after: | |
| 384 while phase: | |
| 385 callable, args, kwargs = phase.pop(0) | |
| 386 try: | |
| 387 callable(*args, **kwargs) | |
| 388 except: | |
| 389 log.err() | |
| 390 | |
| 391 | |
| 392 | |
| 393 class ReactorBase(object): | |
| 394 """ | |
| 395 Default base class for Reactors. | |
| 396 | |
| 397 @type _stopped: C{bool} | |
| 398 @ivar _stopped: A flag which is true between paired calls to C{reactor.run} | |
| 399 and C{reactor.stop}. | |
| 400 """ | |
| 401 | |
| 402 implements(IReactorCore, IReactorTime, IReactorPluggableResolver) | |
| 403 | |
| 404 _stopped = True | |
| 405 installed = False | |
| 406 usingThreads = False | |
| 407 resolver = BlockingResolver() | |
| 408 | |
| 409 __name__ = "twisted.internet.reactor" | |
| 410 | |
| 411 def __init__(self): | |
| 412 self.threadCallQueue = [] | |
| 413 self._eventTriggers = {} | |
| 414 self._pendingTimedCalls = [] | |
| 415 self._newTimedCalls = [] | |
| 416 self._cancellations = 0 | |
| 417 self.running = False | |
| 418 self.waker = None | |
| 419 | |
| 420 self.addSystemEventTrigger('during', 'shutdown', self.crash) | |
| 421 self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll) | |
| 422 | |
| 423 if platform.supportsThreads(): | |
| 424 self._initThreads() | |
| 425 | |
| 426 # override in subclasses | |
| 427 | |
| 428 _lock = None | |
| 429 | |
| 430 def installWaker(self): | |
| 431 raise NotImplementedError() | |
| 432 | |
| 433 def installResolver(self, resolver): | |
| 434 assert IResolverSimple.providedBy(resolver) | |
| 435 oldResolver = self.resolver | |
| 436 self.resolver = resolver | |
| 437 return oldResolver | |
| 438 | |
| 439 def wakeUp(self): | |
| 440 """Wake up the event loop.""" | |
| 441 if not threadable.isInIOThread(): | |
| 442 if self.waker: | |
| 443 self.waker.wakeUp() | |
| 444 # if the waker isn't installed, the reactor isn't running, and | |
| 445 # therefore doesn't need to be woken up | |
| 446 | |
| 447 def doIteration(self, delay): | |
| 448 """Do one iteration over the readers and writers we know about.""" | |
| 449 raise NotImplementedError | |
| 450 | |
| 451 def addReader(self, reader): | |
| 452 raise NotImplementedError | |
| 453 | |
| 454 def addWriter(self, writer): | |
| 455 raise NotImplementedError | |
| 456 | |
| 457 def removeReader(self, reader): | |
| 458 raise NotImplementedError | |
| 459 | |
| 460 def removeWriter(self, writer): | |
| 461 raise NotImplementedError | |
| 462 | |
| 463 def removeAll(self): | |
| 464 raise NotImplementedError | |
| 465 | |
| 466 | |
| 467 def getReaders(self): | |
| 468 raise NotImplementedError() | |
| 469 | |
| 470 | |
| 471 def getWriters(self): | |
| 472 raise NotImplementedError() | |
| 473 | |
| 474 | |
| 475 def resolve(self, name, timeout = (1, 3, 11, 45)): | |
| 476 """Return a Deferred that will resolve a hostname. | |
| 477 """ | |
| 478 if not name: | |
| 479 # XXX - This is *less than* '::', and will screw up IPv6 servers | |
| 480 return defer.succeed('0.0.0.0') | |
| 481 if abstract.isIPAddress(name): | |
| 482 return defer.succeed(name) | |
| 483 return self.resolver.getHostByName(name, timeout) | |
| 484 | |
| 485 # Installation. | |
| 486 | |
| 487 # IReactorCore | |
| 488 | |
| 489 def stop(self): | |
| 490 """ | |
| 491 See twisted.internet.interfaces.IReactorCore.stop. | |
| 492 """ | |
| 493 if self._stopped: | |
| 494 raise error.ReactorNotRunning( | |
| 495 "Can't stop reactor that isn't running.") | |
| 496 self._stopped = True | |
| 497 self.callLater(0, self.fireSystemEvent, "shutdown") | |
| 498 | |
| 499 def crash(self): | |
| 500 """ | |
| 501 See twisted.internet.interfaces.IReactorCore.crash. | |
| 502 """ | |
| 503 self.running = False | |
| 504 | |
| 505 def sigInt(self, *args): | |
| 506 """Handle a SIGINT interrupt. | |
| 507 """ | |
| 508 log.msg("Received SIGINT, shutting down.") | |
| 509 self.callFromThread(self.stop) | |
| 510 | |
| 511 def sigBreak(self, *args): | |
| 512 """Handle a SIGBREAK interrupt. | |
| 513 """ | |
| 514 log.msg("Received SIGBREAK, shutting down.") | |
| 515 self.callFromThread(self.stop) | |
| 516 | |
| 517 def sigTerm(self, *args): | |
| 518 """Handle a SIGTERM interrupt. | |
| 519 """ | |
| 520 log.msg("Received SIGTERM, shutting down.") | |
| 521 self.callFromThread(self.stop) | |
| 522 | |
| 523 def disconnectAll(self): | |
| 524 """Disconnect every reader, and writer in the system. | |
| 525 """ | |
| 526 selectables = self.removeAll() | |
| 527 for reader in selectables: | |
| 528 log.callWithLogger(reader, | |
| 529 reader.connectionLost, | |
| 530 failure.Failure(main.CONNECTION_LOST)) | |
| 531 | |
| 532 | |
| 533 def iterate(self, delay=0): | |
| 534 """See twisted.internet.interfaces.IReactorCore.iterate. | |
| 535 """ | |
| 536 self.runUntilCurrent() | |
| 537 self.doIteration(delay) | |
| 538 | |
| 539 | |
| 540 def fireSystemEvent(self, eventType): | |
| 541 """See twisted.internet.interfaces.IReactorCore.fireSystemEvent. | |
| 542 """ | |
| 543 event = self._eventTriggers.get(eventType) | |
| 544 if event is not None: | |
| 545 event.fireEvent() | |
| 546 | |
| 547 | |
| 548 def addSystemEventTrigger(self, _phase, _eventType, _f, *args, **kw): | |
| 549 """See twisted.internet.interfaces.IReactorCore.addSystemEventTrigger. | |
| 550 """ | |
| 551 assert callable(_f), "%s is not callable" % _f | |
| 552 if _eventType not in self._eventTriggers: | |
| 553 self._eventTriggers[_eventType] = _ThreePhaseEvent() | |
| 554 return (_eventType, self._eventTriggers[_eventType].addTrigger( | |
| 555 _phase, _f, *args, **kw)) | |
| 556 | |
| 557 | |
| 558 def removeSystemEventTrigger(self, triggerID): | |
| 559 """See twisted.internet.interfaces.IReactorCore.removeSystemEventTrigger
. | |
| 560 """ | |
| 561 eventType, handle = triggerID | |
| 562 self._eventTriggers[eventType].removeTrigger(handle) | |
| 563 | |
| 564 | |
| 565 def callWhenRunning(self, _callable, *args, **kw): | |
| 566 """See twisted.internet.interfaces.IReactorCore.callWhenRunning. | |
| 567 """ | |
| 568 if self.running: | |
| 569 _callable(*args, **kw) | |
| 570 else: | |
| 571 return self.addSystemEventTrigger('after', 'startup', | |
| 572 _callable, *args, **kw) | |
| 573 | |
| 574 def startRunning(self): | |
| 575 """ | |
| 576 Method called when reactor starts: do some initialization and fire | |
| 577 startup events. | |
| 578 | |
| 579 Don't call this directly, call reactor.run() instead: it should take | |
| 580 care of calling this. | |
| 581 """ | |
| 582 if self.running: | |
| 583 warnings.warn( | |
| 584 "Reactor already running! This behavior is deprecated " | |
| 585 "since Twisted 8.0", | |
| 586 category=DeprecationWarning, stacklevel=3) | |
| 587 self.running = True | |
| 588 self._stopped = False | |
| 589 threadable.registerAsIOThread() | |
| 590 self.fireSystemEvent('startup') | |
| 591 | |
| 592 # IReactorTime | |
| 593 | |
| 594 seconds = staticmethod(runtimeSeconds) | |
| 595 | |
| 596 def callLater(self, _seconds, _f, *args, **kw): | |
| 597 """See twisted.internet.interfaces.IReactorTime.callLater. | |
| 598 """ | |
| 599 assert callable(_f), "%s is not callable" % _f | |
| 600 assert sys.maxint >= _seconds >= 0, \ | |
| 601 "%s is not greater than or equal to 0 seconds" % (_seconds,) | |
| 602 tple = DelayedCall(self.seconds() + _seconds, _f, args, kw, | |
| 603 self._cancelCallLater, | |
| 604 self._moveCallLaterSooner, | |
| 605 seconds=self.seconds) | |
| 606 self._newTimedCalls.append(tple) | |
| 607 return tple | |
| 608 | |
| 609 def _moveCallLaterSooner(self, tple): | |
| 610 # Linear time find: slow. | |
| 611 heap = self._pendingTimedCalls | |
| 612 try: | |
| 613 pos = heap.index(tple) | |
| 614 | |
| 615 # Move elt up the heap until it rests at the right place. | |
| 616 elt = heap[pos] | |
| 617 while pos != 0: | |
| 618 parent = (pos-1) // 2 | |
| 619 if heap[parent] <= elt: | |
| 620 break | |
| 621 # move parent down | |
| 622 heap[pos] = heap[parent] | |
| 623 pos = parent | |
| 624 heap[pos] = elt | |
| 625 except ValueError: | |
| 626 # element was not found in heap - oh well... | |
| 627 pass | |
| 628 | |
| 629 def _cancelCallLater(self, tple): | |
| 630 self._cancellations+=1 | |
| 631 | |
| 632 def cancelCallLater(self, callID): | |
| 633 """See twisted.internet.interfaces.IReactorTime.cancelCallLater. | |
| 634 """ | |
| 635 # DO NOT DELETE THIS - this is documented in Python in a Nutshell, so we | |
| 636 # we can't get rid of it for a long time. | |
| 637 warnings.warn("reactor.cancelCallLater(callID) is deprecated - use callI
D.cancel() instead") | |
| 638 callID.cancel() | |
| 639 | |
| 640 def getDelayedCalls(self): | |
| 641 """Return all the outstanding delayed calls in the system. | |
| 642 They are returned in no particular order. | |
| 643 This method is not efficient -- it is really only meant for | |
| 644 test cases.""" | |
| 645 return [x for x in (self._pendingTimedCalls + self._newTimedCalls) if no
t x.cancelled] | |
| 646 | |
| 647 def _insertNewDelayedCalls(self): | |
| 648 for call in self._newTimedCalls: | |
| 649 if call.cancelled: | |
| 650 self._cancellations-=1 | |
| 651 else: | |
| 652 call.activate_delay() | |
| 653 heappush(self._pendingTimedCalls, call) | |
| 654 self._newTimedCalls = [] | |
| 655 | |
| 656 def timeout(self): | |
| 657 # insert new delayed calls to make sure to include them in timeout value | |
| 658 self._insertNewDelayedCalls() | |
| 659 | |
| 660 if not self._pendingTimedCalls: | |
| 661 return None | |
| 662 | |
| 663 return max(0, self._pendingTimedCalls[0].time - self.seconds()) | |
| 664 | |
| 665 | |
| 666 def runUntilCurrent(self): | |
| 667 """Run all pending timed calls. | |
| 668 """ | |
| 669 if self.threadCallQueue: | |
| 670 # Keep track of how many calls we actually make, as we're | |
| 671 # making them, in case another call is added to the queue | |
| 672 # while we're in this loop. | |
| 673 count = 0 | |
| 674 total = len(self.threadCallQueue) | |
| 675 for (f, a, kw) in self.threadCallQueue: | |
| 676 try: | |
| 677 f(*a, **kw) | |
| 678 except: | |
| 679 log.err() | |
| 680 count += 1 | |
| 681 if count == total: | |
| 682 break | |
| 683 del self.threadCallQueue[:count] | |
| 684 if self.threadCallQueue: | |
| 685 if self.waker: | |
| 686 self.waker.wakeUp() | |
| 687 | |
| 688 # insert new delayed calls now | |
| 689 self._insertNewDelayedCalls() | |
| 690 | |
| 691 now = self.seconds() | |
| 692 while self._pendingTimedCalls and (self._pendingTimedCalls[0].time <= no
w): | |
| 693 call = heappop(self._pendingTimedCalls) | |
| 694 if call.cancelled: | |
| 695 self._cancellations-=1 | |
| 696 continue | |
| 697 | |
| 698 if call.delayed_time > 0: | |
| 699 call.activate_delay() | |
| 700 heappush(self._pendingTimedCalls, call) | |
| 701 continue | |
| 702 | |
| 703 try: | |
| 704 call.called = 1 | |
| 705 call.func(*call.args, **call.kw) | |
| 706 except: | |
| 707 log.deferr() | |
| 708 if hasattr(call, "creator"): | |
| 709 e = "\n" | |
| 710 e += " C: previous exception occurred in " + \ | |
| 711 "a DelayedCall created here:\n" | |
| 712 e += " C:" | |
| 713 e += "".join(call.creator).rstrip().replace("\n","\n C:") | |
| 714 e += "\n" | |
| 715 log.msg(e) | |
| 716 | |
| 717 | |
| 718 if (self._cancellations > 50 and | |
| 719 self._cancellations > len(self._pendingTimedCalls) >> 1): | |
| 720 self._cancellations = 0 | |
| 721 self._pendingTimedCalls = [x for x in self._pendingTimedCalls | |
| 722 if not x.cancelled] | |
| 723 heapify(self._pendingTimedCalls) | |
| 724 | |
| 725 # IReactorProcess | |
| 726 | |
| 727 def _checkProcessArgs(self, args, env): | |
| 728 """ | |
| 729 Check for valid arguments and environment to spawnProcess. | |
| 730 | |
| 731 @return: A two element tuple giving values to use when creating the | |
| 732 process. The first element of the tuple is a C{list} of C{str} | |
| 733 giving the values for argv of the child process. The second element | |
| 734 of the tuple is either C{None} if C{env} was C{None} or a C{dict} | |
| 735 mapping C{str} environment keys to C{str} environment values. | |
| 736 """ | |
| 737 # Any unicode string which Python would successfully implicitly | |
| 738 # encode to a byte string would have worked before these explicit | |
| 739 # checks were added. Anything which would have failed with a | |
| 740 # UnicodeEncodeError during that implicit encoding step would have | |
| 741 # raised an exception in the child process and that would have been | |
| 742 # a pain in the butt to debug. | |
| 743 # | |
| 744 # So, we will explicitly attempt the same encoding which Python | |
| 745 # would implicitly do later. If it fails, we will report an error | |
| 746 # without ever spawning a child process. If it succeeds, we'll save | |
| 747 # the result so that Python doesn't need to do it implicitly later. | |
| 748 # | |
| 749 # For any unicode which we can actually encode, we'll also issue a | |
| 750 # deprecation warning, because no one should be passing unicode here | |
| 751 # anyway. | |
| 752 # | |
| 753 # -exarkun | |
| 754 defaultEncoding = sys.getdefaultencoding() | |
| 755 | |
| 756 # Common check function | |
| 757 def argChecker(arg): | |
| 758 """ | |
| 759 Return either a str or None. If the given value is not | |
| 760 allowable for some reason, None is returned. Otherwise, a | |
| 761 possibly different object which should be used in place of arg | |
| 762 is returned. This forces unicode encoding to happen now, rather | |
| 763 than implicitly later. | |
| 764 """ | |
| 765 if isinstance(arg, unicode): | |
| 766 try: | |
| 767 arg = arg.encode(defaultEncoding) | |
| 768 except UnicodeEncodeError: | |
| 769 return None | |
| 770 warnings.warn( | |
| 771 "Argument strings and environment keys/values passed to " | |
| 772 "reactor.spawnProcess should be str, not unicode.", | |
| 773 category=DeprecationWarning, | |
| 774 stacklevel=4) | |
| 775 if isinstance(arg, str) and '\0' not in arg: | |
| 776 return arg | |
| 777 return None | |
| 778 | |
| 779 # Make a few tests to check input validity | |
| 780 if not isinstance(args, (tuple, list)): | |
| 781 raise TypeError("Arguments must be a tuple or list") | |
| 782 | |
| 783 outputArgs = [] | |
| 784 for arg in args: | |
| 785 arg = argChecker(arg) | |
| 786 if arg is None: | |
| 787 raise TypeError("Arguments contain a non-string value") | |
| 788 else: | |
| 789 outputArgs.append(arg) | |
| 790 | |
| 791 outputEnv = None | |
| 792 if env is not None: | |
| 793 outputEnv = {} | |
| 794 for key, val in env.iteritems(): | |
| 795 key = argChecker(key) | |
| 796 if key is None: | |
| 797 raise TypeError("Environment contains a non-string key") | |
| 798 val = argChecker(val) | |
| 799 if val is None: | |
| 800 raise TypeError("Environment contains a non-string value") | |
| 801 outputEnv[key] = val | |
| 802 return outputArgs, outputEnv | |
| 803 | |
| 804 # IReactorThreads | |
| 805 if platform.supportsThreads(): | |
| 806 threadpool = None | |
| 807 # ID of the trigger stopping the threadpool | |
| 808 threadpoolShutdownID = None | |
| 809 | |
| 810 def _initThreads(self): | |
| 811 self.usingThreads = True | |
| 812 self.resolver = ThreadedResolver(self) | |
| 813 self.installWaker() | |
| 814 | |
| 815 def callFromThread(self, f, *args, **kw): | |
| 816 """ | |
| 817 See L{twisted.internet.interfaces.IReactorThreads.callFromThread}. | |
| 818 """ | |
| 819 assert callable(f), "%s is not callable" % (f,) | |
| 820 # lists are thread-safe in CPython, but not in Jython | |
| 821 # this is probably a bug in Jython, but until fixed this code | |
| 822 # won't work in Jython. | |
| 823 self.threadCallQueue.append((f, args, kw)) | |
| 824 self.wakeUp() | |
| 825 | |
| 826 def _initThreadPool(self): | |
| 827 """ | |
| 828 Create the threadpool accessible with callFromThread. | |
| 829 """ | |
| 830 from twisted.python import threadpool | |
| 831 self.threadpool = threadpool.ThreadPool(0, 10, 'twisted.internet.rea
ctor') | |
| 832 self.callWhenRunning(self.threadpool.start) | |
| 833 self.threadpoolShutdownID = self.addSystemEventTrigger( | |
| 834 'during', 'shutdown', self._stopThreadPool) | |
| 835 | |
| 836 def _stopThreadPool(self): | |
| 837 """ | |
| 838 Stop the reactor threadpool. | |
| 839 """ | |
| 840 self.threadpoolShutdownID = None | |
| 841 self.threadpool.stop() | |
| 842 self.threadpool = None | |
| 843 | |
| 844 def callInThread(self, _callable, *args, **kwargs): | |
| 845 """ | |
| 846 See L{twisted.internet.interfaces.IReactorThreads.callInThread}. | |
| 847 """ | |
| 848 if self.threadpool is None: | |
| 849 self._initThreadPool() | |
| 850 self.threadpool.callInThread(_callable, *args, **kwargs) | |
| 851 | |
| 852 def suggestThreadPoolSize(self, size): | |
| 853 """ | |
| 854 See L{twisted.internet.interfaces.IReactorThreads.suggestThreadPoolS
ize}. | |
| 855 """ | |
| 856 if size == 0 and self.threadpool is None: | |
| 857 return | |
| 858 if self.threadpool is None: | |
| 859 self._initThreadPool() | |
| 860 self.threadpool.adjustPoolsize(maxthreads=size) | |
| 861 else: | |
| 862 # This is for signal handlers. | |
| 863 def callFromThread(self, f, *args, **kw): | |
| 864 assert callable(f), "%s is not callable" % (f,) | |
| 865 # See comment in the other callFromThread implementation. | |
| 866 self.threadCallQueue.append((f, args, kw)) | |
| 867 | |
| 868 if platform.supportsThreads(): | |
| 869 classImplements(ReactorBase, IReactorThreads) | |
| 870 | |
| 871 | |
| 872 class BaseConnector(styles.Ephemeral): | |
| 873 """Basic implementation of connector. | |
| 874 | |
| 875 State can be: "connecting", "connected", "disconnected" | |
| 876 """ | |
| 877 | |
| 878 implements(IConnector) | |
| 879 | |
| 880 timeoutID = None | |
| 881 factoryStarted = 0 | |
| 882 | |
| 883 def __init__(self, factory, timeout, reactor): | |
| 884 self.state = "disconnected" | |
| 885 self.reactor = reactor | |
| 886 self.factory = factory | |
| 887 self.timeout = timeout | |
| 888 | |
| 889 def disconnect(self): | |
| 890 """Disconnect whatever our state is.""" | |
| 891 if self.state == 'connecting': | |
| 892 self.stopConnecting() | |
| 893 elif self.state == 'connected': | |
| 894 self.transport.loseConnection() | |
| 895 | |
| 896 def connect(self): | |
| 897 """Start connection to remote server.""" | |
| 898 if self.state != "disconnected": | |
| 899 raise RuntimeError, "can't connect in this state" | |
| 900 | |
| 901 self.state = "connecting" | |
| 902 if not self.factoryStarted: | |
| 903 self.factory.doStart() | |
| 904 self.factoryStarted = 1 | |
| 905 self.transport = transport = self._makeTransport() | |
| 906 if self.timeout is not None: | |
| 907 self.timeoutID = self.reactor.callLater(self.timeout, transport.fail
IfNotConnected, error.TimeoutError()) | |
| 908 self.factory.startedConnecting(self) | |
| 909 | |
| 910 def stopConnecting(self): | |
| 911 """Stop attempting to connect.""" | |
| 912 if self.state != "connecting": | |
| 913 raise error.NotConnectingError, "we're not trying to connect" | |
| 914 | |
| 915 self.state = "disconnected" | |
| 916 self.transport.failIfNotConnected(error.UserError()) | |
| 917 del self.transport | |
| 918 | |
| 919 def cancelTimeout(self): | |
| 920 if self.timeoutID is not None: | |
| 921 try: | |
| 922 self.timeoutID.cancel() | |
| 923 except ValueError: | |
| 924 pass | |
| 925 del self.timeoutID | |
| 926 | |
| 927 def buildProtocol(self, addr): | |
| 928 self.state = "connected" | |
| 929 self.cancelTimeout() | |
| 930 return self.factory.buildProtocol(addr) | |
| 931 | |
| 932 def connectionFailed(self, reason): | |
| 933 self.cancelTimeout() | |
| 934 self.transport = None | |
| 935 self.state = "disconnected" | |
| 936 self.factory.clientConnectionFailed(self, reason) | |
| 937 if self.state == "disconnected": | |
| 938 # factory hasn't called our connect() method | |
| 939 self.factory.doStop() | |
| 940 self.factoryStarted = 0 | |
| 941 | |
| 942 def connectionLost(self, reason): | |
| 943 self.state = "disconnected" | |
| 944 self.factory.clientConnectionLost(self, reason) | |
| 945 if self.state == "disconnected": | |
| 946 # factory hasn't called our connect() method | |
| 947 self.factory.doStop() | |
| 948 self.factoryStarted = 0 | |
| 949 | |
| 950 def getDestination(self): | |
| 951 raise NotImplementedError, "implement in subclasses" | |
| 952 | |
| 953 | |
| 954 class BasePort(abstract.FileDescriptor): | |
| 955 """Basic implementation of a ListeningPort. | |
| 956 | |
| 957 Note: This does not actually implement IListeningPort. | |
| 958 """ | |
| 959 | |
| 960 addressFamily = None | |
| 961 socketType = None | |
| 962 | |
| 963 def createInternetSocket(self): | |
| 964 s = socket.socket(self.addressFamily, self.socketType) | |
| 965 s.setblocking(0) | |
| 966 if fcntl and hasattr(fcntl, 'FD_CLOEXEC'): | |
| 967 old = fcntl.fcntl(s.fileno(), fcntl.F_GETFD) | |
| 968 fcntl.fcntl(s.fileno(), fcntl.F_SETFD, old | fcntl.FD_CLOEXEC) | |
| 969 return s | |
| 970 | |
| 971 | |
| 972 def doWrite(self): | |
| 973 """Raises a RuntimeError""" | |
| 974 raise RuntimeError, "doWrite called on a %s" % reflect.qual(self.__class
__) | |
| 975 | |
| 976 | |
| 977 | |
| 978 class _SignalReactorMixin: | |
| 979 """ | |
| 980 Private mixin to manage signals: it installs signal handlers at start time, | |
| 981 and define run method. | |
| 982 | |
| 983 It can only be used mixed in with L{ReactorBase}, and has to be defined | |
| 984 first in the inheritance (so that method resolution order finds | |
| 985 startRunning first). | |
| 986 """ | |
| 987 | |
| 988 def _handleSignals(self): | |
| 989 """ | |
| 990 Install the signal handlers for the Twisted event loop. | |
| 991 """ | |
| 992 try: | |
| 993 import signal | |
| 994 except ImportError: | |
| 995 log.msg("Warning: signal module unavailable -- " | |
| 996 "not installing signal handlers.") | |
| 997 return | |
| 998 | |
| 999 if signal.getsignal(signal.SIGINT) == signal.default_int_handler: | |
| 1000 # only handle if there isn't already a handler, e.g. for Pdb. | |
| 1001 signal.signal(signal.SIGINT, self.sigInt) | |
| 1002 signal.signal(signal.SIGTERM, self.sigTerm) | |
| 1003 | |
| 1004 # Catch Ctrl-Break in windows | |
| 1005 if hasattr(signal, "SIGBREAK"): | |
| 1006 signal.signal(signal.SIGBREAK, self.sigBreak) | |
| 1007 | |
| 1008 if platformType == 'posix': | |
| 1009 signal.signal(signal.SIGCHLD, self._handleSigchld) | |
| 1010 | |
| 1011 | |
| 1012 def _handleSigchld(self, signum, frame, _threadSupport=platform.supportsThre
ads()): | |
| 1013 """ | |
| 1014 Reap all processes on SIGCHLD. | |
| 1015 | |
| 1016 This gets called on SIGCHLD. We do no processing inside a signal | |
| 1017 handler, as the calls we make here could occur between any two | |
| 1018 python bytecode instructions. Deferring processing to the next | |
| 1019 eventloop round prevents us from violating the state constraints | |
| 1020 of arbitrary classes. | |
| 1021 """ | |
| 1022 from twisted.internet.process import reapAllProcesses | |
| 1023 if _threadSupport: | |
| 1024 self.callFromThread(reapAllProcesses) | |
| 1025 else: | |
| 1026 self.callLater(0, reapAllProcesses) | |
| 1027 | |
| 1028 | |
| 1029 def startRunning(self, installSignalHandlers=True): | |
| 1030 """ | |
| 1031 Forward call to ReactorBase, arrange for signal handlers to be | |
| 1032 installed if asked. | |
| 1033 """ | |
| 1034 if installSignalHandlers: | |
| 1035 # Make sure this happens before after-startup events, since the | |
| 1036 # expectation of after-startup is that the reactor is fully | |
| 1037 # initialized. Don't do it right away for historical reasons | |
| 1038 # (perhaps some before-startup triggers don't want there to be a | |
| 1039 # custom SIGCHLD handler so that they can run child processes with | |
| 1040 # some blocking api). | |
| 1041 self.addSystemEventTrigger( | |
| 1042 'during', 'startup', self._handleSignals) | |
| 1043 ReactorBase.startRunning(self) | |
| 1044 | |
| 1045 | |
| 1046 def run(self, installSignalHandlers=True): | |
| 1047 self.startRunning(installSignalHandlers=installSignalHandlers) | |
| 1048 self.mainLoop() | |
| 1049 | |
| 1050 | |
| 1051 def mainLoop(self): | |
| 1052 while self.running: | |
| 1053 try: | |
| 1054 while self.running: | |
| 1055 # Advance simulation time in delayed event | |
| 1056 # processors. | |
| 1057 self.runUntilCurrent() | |
| 1058 t2 = self.timeout() | |
| 1059 t = self.running and t2 | |
| 1060 self.doIteration(t) | |
| 1061 except: | |
| 1062 log.msg("Unexpected error in main loop.") | |
| 1063 log.err() | |
| 1064 else: | |
| 1065 log.msg('Main loop terminated.') | |
| 1066 | |
| 1067 | |
| 1068 | |
| 1069 __all__ = [] | |
| OLD | NEW |