| Index: third_party/twisted_8_1/twisted/internet/defer.py
|
| diff --git a/third_party/twisted_8_1/twisted/internet/defer.py b/third_party/twisted_8_1/twisted/internet/defer.py
|
| deleted file mode 100644
|
| index 8980a13e5205ad79fc04070f039426024f4ce83b..0000000000000000000000000000000000000000
|
| --- a/third_party/twisted_8_1/twisted/internet/defer.py
|
| +++ /dev/null
|
| @@ -1,1107 +0,0 @@
|
| -# -*- test-case-name: twisted.test.test_defer -*-
|
| -#
|
| -# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
|
| -# See LICENSE for details.
|
| -
|
| -"""
|
| -Support for results that aren't immediately available.
|
| -
|
| -Maintainer: U{Glyph Lefkowitz<mailto:glyph@twistedmatrix.com>}
|
| -"""
|
| -
|
| -from __future__ import nested_scopes, generators
|
| -import traceback
|
| -import warnings
|
| -
|
| -# Twisted imports
|
| -from twisted.python import log, failure, lockfile
|
| -from twisted.python.util import unsignedID, mergeFunctionMetadata
|
| -
|
| -class AlreadyCalledError(Exception):
|
| - pass
|
| -
|
| -class TimeoutError(Exception):
|
| - pass
|
| -
|
| -def logError(err):
|
| - log.err(err)
|
| - return err
|
| -
|
| -def succeed(result):
|
| - """
|
| - Return a Deferred that has already had '.callback(result)' called.
|
| -
|
| - This is useful when you're writing synchronous code to an
|
| - asynchronous interface: i.e., some code is calling you expecting a
|
| - Deferred result, but you don't actually need to do anything
|
| - asynchronous. Just return defer.succeed(theResult).
|
| -
|
| - See L{fail} for a version of this function that uses a failing
|
| - Deferred rather than a successful one.
|
| -
|
| - @param result: The result to give to the Deferred's 'callback'
|
| - method.
|
| -
|
| - @rtype: L{Deferred}
|
| - """
|
| - d = Deferred()
|
| - d.callback(result)
|
| - return d
|
| -
|
| -
|
| -def fail(result=None):
|
| - """
|
| - Return a Deferred that has already had '.errback(result)' called.
|
| -
|
| - See L{succeed}'s docstring for rationale.
|
| -
|
| - @param result: The same argument that L{Deferred.errback} takes.
|
| -
|
| - @raise NoCurrentExceptionError: If C{result} is C{None} but there is no
|
| - current exception state.
|
| -
|
| - @rtype: L{Deferred}
|
| - """
|
| - d = Deferred()
|
| - d.errback(result)
|
| - return d
|
| -
|
| -
|
| -def execute(callable, *args, **kw):
|
| - """Create a deferred from a callable and arguments.
|
| -
|
| - Call the given function with the given arguments. Return a deferred which
|
| - has been fired with its callback as the result of that invocation or its
|
| - errback with a Failure for the exception thrown.
|
| - """
|
| - try:
|
| - result = callable(*args, **kw)
|
| - except:
|
| - return fail()
|
| - else:
|
| - return succeed(result)
|
| -
|
| -def maybeDeferred(f, *args, **kw):
|
| - """Invoke a function that may or may not return a deferred.
|
| -
|
| - Call the given function with the given arguments. If the returned
|
| - object is a C{Deferred}, return it. If the returned object is a C{Failure},
|
| - wrap it with C{fail} and return it. Otherwise, wrap it in C{succeed} and
|
| - return it. If an exception is raised, convert it to a C{Failure}, wrap it
|
| - in C{fail}, and then return it.
|
| -
|
| - @type f: Any callable
|
| - @param f: The callable to invoke
|
| -
|
| - @param args: The arguments to pass to C{f}
|
| - @param kw: The keyword arguments to pass to C{f}
|
| -
|
| - @rtype: C{Deferred}
|
| - @return: The result of the function call, wrapped in a C{Deferred} if
|
| - necessary.
|
| - """
|
| - deferred = None
|
| -
|
| - try:
|
| - result = f(*args, **kw)
|
| - except:
|
| - return fail(failure.Failure())
|
| - else:
|
| - if isinstance(result, Deferred):
|
| - return result
|
| - elif isinstance(result, failure.Failure):
|
| - return fail(result)
|
| - else:
|
| - return succeed(result)
|
| - return deferred
|
| -
|
| -def timeout(deferred):
|
| - deferred.errback(failure.Failure(TimeoutError("Callback timed out")))
|
| -
|
| -def passthru(arg):
|
| - return arg
|
| -
|
| -def setDebugging(on):
|
| - """Enable or disable Deferred debugging.
|
| -
|
| - When debugging is on, the call stacks from creation and invocation are
|
| - recorded, and added to any AlreadyCalledErrors we raise.
|
| - """
|
| - Deferred.debug=bool(on)
|
| -
|
| -def getDebugging():
|
| - """Determine whether Deferred debugging is enabled.
|
| - """
|
| - return Deferred.debug
|
| -
|
| -class Deferred:
|
| - """This is a callback which will be put off until later.
|
| -
|
| - Why do we want this? Well, in cases where a function in a threaded
|
| - program would block until it gets a result, for Twisted it should
|
| - not block. Instead, it should return a Deferred.
|
| -
|
| - This can be implemented for protocols that run over the network by
|
| - writing an asynchronous protocol for twisted.internet. For methods
|
| - that come from outside packages that are not under our control, we use
|
| - threads (see for example L{twisted.enterprise.adbapi}).
|
| -
|
| - For more information about Deferreds, see doc/howto/defer.html or
|
| - U{http://twistedmatrix.com/projects/core/documentation/howto/defer.html}
|
| - """
|
| - called = 0
|
| - paused = 0
|
| - timeoutCall = None
|
| - _debugInfo = None
|
| -
|
| - # Are we currently running a user-installed callback? Meant to prevent
|
| - # recursive running of callbacks when a reentrant call to add a callback is
|
| - # used.
|
| - _runningCallbacks = False
|
| -
|
| - # Keep this class attribute for now, for compatibility with code that
|
| - # sets it directly.
|
| - debug = False
|
| -
|
| - def __init__(self):
|
| - self.callbacks = []
|
| - if self.debug:
|
| - self._debugInfo = DebugInfo()
|
| - self._debugInfo.creator = traceback.format_stack()[:-1]
|
| -
|
| - def addCallbacks(self, callback, errback=None,
|
| - callbackArgs=None, callbackKeywords=None,
|
| - errbackArgs=None, errbackKeywords=None):
|
| - """Add a pair of callbacks (success and error) to this Deferred.
|
| -
|
| - These will be executed when the 'master' callback is run.
|
| - """
|
| - assert callable(callback)
|
| - assert errback == None or callable(errback)
|
| - cbs = ((callback, callbackArgs, callbackKeywords),
|
| - (errback or (passthru), errbackArgs, errbackKeywords))
|
| - self.callbacks.append(cbs)
|
| -
|
| - if self.called:
|
| - self._runCallbacks()
|
| - return self
|
| -
|
| - def addCallback(self, callback, *args, **kw):
|
| - """Convenience method for adding just a callback.
|
| -
|
| - See L{addCallbacks}.
|
| - """
|
| - return self.addCallbacks(callback, callbackArgs=args,
|
| - callbackKeywords=kw)
|
| -
|
| - def addErrback(self, errback, *args, **kw):
|
| - """Convenience method for adding just an errback.
|
| -
|
| - See L{addCallbacks}.
|
| - """
|
| - return self.addCallbacks(passthru, errback,
|
| - errbackArgs=args,
|
| - errbackKeywords=kw)
|
| -
|
| - def addBoth(self, callback, *args, **kw):
|
| - """Convenience method for adding a single callable as both a callback
|
| - and an errback.
|
| -
|
| - See L{addCallbacks}.
|
| - """
|
| - return self.addCallbacks(callback, callback,
|
| - callbackArgs=args, errbackArgs=args,
|
| - callbackKeywords=kw, errbackKeywords=kw)
|
| -
|
| - def chainDeferred(self, d):
|
| - """Chain another Deferred to this Deferred.
|
| -
|
| - This method adds callbacks to this Deferred to call d's callback or
|
| - errback, as appropriate. It is merely a shorthand way of performing
|
| - the following::
|
| -
|
| - self.addCallbacks(d.callback, d.errback)
|
| -
|
| - When you chain a deferred d2 to another deferred d1 with
|
| - d1.chainDeferred(d2), you are making d2 participate in the callback
|
| - chain of d1. Thus any event that fires d1 will also fire d2.
|
| - However, the converse is B{not} true; if d2 is fired d1 will not be
|
| - affected.
|
| - """
|
| - return self.addCallbacks(d.callback, d.errback)
|
| -
|
| - def callback(self, result):
|
| - """Run all success callbacks that have been added to this Deferred.
|
| -
|
| - Each callback will have its result passed as the first
|
| - argument to the next; this way, the callbacks act as a
|
| - 'processing chain'. Also, if the success-callback returns a Failure
|
| - or raises an Exception, processing will continue on the *error*-
|
| - callback chain.
|
| - """
|
| - assert not isinstance(result, Deferred)
|
| - self._startRunCallbacks(result)
|
| -
|
| -
|
| - def errback(self, fail=None):
|
| - """
|
| - Run all error callbacks that have been added to this Deferred.
|
| -
|
| - Each callback will have its result passed as the first
|
| - argument to the next; this way, the callbacks act as a
|
| - 'processing chain'. Also, if the error-callback returns a non-Failure
|
| - or doesn't raise an Exception, processing will continue on the
|
| - *success*-callback chain.
|
| -
|
| - If the argument that's passed to me is not a failure.Failure instance,
|
| - it will be embedded in one. If no argument is passed, a failure.Failure
|
| - instance will be created based on the current traceback stack.
|
| -
|
| - Passing a string as `fail' is deprecated, and will be punished with
|
| - a warning message.
|
| -
|
| - @raise NoCurrentExceptionError: If C{fail} is C{None} but there is
|
| - no current exception state.
|
| - """
|
| - if not isinstance(fail, failure.Failure):
|
| - fail = failure.Failure(fail)
|
| -
|
| - self._startRunCallbacks(fail)
|
| -
|
| -
|
| - def pause(self):
|
| - """Stop processing on a Deferred until L{unpause}() is called.
|
| - """
|
| - self.paused = self.paused + 1
|
| -
|
| -
|
| - def unpause(self):
|
| - """Process all callbacks made since L{pause}() was called.
|
| - """
|
| - self.paused = self.paused - 1
|
| - if self.paused:
|
| - return
|
| - if self.called:
|
| - self._runCallbacks()
|
| -
|
| - def _continue(self, result):
|
| - self.result = result
|
| - self.unpause()
|
| -
|
| - def _startRunCallbacks(self, result):
|
| - if self.called:
|
| - if self.debug:
|
| - if self._debugInfo is None:
|
| - self._debugInfo = DebugInfo()
|
| - extra = "\n" + self._debugInfo._getDebugTracebacks()
|
| - raise AlreadyCalledError(extra)
|
| - raise AlreadyCalledError
|
| - if self.debug:
|
| - if self._debugInfo is None:
|
| - self._debugInfo = DebugInfo()
|
| - self._debugInfo.invoker = traceback.format_stack()[:-2]
|
| - self.called = True
|
| - self.result = result
|
| - if self.timeoutCall:
|
| - try:
|
| - self.timeoutCall.cancel()
|
| - except:
|
| - pass
|
| -
|
| - del self.timeoutCall
|
| - self._runCallbacks()
|
| -
|
| - def _runCallbacks(self):
|
| - if self._runningCallbacks:
|
| - # Don't recursively run callbacks
|
| - return
|
| - if not self.paused:
|
| - while self.callbacks:
|
| - item = self.callbacks.pop(0)
|
| - callback, args, kw = item[
|
| - isinstance(self.result, failure.Failure)]
|
| - args = args or ()
|
| - kw = kw or {}
|
| - try:
|
| - self._runningCallbacks = True
|
| - try:
|
| - self.result = callback(self.result, *args, **kw)
|
| - finally:
|
| - self._runningCallbacks = False
|
| - if isinstance(self.result, Deferred):
|
| - # note: this will cause _runCallbacks to be called
|
| - # recursively if self.result already has a result.
|
| - # This shouldn't cause any problems, since there is no
|
| - # relevant state in this stack frame at this point.
|
| - # The recursive call will continue to process
|
| - # self.callbacks until it is empty, then return here,
|
| - # where there is no more work to be done, so this call
|
| - # will return as well.
|
| - self.pause()
|
| - self.result.addBoth(self._continue)
|
| - break
|
| - except:
|
| - self.result = failure.Failure()
|
| -
|
| - if isinstance(self.result, failure.Failure):
|
| - self.result.cleanFailure()
|
| - if self._debugInfo is None:
|
| - self._debugInfo = DebugInfo()
|
| - self._debugInfo.failResult = self.result
|
| - else:
|
| - if self._debugInfo is not None:
|
| - self._debugInfo.failResult = None
|
| -
|
| - def setTimeout(self, seconds, timeoutFunc=timeout, *args, **kw):
|
| - """Set a timeout function to be triggered if I am not called.
|
| -
|
| - @param seconds: How long to wait (from now) before firing the
|
| - timeoutFunc.
|
| -
|
| - @param timeoutFunc: will receive the Deferred and *args, **kw as its
|
| - arguments. The default timeoutFunc will call the errback with a
|
| - L{TimeoutError}.
|
| - """
|
| - warnings.warn(
|
| - "Deferred.setTimeout is deprecated. Look for timeout "
|
| - "support specific to the API you are using instead.",
|
| - DeprecationWarning, stacklevel=2)
|
| -
|
| - if self.called:
|
| - return
|
| - assert not self.timeoutCall, "Don't call setTimeout twice on the same Deferred."
|
| -
|
| - from twisted.internet import reactor
|
| - self.timeoutCall = reactor.callLater(
|
| - seconds,
|
| - lambda: self.called or timeoutFunc(self, *args, **kw))
|
| - return self.timeoutCall
|
| -
|
| - def __str__(self):
|
| - cname = self.__class__.__name__
|
| - if hasattr(self, 'result'):
|
| - return "<%s at %s current result: %r>" % (cname, hex(unsignedID(self)),
|
| - self.result)
|
| - return "<%s at %s>" % (cname, hex(unsignedID(self)))
|
| - __repr__ = __str__
|
| -
|
| -class DebugInfo:
|
| - """Deferred debug helper"""
|
| - failResult = None
|
| -
|
| - def _getDebugTracebacks(self):
|
| - info = ''
|
| - if hasattr(self, "creator"):
|
| - info += " C: Deferred was created:\n C:"
|
| - info += "".join(self.creator).rstrip().replace("\n","\n C:")
|
| - info += "\n"
|
| - if hasattr(self, "invoker"):
|
| - info += " I: First Invoker was:\n I:"
|
| - info += "".join(self.invoker).rstrip().replace("\n","\n I:")
|
| - info += "\n"
|
| - return info
|
| -
|
| - def __del__(self):
|
| - """Print tracebacks and die.
|
| -
|
| - If the *last* (and I do mean *last*) callback leaves me in an error
|
| - state, print a traceback (if said errback is a Failure).
|
| - """
|
| - if self.failResult is not None:
|
| - log.msg("Unhandled error in Deferred:", isError=True)
|
| - debugInfo = self._getDebugTracebacks()
|
| - if debugInfo != '':
|
| - log.msg("(debug: " + debugInfo + ")", isError=True)
|
| - log.err(self.failResult)
|
| -
|
| -class FirstError(Exception):
|
| - """First error to occur in a DeferredList if fireOnOneErrback is set.
|
| -
|
| - @ivar subFailure: the L{Failure} that occurred.
|
| - @ivar index: the index of the Deferred in the DeferredList where it
|
| - happened.
|
| - """
|
| - def __init__(self, failure, index):
|
| - self.subFailure = failure
|
| - self.index = index
|
| -
|
| - def __repr__(self):
|
| - return 'FirstError(%r, %d)' % (self.subFailure, self.index)
|
| -
|
| - def __str__(self):
|
| - return repr(self)
|
| -
|
| - def __getitem__(self, index):
|
| - warnings.warn("FirstError.__getitem__ is deprecated. "
|
| - "Use attributes instead.",
|
| - category=DeprecationWarning, stacklevel=2)
|
| - return [self.subFailure, self.index][index]
|
| -
|
| - def __getslice__(self, start, stop):
|
| - warnings.warn("FirstError.__getslice__ is deprecated. "
|
| - "Use attributes instead.",
|
| - category=DeprecationWarning, stacklevel=2)
|
| - return [self.subFailure, self.index][start:stop]
|
| -
|
| - def __eq__(self, other):
|
| - if isinstance(other, tuple):
|
| - return tuple(self) == other
|
| - elif isinstance(other, FirstError):
|
| - return (self.subFailure == other.subFailure and
|
| - self.index == other.index)
|
| - return False
|
| -
|
| -class DeferredList(Deferred):
|
| - """I combine a group of deferreds into one callback.
|
| -
|
| - I track a list of L{Deferred}s for their callbacks, and make a single
|
| - callback when they have all completed, a list of (success, result)
|
| - tuples, 'success' being a boolean.
|
| -
|
| - Note that you can still use a L{Deferred} after putting it in a
|
| - DeferredList. For example, you can suppress 'Unhandled error in Deferred'
|
| - messages by adding errbacks to the Deferreds *after* putting them in the
|
| - DeferredList, as a DeferredList won't swallow the errors. (Although a more
|
| - convenient way to do this is simply to set the consumeErrors flag)
|
| - """
|
| -
|
| - fireOnOneCallback = 0
|
| - fireOnOneErrback = 0
|
| -
|
| - def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0,
|
| - consumeErrors=0):
|
| - """Initialize a DeferredList.
|
| -
|
| - @type deferredList: C{list} of L{Deferred}s
|
| - @param deferredList: The list of deferreds to track.
|
| - @param fireOnOneCallback: (keyword param) a flag indicating that
|
| - only one callback needs to be fired for me to call
|
| - my callback
|
| - @param fireOnOneErrback: (keyword param) a flag indicating that
|
| - only one errback needs to be fired for me to call
|
| - my errback
|
| - @param consumeErrors: (keyword param) a flag indicating that any errors
|
| - raised in the original deferreds should be
|
| - consumed by this DeferredList. This is useful to
|
| - prevent spurious warnings being logged.
|
| - """
|
| - self.resultList = [None] * len(deferredList)
|
| - Deferred.__init__(self)
|
| - if len(deferredList) == 0 and not fireOnOneCallback:
|
| - self.callback(self.resultList)
|
| -
|
| - # These flags need to be set *before* attaching callbacks to the
|
| - # deferreds, because the callbacks use these flags, and will run
|
| - # synchronously if any of the deferreds are already fired.
|
| - self.fireOnOneCallback = fireOnOneCallback
|
| - self.fireOnOneErrback = fireOnOneErrback
|
| - self.consumeErrors = consumeErrors
|
| - self.finishedCount = 0
|
| -
|
| - index = 0
|
| - for deferred in deferredList:
|
| - deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
|
| - callbackArgs=(index,SUCCESS),
|
| - errbackArgs=(index,FAILURE))
|
| - index = index + 1
|
| -
|
| - def _cbDeferred(self, result, index, succeeded):
|
| - """(internal) Callback for when one of my deferreds fires.
|
| - """
|
| - self.resultList[index] = (succeeded, result)
|
| -
|
| - self.finishedCount += 1
|
| - if not self.called:
|
| - if succeeded == SUCCESS and self.fireOnOneCallback:
|
| - self.callback((result, index))
|
| - elif succeeded == FAILURE and self.fireOnOneErrback:
|
| - self.errback(failure.Failure(FirstError(result, index)))
|
| - elif self.finishedCount == len(self.resultList):
|
| - self.callback(self.resultList)
|
| -
|
| - if succeeded == FAILURE and self.consumeErrors:
|
| - result = None
|
| -
|
| - return result
|
| -
|
| -
|
| -def _parseDListResult(l, fireOnOneErrback=0):
|
| - if __debug__:
|
| - for success, value in l:
|
| - assert success
|
| - return [x[1] for x in l]
|
| -
|
| -def gatherResults(deferredList):
|
| - """Returns list with result of given Deferreds.
|
| -
|
| - This builds on C{DeferredList} but is useful since you don't
|
| - need to parse the result for success/failure.
|
| -
|
| - @type deferredList: C{list} of L{Deferred}s
|
| - """
|
| - d = DeferredList(deferredList, fireOnOneErrback=1)
|
| - d.addCallback(_parseDListResult)
|
| - return d
|
| -
|
| -# Constants for use with DeferredList
|
| -
|
| -SUCCESS = True
|
| -FAILURE = False
|
| -
|
| -
|
| -
|
| -## deferredGenerator
|
| -
|
| -class waitForDeferred:
|
| - """
|
| - See L{deferredGenerator}.
|
| - """
|
| -
|
| - def __init__(self, d):
|
| - if not isinstance(d, Deferred):
|
| - raise TypeError("You must give waitForDeferred a Deferred. You gave it %r." % (d,))
|
| - self.d = d
|
| -
|
| -
|
| - def getResult(self):
|
| - if isinstance(self.result, failure.Failure):
|
| - self.result.raiseException()
|
| - return self.result
|
| -
|
| -
|
| -
|
| -def _deferGenerator(g, deferred):
|
| - """
|
| - See L{deferredGenerator}.
|
| - """
|
| - result = None
|
| -
|
| - # This function is complicated by the need to prevent unbounded recursion
|
| - # arising from repeatedly yielding immediately ready deferreds. This while
|
| - # loop and the waiting variable solve that by manually unfolding the
|
| - # recursion.
|
| -
|
| - waiting = [True, # defgen is waiting for result?
|
| - None] # result
|
| -
|
| - while 1:
|
| - try:
|
| - result = g.next()
|
| - except StopIteration:
|
| - deferred.callback(result)
|
| - return deferred
|
| - except:
|
| - deferred.errback()
|
| - return deferred
|
| -
|
| - # Deferred.callback(Deferred) raises an error; we catch this case
|
| - # early here and give a nicer error message to the user in case
|
| - # they yield a Deferred.
|
| - if isinstance(result, Deferred):
|
| - return fail(TypeError("Yield waitForDeferred(d), not d!"))
|
| -
|
| - if isinstance(result, waitForDeferred):
|
| - # a waitForDeferred was yielded, get the result.
|
| - # Pass result in so it don't get changed going around the loop
|
| - # This isn't a problem for waiting, as it's only reused if
|
| - # gotResult has already been executed.
|
| - def gotResult(r, result=result):
|
| - result.result = r
|
| - if waiting[0]:
|
| - waiting[0] = False
|
| - waiting[1] = r
|
| - else:
|
| - _deferGenerator(g, deferred)
|
| - result.d.addBoth(gotResult)
|
| - if waiting[0]:
|
| - # Haven't called back yet, set flag so that we get reinvoked
|
| - # and return from the loop
|
| - waiting[0] = False
|
| - return deferred
|
| - # Reset waiting to initial values for next loop
|
| - waiting[0] = True
|
| - waiting[1] = None
|
| -
|
| - result = None
|
| -
|
| -
|
| -
|
| -def deferredGenerator(f):
|
| - """
|
| - Maintainer: U{Christopher Armstrong<mailto:radix@twistedmatrix.com>}
|
| -
|
| - deferredGenerator and waitForDeferred help you write Deferred-using code
|
| - that looks like a regular sequential function. If your code has a minimum
|
| - requirement of Python 2.5, consider the use of L{inlineCallbacks} instead,
|
| - which can accomplish the same thing in a more concise manner.
|
| -
|
| - There are two important functions involved: waitForDeferred, and
|
| - deferredGenerator. They are used together, like this::
|
| -
|
| - def thingummy():
|
| - thing = waitForDeferred(makeSomeRequestResultingInDeferred())
|
| - yield thing
|
| - thing = thing.getResult()
|
| - print thing #the result! hoorj!
|
| - thingummy = deferredGenerator(thingummy)
|
| -
|
| - waitForDeferred returns something that you should immediately yield; when
|
| - your generator is resumed, calling thing.getResult() will either give you
|
| - the result of the Deferred if it was a success, or raise an exception if it
|
| - was a failure. Calling C{getResult} is B{absolutely mandatory}. If you do
|
| - not call it, I{your program will not work}.
|
| -
|
| - deferredGenerator takes one of these waitForDeferred-using generator
|
| - functions and converts it into a function that returns a Deferred. The
|
| - result of the Deferred will be the last value that your generator yielded
|
| - unless the last value is a waitForDeferred instance, in which case the
|
| - result will be C{None}. If the function raises an unhandled exception, the
|
| - Deferred will errback instead. Remember that 'return result' won't work;
|
| - use 'yield result; return' in place of that.
|
| -
|
| - Note that not yielding anything from your generator will make the Deferred
|
| - result in None. Yielding a Deferred from your generator is also an error
|
| - condition; always yield waitForDeferred(d) instead.
|
| -
|
| - The Deferred returned from your deferred generator may also errback if your
|
| - generator raised an exception. For example::
|
| -
|
| - def thingummy():
|
| - thing = waitForDeferred(makeSomeRequestResultingInDeferred())
|
| - yield thing
|
| - thing = thing.getResult()
|
| - if thing == 'I love Twisted':
|
| - # will become the result of the Deferred
|
| - yield 'TWISTED IS GREAT!'
|
| - return
|
| - else:
|
| - # will trigger an errback
|
| - raise Exception('DESTROY ALL LIFE')
|
| - thingummy = deferredGenerator(thingummy)
|
| -
|
| - Put succinctly, these functions connect deferred-using code with this 'fake
|
| - blocking' style in both directions: waitForDeferred converts from a
|
| - Deferred to the 'blocking' style, and deferredGenerator converts from the
|
| - 'blocking' style to a Deferred.
|
| - """
|
| - def unwindGenerator(*args, **kwargs):
|
| - return _deferGenerator(f(*args, **kwargs), Deferred())
|
| - return mergeFunctionMetadata(f, unwindGenerator)
|
| -
|
| -
|
| -## inlineCallbacks
|
| -
|
| -# BaseException is only in Py 2.5.
|
| -try:
|
| - BaseException
|
| -except NameError:
|
| - BaseException=Exception
|
| -
|
| -class _DefGen_Return(BaseException):
|
| - def __init__(self, value):
|
| - self.value = value
|
| -
|
| -def returnValue(val):
|
| - """
|
| - Return val from a L{inlineCallbacks} generator.
|
| -
|
| - Note: this is currently implemented by raising an exception
|
| - derived from BaseException. You might want to change any
|
| - 'except:' clauses to an 'except Exception:' clause so as not to
|
| - catch this exception.
|
| -
|
| - Also: while this function currently will work when called from
|
| - within arbitrary functions called from within the generator, do
|
| - not rely upon this behavior.
|
| - """
|
| - raise _DefGen_Return(val)
|
| -
|
| -def _inlineCallbacks(result, g, deferred):
|
| - """
|
| - See L{inlineCallbacks}.
|
| - """
|
| - # This function is complicated by the need to prevent unbounded recursion
|
| - # arising from repeatedly yielding immediately ready deferreds. This while
|
| - # loop and the waiting variable solve that by manually unfolding the
|
| - # recursion.
|
| -
|
| - waiting = [True, # waiting for result?
|
| - None] # result
|
| -
|
| - while 1:
|
| - try:
|
| - # Send the last result back as the result of the yield expression.
|
| - if isinstance(result, failure.Failure):
|
| - result = result.throwExceptionIntoGenerator(g)
|
| - else:
|
| - result = g.send(result)
|
| - except StopIteration:
|
| - # fell off the end, or "return" statement
|
| - deferred.callback(None)
|
| - return deferred
|
| - except _DefGen_Return, e:
|
| - # returnValue call
|
| - deferred.callback(e.value)
|
| - return deferred
|
| - except:
|
| - deferred.errback()
|
| - return deferred
|
| -
|
| - if isinstance(result, Deferred):
|
| - # a deferred was yielded, get the result.
|
| - def gotResult(r):
|
| - if waiting[0]:
|
| - waiting[0] = False
|
| - waiting[1] = r
|
| - else:
|
| - _inlineCallbacks(r, g, deferred)
|
| -
|
| - result.addBoth(gotResult)
|
| - if waiting[0]:
|
| - # Haven't called back yet, set flag so that we get reinvoked
|
| - # and return from the loop
|
| - waiting[0] = False
|
| - return deferred
|
| -
|
| - result = waiting[1]
|
| - # Reset waiting to initial values for next loop. gotResult uses
|
| - # waiting, but this isn't a problem because gotResult is only
|
| - # executed once, and if it hasn't been executed yet, the return
|
| - # branch above would have been taken.
|
| -
|
| -
|
| - waiting[0] = True
|
| - waiting[1] = None
|
| -
|
| -
|
| - return deferred
|
| -
|
| -def inlineCallbacks(f):
|
| - """
|
| - Maintainer: U{Christopher Armstrong<mailto:radix@twistedmatrix.com>}
|
| -
|
| - WARNING: this function will not work in Python 2.4 and earlier!
|
| -
|
| - inlineCallbacks helps you write Deferred-using code that looks like a
|
| - regular sequential function. This function uses features of Python 2.5
|
| - generators. If you need to be compatible with Python 2.4 or before, use
|
| - the L{deferredGenerator} function instead, which accomplishes the same
|
| - thing, but with somewhat more boilerplate. For example::
|
| -
|
| - def thingummy():
|
| - thing = yield makeSomeRequestResultingInDeferred()
|
| - print thing #the result! hoorj!
|
| - thingummy = inlineCallbacks(thingummy)
|
| -
|
| - When you call anything that results in a Deferred, you can simply yield it;
|
| - your generator will automatically be resumed when the Deferred's result is
|
| - available. The generator will be sent the result of the Deferred with the
|
| - 'send' method on generators, or if the result was a failure, 'throw'.
|
| -
|
| - Your inlineCallbacks-enabled generator will return a Deferred object, which
|
| - will result in the return value of the generator (or will fail with a
|
| - failure object if your generator raises an unhandled exception). Note that
|
| - you can't use 'return result' to return a value; use 'returnValue(result)'
|
| - instead. Falling off the end of the generator, or simply using 'return'
|
| - will cause the Deferred to have a result of None.
|
| -
|
| - The Deferred returned from your deferred generator may errback if your
|
| - generator raised an exception::
|
| -
|
| - def thingummy():
|
| - thing = yield makeSomeRequestResultingInDeferred()
|
| - if thing == 'I love Twisted':
|
| - # will become the result of the Deferred
|
| - returnValue('TWISTED IS GREAT!')
|
| - else:
|
| - # will trigger an errback
|
| - raise Exception('DESTROY ALL LIFE')
|
| - thingummy = inlineCallbacks(thingummy)
|
| - """
|
| - def unwindGenerator(*args, **kwargs):
|
| - return _inlineCallbacks(None, f(*args, **kwargs), Deferred())
|
| - return mergeFunctionMetadata(f, unwindGenerator)
|
| -
|
| -
|
| -## DeferredLock/DeferredQueue
|
| -
|
| -class _ConcurrencyPrimitive(object):
|
| - def __init__(self):
|
| - self.waiting = []
|
| -
|
| - def _releaseAndReturn(self, r):
|
| - self.release()
|
| - return r
|
| -
|
| - def run(*args, **kwargs):
|
| - """Acquire, run, release.
|
| -
|
| - This function takes a callable as its first argument and any
|
| - number of other positional and keyword arguments. When the
|
| - lock or semaphore is acquired, the callable will be invoked
|
| - with those arguments.
|
| -
|
| - The callable may return a Deferred; if it does, the lock or
|
| - semaphore won't be released until that Deferred fires.
|
| -
|
| - @return: Deferred of function result.
|
| - """
|
| - if len(args) < 2:
|
| - if not args:
|
| - raise TypeError("run() takes at least 2 arguments, none given.")
|
| - raise TypeError("%s.run() takes at least 2 arguments, 1 given" % (
|
| - args[0].__class__.__name__,))
|
| - self, f = args[:2]
|
| - args = args[2:]
|
| -
|
| - def execute(ignoredResult):
|
| - d = maybeDeferred(f, *args, **kwargs)
|
| - d.addBoth(self._releaseAndReturn)
|
| - return d
|
| -
|
| - d = self.acquire()
|
| - d.addCallback(execute)
|
| - return d
|
| -
|
| -
|
| -class DeferredLock(_ConcurrencyPrimitive):
|
| - """
|
| - A lock for event driven systems.
|
| -
|
| - @ivar locked: True when this Lock has been acquired, false at all
|
| - other times. Do not change this value, but it is useful to
|
| - examine for the equivalent of a \"non-blocking\" acquisition.
|
| - """
|
| -
|
| - locked = 0
|
| -
|
| - def acquire(self):
|
| - """Attempt to acquire the lock.
|
| -
|
| - @return: a Deferred which fires on lock acquisition.
|
| - """
|
| - d = Deferred()
|
| - if self.locked:
|
| - self.waiting.append(d)
|
| - else:
|
| - self.locked = 1
|
| - d.callback(self)
|
| - return d
|
| -
|
| - def release(self):
|
| - """Release the lock.
|
| -
|
| - Should be called by whomever did the acquire() when the shared
|
| - resource is free.
|
| - """
|
| - assert self.locked, "Tried to release an unlocked lock"
|
| - self.locked = 0
|
| - if self.waiting:
|
| - # someone is waiting to acquire lock
|
| - self.locked = 1
|
| - d = self.waiting.pop(0)
|
| - d.callback(self)
|
| -
|
| -class DeferredSemaphore(_ConcurrencyPrimitive):
|
| - """
|
| - A semaphore for event driven systems.
|
| - """
|
| -
|
| - def __init__(self, tokens):
|
| - _ConcurrencyPrimitive.__init__(self)
|
| - self.tokens = tokens
|
| - self.limit = tokens
|
| -
|
| - def acquire(self):
|
| - """Attempt to acquire the token.
|
| -
|
| - @return: a Deferred which fires on token acquisition.
|
| - """
|
| - assert self.tokens >= 0, "Internal inconsistency?? tokens should never be negative"
|
| - d = Deferred()
|
| - if not self.tokens:
|
| - self.waiting.append(d)
|
| - else:
|
| - self.tokens = self.tokens - 1
|
| - d.callback(self)
|
| - return d
|
| -
|
| - def release(self):
|
| - """Release the token.
|
| -
|
| - Should be called by whoever did the acquire() when the shared
|
| - resource is free.
|
| - """
|
| - assert self.tokens < self.limit, "Someone released me too many times: too many tokens!"
|
| - self.tokens = self.tokens + 1
|
| - if self.waiting:
|
| - # someone is waiting to acquire token
|
| - self.tokens = self.tokens - 1
|
| - d = self.waiting.pop(0)
|
| - d.callback(self)
|
| -
|
| -class QueueOverflow(Exception):
|
| - pass
|
| -
|
| -class QueueUnderflow(Exception):
|
| - pass
|
| -
|
| -
|
| -class DeferredQueue(object):
|
| - """
|
| - An event driven queue.
|
| -
|
| - Objects may be added as usual to this queue. When an attempt is
|
| - made to retrieve an object when the queue is empty, a Deferred is
|
| - returned which will fire when an object becomes available.
|
| -
|
| - @ivar size: The maximum number of objects to allow into the queue
|
| - at a time. When an attempt to add a new object would exceed this
|
| - limit, QueueOverflow is raised synchronously. None for no limit.
|
| -
|
| - @ivar backlog: The maximum number of Deferred gets to allow at
|
| - one time. When an attempt is made to get an object which would
|
| - exceed this limit, QueueUnderflow is raised synchronously. None
|
| - for no limit.
|
| - """
|
| -
|
| - def __init__(self, size=None, backlog=None):
|
| - self.waiting = []
|
| - self.pending = []
|
| - self.size = size
|
| - self.backlog = backlog
|
| -
|
| - def put(self, obj):
|
| - """Add an object to this queue.
|
| -
|
| - @raise QueueOverflow: Too many objects are in this queue.
|
| - """
|
| - if self.waiting:
|
| - self.waiting.pop(0).callback(obj)
|
| - elif self.size is None or len(self.pending) < self.size:
|
| - self.pending.append(obj)
|
| - else:
|
| - raise QueueOverflow()
|
| -
|
| - def get(self):
|
| - """Attempt to retrieve and remove an object from the queue.
|
| -
|
| - @return: a Deferred which fires with the next object available in the queue.
|
| -
|
| - @raise QueueUnderflow: Too many (more than C{backlog})
|
| - Deferreds are already waiting for an object from this queue.
|
| - """
|
| - if self.pending:
|
| - return succeed(self.pending.pop(0))
|
| - elif self.backlog is None or len(self.waiting) < self.backlog:
|
| - d = Deferred()
|
| - self.waiting.append(d)
|
| - return d
|
| - else:
|
| - raise QueueUnderflow()
|
| -
|
| -
|
| -class AlreadyTryingToLockError(Exception):
|
| - """
|
| - Raised when DeferredFilesystemLock.deferUntilLocked is called twice on a
|
| - single DeferredFilesystemLock.
|
| - """
|
| -
|
| -
|
| -class DeferredFilesystemLock(lockfile.FilesystemLock):
|
| - """
|
| - A FilesystemLock that allows for a deferred to be fired when the lock is
|
| - acquired.
|
| -
|
| - @ivar _scheduler: The object in charge of scheduling retries. In this
|
| - implementation this is parameterized for testing.
|
| -
|
| - @ivar _interval: The retry interval for an L{IReactorTime} based scheduler.
|
| -
|
| - @ivar _tryLockCall: A L{DelayedCall} based on _interval that will managex
|
| - the next retry for aquiring the lock.
|
| -
|
| - @ivar _timeoutCall: A L{DelayedCall} based on deferUntilLocked's timeout
|
| - argument. This is in charge of timing out our attempt to acquire the
|
| - lock.
|
| - """
|
| - _interval = 1
|
| - _tryLockCall = None
|
| - _timeoutCall = None
|
| -
|
| - def __init__(self, name, scheduler=None):
|
| - """
|
| - @param name: The name of the lock to acquire
|
| - @param scheduler: An object which provides L{IReactorTime}
|
| - """
|
| - lockfile.FilesystemLock.__init__(self, name)
|
| -
|
| - if scheduler is None:
|
| - from twisted.internet import reactor
|
| - scheduler = reactor
|
| -
|
| - self._scheduler = scheduler
|
| -
|
| - def deferUntilLocked(self, timeout=None):
|
| - """
|
| - Wait until we acquire this lock. This method is not safe for
|
| - concurrent use.
|
| -
|
| - @type timeout: C{float} or C{int}
|
| - @param timeout: the number of seconds after which to time out if the
|
| - lock has not been acquired.
|
| -
|
| - @return: a deferred which will callback when the lock is acquired, or
|
| - errback with a L{TimeoutError} after timing out or an
|
| - L{AlreadyTryingToLockError} if the L{deferUntilLocked} has already
|
| - been called and not successfully locked the file.
|
| - """
|
| - if self._tryLockCall is not None:
|
| - return fail(
|
| - AlreadyTryingToLockError(
|
| - "deferUntilLocked isn't safe for concurrent use."))
|
| -
|
| - d = Deferred()
|
| -
|
| - def _cancelLock():
|
| - self._tryLockCall.cancel()
|
| - self._tryLockCall = None
|
| - self._timeoutCall = None
|
| -
|
| - if self.lock():
|
| - d.callback(None)
|
| - else:
|
| - d.errback(failure.Failure(
|
| - TimeoutError("Timed out aquiring lock: %s after %fs" % (
|
| - self.name,
|
| - timeout))))
|
| -
|
| - def _tryLock():
|
| - if self.lock():
|
| - if self._timeoutCall is not None:
|
| - self._timeoutCall.cancel()
|
| - self._timeoutCall = None
|
| -
|
| - self._tryLockCall = None
|
| -
|
| - d.callback(None)
|
| - else:
|
| - if timeout is not None and self._timeoutCall is None:
|
| - self._timeoutCall = self._scheduler.callLater(
|
| - timeout, _cancelLock)
|
| -
|
| - self._tryLockCall = self._scheduler.callLater(
|
| - self._interval, _tryLock)
|
| -
|
| - _tryLock()
|
| -
|
| - return d
|
| -
|
| -
|
| -__all__ = ["Deferred", "DeferredList", "succeed", "fail", "FAILURE", "SUCCESS",
|
| - "AlreadyCalledError", "TimeoutError", "gatherResults",
|
| - "maybeDeferred",
|
| - "waitForDeferred", "deferredGenerator", "inlineCallbacks",
|
| - "DeferredLock", "DeferredSemaphore", "DeferredQueue",
|
| - "DeferredFilesystemLock", "AlreadyTryingToLockError",
|
| - ]
|
|
|