| Index: third_party/twisted_8_1/twisted/internet/base.py
|
| diff --git a/third_party/twisted_8_1/twisted/internet/base.py b/third_party/twisted_8_1/twisted/internet/base.py
|
| deleted file mode 100644
|
| index 0d700f8fed259656312159c2f3f2267291ee1f8e..0000000000000000000000000000000000000000
|
| --- a/third_party/twisted_8_1/twisted/internet/base.py
|
| +++ /dev/null
|
| @@ -1,1069 +0,0 @@
|
| -# -*- test-case-name: twisted.test.test_internet -*-
|
| -# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
|
| -# See LICENSE for details.
|
| -
|
| -
|
| -"""
|
| -Very basic functionality for a Reactor implementation.
|
| -
|
| -Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>}
|
| -"""
|
| -
|
| -import socket # needed only for sync-dns
|
| -from zope.interface import implements, classImplements
|
| -
|
| -import sys
|
| -import warnings
|
| -import operator
|
| -from heapq import heappush, heappop, heapify
|
| -
|
| -try:
|
| - import fcntl
|
| -except ImportError:
|
| - fcntl = None
|
| -import traceback
|
| -
|
| -from twisted.internet.interfaces import IReactorCore, IReactorTime, IReactorThreads
|
| -from twisted.internet.interfaces import IResolverSimple, IReactorPluggableResolver
|
| -from twisted.internet.interfaces import IConnector, IDelayedCall
|
| -from twisted.internet import main, error, abstract, defer, threads
|
| -from twisted.python import log, failure, reflect
|
| -from twisted.python.runtime import seconds as runtimeSeconds, platform, platformType
|
| -from twisted.internet.defer import Deferred, DeferredList
|
| -from twisted.persisted import styles
|
| -
|
| -# This import is for side-effects! Even if you don't see any code using it
|
| -# in this module, don't delete it.
|
| -from twisted.python import threadable
|
| -
|
| -
|
| -class DelayedCall(styles.Ephemeral):
|
| -
|
| - implements(IDelayedCall)
|
| - # enable .debug to record creator call stack, and it will be logged if
|
| - # an exception occurs while the function is being run
|
| - debug = False
|
| - _str = None
|
| -
|
| - def __init__(self, time, func, args, kw, cancel, reset,
|
| - seconds=runtimeSeconds):
|
| - """
|
| - @param time: Seconds from the epoch at which to call C{func}.
|
| - @param func: The callable to call.
|
| - @param args: The positional arguments to pass to the callable.
|
| - @param kw: The keyword arguments to pass to the callable.
|
| - @param cancel: A callable which will be called with this
|
| - DelayedCall before cancellation.
|
| - @param reset: A callable which will be called with this
|
| - DelayedCall after changing this DelayedCall's scheduled
|
| - execution time. The callable should adjust any necessary
|
| - scheduling details to ensure this DelayedCall is invoked
|
| - at the new appropriate time.
|
| - @param seconds: If provided, a no-argument callable which will be
|
| - used to determine the current time any time that information is
|
| - needed.
|
| - """
|
| - self.time, self.func, self.args, self.kw = time, func, args, kw
|
| - self.resetter = reset
|
| - self.canceller = cancel
|
| - self.seconds = seconds
|
| - self.cancelled = self.called = 0
|
| - self.delayed_time = 0
|
| - if self.debug:
|
| - self.creator = traceback.format_stack()[:-2]
|
| -
|
| - def getTime(self):
|
| - """Return the time at which this call will fire
|
| -
|
| - @rtype: C{float}
|
| - @return: The number of seconds after the epoch at which this call is
|
| - scheduled to be made.
|
| - """
|
| - return self.time + self.delayed_time
|
| -
|
| - def cancel(self):
|
| - """Unschedule this call
|
| -
|
| - @raise AlreadyCancelled: Raised if this call has already been
|
| - unscheduled.
|
| -
|
| - @raise AlreadyCalled: Raised if this call has already been made.
|
| - """
|
| - if self.cancelled:
|
| - raise error.AlreadyCancelled
|
| - elif self.called:
|
| - raise error.AlreadyCalled
|
| - else:
|
| - self.canceller(self)
|
| - self.cancelled = 1
|
| - if self.debug:
|
| - self._str = str(self)
|
| - del self.func, self.args, self.kw
|
| -
|
| - def reset(self, secondsFromNow):
|
| - """Reschedule this call for a different time
|
| -
|
| - @type secondsFromNow: C{float}
|
| - @param secondsFromNow: The number of seconds from the time of the
|
| - C{reset} call at which this call will be scheduled.
|
| -
|
| - @raise AlreadyCancelled: Raised if this call has been cancelled.
|
| - @raise AlreadyCalled: Raised if this call has already been made.
|
| - """
|
| - if self.cancelled:
|
| - raise error.AlreadyCancelled
|
| - elif self.called:
|
| - raise error.AlreadyCalled
|
| - else:
|
| - newTime = self.seconds() + secondsFromNow
|
| - if newTime < self.time:
|
| - self.delayed_time = 0
|
| - self.time = newTime
|
| - self.resetter(self)
|
| - else:
|
| - self.delayed_time = newTime - self.time
|
| -
|
| - def delay(self, secondsLater):
|
| - """Reschedule this call for a later time
|
| -
|
| - @type secondsLater: C{float}
|
| - @param secondsLater: The number of seconds after the originally
|
| - scheduled time for which to reschedule this call.
|
| -
|
| - @raise AlreadyCancelled: Raised if this call has been cancelled.
|
| - @raise AlreadyCalled: Raised if this call has already been made.
|
| - """
|
| - if self.cancelled:
|
| - raise error.AlreadyCancelled
|
| - elif self.called:
|
| - raise error.AlreadyCalled
|
| - else:
|
| - self.delayed_time += secondsLater
|
| - if self.delayed_time < 0:
|
| - self.activate_delay()
|
| - self.resetter(self)
|
| -
|
| - def activate_delay(self):
|
| - self.time += self.delayed_time
|
| - self.delayed_time = 0
|
| -
|
| - def active(self):
|
| - """Determine whether this call is still pending
|
| -
|
| - @rtype: C{bool}
|
| - @return: True if this call has not yet been made or cancelled,
|
| - False otherwise.
|
| - """
|
| - return not (self.cancelled or self.called)
|
| -
|
| - def __le__(self, other):
|
| - return self.time <= other.time
|
| -
|
| - def __str__(self):
|
| - if self._str is not None:
|
| - return self._str
|
| - if hasattr(self, 'func'):
|
| - if hasattr(self.func, 'func_name'):
|
| - func = self.func.func_name
|
| - if hasattr(self.func, 'im_class'):
|
| - func = self.func.im_class.__name__ + '.' + func
|
| - else:
|
| - func = reflect.safe_repr(self.func)
|
| - else:
|
| - func = None
|
| -
|
| - now = self.seconds()
|
| - L = ["<DelayedCall %s [%ss] called=%s cancelled=%s" % (
|
| - id(self), self.time - now, self.called, self.cancelled)]
|
| - if func is not None:
|
| - L.extend((" ", func, "("))
|
| - if self.args:
|
| - L.append(", ".join([reflect.safe_repr(e) for e in self.args]))
|
| - if self.kw:
|
| - L.append(", ")
|
| - if self.kw:
|
| - L.append(", ".join(['%s=%s' % (k, reflect.safe_repr(v)) for (k, v) in self.kw.iteritems()]))
|
| - L.append(")")
|
| -
|
| - if self.debug:
|
| - L.append("\n\ntraceback at creation: \n\n%s" % (' '.join(self.creator)))
|
| - L.append('>')
|
| -
|
| - return "".join(L)
|
| -
|
| -
|
| -class ThreadedResolver:
|
| - implements(IResolverSimple)
|
| -
|
| - def __init__(self, reactor):
|
| - self.reactor = reactor
|
| - self._runningQueries = {}
|
| -
|
| - def _fail(self, name, err):
|
| - err = error.DNSLookupError("address %r not found: %s" % (name, err))
|
| - return failure.Failure(err)
|
| -
|
| - def _cleanup(self, name, lookupDeferred):
|
| - userDeferred, cancelCall = self._runningQueries[lookupDeferred]
|
| - del self._runningQueries[lookupDeferred]
|
| - userDeferred.errback(self._fail(name, "timeout error"))
|
| -
|
| - def _checkTimeout(self, result, name, lookupDeferred):
|
| - try:
|
| - userDeferred, cancelCall = self._runningQueries[lookupDeferred]
|
| - except KeyError:
|
| - pass
|
| - else:
|
| - del self._runningQueries[lookupDeferred]
|
| - cancelCall.cancel()
|
| -
|
| - if isinstance(result, failure.Failure):
|
| - userDeferred.errback(self._fail(name, result.getErrorMessage()))
|
| - else:
|
| - userDeferred.callback(result)
|
| -
|
| - def getHostByName(self, name, timeout = (1, 3, 11, 45)):
|
| - if timeout:
|
| - timeoutDelay = reduce(operator.add, timeout)
|
| - else:
|
| - timeoutDelay = 60
|
| - userDeferred = defer.Deferred()
|
| - lookupDeferred = threads.deferToThread(socket.gethostbyname, name)
|
| - cancelCall = self.reactor.callLater(
|
| - timeoutDelay, self._cleanup, name, lookupDeferred)
|
| - self._runningQueries[lookupDeferred] = (userDeferred, cancelCall)
|
| - lookupDeferred.addBoth(self._checkTimeout, name, lookupDeferred)
|
| - return userDeferred
|
| -
|
| -class BlockingResolver:
|
| - implements(IResolverSimple)
|
| -
|
| - def getHostByName(self, name, timeout = (1, 3, 11, 45)):
|
| - try:
|
| - address = socket.gethostbyname(name)
|
| - except socket.error:
|
| - msg = "address %r not found" % (name,)
|
| - err = error.DNSLookupError(msg)
|
| - return defer.fail(err)
|
| - else:
|
| - return defer.succeed(address)
|
| -
|
| -
|
| -class _ThreePhaseEvent(object):
|
| - """
|
| - Collection of callables (with arguments) which can be invoked as a group in
|
| - a particular order.
|
| -
|
| - This provides the underlying implementation for the reactor's system event
|
| - triggers. An instance of this class tracks triggers for all phases of a
|
| - single type of event.
|
| -
|
| - @ivar before: A list of the before-phase triggers containing three-tuples
|
| - of a callable, a tuple of positional arguments, and a dict of keyword
|
| - arguments
|
| -
|
| - @ivar finishedBefore: A list of the before-phase triggers which have
|
| - already been executed. This is only populated in the C{'BEFORE'} state.
|
| -
|
| - @ivar during: A list of the during-phase triggers containing three-tuples
|
| - of a callable, a tuple of positional arguments, and a dict of keyword
|
| - arguments
|
| -
|
| - @ivar after: A list of the after-phase triggers containing three-tuples
|
| - of a callable, a tuple of positional arguments, and a dict of keyword
|
| - arguments
|
| -
|
| - @ivar state: A string indicating what is currently going on with this
|
| - object. One of C{'BASE'} (for when nothing in particular is happening;
|
| - this is the initial value), C{'BEFORE'} (when the before-phase triggers
|
| - are in the process of being executed).
|
| - """
|
| - def __init__(self):
|
| - self.before = []
|
| - self.during = []
|
| - self.after = []
|
| - self.state = 'BASE'
|
| -
|
| -
|
| - def addTrigger(self, phase, callable, *args, **kwargs):
|
| - """
|
| - Add a trigger to the indicate phase.
|
| -
|
| - @param phase: One of C{'before'}, C{'during'}, or C{'after'}.
|
| -
|
| - @param callable: An object to be called when this event is triggered.
|
| - @param *args: Positional arguments to pass to C{callable}.
|
| - @param **kwargs: Keyword arguments to pass to C{callable}.
|
| -
|
| - @return: An opaque handle which may be passed to L{removeTrigger} to
|
| - reverse the effects of calling this method.
|
| - """
|
| - if phase not in ('before', 'during', 'after'):
|
| - raise KeyError("invalid phase")
|
| - getattr(self, phase).append((callable, args, kwargs))
|
| - return phase, callable, args, kwargs
|
| -
|
| -
|
| - def removeTrigger(self, handle):
|
| - """
|
| - Remove a previously added trigger callable.
|
| -
|
| - @param handle: An object previously returned by L{addTrigger}. The
|
| - trigger added by that call will be removed.
|
| -
|
| - @raise ValueError: If the trigger associated with C{handle} has already
|
| - been removed or if C{handle} is not a valid handle.
|
| - """
|
| - return getattr(self, 'removeTrigger_' + self.state)(handle)
|
| -
|
| -
|
| - def removeTrigger_BASE(self, handle):
|
| - """
|
| - Just try to remove the trigger.
|
| -
|
| - @see removeTrigger
|
| - """
|
| - try:
|
| - phase, callable, args, kwargs = handle
|
| - except (TypeError, ValueError), e:
|
| - raise ValueError("invalid trigger handle")
|
| - else:
|
| - if phase not in ('before', 'during', 'after'):
|
| - raise KeyError("invalid phase")
|
| - getattr(self, phase).remove((callable, args, kwargs))
|
| -
|
| -
|
| - def removeTrigger_BEFORE(self, handle):
|
| - """
|
| - Remove the trigger if it has yet to be executed, otherwise emit a
|
| - warning that in the future an exception will be raised when removing an
|
| - already-executed trigger.
|
| -
|
| - @see removeTrigger
|
| - """
|
| - phase, callable, args, kwargs = handle
|
| - if phase != 'before':
|
| - return self.removeTrigger_BASE(handle)
|
| - if (callable, args, kwargs) in self.finishedBefore:
|
| - warnings.warn(
|
| - "Removing already-fired system event triggers will raise an "
|
| - "exception in a future version of Twisted.",
|
| - category=DeprecationWarning,
|
| - stacklevel=3)
|
| - else:
|
| - self.removeTrigger_BASE(handle)
|
| -
|
| -
|
| - def fireEvent(self):
|
| - """
|
| - Call the triggers added to this event.
|
| - """
|
| - self.state = 'BEFORE'
|
| - self.finishedBefore = []
|
| - beforeResults = []
|
| - while self.before:
|
| - callable, args, kwargs = self.before.pop(0)
|
| - self.finishedBefore.append((callable, args, kwargs))
|
| - try:
|
| - result = callable(*args, **kwargs)
|
| - except:
|
| - log.err()
|
| - else:
|
| - if isinstance(result, Deferred):
|
| - beforeResults.append(result)
|
| - DeferredList(beforeResults).addCallback(self._continueFiring)
|
| -
|
| -
|
| - def _continueFiring(self, ignored):
|
| - """
|
| - Call the during and after phase triggers for this event.
|
| - """
|
| - self.state = 'BASE'
|
| - self.finishedBefore = []
|
| - for phase in self.during, self.after:
|
| - while phase:
|
| - callable, args, kwargs = phase.pop(0)
|
| - try:
|
| - callable(*args, **kwargs)
|
| - except:
|
| - log.err()
|
| -
|
| -
|
| -
|
| -class ReactorBase(object):
|
| - """
|
| - Default base class for Reactors.
|
| -
|
| - @type _stopped: C{bool}
|
| - @ivar _stopped: A flag which is true between paired calls to C{reactor.run}
|
| - and C{reactor.stop}.
|
| - """
|
| -
|
| - implements(IReactorCore, IReactorTime, IReactorPluggableResolver)
|
| -
|
| - _stopped = True
|
| - installed = False
|
| - usingThreads = False
|
| - resolver = BlockingResolver()
|
| -
|
| - __name__ = "twisted.internet.reactor"
|
| -
|
| - def __init__(self):
|
| - self.threadCallQueue = []
|
| - self._eventTriggers = {}
|
| - self._pendingTimedCalls = []
|
| - self._newTimedCalls = []
|
| - self._cancellations = 0
|
| - self.running = False
|
| - self.waker = None
|
| -
|
| - self.addSystemEventTrigger('during', 'shutdown', self.crash)
|
| - self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
|
| -
|
| - if platform.supportsThreads():
|
| - self._initThreads()
|
| -
|
| - # override in subclasses
|
| -
|
| - _lock = None
|
| -
|
| - def installWaker(self):
|
| - raise NotImplementedError()
|
| -
|
| - def installResolver(self, resolver):
|
| - assert IResolverSimple.providedBy(resolver)
|
| - oldResolver = self.resolver
|
| - self.resolver = resolver
|
| - return oldResolver
|
| -
|
| - def wakeUp(self):
|
| - """Wake up the event loop."""
|
| - if not threadable.isInIOThread():
|
| - if self.waker:
|
| - self.waker.wakeUp()
|
| - # if the waker isn't installed, the reactor isn't running, and
|
| - # therefore doesn't need to be woken up
|
| -
|
| - def doIteration(self, delay):
|
| - """Do one iteration over the readers and writers we know about."""
|
| - raise NotImplementedError
|
| -
|
| - def addReader(self, reader):
|
| - raise NotImplementedError
|
| -
|
| - def addWriter(self, writer):
|
| - raise NotImplementedError
|
| -
|
| - def removeReader(self, reader):
|
| - raise NotImplementedError
|
| -
|
| - def removeWriter(self, writer):
|
| - raise NotImplementedError
|
| -
|
| - def removeAll(self):
|
| - raise NotImplementedError
|
| -
|
| -
|
| - def getReaders(self):
|
| - raise NotImplementedError()
|
| -
|
| -
|
| - def getWriters(self):
|
| - raise NotImplementedError()
|
| -
|
| -
|
| - def resolve(self, name, timeout = (1, 3, 11, 45)):
|
| - """Return a Deferred that will resolve a hostname.
|
| - """
|
| - if not name:
|
| - # XXX - This is *less than* '::', and will screw up IPv6 servers
|
| - return defer.succeed('0.0.0.0')
|
| - if abstract.isIPAddress(name):
|
| - return defer.succeed(name)
|
| - return self.resolver.getHostByName(name, timeout)
|
| -
|
| - # Installation.
|
| -
|
| - # IReactorCore
|
| -
|
| - def stop(self):
|
| - """
|
| - See twisted.internet.interfaces.IReactorCore.stop.
|
| - """
|
| - if self._stopped:
|
| - raise error.ReactorNotRunning(
|
| - "Can't stop reactor that isn't running.")
|
| - self._stopped = True
|
| - self.callLater(0, self.fireSystemEvent, "shutdown")
|
| -
|
| - def crash(self):
|
| - """
|
| - See twisted.internet.interfaces.IReactorCore.crash.
|
| - """
|
| - self.running = False
|
| -
|
| - def sigInt(self, *args):
|
| - """Handle a SIGINT interrupt.
|
| - """
|
| - log.msg("Received SIGINT, shutting down.")
|
| - self.callFromThread(self.stop)
|
| -
|
| - def sigBreak(self, *args):
|
| - """Handle a SIGBREAK interrupt.
|
| - """
|
| - log.msg("Received SIGBREAK, shutting down.")
|
| - self.callFromThread(self.stop)
|
| -
|
| - def sigTerm(self, *args):
|
| - """Handle a SIGTERM interrupt.
|
| - """
|
| - log.msg("Received SIGTERM, shutting down.")
|
| - self.callFromThread(self.stop)
|
| -
|
| - def disconnectAll(self):
|
| - """Disconnect every reader, and writer in the system.
|
| - """
|
| - selectables = self.removeAll()
|
| - for reader in selectables:
|
| - log.callWithLogger(reader,
|
| - reader.connectionLost,
|
| - failure.Failure(main.CONNECTION_LOST))
|
| -
|
| -
|
| - def iterate(self, delay=0):
|
| - """See twisted.internet.interfaces.IReactorCore.iterate.
|
| - """
|
| - self.runUntilCurrent()
|
| - self.doIteration(delay)
|
| -
|
| -
|
| - def fireSystemEvent(self, eventType):
|
| - """See twisted.internet.interfaces.IReactorCore.fireSystemEvent.
|
| - """
|
| - event = self._eventTriggers.get(eventType)
|
| - if event is not None:
|
| - event.fireEvent()
|
| -
|
| -
|
| - def addSystemEventTrigger(self, _phase, _eventType, _f, *args, **kw):
|
| - """See twisted.internet.interfaces.IReactorCore.addSystemEventTrigger.
|
| - """
|
| - assert callable(_f), "%s is not callable" % _f
|
| - if _eventType not in self._eventTriggers:
|
| - self._eventTriggers[_eventType] = _ThreePhaseEvent()
|
| - return (_eventType, self._eventTriggers[_eventType].addTrigger(
|
| - _phase, _f, *args, **kw))
|
| -
|
| -
|
| - def removeSystemEventTrigger(self, triggerID):
|
| - """See twisted.internet.interfaces.IReactorCore.removeSystemEventTrigger.
|
| - """
|
| - eventType, handle = triggerID
|
| - self._eventTriggers[eventType].removeTrigger(handle)
|
| -
|
| -
|
| - def callWhenRunning(self, _callable, *args, **kw):
|
| - """See twisted.internet.interfaces.IReactorCore.callWhenRunning.
|
| - """
|
| - if self.running:
|
| - _callable(*args, **kw)
|
| - else:
|
| - return self.addSystemEventTrigger('after', 'startup',
|
| - _callable, *args, **kw)
|
| -
|
| - def startRunning(self):
|
| - """
|
| - Method called when reactor starts: do some initialization and fire
|
| - startup events.
|
| -
|
| - Don't call this directly, call reactor.run() instead: it should take
|
| - care of calling this.
|
| - """
|
| - if self.running:
|
| - warnings.warn(
|
| - "Reactor already running! This behavior is deprecated "
|
| - "since Twisted 8.0",
|
| - category=DeprecationWarning, stacklevel=3)
|
| - self.running = True
|
| - self._stopped = False
|
| - threadable.registerAsIOThread()
|
| - self.fireSystemEvent('startup')
|
| -
|
| - # IReactorTime
|
| -
|
| - seconds = staticmethod(runtimeSeconds)
|
| -
|
| - def callLater(self, _seconds, _f, *args, **kw):
|
| - """See twisted.internet.interfaces.IReactorTime.callLater.
|
| - """
|
| - assert callable(_f), "%s is not callable" % _f
|
| - assert sys.maxint >= _seconds >= 0, \
|
| - "%s is not greater than or equal to 0 seconds" % (_seconds,)
|
| - tple = DelayedCall(self.seconds() + _seconds, _f, args, kw,
|
| - self._cancelCallLater,
|
| - self._moveCallLaterSooner,
|
| - seconds=self.seconds)
|
| - self._newTimedCalls.append(tple)
|
| - return tple
|
| -
|
| - def _moveCallLaterSooner(self, tple):
|
| - # Linear time find: slow.
|
| - heap = self._pendingTimedCalls
|
| - try:
|
| - pos = heap.index(tple)
|
| -
|
| - # Move elt up the heap until it rests at the right place.
|
| - elt = heap[pos]
|
| - while pos != 0:
|
| - parent = (pos-1) // 2
|
| - if heap[parent] <= elt:
|
| - break
|
| - # move parent down
|
| - heap[pos] = heap[parent]
|
| - pos = parent
|
| - heap[pos] = elt
|
| - except ValueError:
|
| - # element was not found in heap - oh well...
|
| - pass
|
| -
|
| - def _cancelCallLater(self, tple):
|
| - self._cancellations+=1
|
| -
|
| - def cancelCallLater(self, callID):
|
| - """See twisted.internet.interfaces.IReactorTime.cancelCallLater.
|
| - """
|
| - # DO NOT DELETE THIS - this is documented in Python in a Nutshell, so we
|
| - # we can't get rid of it for a long time.
|
| - warnings.warn("reactor.cancelCallLater(callID) is deprecated - use callID.cancel() instead")
|
| - callID.cancel()
|
| -
|
| - def getDelayedCalls(self):
|
| - """Return all the outstanding delayed calls in the system.
|
| - They are returned in no particular order.
|
| - This method is not efficient -- it is really only meant for
|
| - test cases."""
|
| - return [x for x in (self._pendingTimedCalls + self._newTimedCalls) if not x.cancelled]
|
| -
|
| - def _insertNewDelayedCalls(self):
|
| - for call in self._newTimedCalls:
|
| - if call.cancelled:
|
| - self._cancellations-=1
|
| - else:
|
| - call.activate_delay()
|
| - heappush(self._pendingTimedCalls, call)
|
| - self._newTimedCalls = []
|
| -
|
| - def timeout(self):
|
| - # insert new delayed calls to make sure to include them in timeout value
|
| - self._insertNewDelayedCalls()
|
| -
|
| - if not self._pendingTimedCalls:
|
| - return None
|
| -
|
| - return max(0, self._pendingTimedCalls[0].time - self.seconds())
|
| -
|
| -
|
| - def runUntilCurrent(self):
|
| - """Run all pending timed calls.
|
| - """
|
| - if self.threadCallQueue:
|
| - # Keep track of how many calls we actually make, as we're
|
| - # making them, in case another call is added to the queue
|
| - # while we're in this loop.
|
| - count = 0
|
| - total = len(self.threadCallQueue)
|
| - for (f, a, kw) in self.threadCallQueue:
|
| - try:
|
| - f(*a, **kw)
|
| - except:
|
| - log.err()
|
| - count += 1
|
| - if count == total:
|
| - break
|
| - del self.threadCallQueue[:count]
|
| - if self.threadCallQueue:
|
| - if self.waker:
|
| - self.waker.wakeUp()
|
| -
|
| - # insert new delayed calls now
|
| - self._insertNewDelayedCalls()
|
| -
|
| - now = self.seconds()
|
| - while self._pendingTimedCalls and (self._pendingTimedCalls[0].time <= now):
|
| - call = heappop(self._pendingTimedCalls)
|
| - if call.cancelled:
|
| - self._cancellations-=1
|
| - continue
|
| -
|
| - if call.delayed_time > 0:
|
| - call.activate_delay()
|
| - heappush(self._pendingTimedCalls, call)
|
| - continue
|
| -
|
| - try:
|
| - call.called = 1
|
| - call.func(*call.args, **call.kw)
|
| - except:
|
| - log.deferr()
|
| - if hasattr(call, "creator"):
|
| - e = "\n"
|
| - e += " C: previous exception occurred in " + \
|
| - "a DelayedCall created here:\n"
|
| - e += " C:"
|
| - e += "".join(call.creator).rstrip().replace("\n","\n C:")
|
| - e += "\n"
|
| - log.msg(e)
|
| -
|
| -
|
| - if (self._cancellations > 50 and
|
| - self._cancellations > len(self._pendingTimedCalls) >> 1):
|
| - self._cancellations = 0
|
| - self._pendingTimedCalls = [x for x in self._pendingTimedCalls
|
| - if not x.cancelled]
|
| - heapify(self._pendingTimedCalls)
|
| -
|
| - # IReactorProcess
|
| -
|
| - def _checkProcessArgs(self, args, env):
|
| - """
|
| - Check for valid arguments and environment to spawnProcess.
|
| -
|
| - @return: A two element tuple giving values to use when creating the
|
| - process. The first element of the tuple is a C{list} of C{str}
|
| - giving the values for argv of the child process. The second element
|
| - of the tuple is either C{None} if C{env} was C{None} or a C{dict}
|
| - mapping C{str} environment keys to C{str} environment values.
|
| - """
|
| - # Any unicode string which Python would successfully implicitly
|
| - # encode to a byte string would have worked before these explicit
|
| - # checks were added. Anything which would have failed with a
|
| - # UnicodeEncodeError during that implicit encoding step would have
|
| - # raised an exception in the child process and that would have been
|
| - # a pain in the butt to debug.
|
| - #
|
| - # So, we will explicitly attempt the same encoding which Python
|
| - # would implicitly do later. If it fails, we will report an error
|
| - # without ever spawning a child process. If it succeeds, we'll save
|
| - # the result so that Python doesn't need to do it implicitly later.
|
| - #
|
| - # For any unicode which we can actually encode, we'll also issue a
|
| - # deprecation warning, because no one should be passing unicode here
|
| - # anyway.
|
| - #
|
| - # -exarkun
|
| - defaultEncoding = sys.getdefaultencoding()
|
| -
|
| - # Common check function
|
| - def argChecker(arg):
|
| - """
|
| - Return either a str or None. If the given value is not
|
| - allowable for some reason, None is returned. Otherwise, a
|
| - possibly different object which should be used in place of arg
|
| - is returned. This forces unicode encoding to happen now, rather
|
| - than implicitly later.
|
| - """
|
| - if isinstance(arg, unicode):
|
| - try:
|
| - arg = arg.encode(defaultEncoding)
|
| - except UnicodeEncodeError:
|
| - return None
|
| - warnings.warn(
|
| - "Argument strings and environment keys/values passed to "
|
| - "reactor.spawnProcess should be str, not unicode.",
|
| - category=DeprecationWarning,
|
| - stacklevel=4)
|
| - if isinstance(arg, str) and '\0' not in arg:
|
| - return arg
|
| - return None
|
| -
|
| - # Make a few tests to check input validity
|
| - if not isinstance(args, (tuple, list)):
|
| - raise TypeError("Arguments must be a tuple or list")
|
| -
|
| - outputArgs = []
|
| - for arg in args:
|
| - arg = argChecker(arg)
|
| - if arg is None:
|
| - raise TypeError("Arguments contain a non-string value")
|
| - else:
|
| - outputArgs.append(arg)
|
| -
|
| - outputEnv = None
|
| - if env is not None:
|
| - outputEnv = {}
|
| - for key, val in env.iteritems():
|
| - key = argChecker(key)
|
| - if key is None:
|
| - raise TypeError("Environment contains a non-string key")
|
| - val = argChecker(val)
|
| - if val is None:
|
| - raise TypeError("Environment contains a non-string value")
|
| - outputEnv[key] = val
|
| - return outputArgs, outputEnv
|
| -
|
| - # IReactorThreads
|
| - if platform.supportsThreads():
|
| - threadpool = None
|
| - # ID of the trigger stopping the threadpool
|
| - threadpoolShutdownID = None
|
| -
|
| - def _initThreads(self):
|
| - self.usingThreads = True
|
| - self.resolver = ThreadedResolver(self)
|
| - self.installWaker()
|
| -
|
| - def callFromThread(self, f, *args, **kw):
|
| - """
|
| - See L{twisted.internet.interfaces.IReactorThreads.callFromThread}.
|
| - """
|
| - assert callable(f), "%s is not callable" % (f,)
|
| - # lists are thread-safe in CPython, but not in Jython
|
| - # this is probably a bug in Jython, but until fixed this code
|
| - # won't work in Jython.
|
| - self.threadCallQueue.append((f, args, kw))
|
| - self.wakeUp()
|
| -
|
| - def _initThreadPool(self):
|
| - """
|
| - Create the threadpool accessible with callFromThread.
|
| - """
|
| - from twisted.python import threadpool
|
| - self.threadpool = threadpool.ThreadPool(0, 10, 'twisted.internet.reactor')
|
| - self.callWhenRunning(self.threadpool.start)
|
| - self.threadpoolShutdownID = self.addSystemEventTrigger(
|
| - 'during', 'shutdown', self._stopThreadPool)
|
| -
|
| - def _stopThreadPool(self):
|
| - """
|
| - Stop the reactor threadpool.
|
| - """
|
| - self.threadpoolShutdownID = None
|
| - self.threadpool.stop()
|
| - self.threadpool = None
|
| -
|
| - def callInThread(self, _callable, *args, **kwargs):
|
| - """
|
| - See L{twisted.internet.interfaces.IReactorThreads.callInThread}.
|
| - """
|
| - if self.threadpool is None:
|
| - self._initThreadPool()
|
| - self.threadpool.callInThread(_callable, *args, **kwargs)
|
| -
|
| - def suggestThreadPoolSize(self, size):
|
| - """
|
| - See L{twisted.internet.interfaces.IReactorThreads.suggestThreadPoolSize}.
|
| - """
|
| - if size == 0 and self.threadpool is None:
|
| - return
|
| - if self.threadpool is None:
|
| - self._initThreadPool()
|
| - self.threadpool.adjustPoolsize(maxthreads=size)
|
| - else:
|
| - # This is for signal handlers.
|
| - def callFromThread(self, f, *args, **kw):
|
| - assert callable(f), "%s is not callable" % (f,)
|
| - # See comment in the other callFromThread implementation.
|
| - self.threadCallQueue.append((f, args, kw))
|
| -
|
| -if platform.supportsThreads():
|
| - classImplements(ReactorBase, IReactorThreads)
|
| -
|
| -
|
| -class BaseConnector(styles.Ephemeral):
|
| - """Basic implementation of connector.
|
| -
|
| - State can be: "connecting", "connected", "disconnected"
|
| - """
|
| -
|
| - implements(IConnector)
|
| -
|
| - timeoutID = None
|
| - factoryStarted = 0
|
| -
|
| - def __init__(self, factory, timeout, reactor):
|
| - self.state = "disconnected"
|
| - self.reactor = reactor
|
| - self.factory = factory
|
| - self.timeout = timeout
|
| -
|
| - def disconnect(self):
|
| - """Disconnect whatever our state is."""
|
| - if self.state == 'connecting':
|
| - self.stopConnecting()
|
| - elif self.state == 'connected':
|
| - self.transport.loseConnection()
|
| -
|
| - def connect(self):
|
| - """Start connection to remote server."""
|
| - if self.state != "disconnected":
|
| - raise RuntimeError, "can't connect in this state"
|
| -
|
| - self.state = "connecting"
|
| - if not self.factoryStarted:
|
| - self.factory.doStart()
|
| - self.factoryStarted = 1
|
| - self.transport = transport = self._makeTransport()
|
| - if self.timeout is not None:
|
| - self.timeoutID = self.reactor.callLater(self.timeout, transport.failIfNotConnected, error.TimeoutError())
|
| - self.factory.startedConnecting(self)
|
| -
|
| - def stopConnecting(self):
|
| - """Stop attempting to connect."""
|
| - if self.state != "connecting":
|
| - raise error.NotConnectingError, "we're not trying to connect"
|
| -
|
| - self.state = "disconnected"
|
| - self.transport.failIfNotConnected(error.UserError())
|
| - del self.transport
|
| -
|
| - def cancelTimeout(self):
|
| - if self.timeoutID is not None:
|
| - try:
|
| - self.timeoutID.cancel()
|
| - except ValueError:
|
| - pass
|
| - del self.timeoutID
|
| -
|
| - def buildProtocol(self, addr):
|
| - self.state = "connected"
|
| - self.cancelTimeout()
|
| - return self.factory.buildProtocol(addr)
|
| -
|
| - def connectionFailed(self, reason):
|
| - self.cancelTimeout()
|
| - self.transport = None
|
| - self.state = "disconnected"
|
| - self.factory.clientConnectionFailed(self, reason)
|
| - if self.state == "disconnected":
|
| - # factory hasn't called our connect() method
|
| - self.factory.doStop()
|
| - self.factoryStarted = 0
|
| -
|
| - def connectionLost(self, reason):
|
| - self.state = "disconnected"
|
| - self.factory.clientConnectionLost(self, reason)
|
| - if self.state == "disconnected":
|
| - # factory hasn't called our connect() method
|
| - self.factory.doStop()
|
| - self.factoryStarted = 0
|
| -
|
| - def getDestination(self):
|
| - raise NotImplementedError, "implement in subclasses"
|
| -
|
| -
|
| -class BasePort(abstract.FileDescriptor):
|
| - """Basic implementation of a ListeningPort.
|
| -
|
| - Note: This does not actually implement IListeningPort.
|
| - """
|
| -
|
| - addressFamily = None
|
| - socketType = None
|
| -
|
| - def createInternetSocket(self):
|
| - s = socket.socket(self.addressFamily, self.socketType)
|
| - s.setblocking(0)
|
| - if fcntl and hasattr(fcntl, 'FD_CLOEXEC'):
|
| - old = fcntl.fcntl(s.fileno(), fcntl.F_GETFD)
|
| - fcntl.fcntl(s.fileno(), fcntl.F_SETFD, old | fcntl.FD_CLOEXEC)
|
| - return s
|
| -
|
| -
|
| - def doWrite(self):
|
| - """Raises a RuntimeError"""
|
| - raise RuntimeError, "doWrite called on a %s" % reflect.qual(self.__class__)
|
| -
|
| -
|
| -
|
| -class _SignalReactorMixin:
|
| - """
|
| - Private mixin to manage signals: it installs signal handlers at start time,
|
| - and define run method.
|
| -
|
| - It can only be used mixed in with L{ReactorBase}, and has to be defined
|
| - first in the inheritance (so that method resolution order finds
|
| - startRunning first).
|
| - """
|
| -
|
| - def _handleSignals(self):
|
| - """
|
| - Install the signal handlers for the Twisted event loop.
|
| - """
|
| - try:
|
| - import signal
|
| - except ImportError:
|
| - log.msg("Warning: signal module unavailable -- "
|
| - "not installing signal handlers.")
|
| - return
|
| -
|
| - if signal.getsignal(signal.SIGINT) == signal.default_int_handler:
|
| - # only handle if there isn't already a handler, e.g. for Pdb.
|
| - signal.signal(signal.SIGINT, self.sigInt)
|
| - signal.signal(signal.SIGTERM, self.sigTerm)
|
| -
|
| - # Catch Ctrl-Break in windows
|
| - if hasattr(signal, "SIGBREAK"):
|
| - signal.signal(signal.SIGBREAK, self.sigBreak)
|
| -
|
| - if platformType == 'posix':
|
| - signal.signal(signal.SIGCHLD, self._handleSigchld)
|
| -
|
| -
|
| - def _handleSigchld(self, signum, frame, _threadSupport=platform.supportsThreads()):
|
| - """
|
| - Reap all processes on SIGCHLD.
|
| -
|
| - This gets called on SIGCHLD. We do no processing inside a signal
|
| - handler, as the calls we make here could occur between any two
|
| - python bytecode instructions. Deferring processing to the next
|
| - eventloop round prevents us from violating the state constraints
|
| - of arbitrary classes.
|
| - """
|
| - from twisted.internet.process import reapAllProcesses
|
| - if _threadSupport:
|
| - self.callFromThread(reapAllProcesses)
|
| - else:
|
| - self.callLater(0, reapAllProcesses)
|
| -
|
| -
|
| - def startRunning(self, installSignalHandlers=True):
|
| - """
|
| - Forward call to ReactorBase, arrange for signal handlers to be
|
| - installed if asked.
|
| - """
|
| - if installSignalHandlers:
|
| - # Make sure this happens before after-startup events, since the
|
| - # expectation of after-startup is that the reactor is fully
|
| - # initialized. Don't do it right away for historical reasons
|
| - # (perhaps some before-startup triggers don't want there to be a
|
| - # custom SIGCHLD handler so that they can run child processes with
|
| - # some blocking api).
|
| - self.addSystemEventTrigger(
|
| - 'during', 'startup', self._handleSignals)
|
| - ReactorBase.startRunning(self)
|
| -
|
| -
|
| - def run(self, installSignalHandlers=True):
|
| - self.startRunning(installSignalHandlers=installSignalHandlers)
|
| - self.mainLoop()
|
| -
|
| -
|
| - def mainLoop(self):
|
| - while self.running:
|
| - try:
|
| - while self.running:
|
| - # Advance simulation time in delayed event
|
| - # processors.
|
| - self.runUntilCurrent()
|
| - t2 = self.timeout()
|
| - t = self.running and t2
|
| - self.doIteration(t)
|
| - except:
|
| - log.msg("Unexpected error in main loop.")
|
| - log.err()
|
| - else:
|
| - log.msg('Main loop terminated.')
|
| -
|
| -
|
| -
|
| -__all__ = []
|
|
|