Index: third_party/twisted_8_1/twisted/internet/base.py |
diff --git a/third_party/twisted_8_1/twisted/internet/base.py b/third_party/twisted_8_1/twisted/internet/base.py |
deleted file mode 100644 |
index 0d700f8fed259656312159c2f3f2267291ee1f8e..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/internet/base.py |
+++ /dev/null |
@@ -1,1069 +0,0 @@ |
-# -*- test-case-name: twisted.test.test_internet -*- |
-# Copyright (c) 2001-2008 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
- |
-""" |
-Very basic functionality for a Reactor implementation. |
- |
-Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>} |
-""" |
- |
-import socket # needed only for sync-dns |
-from zope.interface import implements, classImplements |
- |
-import sys |
-import warnings |
-import operator |
-from heapq import heappush, heappop, heapify |
- |
-try: |
- import fcntl |
-except ImportError: |
- fcntl = None |
-import traceback |
- |
-from twisted.internet.interfaces import IReactorCore, IReactorTime, IReactorThreads |
-from twisted.internet.interfaces import IResolverSimple, IReactorPluggableResolver |
-from twisted.internet.interfaces import IConnector, IDelayedCall |
-from twisted.internet import main, error, abstract, defer, threads |
-from twisted.python import log, failure, reflect |
-from twisted.python.runtime import seconds as runtimeSeconds, platform, platformType |
-from twisted.internet.defer import Deferred, DeferredList |
-from twisted.persisted import styles |
- |
-# This import is for side-effects! Even if you don't see any code using it |
-# in this module, don't delete it. |
-from twisted.python import threadable |
- |
- |
-class DelayedCall(styles.Ephemeral): |
- |
- implements(IDelayedCall) |
- # enable .debug to record creator call stack, and it will be logged if |
- # an exception occurs while the function is being run |
- debug = False |
- _str = None |
- |
- def __init__(self, time, func, args, kw, cancel, reset, |
- seconds=runtimeSeconds): |
- """ |
- @param time: Seconds from the epoch at which to call C{func}. |
- @param func: The callable to call. |
- @param args: The positional arguments to pass to the callable. |
- @param kw: The keyword arguments to pass to the callable. |
- @param cancel: A callable which will be called with this |
- DelayedCall before cancellation. |
- @param reset: A callable which will be called with this |
- DelayedCall after changing this DelayedCall's scheduled |
- execution time. The callable should adjust any necessary |
- scheduling details to ensure this DelayedCall is invoked |
- at the new appropriate time. |
- @param seconds: If provided, a no-argument callable which will be |
- used to determine the current time any time that information is |
- needed. |
- """ |
- self.time, self.func, self.args, self.kw = time, func, args, kw |
- self.resetter = reset |
- self.canceller = cancel |
- self.seconds = seconds |
- self.cancelled = self.called = 0 |
- self.delayed_time = 0 |
- if self.debug: |
- self.creator = traceback.format_stack()[:-2] |
- |
- def getTime(self): |
- """Return the time at which this call will fire |
- |
- @rtype: C{float} |
- @return: The number of seconds after the epoch at which this call is |
- scheduled to be made. |
- """ |
- return self.time + self.delayed_time |
- |
- def cancel(self): |
- """Unschedule this call |
- |
- @raise AlreadyCancelled: Raised if this call has already been |
- unscheduled. |
- |
- @raise AlreadyCalled: Raised if this call has already been made. |
- """ |
- if self.cancelled: |
- raise error.AlreadyCancelled |
- elif self.called: |
- raise error.AlreadyCalled |
- else: |
- self.canceller(self) |
- self.cancelled = 1 |
- if self.debug: |
- self._str = str(self) |
- del self.func, self.args, self.kw |
- |
- def reset(self, secondsFromNow): |
- """Reschedule this call for a different time |
- |
- @type secondsFromNow: C{float} |
- @param secondsFromNow: The number of seconds from the time of the |
- C{reset} call at which this call will be scheduled. |
- |
- @raise AlreadyCancelled: Raised if this call has been cancelled. |
- @raise AlreadyCalled: Raised if this call has already been made. |
- """ |
- if self.cancelled: |
- raise error.AlreadyCancelled |
- elif self.called: |
- raise error.AlreadyCalled |
- else: |
- newTime = self.seconds() + secondsFromNow |
- if newTime < self.time: |
- self.delayed_time = 0 |
- self.time = newTime |
- self.resetter(self) |
- else: |
- self.delayed_time = newTime - self.time |
- |
- def delay(self, secondsLater): |
- """Reschedule this call for a later time |
- |
- @type secondsLater: C{float} |
- @param secondsLater: The number of seconds after the originally |
- scheduled time for which to reschedule this call. |
- |
- @raise AlreadyCancelled: Raised if this call has been cancelled. |
- @raise AlreadyCalled: Raised if this call has already been made. |
- """ |
- if self.cancelled: |
- raise error.AlreadyCancelled |
- elif self.called: |
- raise error.AlreadyCalled |
- else: |
- self.delayed_time += secondsLater |
- if self.delayed_time < 0: |
- self.activate_delay() |
- self.resetter(self) |
- |
- def activate_delay(self): |
- self.time += self.delayed_time |
- self.delayed_time = 0 |
- |
- def active(self): |
- """Determine whether this call is still pending |
- |
- @rtype: C{bool} |
- @return: True if this call has not yet been made or cancelled, |
- False otherwise. |
- """ |
- return not (self.cancelled or self.called) |
- |
- def __le__(self, other): |
- return self.time <= other.time |
- |
- def __str__(self): |
- if self._str is not None: |
- return self._str |
- if hasattr(self, 'func'): |
- if hasattr(self.func, 'func_name'): |
- func = self.func.func_name |
- if hasattr(self.func, 'im_class'): |
- func = self.func.im_class.__name__ + '.' + func |
- else: |
- func = reflect.safe_repr(self.func) |
- else: |
- func = None |
- |
- now = self.seconds() |
- L = ["<DelayedCall %s [%ss] called=%s cancelled=%s" % ( |
- id(self), self.time - now, self.called, self.cancelled)] |
- if func is not None: |
- L.extend((" ", func, "(")) |
- if self.args: |
- L.append(", ".join([reflect.safe_repr(e) for e in self.args])) |
- if self.kw: |
- L.append(", ") |
- if self.kw: |
- L.append(", ".join(['%s=%s' % (k, reflect.safe_repr(v)) for (k, v) in self.kw.iteritems()])) |
- L.append(")") |
- |
- if self.debug: |
- L.append("\n\ntraceback at creation: \n\n%s" % (' '.join(self.creator))) |
- L.append('>') |
- |
- return "".join(L) |
- |
- |
-class ThreadedResolver: |
- implements(IResolverSimple) |
- |
- def __init__(self, reactor): |
- self.reactor = reactor |
- self._runningQueries = {} |
- |
- def _fail(self, name, err): |
- err = error.DNSLookupError("address %r not found: %s" % (name, err)) |
- return failure.Failure(err) |
- |
- def _cleanup(self, name, lookupDeferred): |
- userDeferred, cancelCall = self._runningQueries[lookupDeferred] |
- del self._runningQueries[lookupDeferred] |
- userDeferred.errback(self._fail(name, "timeout error")) |
- |
- def _checkTimeout(self, result, name, lookupDeferred): |
- try: |
- userDeferred, cancelCall = self._runningQueries[lookupDeferred] |
- except KeyError: |
- pass |
- else: |
- del self._runningQueries[lookupDeferred] |
- cancelCall.cancel() |
- |
- if isinstance(result, failure.Failure): |
- userDeferred.errback(self._fail(name, result.getErrorMessage())) |
- else: |
- userDeferred.callback(result) |
- |
- def getHostByName(self, name, timeout = (1, 3, 11, 45)): |
- if timeout: |
- timeoutDelay = reduce(operator.add, timeout) |
- else: |
- timeoutDelay = 60 |
- userDeferred = defer.Deferred() |
- lookupDeferred = threads.deferToThread(socket.gethostbyname, name) |
- cancelCall = self.reactor.callLater( |
- timeoutDelay, self._cleanup, name, lookupDeferred) |
- self._runningQueries[lookupDeferred] = (userDeferred, cancelCall) |
- lookupDeferred.addBoth(self._checkTimeout, name, lookupDeferred) |
- return userDeferred |
- |
-class BlockingResolver: |
- implements(IResolverSimple) |
- |
- def getHostByName(self, name, timeout = (1, 3, 11, 45)): |
- try: |
- address = socket.gethostbyname(name) |
- except socket.error: |
- msg = "address %r not found" % (name,) |
- err = error.DNSLookupError(msg) |
- return defer.fail(err) |
- else: |
- return defer.succeed(address) |
- |
- |
-class _ThreePhaseEvent(object): |
- """ |
- Collection of callables (with arguments) which can be invoked as a group in |
- a particular order. |
- |
- This provides the underlying implementation for the reactor's system event |
- triggers. An instance of this class tracks triggers for all phases of a |
- single type of event. |
- |
- @ivar before: A list of the before-phase triggers containing three-tuples |
- of a callable, a tuple of positional arguments, and a dict of keyword |
- arguments |
- |
- @ivar finishedBefore: A list of the before-phase triggers which have |
- already been executed. This is only populated in the C{'BEFORE'} state. |
- |
- @ivar during: A list of the during-phase triggers containing three-tuples |
- of a callable, a tuple of positional arguments, and a dict of keyword |
- arguments |
- |
- @ivar after: A list of the after-phase triggers containing three-tuples |
- of a callable, a tuple of positional arguments, and a dict of keyword |
- arguments |
- |
- @ivar state: A string indicating what is currently going on with this |
- object. One of C{'BASE'} (for when nothing in particular is happening; |
- this is the initial value), C{'BEFORE'} (when the before-phase triggers |
- are in the process of being executed). |
- """ |
- def __init__(self): |
- self.before = [] |
- self.during = [] |
- self.after = [] |
- self.state = 'BASE' |
- |
- |
- def addTrigger(self, phase, callable, *args, **kwargs): |
- """ |
- Add a trigger to the indicate phase. |
- |
- @param phase: One of C{'before'}, C{'during'}, or C{'after'}. |
- |
- @param callable: An object to be called when this event is triggered. |
- @param *args: Positional arguments to pass to C{callable}. |
- @param **kwargs: Keyword arguments to pass to C{callable}. |
- |
- @return: An opaque handle which may be passed to L{removeTrigger} to |
- reverse the effects of calling this method. |
- """ |
- if phase not in ('before', 'during', 'after'): |
- raise KeyError("invalid phase") |
- getattr(self, phase).append((callable, args, kwargs)) |
- return phase, callable, args, kwargs |
- |
- |
- def removeTrigger(self, handle): |
- """ |
- Remove a previously added trigger callable. |
- |
- @param handle: An object previously returned by L{addTrigger}. The |
- trigger added by that call will be removed. |
- |
- @raise ValueError: If the trigger associated with C{handle} has already |
- been removed or if C{handle} is not a valid handle. |
- """ |
- return getattr(self, 'removeTrigger_' + self.state)(handle) |
- |
- |
- def removeTrigger_BASE(self, handle): |
- """ |
- Just try to remove the trigger. |
- |
- @see removeTrigger |
- """ |
- try: |
- phase, callable, args, kwargs = handle |
- except (TypeError, ValueError), e: |
- raise ValueError("invalid trigger handle") |
- else: |
- if phase not in ('before', 'during', 'after'): |
- raise KeyError("invalid phase") |
- getattr(self, phase).remove((callable, args, kwargs)) |
- |
- |
- def removeTrigger_BEFORE(self, handle): |
- """ |
- Remove the trigger if it has yet to be executed, otherwise emit a |
- warning that in the future an exception will be raised when removing an |
- already-executed trigger. |
- |
- @see removeTrigger |
- """ |
- phase, callable, args, kwargs = handle |
- if phase != 'before': |
- return self.removeTrigger_BASE(handle) |
- if (callable, args, kwargs) in self.finishedBefore: |
- warnings.warn( |
- "Removing already-fired system event triggers will raise an " |
- "exception in a future version of Twisted.", |
- category=DeprecationWarning, |
- stacklevel=3) |
- else: |
- self.removeTrigger_BASE(handle) |
- |
- |
- def fireEvent(self): |
- """ |
- Call the triggers added to this event. |
- """ |
- self.state = 'BEFORE' |
- self.finishedBefore = [] |
- beforeResults = [] |
- while self.before: |
- callable, args, kwargs = self.before.pop(0) |
- self.finishedBefore.append((callable, args, kwargs)) |
- try: |
- result = callable(*args, **kwargs) |
- except: |
- log.err() |
- else: |
- if isinstance(result, Deferred): |
- beforeResults.append(result) |
- DeferredList(beforeResults).addCallback(self._continueFiring) |
- |
- |
- def _continueFiring(self, ignored): |
- """ |
- Call the during and after phase triggers for this event. |
- """ |
- self.state = 'BASE' |
- self.finishedBefore = [] |
- for phase in self.during, self.after: |
- while phase: |
- callable, args, kwargs = phase.pop(0) |
- try: |
- callable(*args, **kwargs) |
- except: |
- log.err() |
- |
- |
- |
-class ReactorBase(object): |
- """ |
- Default base class for Reactors. |
- |
- @type _stopped: C{bool} |
- @ivar _stopped: A flag which is true between paired calls to C{reactor.run} |
- and C{reactor.stop}. |
- """ |
- |
- implements(IReactorCore, IReactorTime, IReactorPluggableResolver) |
- |
- _stopped = True |
- installed = False |
- usingThreads = False |
- resolver = BlockingResolver() |
- |
- __name__ = "twisted.internet.reactor" |
- |
- def __init__(self): |
- self.threadCallQueue = [] |
- self._eventTriggers = {} |
- self._pendingTimedCalls = [] |
- self._newTimedCalls = [] |
- self._cancellations = 0 |
- self.running = False |
- self.waker = None |
- |
- self.addSystemEventTrigger('during', 'shutdown', self.crash) |
- self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll) |
- |
- if platform.supportsThreads(): |
- self._initThreads() |
- |
- # override in subclasses |
- |
- _lock = None |
- |
- def installWaker(self): |
- raise NotImplementedError() |
- |
- def installResolver(self, resolver): |
- assert IResolverSimple.providedBy(resolver) |
- oldResolver = self.resolver |
- self.resolver = resolver |
- return oldResolver |
- |
- def wakeUp(self): |
- """Wake up the event loop.""" |
- if not threadable.isInIOThread(): |
- if self.waker: |
- self.waker.wakeUp() |
- # if the waker isn't installed, the reactor isn't running, and |
- # therefore doesn't need to be woken up |
- |
- def doIteration(self, delay): |
- """Do one iteration over the readers and writers we know about.""" |
- raise NotImplementedError |
- |
- def addReader(self, reader): |
- raise NotImplementedError |
- |
- def addWriter(self, writer): |
- raise NotImplementedError |
- |
- def removeReader(self, reader): |
- raise NotImplementedError |
- |
- def removeWriter(self, writer): |
- raise NotImplementedError |
- |
- def removeAll(self): |
- raise NotImplementedError |
- |
- |
- def getReaders(self): |
- raise NotImplementedError() |
- |
- |
- def getWriters(self): |
- raise NotImplementedError() |
- |
- |
- def resolve(self, name, timeout = (1, 3, 11, 45)): |
- """Return a Deferred that will resolve a hostname. |
- """ |
- if not name: |
- # XXX - This is *less than* '::', and will screw up IPv6 servers |
- return defer.succeed('0.0.0.0') |
- if abstract.isIPAddress(name): |
- return defer.succeed(name) |
- return self.resolver.getHostByName(name, timeout) |
- |
- # Installation. |
- |
- # IReactorCore |
- |
- def stop(self): |
- """ |
- See twisted.internet.interfaces.IReactorCore.stop. |
- """ |
- if self._stopped: |
- raise error.ReactorNotRunning( |
- "Can't stop reactor that isn't running.") |
- self._stopped = True |
- self.callLater(0, self.fireSystemEvent, "shutdown") |
- |
- def crash(self): |
- """ |
- See twisted.internet.interfaces.IReactorCore.crash. |
- """ |
- self.running = False |
- |
- def sigInt(self, *args): |
- """Handle a SIGINT interrupt. |
- """ |
- log.msg("Received SIGINT, shutting down.") |
- self.callFromThread(self.stop) |
- |
- def sigBreak(self, *args): |
- """Handle a SIGBREAK interrupt. |
- """ |
- log.msg("Received SIGBREAK, shutting down.") |
- self.callFromThread(self.stop) |
- |
- def sigTerm(self, *args): |
- """Handle a SIGTERM interrupt. |
- """ |
- log.msg("Received SIGTERM, shutting down.") |
- self.callFromThread(self.stop) |
- |
- def disconnectAll(self): |
- """Disconnect every reader, and writer in the system. |
- """ |
- selectables = self.removeAll() |
- for reader in selectables: |
- log.callWithLogger(reader, |
- reader.connectionLost, |
- failure.Failure(main.CONNECTION_LOST)) |
- |
- |
- def iterate(self, delay=0): |
- """See twisted.internet.interfaces.IReactorCore.iterate. |
- """ |
- self.runUntilCurrent() |
- self.doIteration(delay) |
- |
- |
- def fireSystemEvent(self, eventType): |
- """See twisted.internet.interfaces.IReactorCore.fireSystemEvent. |
- """ |
- event = self._eventTriggers.get(eventType) |
- if event is not None: |
- event.fireEvent() |
- |
- |
- def addSystemEventTrigger(self, _phase, _eventType, _f, *args, **kw): |
- """See twisted.internet.interfaces.IReactorCore.addSystemEventTrigger. |
- """ |
- assert callable(_f), "%s is not callable" % _f |
- if _eventType not in self._eventTriggers: |
- self._eventTriggers[_eventType] = _ThreePhaseEvent() |
- return (_eventType, self._eventTriggers[_eventType].addTrigger( |
- _phase, _f, *args, **kw)) |
- |
- |
- def removeSystemEventTrigger(self, triggerID): |
- """See twisted.internet.interfaces.IReactorCore.removeSystemEventTrigger. |
- """ |
- eventType, handle = triggerID |
- self._eventTriggers[eventType].removeTrigger(handle) |
- |
- |
- def callWhenRunning(self, _callable, *args, **kw): |
- """See twisted.internet.interfaces.IReactorCore.callWhenRunning. |
- """ |
- if self.running: |
- _callable(*args, **kw) |
- else: |
- return self.addSystemEventTrigger('after', 'startup', |
- _callable, *args, **kw) |
- |
- def startRunning(self): |
- """ |
- Method called when reactor starts: do some initialization and fire |
- startup events. |
- |
- Don't call this directly, call reactor.run() instead: it should take |
- care of calling this. |
- """ |
- if self.running: |
- warnings.warn( |
- "Reactor already running! This behavior is deprecated " |
- "since Twisted 8.0", |
- category=DeprecationWarning, stacklevel=3) |
- self.running = True |
- self._stopped = False |
- threadable.registerAsIOThread() |
- self.fireSystemEvent('startup') |
- |
- # IReactorTime |
- |
- seconds = staticmethod(runtimeSeconds) |
- |
- def callLater(self, _seconds, _f, *args, **kw): |
- """See twisted.internet.interfaces.IReactorTime.callLater. |
- """ |
- assert callable(_f), "%s is not callable" % _f |
- assert sys.maxint >= _seconds >= 0, \ |
- "%s is not greater than or equal to 0 seconds" % (_seconds,) |
- tple = DelayedCall(self.seconds() + _seconds, _f, args, kw, |
- self._cancelCallLater, |
- self._moveCallLaterSooner, |
- seconds=self.seconds) |
- self._newTimedCalls.append(tple) |
- return tple |
- |
- def _moveCallLaterSooner(self, tple): |
- # Linear time find: slow. |
- heap = self._pendingTimedCalls |
- try: |
- pos = heap.index(tple) |
- |
- # Move elt up the heap until it rests at the right place. |
- elt = heap[pos] |
- while pos != 0: |
- parent = (pos-1) // 2 |
- if heap[parent] <= elt: |
- break |
- # move parent down |
- heap[pos] = heap[parent] |
- pos = parent |
- heap[pos] = elt |
- except ValueError: |
- # element was not found in heap - oh well... |
- pass |
- |
- def _cancelCallLater(self, tple): |
- self._cancellations+=1 |
- |
- def cancelCallLater(self, callID): |
- """See twisted.internet.interfaces.IReactorTime.cancelCallLater. |
- """ |
- # DO NOT DELETE THIS - this is documented in Python in a Nutshell, so we |
- # we can't get rid of it for a long time. |
- warnings.warn("reactor.cancelCallLater(callID) is deprecated - use callID.cancel() instead") |
- callID.cancel() |
- |
- def getDelayedCalls(self): |
- """Return all the outstanding delayed calls in the system. |
- They are returned in no particular order. |
- This method is not efficient -- it is really only meant for |
- test cases.""" |
- return [x for x in (self._pendingTimedCalls + self._newTimedCalls) if not x.cancelled] |
- |
- def _insertNewDelayedCalls(self): |
- for call in self._newTimedCalls: |
- if call.cancelled: |
- self._cancellations-=1 |
- else: |
- call.activate_delay() |
- heappush(self._pendingTimedCalls, call) |
- self._newTimedCalls = [] |
- |
- def timeout(self): |
- # insert new delayed calls to make sure to include them in timeout value |
- self._insertNewDelayedCalls() |
- |
- if not self._pendingTimedCalls: |
- return None |
- |
- return max(0, self._pendingTimedCalls[0].time - self.seconds()) |
- |
- |
- def runUntilCurrent(self): |
- """Run all pending timed calls. |
- """ |
- if self.threadCallQueue: |
- # Keep track of how many calls we actually make, as we're |
- # making them, in case another call is added to the queue |
- # while we're in this loop. |
- count = 0 |
- total = len(self.threadCallQueue) |
- for (f, a, kw) in self.threadCallQueue: |
- try: |
- f(*a, **kw) |
- except: |
- log.err() |
- count += 1 |
- if count == total: |
- break |
- del self.threadCallQueue[:count] |
- if self.threadCallQueue: |
- if self.waker: |
- self.waker.wakeUp() |
- |
- # insert new delayed calls now |
- self._insertNewDelayedCalls() |
- |
- now = self.seconds() |
- while self._pendingTimedCalls and (self._pendingTimedCalls[0].time <= now): |
- call = heappop(self._pendingTimedCalls) |
- if call.cancelled: |
- self._cancellations-=1 |
- continue |
- |
- if call.delayed_time > 0: |
- call.activate_delay() |
- heappush(self._pendingTimedCalls, call) |
- continue |
- |
- try: |
- call.called = 1 |
- call.func(*call.args, **call.kw) |
- except: |
- log.deferr() |
- if hasattr(call, "creator"): |
- e = "\n" |
- e += " C: previous exception occurred in " + \ |
- "a DelayedCall created here:\n" |
- e += " C:" |
- e += "".join(call.creator).rstrip().replace("\n","\n C:") |
- e += "\n" |
- log.msg(e) |
- |
- |
- if (self._cancellations > 50 and |
- self._cancellations > len(self._pendingTimedCalls) >> 1): |
- self._cancellations = 0 |
- self._pendingTimedCalls = [x for x in self._pendingTimedCalls |
- if not x.cancelled] |
- heapify(self._pendingTimedCalls) |
- |
- # IReactorProcess |
- |
- def _checkProcessArgs(self, args, env): |
- """ |
- Check for valid arguments and environment to spawnProcess. |
- |
- @return: A two element tuple giving values to use when creating the |
- process. The first element of the tuple is a C{list} of C{str} |
- giving the values for argv of the child process. The second element |
- of the tuple is either C{None} if C{env} was C{None} or a C{dict} |
- mapping C{str} environment keys to C{str} environment values. |
- """ |
- # Any unicode string which Python would successfully implicitly |
- # encode to a byte string would have worked before these explicit |
- # checks were added. Anything which would have failed with a |
- # UnicodeEncodeError during that implicit encoding step would have |
- # raised an exception in the child process and that would have been |
- # a pain in the butt to debug. |
- # |
- # So, we will explicitly attempt the same encoding which Python |
- # would implicitly do later. If it fails, we will report an error |
- # without ever spawning a child process. If it succeeds, we'll save |
- # the result so that Python doesn't need to do it implicitly later. |
- # |
- # For any unicode which we can actually encode, we'll also issue a |
- # deprecation warning, because no one should be passing unicode here |
- # anyway. |
- # |
- # -exarkun |
- defaultEncoding = sys.getdefaultencoding() |
- |
- # Common check function |
- def argChecker(arg): |
- """ |
- Return either a str or None. If the given value is not |
- allowable for some reason, None is returned. Otherwise, a |
- possibly different object which should be used in place of arg |
- is returned. This forces unicode encoding to happen now, rather |
- than implicitly later. |
- """ |
- if isinstance(arg, unicode): |
- try: |
- arg = arg.encode(defaultEncoding) |
- except UnicodeEncodeError: |
- return None |
- warnings.warn( |
- "Argument strings and environment keys/values passed to " |
- "reactor.spawnProcess should be str, not unicode.", |
- category=DeprecationWarning, |
- stacklevel=4) |
- if isinstance(arg, str) and '\0' not in arg: |
- return arg |
- return None |
- |
- # Make a few tests to check input validity |
- if not isinstance(args, (tuple, list)): |
- raise TypeError("Arguments must be a tuple or list") |
- |
- outputArgs = [] |
- for arg in args: |
- arg = argChecker(arg) |
- if arg is None: |
- raise TypeError("Arguments contain a non-string value") |
- else: |
- outputArgs.append(arg) |
- |
- outputEnv = None |
- if env is not None: |
- outputEnv = {} |
- for key, val in env.iteritems(): |
- key = argChecker(key) |
- if key is None: |
- raise TypeError("Environment contains a non-string key") |
- val = argChecker(val) |
- if val is None: |
- raise TypeError("Environment contains a non-string value") |
- outputEnv[key] = val |
- return outputArgs, outputEnv |
- |
- # IReactorThreads |
- if platform.supportsThreads(): |
- threadpool = None |
- # ID of the trigger stopping the threadpool |
- threadpoolShutdownID = None |
- |
- def _initThreads(self): |
- self.usingThreads = True |
- self.resolver = ThreadedResolver(self) |
- self.installWaker() |
- |
- def callFromThread(self, f, *args, **kw): |
- """ |
- See L{twisted.internet.interfaces.IReactorThreads.callFromThread}. |
- """ |
- assert callable(f), "%s is not callable" % (f,) |
- # lists are thread-safe in CPython, but not in Jython |
- # this is probably a bug in Jython, but until fixed this code |
- # won't work in Jython. |
- self.threadCallQueue.append((f, args, kw)) |
- self.wakeUp() |
- |
- def _initThreadPool(self): |
- """ |
- Create the threadpool accessible with callFromThread. |
- """ |
- from twisted.python import threadpool |
- self.threadpool = threadpool.ThreadPool(0, 10, 'twisted.internet.reactor') |
- self.callWhenRunning(self.threadpool.start) |
- self.threadpoolShutdownID = self.addSystemEventTrigger( |
- 'during', 'shutdown', self._stopThreadPool) |
- |
- def _stopThreadPool(self): |
- """ |
- Stop the reactor threadpool. |
- """ |
- self.threadpoolShutdownID = None |
- self.threadpool.stop() |
- self.threadpool = None |
- |
- def callInThread(self, _callable, *args, **kwargs): |
- """ |
- See L{twisted.internet.interfaces.IReactorThreads.callInThread}. |
- """ |
- if self.threadpool is None: |
- self._initThreadPool() |
- self.threadpool.callInThread(_callable, *args, **kwargs) |
- |
- def suggestThreadPoolSize(self, size): |
- """ |
- See L{twisted.internet.interfaces.IReactorThreads.suggestThreadPoolSize}. |
- """ |
- if size == 0 and self.threadpool is None: |
- return |
- if self.threadpool is None: |
- self._initThreadPool() |
- self.threadpool.adjustPoolsize(maxthreads=size) |
- else: |
- # This is for signal handlers. |
- def callFromThread(self, f, *args, **kw): |
- assert callable(f), "%s is not callable" % (f,) |
- # See comment in the other callFromThread implementation. |
- self.threadCallQueue.append((f, args, kw)) |
- |
-if platform.supportsThreads(): |
- classImplements(ReactorBase, IReactorThreads) |
- |
- |
-class BaseConnector(styles.Ephemeral): |
- """Basic implementation of connector. |
- |
- State can be: "connecting", "connected", "disconnected" |
- """ |
- |
- implements(IConnector) |
- |
- timeoutID = None |
- factoryStarted = 0 |
- |
- def __init__(self, factory, timeout, reactor): |
- self.state = "disconnected" |
- self.reactor = reactor |
- self.factory = factory |
- self.timeout = timeout |
- |
- def disconnect(self): |
- """Disconnect whatever our state is.""" |
- if self.state == 'connecting': |
- self.stopConnecting() |
- elif self.state == 'connected': |
- self.transport.loseConnection() |
- |
- def connect(self): |
- """Start connection to remote server.""" |
- if self.state != "disconnected": |
- raise RuntimeError, "can't connect in this state" |
- |
- self.state = "connecting" |
- if not self.factoryStarted: |
- self.factory.doStart() |
- self.factoryStarted = 1 |
- self.transport = transport = self._makeTransport() |
- if self.timeout is not None: |
- self.timeoutID = self.reactor.callLater(self.timeout, transport.failIfNotConnected, error.TimeoutError()) |
- self.factory.startedConnecting(self) |
- |
- def stopConnecting(self): |
- """Stop attempting to connect.""" |
- if self.state != "connecting": |
- raise error.NotConnectingError, "we're not trying to connect" |
- |
- self.state = "disconnected" |
- self.transport.failIfNotConnected(error.UserError()) |
- del self.transport |
- |
- def cancelTimeout(self): |
- if self.timeoutID is not None: |
- try: |
- self.timeoutID.cancel() |
- except ValueError: |
- pass |
- del self.timeoutID |
- |
- def buildProtocol(self, addr): |
- self.state = "connected" |
- self.cancelTimeout() |
- return self.factory.buildProtocol(addr) |
- |
- def connectionFailed(self, reason): |
- self.cancelTimeout() |
- self.transport = None |
- self.state = "disconnected" |
- self.factory.clientConnectionFailed(self, reason) |
- if self.state == "disconnected": |
- # factory hasn't called our connect() method |
- self.factory.doStop() |
- self.factoryStarted = 0 |
- |
- def connectionLost(self, reason): |
- self.state = "disconnected" |
- self.factory.clientConnectionLost(self, reason) |
- if self.state == "disconnected": |
- # factory hasn't called our connect() method |
- self.factory.doStop() |
- self.factoryStarted = 0 |
- |
- def getDestination(self): |
- raise NotImplementedError, "implement in subclasses" |
- |
- |
-class BasePort(abstract.FileDescriptor): |
- """Basic implementation of a ListeningPort. |
- |
- Note: This does not actually implement IListeningPort. |
- """ |
- |
- addressFamily = None |
- socketType = None |
- |
- def createInternetSocket(self): |
- s = socket.socket(self.addressFamily, self.socketType) |
- s.setblocking(0) |
- if fcntl and hasattr(fcntl, 'FD_CLOEXEC'): |
- old = fcntl.fcntl(s.fileno(), fcntl.F_GETFD) |
- fcntl.fcntl(s.fileno(), fcntl.F_SETFD, old | fcntl.FD_CLOEXEC) |
- return s |
- |
- |
- def doWrite(self): |
- """Raises a RuntimeError""" |
- raise RuntimeError, "doWrite called on a %s" % reflect.qual(self.__class__) |
- |
- |
- |
-class _SignalReactorMixin: |
- """ |
- Private mixin to manage signals: it installs signal handlers at start time, |
- and define run method. |
- |
- It can only be used mixed in with L{ReactorBase}, and has to be defined |
- first in the inheritance (so that method resolution order finds |
- startRunning first). |
- """ |
- |
- def _handleSignals(self): |
- """ |
- Install the signal handlers for the Twisted event loop. |
- """ |
- try: |
- import signal |
- except ImportError: |
- log.msg("Warning: signal module unavailable -- " |
- "not installing signal handlers.") |
- return |
- |
- if signal.getsignal(signal.SIGINT) == signal.default_int_handler: |
- # only handle if there isn't already a handler, e.g. for Pdb. |
- signal.signal(signal.SIGINT, self.sigInt) |
- signal.signal(signal.SIGTERM, self.sigTerm) |
- |
- # Catch Ctrl-Break in windows |
- if hasattr(signal, "SIGBREAK"): |
- signal.signal(signal.SIGBREAK, self.sigBreak) |
- |
- if platformType == 'posix': |
- signal.signal(signal.SIGCHLD, self._handleSigchld) |
- |
- |
- def _handleSigchld(self, signum, frame, _threadSupport=platform.supportsThreads()): |
- """ |
- Reap all processes on SIGCHLD. |
- |
- This gets called on SIGCHLD. We do no processing inside a signal |
- handler, as the calls we make here could occur between any two |
- python bytecode instructions. Deferring processing to the next |
- eventloop round prevents us from violating the state constraints |
- of arbitrary classes. |
- """ |
- from twisted.internet.process import reapAllProcesses |
- if _threadSupport: |
- self.callFromThread(reapAllProcesses) |
- else: |
- self.callLater(0, reapAllProcesses) |
- |
- |
- def startRunning(self, installSignalHandlers=True): |
- """ |
- Forward call to ReactorBase, arrange for signal handlers to be |
- installed if asked. |
- """ |
- if installSignalHandlers: |
- # Make sure this happens before after-startup events, since the |
- # expectation of after-startup is that the reactor is fully |
- # initialized. Don't do it right away for historical reasons |
- # (perhaps some before-startup triggers don't want there to be a |
- # custom SIGCHLD handler so that they can run child processes with |
- # some blocking api). |
- self.addSystemEventTrigger( |
- 'during', 'startup', self._handleSignals) |
- ReactorBase.startRunning(self) |
- |
- |
- def run(self, installSignalHandlers=True): |
- self.startRunning(installSignalHandlers=installSignalHandlers) |
- self.mainLoop() |
- |
- |
- def mainLoop(self): |
- while self.running: |
- try: |
- while self.running: |
- # Advance simulation time in delayed event |
- # processors. |
- self.runUntilCurrent() |
- t2 = self.timeout() |
- t = self.running and t2 |
- self.doIteration(t) |
- except: |
- log.msg("Unexpected error in main loop.") |
- log.err() |
- else: |
- log.msg('Main loop terminated.') |
- |
- |
- |
-__all__ = [] |