| OLD | NEW |
| (Empty) |
| 1 # -*- test-case-name: twisted.test.test_defer -*- | |
| 2 # | |
| 3 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
| 4 # See LICENSE for details. | |
| 5 | |
| 6 """ | |
| 7 Support for results that aren't immediately available. | |
| 8 | |
| 9 Maintainer: U{Glyph Lefkowitz<mailto:glyph@twistedmatrix.com>} | |
| 10 """ | |
| 11 | |
| 12 from __future__ import nested_scopes, generators | |
| 13 import traceback | |
| 14 import warnings | |
| 15 | |
| 16 # Twisted imports | |
| 17 from twisted.python import log, failure, lockfile | |
| 18 from twisted.python.util import unsignedID, mergeFunctionMetadata | |
| 19 | |
| 20 class AlreadyCalledError(Exception): | |
| 21 pass | |
| 22 | |
| 23 class TimeoutError(Exception): | |
| 24 pass | |
| 25 | |
| 26 def logError(err): | |
| 27 log.err(err) | |
| 28 return err | |
| 29 | |
| 30 def succeed(result): | |
| 31 """ | |
| 32 Return a Deferred that has already had '.callback(result)' called. | |
| 33 | |
| 34 This is useful when you're writing synchronous code to an | |
| 35 asynchronous interface: i.e., some code is calling you expecting a | |
| 36 Deferred result, but you don't actually need to do anything | |
| 37 asynchronous. Just return defer.succeed(theResult). | |
| 38 | |
| 39 See L{fail} for a version of this function that uses a failing | |
| 40 Deferred rather than a successful one. | |
| 41 | |
| 42 @param result: The result to give to the Deferred's 'callback' | |
| 43 method. | |
| 44 | |
| 45 @rtype: L{Deferred} | |
| 46 """ | |
| 47 d = Deferred() | |
| 48 d.callback(result) | |
| 49 return d | |
| 50 | |
| 51 | |
| 52 def fail(result=None): | |
| 53 """ | |
| 54 Return a Deferred that has already had '.errback(result)' called. | |
| 55 | |
| 56 See L{succeed}'s docstring for rationale. | |
| 57 | |
| 58 @param result: The same argument that L{Deferred.errback} takes. | |
| 59 | |
| 60 @raise NoCurrentExceptionError: If C{result} is C{None} but there is no | |
| 61 current exception state. | |
| 62 | |
| 63 @rtype: L{Deferred} | |
| 64 """ | |
| 65 d = Deferred() | |
| 66 d.errback(result) | |
| 67 return d | |
| 68 | |
| 69 | |
| 70 def execute(callable, *args, **kw): | |
| 71 """Create a deferred from a callable and arguments. | |
| 72 | |
| 73 Call the given function with the given arguments. Return a deferred which | |
| 74 has been fired with its callback as the result of that invocation or its | |
| 75 errback with a Failure for the exception thrown. | |
| 76 """ | |
| 77 try: | |
| 78 result = callable(*args, **kw) | |
| 79 except: | |
| 80 return fail() | |
| 81 else: | |
| 82 return succeed(result) | |
| 83 | |
| 84 def maybeDeferred(f, *args, **kw): | |
| 85 """Invoke a function that may or may not return a deferred. | |
| 86 | |
| 87 Call the given function with the given arguments. If the returned | |
| 88 object is a C{Deferred}, return it. If the returned object is a C{Failure}, | |
| 89 wrap it with C{fail} and return it. Otherwise, wrap it in C{succeed} and | |
| 90 return it. If an exception is raised, convert it to a C{Failure}, wrap it | |
| 91 in C{fail}, and then return it. | |
| 92 | |
| 93 @type f: Any callable | |
| 94 @param f: The callable to invoke | |
| 95 | |
| 96 @param args: The arguments to pass to C{f} | |
| 97 @param kw: The keyword arguments to pass to C{f} | |
| 98 | |
| 99 @rtype: C{Deferred} | |
| 100 @return: The result of the function call, wrapped in a C{Deferred} if | |
| 101 necessary. | |
| 102 """ | |
| 103 deferred = None | |
| 104 | |
| 105 try: | |
| 106 result = f(*args, **kw) | |
| 107 except: | |
| 108 return fail(failure.Failure()) | |
| 109 else: | |
| 110 if isinstance(result, Deferred): | |
| 111 return result | |
| 112 elif isinstance(result, failure.Failure): | |
| 113 return fail(result) | |
| 114 else: | |
| 115 return succeed(result) | |
| 116 return deferred | |
| 117 | |
| 118 def timeout(deferred): | |
| 119 deferred.errback(failure.Failure(TimeoutError("Callback timed out"))) | |
| 120 | |
| 121 def passthru(arg): | |
| 122 return arg | |
| 123 | |
| 124 def setDebugging(on): | |
| 125 """Enable or disable Deferred debugging. | |
| 126 | |
| 127 When debugging is on, the call stacks from creation and invocation are | |
| 128 recorded, and added to any AlreadyCalledErrors we raise. | |
| 129 """ | |
| 130 Deferred.debug=bool(on) | |
| 131 | |
| 132 def getDebugging(): | |
| 133 """Determine whether Deferred debugging is enabled. | |
| 134 """ | |
| 135 return Deferred.debug | |
| 136 | |
| 137 class Deferred: | |
| 138 """This is a callback which will be put off until later. | |
| 139 | |
| 140 Why do we want this? Well, in cases where a function in a threaded | |
| 141 program would block until it gets a result, for Twisted it should | |
| 142 not block. Instead, it should return a Deferred. | |
| 143 | |
| 144 This can be implemented for protocols that run over the network by | |
| 145 writing an asynchronous protocol for twisted.internet. For methods | |
| 146 that come from outside packages that are not under our control, we use | |
| 147 threads (see for example L{twisted.enterprise.adbapi}). | |
| 148 | |
| 149 For more information about Deferreds, see doc/howto/defer.html or | |
| 150 U{http://twistedmatrix.com/projects/core/documentation/howto/defer.html} | |
| 151 """ | |
| 152 called = 0 | |
| 153 paused = 0 | |
| 154 timeoutCall = None | |
| 155 _debugInfo = None | |
| 156 | |
| 157 # Are we currently running a user-installed callback? Meant to prevent | |
| 158 # recursive running of callbacks when a reentrant call to add a callback is | |
| 159 # used. | |
| 160 _runningCallbacks = False | |
| 161 | |
| 162 # Keep this class attribute for now, for compatibility with code that | |
| 163 # sets it directly. | |
| 164 debug = False | |
| 165 | |
| 166 def __init__(self): | |
| 167 self.callbacks = [] | |
| 168 if self.debug: | |
| 169 self._debugInfo = DebugInfo() | |
| 170 self._debugInfo.creator = traceback.format_stack()[:-1] | |
| 171 | |
| 172 def addCallbacks(self, callback, errback=None, | |
| 173 callbackArgs=None, callbackKeywords=None, | |
| 174 errbackArgs=None, errbackKeywords=None): | |
| 175 """Add a pair of callbacks (success and error) to this Deferred. | |
| 176 | |
| 177 These will be executed when the 'master' callback is run. | |
| 178 """ | |
| 179 assert callable(callback) | |
| 180 assert errback == None or callable(errback) | |
| 181 cbs = ((callback, callbackArgs, callbackKeywords), | |
| 182 (errback or (passthru), errbackArgs, errbackKeywords)) | |
| 183 self.callbacks.append(cbs) | |
| 184 | |
| 185 if self.called: | |
| 186 self._runCallbacks() | |
| 187 return self | |
| 188 | |
| 189 def addCallback(self, callback, *args, **kw): | |
| 190 """Convenience method for adding just a callback. | |
| 191 | |
| 192 See L{addCallbacks}. | |
| 193 """ | |
| 194 return self.addCallbacks(callback, callbackArgs=args, | |
| 195 callbackKeywords=kw) | |
| 196 | |
| 197 def addErrback(self, errback, *args, **kw): | |
| 198 """Convenience method for adding just an errback. | |
| 199 | |
| 200 See L{addCallbacks}. | |
| 201 """ | |
| 202 return self.addCallbacks(passthru, errback, | |
| 203 errbackArgs=args, | |
| 204 errbackKeywords=kw) | |
| 205 | |
| 206 def addBoth(self, callback, *args, **kw): | |
| 207 """Convenience method for adding a single callable as both a callback | |
| 208 and an errback. | |
| 209 | |
| 210 See L{addCallbacks}. | |
| 211 """ | |
| 212 return self.addCallbacks(callback, callback, | |
| 213 callbackArgs=args, errbackArgs=args, | |
| 214 callbackKeywords=kw, errbackKeywords=kw) | |
| 215 | |
| 216 def chainDeferred(self, d): | |
| 217 """Chain another Deferred to this Deferred. | |
| 218 | |
| 219 This method adds callbacks to this Deferred to call d's callback or | |
| 220 errback, as appropriate. It is merely a shorthand way of performing | |
| 221 the following:: | |
| 222 | |
| 223 self.addCallbacks(d.callback, d.errback) | |
| 224 | |
| 225 When you chain a deferred d2 to another deferred d1 with | |
| 226 d1.chainDeferred(d2), you are making d2 participate in the callback | |
| 227 chain of d1. Thus any event that fires d1 will also fire d2. | |
| 228 However, the converse is B{not} true; if d2 is fired d1 will not be | |
| 229 affected. | |
| 230 """ | |
| 231 return self.addCallbacks(d.callback, d.errback) | |
| 232 | |
| 233 def callback(self, result): | |
| 234 """Run all success callbacks that have been added to this Deferred. | |
| 235 | |
| 236 Each callback will have its result passed as the first | |
| 237 argument to the next; this way, the callbacks act as a | |
| 238 'processing chain'. Also, if the success-callback returns a Failure | |
| 239 or raises an Exception, processing will continue on the *error*- | |
| 240 callback chain. | |
| 241 """ | |
| 242 assert not isinstance(result, Deferred) | |
| 243 self._startRunCallbacks(result) | |
| 244 | |
| 245 | |
| 246 def errback(self, fail=None): | |
| 247 """ | |
| 248 Run all error callbacks that have been added to this Deferred. | |
| 249 | |
| 250 Each callback will have its result passed as the first | |
| 251 argument to the next; this way, the callbacks act as a | |
| 252 'processing chain'. Also, if the error-callback returns a non-Failure | |
| 253 or doesn't raise an Exception, processing will continue on the | |
| 254 *success*-callback chain. | |
| 255 | |
| 256 If the argument that's passed to me is not a failure.Failure instance, | |
| 257 it will be embedded in one. If no argument is passed, a failure.Failure | |
| 258 instance will be created based on the current traceback stack. | |
| 259 | |
| 260 Passing a string as `fail' is deprecated, and will be punished with | |
| 261 a warning message. | |
| 262 | |
| 263 @raise NoCurrentExceptionError: If C{fail} is C{None} but there is | |
| 264 no current exception state. | |
| 265 """ | |
| 266 if not isinstance(fail, failure.Failure): | |
| 267 fail = failure.Failure(fail) | |
| 268 | |
| 269 self._startRunCallbacks(fail) | |
| 270 | |
| 271 | |
| 272 def pause(self): | |
| 273 """Stop processing on a Deferred until L{unpause}() is called. | |
| 274 """ | |
| 275 self.paused = self.paused + 1 | |
| 276 | |
| 277 | |
| 278 def unpause(self): | |
| 279 """Process all callbacks made since L{pause}() was called. | |
| 280 """ | |
| 281 self.paused = self.paused - 1 | |
| 282 if self.paused: | |
| 283 return | |
| 284 if self.called: | |
| 285 self._runCallbacks() | |
| 286 | |
| 287 def _continue(self, result): | |
| 288 self.result = result | |
| 289 self.unpause() | |
| 290 | |
| 291 def _startRunCallbacks(self, result): | |
| 292 if self.called: | |
| 293 if self.debug: | |
| 294 if self._debugInfo is None: | |
| 295 self._debugInfo = DebugInfo() | |
| 296 extra = "\n" + self._debugInfo._getDebugTracebacks() | |
| 297 raise AlreadyCalledError(extra) | |
| 298 raise AlreadyCalledError | |
| 299 if self.debug: | |
| 300 if self._debugInfo is None: | |
| 301 self._debugInfo = DebugInfo() | |
| 302 self._debugInfo.invoker = traceback.format_stack()[:-2] | |
| 303 self.called = True | |
| 304 self.result = result | |
| 305 if self.timeoutCall: | |
| 306 try: | |
| 307 self.timeoutCall.cancel() | |
| 308 except: | |
| 309 pass | |
| 310 | |
| 311 del self.timeoutCall | |
| 312 self._runCallbacks() | |
| 313 | |
| 314 def _runCallbacks(self): | |
| 315 if self._runningCallbacks: | |
| 316 # Don't recursively run callbacks | |
| 317 return | |
| 318 if not self.paused: | |
| 319 while self.callbacks: | |
| 320 item = self.callbacks.pop(0) | |
| 321 callback, args, kw = item[ | |
| 322 isinstance(self.result, failure.Failure)] | |
| 323 args = args or () | |
| 324 kw = kw or {} | |
| 325 try: | |
| 326 self._runningCallbacks = True | |
| 327 try: | |
| 328 self.result = callback(self.result, *args, **kw) | |
| 329 finally: | |
| 330 self._runningCallbacks = False | |
| 331 if isinstance(self.result, Deferred): | |
| 332 # note: this will cause _runCallbacks to be called | |
| 333 # recursively if self.result already has a result. | |
| 334 # This shouldn't cause any problems, since there is no | |
| 335 # relevant state in this stack frame at this point. | |
| 336 # The recursive call will continue to process | |
| 337 # self.callbacks until it is empty, then return here, | |
| 338 # where there is no more work to be done, so this call | |
| 339 # will return as well. | |
| 340 self.pause() | |
| 341 self.result.addBoth(self._continue) | |
| 342 break | |
| 343 except: | |
| 344 self.result = failure.Failure() | |
| 345 | |
| 346 if isinstance(self.result, failure.Failure): | |
| 347 self.result.cleanFailure() | |
| 348 if self._debugInfo is None: | |
| 349 self._debugInfo = DebugInfo() | |
| 350 self._debugInfo.failResult = self.result | |
| 351 else: | |
| 352 if self._debugInfo is not None: | |
| 353 self._debugInfo.failResult = None | |
| 354 | |
| 355 def setTimeout(self, seconds, timeoutFunc=timeout, *args, **kw): | |
| 356 """Set a timeout function to be triggered if I am not called. | |
| 357 | |
| 358 @param seconds: How long to wait (from now) before firing the | |
| 359 timeoutFunc. | |
| 360 | |
| 361 @param timeoutFunc: will receive the Deferred and *args, **kw as its | |
| 362 arguments. The default timeoutFunc will call the errback with a | |
| 363 L{TimeoutError}. | |
| 364 """ | |
| 365 warnings.warn( | |
| 366 "Deferred.setTimeout is deprecated. Look for timeout " | |
| 367 "support specific to the API you are using instead.", | |
| 368 DeprecationWarning, stacklevel=2) | |
| 369 | |
| 370 if self.called: | |
| 371 return | |
| 372 assert not self.timeoutCall, "Don't call setTimeout twice on the same De
ferred." | |
| 373 | |
| 374 from twisted.internet import reactor | |
| 375 self.timeoutCall = reactor.callLater( | |
| 376 seconds, | |
| 377 lambda: self.called or timeoutFunc(self, *args, **kw)) | |
| 378 return self.timeoutCall | |
| 379 | |
| 380 def __str__(self): | |
| 381 cname = self.__class__.__name__ | |
| 382 if hasattr(self, 'result'): | |
| 383 return "<%s at %s current result: %r>" % (cname, hex(unsignedID(sel
f)), | |
| 384 self.result) | |
| 385 return "<%s at %s>" % (cname, hex(unsignedID(self))) | |
| 386 __repr__ = __str__ | |
| 387 | |
| 388 class DebugInfo: | |
| 389 """Deferred debug helper""" | |
| 390 failResult = None | |
| 391 | |
| 392 def _getDebugTracebacks(self): | |
| 393 info = '' | |
| 394 if hasattr(self, "creator"): | |
| 395 info += " C: Deferred was created:\n C:" | |
| 396 info += "".join(self.creator).rstrip().replace("\n","\n C:") | |
| 397 info += "\n" | |
| 398 if hasattr(self, "invoker"): | |
| 399 info += " I: First Invoker was:\n I:" | |
| 400 info += "".join(self.invoker).rstrip().replace("\n","\n I:") | |
| 401 info += "\n" | |
| 402 return info | |
| 403 | |
| 404 def __del__(self): | |
| 405 """Print tracebacks and die. | |
| 406 | |
| 407 If the *last* (and I do mean *last*) callback leaves me in an error | |
| 408 state, print a traceback (if said errback is a Failure). | |
| 409 """ | |
| 410 if self.failResult is not None: | |
| 411 log.msg("Unhandled error in Deferred:", isError=True) | |
| 412 debugInfo = self._getDebugTracebacks() | |
| 413 if debugInfo != '': | |
| 414 log.msg("(debug: " + debugInfo + ")", isError=True) | |
| 415 log.err(self.failResult) | |
| 416 | |
| 417 class FirstError(Exception): | |
| 418 """First error to occur in a DeferredList if fireOnOneErrback is set. | |
| 419 | |
| 420 @ivar subFailure: the L{Failure} that occurred. | |
| 421 @ivar index: the index of the Deferred in the DeferredList where it | |
| 422 happened. | |
| 423 """ | |
| 424 def __init__(self, failure, index): | |
| 425 self.subFailure = failure | |
| 426 self.index = index | |
| 427 | |
| 428 def __repr__(self): | |
| 429 return 'FirstError(%r, %d)' % (self.subFailure, self.index) | |
| 430 | |
| 431 def __str__(self): | |
| 432 return repr(self) | |
| 433 | |
| 434 def __getitem__(self, index): | |
| 435 warnings.warn("FirstError.__getitem__ is deprecated. " | |
| 436 "Use attributes instead.", | |
| 437 category=DeprecationWarning, stacklevel=2) | |
| 438 return [self.subFailure, self.index][index] | |
| 439 | |
| 440 def __getslice__(self, start, stop): | |
| 441 warnings.warn("FirstError.__getslice__ is deprecated. " | |
| 442 "Use attributes instead.", | |
| 443 category=DeprecationWarning, stacklevel=2) | |
| 444 return [self.subFailure, self.index][start:stop] | |
| 445 | |
| 446 def __eq__(self, other): | |
| 447 if isinstance(other, tuple): | |
| 448 return tuple(self) == other | |
| 449 elif isinstance(other, FirstError): | |
| 450 return (self.subFailure == other.subFailure and | |
| 451 self.index == other.index) | |
| 452 return False | |
| 453 | |
| 454 class DeferredList(Deferred): | |
| 455 """I combine a group of deferreds into one callback. | |
| 456 | |
| 457 I track a list of L{Deferred}s for their callbacks, and make a single | |
| 458 callback when they have all completed, a list of (success, result) | |
| 459 tuples, 'success' being a boolean. | |
| 460 | |
| 461 Note that you can still use a L{Deferred} after putting it in a | |
| 462 DeferredList. For example, you can suppress 'Unhandled error in Deferred' | |
| 463 messages by adding errbacks to the Deferreds *after* putting them in the | |
| 464 DeferredList, as a DeferredList won't swallow the errors. (Although a more | |
| 465 convenient way to do this is simply to set the consumeErrors flag) | |
| 466 """ | |
| 467 | |
| 468 fireOnOneCallback = 0 | |
| 469 fireOnOneErrback = 0 | |
| 470 | |
| 471 def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0, | |
| 472 consumeErrors=0): | |
| 473 """Initialize a DeferredList. | |
| 474 | |
| 475 @type deferredList: C{list} of L{Deferred}s | |
| 476 @param deferredList: The list of deferreds to track. | |
| 477 @param fireOnOneCallback: (keyword param) a flag indicating that | |
| 478 only one callback needs to be fired for me to call | |
| 479 my callback | |
| 480 @param fireOnOneErrback: (keyword param) a flag indicating that | |
| 481 only one errback needs to be fired for me to call | |
| 482 my errback | |
| 483 @param consumeErrors: (keyword param) a flag indicating that any errors | |
| 484 raised in the original deferreds should be | |
| 485 consumed by this DeferredList. This is useful to | |
| 486 prevent spurious warnings being logged. | |
| 487 """ | |
| 488 self.resultList = [None] * len(deferredList) | |
| 489 Deferred.__init__(self) | |
| 490 if len(deferredList) == 0 and not fireOnOneCallback: | |
| 491 self.callback(self.resultList) | |
| 492 | |
| 493 # These flags need to be set *before* attaching callbacks to the | |
| 494 # deferreds, because the callbacks use these flags, and will run | |
| 495 # synchronously if any of the deferreds are already fired. | |
| 496 self.fireOnOneCallback = fireOnOneCallback | |
| 497 self.fireOnOneErrback = fireOnOneErrback | |
| 498 self.consumeErrors = consumeErrors | |
| 499 self.finishedCount = 0 | |
| 500 | |
| 501 index = 0 | |
| 502 for deferred in deferredList: | |
| 503 deferred.addCallbacks(self._cbDeferred, self._cbDeferred, | |
| 504 callbackArgs=(index,SUCCESS), | |
| 505 errbackArgs=(index,FAILURE)) | |
| 506 index = index + 1 | |
| 507 | |
| 508 def _cbDeferred(self, result, index, succeeded): | |
| 509 """(internal) Callback for when one of my deferreds fires. | |
| 510 """ | |
| 511 self.resultList[index] = (succeeded, result) | |
| 512 | |
| 513 self.finishedCount += 1 | |
| 514 if not self.called: | |
| 515 if succeeded == SUCCESS and self.fireOnOneCallback: | |
| 516 self.callback((result, index)) | |
| 517 elif succeeded == FAILURE and self.fireOnOneErrback: | |
| 518 self.errback(failure.Failure(FirstError(result, index))) | |
| 519 elif self.finishedCount == len(self.resultList): | |
| 520 self.callback(self.resultList) | |
| 521 | |
| 522 if succeeded == FAILURE and self.consumeErrors: | |
| 523 result = None | |
| 524 | |
| 525 return result | |
| 526 | |
| 527 | |
| 528 def _parseDListResult(l, fireOnOneErrback=0): | |
| 529 if __debug__: | |
| 530 for success, value in l: | |
| 531 assert success | |
| 532 return [x[1] for x in l] | |
| 533 | |
| 534 def gatherResults(deferredList): | |
| 535 """Returns list with result of given Deferreds. | |
| 536 | |
| 537 This builds on C{DeferredList} but is useful since you don't | |
| 538 need to parse the result for success/failure. | |
| 539 | |
| 540 @type deferredList: C{list} of L{Deferred}s | |
| 541 """ | |
| 542 d = DeferredList(deferredList, fireOnOneErrback=1) | |
| 543 d.addCallback(_parseDListResult) | |
| 544 return d | |
| 545 | |
| 546 # Constants for use with DeferredList | |
| 547 | |
| 548 SUCCESS = True | |
| 549 FAILURE = False | |
| 550 | |
| 551 | |
| 552 | |
| 553 ## deferredGenerator | |
| 554 | |
| 555 class waitForDeferred: | |
| 556 """ | |
| 557 See L{deferredGenerator}. | |
| 558 """ | |
| 559 | |
| 560 def __init__(self, d): | |
| 561 if not isinstance(d, Deferred): | |
| 562 raise TypeError("You must give waitForDeferred a Deferred. You gave
it %r." % (d,)) | |
| 563 self.d = d | |
| 564 | |
| 565 | |
| 566 def getResult(self): | |
| 567 if isinstance(self.result, failure.Failure): | |
| 568 self.result.raiseException() | |
| 569 return self.result | |
| 570 | |
| 571 | |
| 572 | |
| 573 def _deferGenerator(g, deferred): | |
| 574 """ | |
| 575 See L{deferredGenerator}. | |
| 576 """ | |
| 577 result = None | |
| 578 | |
| 579 # This function is complicated by the need to prevent unbounded recursion | |
| 580 # arising from repeatedly yielding immediately ready deferreds. This while | |
| 581 # loop and the waiting variable solve that by manually unfolding the | |
| 582 # recursion. | |
| 583 | |
| 584 waiting = [True, # defgen is waiting for result? | |
| 585 None] # result | |
| 586 | |
| 587 while 1: | |
| 588 try: | |
| 589 result = g.next() | |
| 590 except StopIteration: | |
| 591 deferred.callback(result) | |
| 592 return deferred | |
| 593 except: | |
| 594 deferred.errback() | |
| 595 return deferred | |
| 596 | |
| 597 # Deferred.callback(Deferred) raises an error; we catch this case | |
| 598 # early here and give a nicer error message to the user in case | |
| 599 # they yield a Deferred. | |
| 600 if isinstance(result, Deferred): | |
| 601 return fail(TypeError("Yield waitForDeferred(d), not d!")) | |
| 602 | |
| 603 if isinstance(result, waitForDeferred): | |
| 604 # a waitForDeferred was yielded, get the result. | |
| 605 # Pass result in so it don't get changed going around the loop | |
| 606 # This isn't a problem for waiting, as it's only reused if | |
| 607 # gotResult has already been executed. | |
| 608 def gotResult(r, result=result): | |
| 609 result.result = r | |
| 610 if waiting[0]: | |
| 611 waiting[0] = False | |
| 612 waiting[1] = r | |
| 613 else: | |
| 614 _deferGenerator(g, deferred) | |
| 615 result.d.addBoth(gotResult) | |
| 616 if waiting[0]: | |
| 617 # Haven't called back yet, set flag so that we get reinvoked | |
| 618 # and return from the loop | |
| 619 waiting[0] = False | |
| 620 return deferred | |
| 621 # Reset waiting to initial values for next loop | |
| 622 waiting[0] = True | |
| 623 waiting[1] = None | |
| 624 | |
| 625 result = None | |
| 626 | |
| 627 | |
| 628 | |
| 629 def deferredGenerator(f): | |
| 630 """ | |
| 631 Maintainer: U{Christopher Armstrong<mailto:radix@twistedmatrix.com>} | |
| 632 | |
| 633 deferredGenerator and waitForDeferred help you write Deferred-using code | |
| 634 that looks like a regular sequential function. If your code has a minimum | |
| 635 requirement of Python 2.5, consider the use of L{inlineCallbacks} instead, | |
| 636 which can accomplish the same thing in a more concise manner. | |
| 637 | |
| 638 There are two important functions involved: waitForDeferred, and | |
| 639 deferredGenerator. They are used together, like this:: | |
| 640 | |
| 641 def thingummy(): | |
| 642 thing = waitForDeferred(makeSomeRequestResultingInDeferred()) | |
| 643 yield thing | |
| 644 thing = thing.getResult() | |
| 645 print thing #the result! hoorj! | |
| 646 thingummy = deferredGenerator(thingummy) | |
| 647 | |
| 648 waitForDeferred returns something that you should immediately yield; when | |
| 649 your generator is resumed, calling thing.getResult() will either give you | |
| 650 the result of the Deferred if it was a success, or raise an exception if it | |
| 651 was a failure. Calling C{getResult} is B{absolutely mandatory}. If you do | |
| 652 not call it, I{your program will not work}. | |
| 653 | |
| 654 deferredGenerator takes one of these waitForDeferred-using generator | |
| 655 functions and converts it into a function that returns a Deferred. The | |
| 656 result of the Deferred will be the last value that your generator yielded | |
| 657 unless the last value is a waitForDeferred instance, in which case the | |
| 658 result will be C{None}. If the function raises an unhandled exception, the | |
| 659 Deferred will errback instead. Remember that 'return result' won't work; | |
| 660 use 'yield result; return' in place of that. | |
| 661 | |
| 662 Note that not yielding anything from your generator will make the Deferred | |
| 663 result in None. Yielding a Deferred from your generator is also an error | |
| 664 condition; always yield waitForDeferred(d) instead. | |
| 665 | |
| 666 The Deferred returned from your deferred generator may also errback if your | |
| 667 generator raised an exception. For example:: | |
| 668 | |
| 669 def thingummy(): | |
| 670 thing = waitForDeferred(makeSomeRequestResultingInDeferred()) | |
| 671 yield thing | |
| 672 thing = thing.getResult() | |
| 673 if thing == 'I love Twisted': | |
| 674 # will become the result of the Deferred | |
| 675 yield 'TWISTED IS GREAT!' | |
| 676 return | |
| 677 else: | |
| 678 # will trigger an errback | |
| 679 raise Exception('DESTROY ALL LIFE') | |
| 680 thingummy = deferredGenerator(thingummy) | |
| 681 | |
| 682 Put succinctly, these functions connect deferred-using code with this 'fake | |
| 683 blocking' style in both directions: waitForDeferred converts from a | |
| 684 Deferred to the 'blocking' style, and deferredGenerator converts from the | |
| 685 'blocking' style to a Deferred. | |
| 686 """ | |
| 687 def unwindGenerator(*args, **kwargs): | |
| 688 return _deferGenerator(f(*args, **kwargs), Deferred()) | |
| 689 return mergeFunctionMetadata(f, unwindGenerator) | |
| 690 | |
| 691 | |
| 692 ## inlineCallbacks | |
| 693 | |
| 694 # BaseException is only in Py 2.5. | |
| 695 try: | |
| 696 BaseException | |
| 697 except NameError: | |
| 698 BaseException=Exception | |
| 699 | |
| 700 class _DefGen_Return(BaseException): | |
| 701 def __init__(self, value): | |
| 702 self.value = value | |
| 703 | |
| 704 def returnValue(val): | |
| 705 """ | |
| 706 Return val from a L{inlineCallbacks} generator. | |
| 707 | |
| 708 Note: this is currently implemented by raising an exception | |
| 709 derived from BaseException. You might want to change any | |
| 710 'except:' clauses to an 'except Exception:' clause so as not to | |
| 711 catch this exception. | |
| 712 | |
| 713 Also: while this function currently will work when called from | |
| 714 within arbitrary functions called from within the generator, do | |
| 715 not rely upon this behavior. | |
| 716 """ | |
| 717 raise _DefGen_Return(val) | |
| 718 | |
| 719 def _inlineCallbacks(result, g, deferred): | |
| 720 """ | |
| 721 See L{inlineCallbacks}. | |
| 722 """ | |
| 723 # This function is complicated by the need to prevent unbounded recursion | |
| 724 # arising from repeatedly yielding immediately ready deferreds. This while | |
| 725 # loop and the waiting variable solve that by manually unfolding the | |
| 726 # recursion. | |
| 727 | |
| 728 waiting = [True, # waiting for result? | |
| 729 None] # result | |
| 730 | |
| 731 while 1: | |
| 732 try: | |
| 733 # Send the last result back as the result of the yield expression. | |
| 734 if isinstance(result, failure.Failure): | |
| 735 result = result.throwExceptionIntoGenerator(g) | |
| 736 else: | |
| 737 result = g.send(result) | |
| 738 except StopIteration: | |
| 739 # fell off the end, or "return" statement | |
| 740 deferred.callback(None) | |
| 741 return deferred | |
| 742 except _DefGen_Return, e: | |
| 743 # returnValue call | |
| 744 deferred.callback(e.value) | |
| 745 return deferred | |
| 746 except: | |
| 747 deferred.errback() | |
| 748 return deferred | |
| 749 | |
| 750 if isinstance(result, Deferred): | |
| 751 # a deferred was yielded, get the result. | |
| 752 def gotResult(r): | |
| 753 if waiting[0]: | |
| 754 waiting[0] = False | |
| 755 waiting[1] = r | |
| 756 else: | |
| 757 _inlineCallbacks(r, g, deferred) | |
| 758 | |
| 759 result.addBoth(gotResult) | |
| 760 if waiting[0]: | |
| 761 # Haven't called back yet, set flag so that we get reinvoked | |
| 762 # and return from the loop | |
| 763 waiting[0] = False | |
| 764 return deferred | |
| 765 | |
| 766 result = waiting[1] | |
| 767 # Reset waiting to initial values for next loop. gotResult uses | |
| 768 # waiting, but this isn't a problem because gotResult is only | |
| 769 # executed once, and if it hasn't been executed yet, the return | |
| 770 # branch above would have been taken. | |
| 771 | |
| 772 | |
| 773 waiting[0] = True | |
| 774 waiting[1] = None | |
| 775 | |
| 776 | |
| 777 return deferred | |
| 778 | |
| 779 def inlineCallbacks(f): | |
| 780 """ | |
| 781 Maintainer: U{Christopher Armstrong<mailto:radix@twistedmatrix.com>} | |
| 782 | |
| 783 WARNING: this function will not work in Python 2.4 and earlier! | |
| 784 | |
| 785 inlineCallbacks helps you write Deferred-using code that looks like a | |
| 786 regular sequential function. This function uses features of Python 2.5 | |
| 787 generators. If you need to be compatible with Python 2.4 or before, use | |
| 788 the L{deferredGenerator} function instead, which accomplishes the same | |
| 789 thing, but with somewhat more boilerplate. For example:: | |
| 790 | |
| 791 def thingummy(): | |
| 792 thing = yield makeSomeRequestResultingInDeferred() | |
| 793 print thing #the result! hoorj! | |
| 794 thingummy = inlineCallbacks(thingummy) | |
| 795 | |
| 796 When you call anything that results in a Deferred, you can simply yield it; | |
| 797 your generator will automatically be resumed when the Deferred's result is | |
| 798 available. The generator will be sent the result of the Deferred with the | |
| 799 'send' method on generators, or if the result was a failure, 'throw'. | |
| 800 | |
| 801 Your inlineCallbacks-enabled generator will return a Deferred object, which | |
| 802 will result in the return value of the generator (or will fail with a | |
| 803 failure object if your generator raises an unhandled exception). Note that | |
| 804 you can't use 'return result' to return a value; use 'returnValue(result)' | |
| 805 instead. Falling off the end of the generator, or simply using 'return' | |
| 806 will cause the Deferred to have a result of None. | |
| 807 | |
| 808 The Deferred returned from your deferred generator may errback if your | |
| 809 generator raised an exception:: | |
| 810 | |
| 811 def thingummy(): | |
| 812 thing = yield makeSomeRequestResultingInDeferred() | |
| 813 if thing == 'I love Twisted': | |
| 814 # will become the result of the Deferred | |
| 815 returnValue('TWISTED IS GREAT!') | |
| 816 else: | |
| 817 # will trigger an errback | |
| 818 raise Exception('DESTROY ALL LIFE') | |
| 819 thingummy = inlineCallbacks(thingummy) | |
| 820 """ | |
| 821 def unwindGenerator(*args, **kwargs): | |
| 822 return _inlineCallbacks(None, f(*args, **kwargs), Deferred()) | |
| 823 return mergeFunctionMetadata(f, unwindGenerator) | |
| 824 | |
| 825 | |
| 826 ## DeferredLock/DeferredQueue | |
| 827 | |
| 828 class _ConcurrencyPrimitive(object): | |
| 829 def __init__(self): | |
| 830 self.waiting = [] | |
| 831 | |
| 832 def _releaseAndReturn(self, r): | |
| 833 self.release() | |
| 834 return r | |
| 835 | |
| 836 def run(*args, **kwargs): | |
| 837 """Acquire, run, release. | |
| 838 | |
| 839 This function takes a callable as its first argument and any | |
| 840 number of other positional and keyword arguments. When the | |
| 841 lock or semaphore is acquired, the callable will be invoked | |
| 842 with those arguments. | |
| 843 | |
| 844 The callable may return a Deferred; if it does, the lock or | |
| 845 semaphore won't be released until that Deferred fires. | |
| 846 | |
| 847 @return: Deferred of function result. | |
| 848 """ | |
| 849 if len(args) < 2: | |
| 850 if not args: | |
| 851 raise TypeError("run() takes at least 2 arguments, none given.") | |
| 852 raise TypeError("%s.run() takes at least 2 arguments, 1 given" % ( | |
| 853 args[0].__class__.__name__,)) | |
| 854 self, f = args[:2] | |
| 855 args = args[2:] | |
| 856 | |
| 857 def execute(ignoredResult): | |
| 858 d = maybeDeferred(f, *args, **kwargs) | |
| 859 d.addBoth(self._releaseAndReturn) | |
| 860 return d | |
| 861 | |
| 862 d = self.acquire() | |
| 863 d.addCallback(execute) | |
| 864 return d | |
| 865 | |
| 866 | |
| 867 class DeferredLock(_ConcurrencyPrimitive): | |
| 868 """ | |
| 869 A lock for event driven systems. | |
| 870 | |
| 871 @ivar locked: True when this Lock has been acquired, false at all | |
| 872 other times. Do not change this value, but it is useful to | |
| 873 examine for the equivalent of a \"non-blocking\" acquisition. | |
| 874 """ | |
| 875 | |
| 876 locked = 0 | |
| 877 | |
| 878 def acquire(self): | |
| 879 """Attempt to acquire the lock. | |
| 880 | |
| 881 @return: a Deferred which fires on lock acquisition. | |
| 882 """ | |
| 883 d = Deferred() | |
| 884 if self.locked: | |
| 885 self.waiting.append(d) | |
| 886 else: | |
| 887 self.locked = 1 | |
| 888 d.callback(self) | |
| 889 return d | |
| 890 | |
| 891 def release(self): | |
| 892 """Release the lock. | |
| 893 | |
| 894 Should be called by whomever did the acquire() when the shared | |
| 895 resource is free. | |
| 896 """ | |
| 897 assert self.locked, "Tried to release an unlocked lock" | |
| 898 self.locked = 0 | |
| 899 if self.waiting: | |
| 900 # someone is waiting to acquire lock | |
| 901 self.locked = 1 | |
| 902 d = self.waiting.pop(0) | |
| 903 d.callback(self) | |
| 904 | |
| 905 class DeferredSemaphore(_ConcurrencyPrimitive): | |
| 906 """ | |
| 907 A semaphore for event driven systems. | |
| 908 """ | |
| 909 | |
| 910 def __init__(self, tokens): | |
| 911 _ConcurrencyPrimitive.__init__(self) | |
| 912 self.tokens = tokens | |
| 913 self.limit = tokens | |
| 914 | |
| 915 def acquire(self): | |
| 916 """Attempt to acquire the token. | |
| 917 | |
| 918 @return: a Deferred which fires on token acquisition. | |
| 919 """ | |
| 920 assert self.tokens >= 0, "Internal inconsistency?? tokens should never
be negative" | |
| 921 d = Deferred() | |
| 922 if not self.tokens: | |
| 923 self.waiting.append(d) | |
| 924 else: | |
| 925 self.tokens = self.tokens - 1 | |
| 926 d.callback(self) | |
| 927 return d | |
| 928 | |
| 929 def release(self): | |
| 930 """Release the token. | |
| 931 | |
| 932 Should be called by whoever did the acquire() when the shared | |
| 933 resource is free. | |
| 934 """ | |
| 935 assert self.tokens < self.limit, "Someone released me too many times: to
o many tokens!" | |
| 936 self.tokens = self.tokens + 1 | |
| 937 if self.waiting: | |
| 938 # someone is waiting to acquire token | |
| 939 self.tokens = self.tokens - 1 | |
| 940 d = self.waiting.pop(0) | |
| 941 d.callback(self) | |
| 942 | |
| 943 class QueueOverflow(Exception): | |
| 944 pass | |
| 945 | |
| 946 class QueueUnderflow(Exception): | |
| 947 pass | |
| 948 | |
| 949 | |
| 950 class DeferredQueue(object): | |
| 951 """ | |
| 952 An event driven queue. | |
| 953 | |
| 954 Objects may be added as usual to this queue. When an attempt is | |
| 955 made to retrieve an object when the queue is empty, a Deferred is | |
| 956 returned which will fire when an object becomes available. | |
| 957 | |
| 958 @ivar size: The maximum number of objects to allow into the queue | |
| 959 at a time. When an attempt to add a new object would exceed this | |
| 960 limit, QueueOverflow is raised synchronously. None for no limit. | |
| 961 | |
| 962 @ivar backlog: The maximum number of Deferred gets to allow at | |
| 963 one time. When an attempt is made to get an object which would | |
| 964 exceed this limit, QueueUnderflow is raised synchronously. None | |
| 965 for no limit. | |
| 966 """ | |
| 967 | |
| 968 def __init__(self, size=None, backlog=None): | |
| 969 self.waiting = [] | |
| 970 self.pending = [] | |
| 971 self.size = size | |
| 972 self.backlog = backlog | |
| 973 | |
| 974 def put(self, obj): | |
| 975 """Add an object to this queue. | |
| 976 | |
| 977 @raise QueueOverflow: Too many objects are in this queue. | |
| 978 """ | |
| 979 if self.waiting: | |
| 980 self.waiting.pop(0).callback(obj) | |
| 981 elif self.size is None or len(self.pending) < self.size: | |
| 982 self.pending.append(obj) | |
| 983 else: | |
| 984 raise QueueOverflow() | |
| 985 | |
| 986 def get(self): | |
| 987 """Attempt to retrieve and remove an object from the queue. | |
| 988 | |
| 989 @return: a Deferred which fires with the next object available in the qu
eue. | |
| 990 | |
| 991 @raise QueueUnderflow: Too many (more than C{backlog}) | |
| 992 Deferreds are already waiting for an object from this queue. | |
| 993 """ | |
| 994 if self.pending: | |
| 995 return succeed(self.pending.pop(0)) | |
| 996 elif self.backlog is None or len(self.waiting) < self.backlog: | |
| 997 d = Deferred() | |
| 998 self.waiting.append(d) | |
| 999 return d | |
| 1000 else: | |
| 1001 raise QueueUnderflow() | |
| 1002 | |
| 1003 | |
| 1004 class AlreadyTryingToLockError(Exception): | |
| 1005 """ | |
| 1006 Raised when DeferredFilesystemLock.deferUntilLocked is called twice on a | |
| 1007 single DeferredFilesystemLock. | |
| 1008 """ | |
| 1009 | |
| 1010 | |
| 1011 class DeferredFilesystemLock(lockfile.FilesystemLock): | |
| 1012 """ | |
| 1013 A FilesystemLock that allows for a deferred to be fired when the lock is | |
| 1014 acquired. | |
| 1015 | |
| 1016 @ivar _scheduler: The object in charge of scheduling retries. In this | |
| 1017 implementation this is parameterized for testing. | |
| 1018 | |
| 1019 @ivar _interval: The retry interval for an L{IReactorTime} based scheduler. | |
| 1020 | |
| 1021 @ivar _tryLockCall: A L{DelayedCall} based on _interval that will managex | |
| 1022 the next retry for aquiring the lock. | |
| 1023 | |
| 1024 @ivar _timeoutCall: A L{DelayedCall} based on deferUntilLocked's timeout | |
| 1025 argument. This is in charge of timing out our attempt to acquire the | |
| 1026 lock. | |
| 1027 """ | |
| 1028 _interval = 1 | |
| 1029 _tryLockCall = None | |
| 1030 _timeoutCall = None | |
| 1031 | |
| 1032 def __init__(self, name, scheduler=None): | |
| 1033 """ | |
| 1034 @param name: The name of the lock to acquire | |
| 1035 @param scheduler: An object which provides L{IReactorTime} | |
| 1036 """ | |
| 1037 lockfile.FilesystemLock.__init__(self, name) | |
| 1038 | |
| 1039 if scheduler is None: | |
| 1040 from twisted.internet import reactor | |
| 1041 scheduler = reactor | |
| 1042 | |
| 1043 self._scheduler = scheduler | |
| 1044 | |
| 1045 def deferUntilLocked(self, timeout=None): | |
| 1046 """ | |
| 1047 Wait until we acquire this lock. This method is not safe for | |
| 1048 concurrent use. | |
| 1049 | |
| 1050 @type timeout: C{float} or C{int} | |
| 1051 @param timeout: the number of seconds after which to time out if the | |
| 1052 lock has not been acquired. | |
| 1053 | |
| 1054 @return: a deferred which will callback when the lock is acquired, or | |
| 1055 errback with a L{TimeoutError} after timing out or an | |
| 1056 L{AlreadyTryingToLockError} if the L{deferUntilLocked} has already | |
| 1057 been called and not successfully locked the file. | |
| 1058 """ | |
| 1059 if self._tryLockCall is not None: | |
| 1060 return fail( | |
| 1061 AlreadyTryingToLockError( | |
| 1062 "deferUntilLocked isn't safe for concurrent use.")) | |
| 1063 | |
| 1064 d = Deferred() | |
| 1065 | |
| 1066 def _cancelLock(): | |
| 1067 self._tryLockCall.cancel() | |
| 1068 self._tryLockCall = None | |
| 1069 self._timeoutCall = None | |
| 1070 | |
| 1071 if self.lock(): | |
| 1072 d.callback(None) | |
| 1073 else: | |
| 1074 d.errback(failure.Failure( | |
| 1075 TimeoutError("Timed out aquiring lock: %s after %fs" % ( | |
| 1076 self.name, | |
| 1077 timeout)))) | |
| 1078 | |
| 1079 def _tryLock(): | |
| 1080 if self.lock(): | |
| 1081 if self._timeoutCall is not None: | |
| 1082 self._timeoutCall.cancel() | |
| 1083 self._timeoutCall = None | |
| 1084 | |
| 1085 self._tryLockCall = None | |
| 1086 | |
| 1087 d.callback(None) | |
| 1088 else: | |
| 1089 if timeout is not None and self._timeoutCall is None: | |
| 1090 self._timeoutCall = self._scheduler.callLater( | |
| 1091 timeout, _cancelLock) | |
| 1092 | |
| 1093 self._tryLockCall = self._scheduler.callLater( | |
| 1094 self._interval, _tryLock) | |
| 1095 | |
| 1096 _tryLock() | |
| 1097 | |
| 1098 return d | |
| 1099 | |
| 1100 | |
| 1101 __all__ = ["Deferred", "DeferredList", "succeed", "fail", "FAILURE", "SUCCESS", | |
| 1102 "AlreadyCalledError", "TimeoutError", "gatherResults", | |
| 1103 "maybeDeferred", | |
| 1104 "waitForDeferred", "deferredGenerator", "inlineCallbacks", | |
| 1105 "DeferredLock", "DeferredSemaphore", "DeferredQueue", | |
| 1106 "DeferredFilesystemLock", "AlreadyTryingToLockError", | |
| 1107 ] | |
| OLD | NEW |