OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.test.test_defer -*- | |
2 # | |
3 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
4 # See LICENSE for details. | |
5 | |
6 """ | |
7 Support for results that aren't immediately available. | |
8 | |
9 Maintainer: U{Glyph Lefkowitz<mailto:glyph@twistedmatrix.com>} | |
10 """ | |
11 | |
12 from __future__ import nested_scopes, generators | |
13 import traceback | |
14 import warnings | |
15 | |
16 # Twisted imports | |
17 from twisted.python import log, failure, lockfile | |
18 from twisted.python.util import unsignedID, mergeFunctionMetadata | |
19 | |
20 class AlreadyCalledError(Exception): | |
21 pass | |
22 | |
23 class TimeoutError(Exception): | |
24 pass | |
25 | |
26 def logError(err): | |
27 log.err(err) | |
28 return err | |
29 | |
30 def succeed(result): | |
31 """ | |
32 Return a Deferred that has already had '.callback(result)' called. | |
33 | |
34 This is useful when you're writing synchronous code to an | |
35 asynchronous interface: i.e., some code is calling you expecting a | |
36 Deferred result, but you don't actually need to do anything | |
37 asynchronous. Just return defer.succeed(theResult). | |
38 | |
39 See L{fail} for a version of this function that uses a failing | |
40 Deferred rather than a successful one. | |
41 | |
42 @param result: The result to give to the Deferred's 'callback' | |
43 method. | |
44 | |
45 @rtype: L{Deferred} | |
46 """ | |
47 d = Deferred() | |
48 d.callback(result) | |
49 return d | |
50 | |
51 | |
52 def fail(result=None): | |
53 """ | |
54 Return a Deferred that has already had '.errback(result)' called. | |
55 | |
56 See L{succeed}'s docstring for rationale. | |
57 | |
58 @param result: The same argument that L{Deferred.errback} takes. | |
59 | |
60 @raise NoCurrentExceptionError: If C{result} is C{None} but there is no | |
61 current exception state. | |
62 | |
63 @rtype: L{Deferred} | |
64 """ | |
65 d = Deferred() | |
66 d.errback(result) | |
67 return d | |
68 | |
69 | |
70 def execute(callable, *args, **kw): | |
71 """Create a deferred from a callable and arguments. | |
72 | |
73 Call the given function with the given arguments. Return a deferred which | |
74 has been fired with its callback as the result of that invocation or its | |
75 errback with a Failure for the exception thrown. | |
76 """ | |
77 try: | |
78 result = callable(*args, **kw) | |
79 except: | |
80 return fail() | |
81 else: | |
82 return succeed(result) | |
83 | |
84 def maybeDeferred(f, *args, **kw): | |
85 """Invoke a function that may or may not return a deferred. | |
86 | |
87 Call the given function with the given arguments. If the returned | |
88 object is a C{Deferred}, return it. If the returned object is a C{Failure}, | |
89 wrap it with C{fail} and return it. Otherwise, wrap it in C{succeed} and | |
90 return it. If an exception is raised, convert it to a C{Failure}, wrap it | |
91 in C{fail}, and then return it. | |
92 | |
93 @type f: Any callable | |
94 @param f: The callable to invoke | |
95 | |
96 @param args: The arguments to pass to C{f} | |
97 @param kw: The keyword arguments to pass to C{f} | |
98 | |
99 @rtype: C{Deferred} | |
100 @return: The result of the function call, wrapped in a C{Deferred} if | |
101 necessary. | |
102 """ | |
103 deferred = None | |
104 | |
105 try: | |
106 result = f(*args, **kw) | |
107 except: | |
108 return fail(failure.Failure()) | |
109 else: | |
110 if isinstance(result, Deferred): | |
111 return result | |
112 elif isinstance(result, failure.Failure): | |
113 return fail(result) | |
114 else: | |
115 return succeed(result) | |
116 return deferred | |
117 | |
118 def timeout(deferred): | |
119 deferred.errback(failure.Failure(TimeoutError("Callback timed out"))) | |
120 | |
121 def passthru(arg): | |
122 return arg | |
123 | |
124 def setDebugging(on): | |
125 """Enable or disable Deferred debugging. | |
126 | |
127 When debugging is on, the call stacks from creation and invocation are | |
128 recorded, and added to any AlreadyCalledErrors we raise. | |
129 """ | |
130 Deferred.debug=bool(on) | |
131 | |
132 def getDebugging(): | |
133 """Determine whether Deferred debugging is enabled. | |
134 """ | |
135 return Deferred.debug | |
136 | |
137 class Deferred: | |
138 """This is a callback which will be put off until later. | |
139 | |
140 Why do we want this? Well, in cases where a function in a threaded | |
141 program would block until it gets a result, for Twisted it should | |
142 not block. Instead, it should return a Deferred. | |
143 | |
144 This can be implemented for protocols that run over the network by | |
145 writing an asynchronous protocol for twisted.internet. For methods | |
146 that come from outside packages that are not under our control, we use | |
147 threads (see for example L{twisted.enterprise.adbapi}). | |
148 | |
149 For more information about Deferreds, see doc/howto/defer.html or | |
150 U{http://twistedmatrix.com/projects/core/documentation/howto/defer.html} | |
151 """ | |
152 called = 0 | |
153 paused = 0 | |
154 timeoutCall = None | |
155 _debugInfo = None | |
156 | |
157 # Are we currently running a user-installed callback? Meant to prevent | |
158 # recursive running of callbacks when a reentrant call to add a callback is | |
159 # used. | |
160 _runningCallbacks = False | |
161 | |
162 # Keep this class attribute for now, for compatibility with code that | |
163 # sets it directly. | |
164 debug = False | |
165 | |
166 def __init__(self): | |
167 self.callbacks = [] | |
168 if self.debug: | |
169 self._debugInfo = DebugInfo() | |
170 self._debugInfo.creator = traceback.format_stack()[:-1] | |
171 | |
172 def addCallbacks(self, callback, errback=None, | |
173 callbackArgs=None, callbackKeywords=None, | |
174 errbackArgs=None, errbackKeywords=None): | |
175 """Add a pair of callbacks (success and error) to this Deferred. | |
176 | |
177 These will be executed when the 'master' callback is run. | |
178 """ | |
179 assert callable(callback) | |
180 assert errback == None or callable(errback) | |
181 cbs = ((callback, callbackArgs, callbackKeywords), | |
182 (errback or (passthru), errbackArgs, errbackKeywords)) | |
183 self.callbacks.append(cbs) | |
184 | |
185 if self.called: | |
186 self._runCallbacks() | |
187 return self | |
188 | |
189 def addCallback(self, callback, *args, **kw): | |
190 """Convenience method for adding just a callback. | |
191 | |
192 See L{addCallbacks}. | |
193 """ | |
194 return self.addCallbacks(callback, callbackArgs=args, | |
195 callbackKeywords=kw) | |
196 | |
197 def addErrback(self, errback, *args, **kw): | |
198 """Convenience method for adding just an errback. | |
199 | |
200 See L{addCallbacks}. | |
201 """ | |
202 return self.addCallbacks(passthru, errback, | |
203 errbackArgs=args, | |
204 errbackKeywords=kw) | |
205 | |
206 def addBoth(self, callback, *args, **kw): | |
207 """Convenience method for adding a single callable as both a callback | |
208 and an errback. | |
209 | |
210 See L{addCallbacks}. | |
211 """ | |
212 return self.addCallbacks(callback, callback, | |
213 callbackArgs=args, errbackArgs=args, | |
214 callbackKeywords=kw, errbackKeywords=kw) | |
215 | |
216 def chainDeferred(self, d): | |
217 """Chain another Deferred to this Deferred. | |
218 | |
219 This method adds callbacks to this Deferred to call d's callback or | |
220 errback, as appropriate. It is merely a shorthand way of performing | |
221 the following:: | |
222 | |
223 self.addCallbacks(d.callback, d.errback) | |
224 | |
225 When you chain a deferred d2 to another deferred d1 with | |
226 d1.chainDeferred(d2), you are making d2 participate in the callback | |
227 chain of d1. Thus any event that fires d1 will also fire d2. | |
228 However, the converse is B{not} true; if d2 is fired d1 will not be | |
229 affected. | |
230 """ | |
231 return self.addCallbacks(d.callback, d.errback) | |
232 | |
233 def callback(self, result): | |
234 """Run all success callbacks that have been added to this Deferred. | |
235 | |
236 Each callback will have its result passed as the first | |
237 argument to the next; this way, the callbacks act as a | |
238 'processing chain'. Also, if the success-callback returns a Failure | |
239 or raises an Exception, processing will continue on the *error*- | |
240 callback chain. | |
241 """ | |
242 assert not isinstance(result, Deferred) | |
243 self._startRunCallbacks(result) | |
244 | |
245 | |
246 def errback(self, fail=None): | |
247 """ | |
248 Run all error callbacks that have been added to this Deferred. | |
249 | |
250 Each callback will have its result passed as the first | |
251 argument to the next; this way, the callbacks act as a | |
252 'processing chain'. Also, if the error-callback returns a non-Failure | |
253 or doesn't raise an Exception, processing will continue on the | |
254 *success*-callback chain. | |
255 | |
256 If the argument that's passed to me is not a failure.Failure instance, | |
257 it will be embedded in one. If no argument is passed, a failure.Failure | |
258 instance will be created based on the current traceback stack. | |
259 | |
260 Passing a string as `fail' is deprecated, and will be punished with | |
261 a warning message. | |
262 | |
263 @raise NoCurrentExceptionError: If C{fail} is C{None} but there is | |
264 no current exception state. | |
265 """ | |
266 if not isinstance(fail, failure.Failure): | |
267 fail = failure.Failure(fail) | |
268 | |
269 self._startRunCallbacks(fail) | |
270 | |
271 | |
272 def pause(self): | |
273 """Stop processing on a Deferred until L{unpause}() is called. | |
274 """ | |
275 self.paused = self.paused + 1 | |
276 | |
277 | |
278 def unpause(self): | |
279 """Process all callbacks made since L{pause}() was called. | |
280 """ | |
281 self.paused = self.paused - 1 | |
282 if self.paused: | |
283 return | |
284 if self.called: | |
285 self._runCallbacks() | |
286 | |
287 def _continue(self, result): | |
288 self.result = result | |
289 self.unpause() | |
290 | |
291 def _startRunCallbacks(self, result): | |
292 if self.called: | |
293 if self.debug: | |
294 if self._debugInfo is None: | |
295 self._debugInfo = DebugInfo() | |
296 extra = "\n" + self._debugInfo._getDebugTracebacks() | |
297 raise AlreadyCalledError(extra) | |
298 raise AlreadyCalledError | |
299 if self.debug: | |
300 if self._debugInfo is None: | |
301 self._debugInfo = DebugInfo() | |
302 self._debugInfo.invoker = traceback.format_stack()[:-2] | |
303 self.called = True | |
304 self.result = result | |
305 if self.timeoutCall: | |
306 try: | |
307 self.timeoutCall.cancel() | |
308 except: | |
309 pass | |
310 | |
311 del self.timeoutCall | |
312 self._runCallbacks() | |
313 | |
314 def _runCallbacks(self): | |
315 if self._runningCallbacks: | |
316 # Don't recursively run callbacks | |
317 return | |
318 if not self.paused: | |
319 while self.callbacks: | |
320 item = self.callbacks.pop(0) | |
321 callback, args, kw = item[ | |
322 isinstance(self.result, failure.Failure)] | |
323 args = args or () | |
324 kw = kw or {} | |
325 try: | |
326 self._runningCallbacks = True | |
327 try: | |
328 self.result = callback(self.result, *args, **kw) | |
329 finally: | |
330 self._runningCallbacks = False | |
331 if isinstance(self.result, Deferred): | |
332 # note: this will cause _runCallbacks to be called | |
333 # recursively if self.result already has a result. | |
334 # This shouldn't cause any problems, since there is no | |
335 # relevant state in this stack frame at this point. | |
336 # The recursive call will continue to process | |
337 # self.callbacks until it is empty, then return here, | |
338 # where there is no more work to be done, so this call | |
339 # will return as well. | |
340 self.pause() | |
341 self.result.addBoth(self._continue) | |
342 break | |
343 except: | |
344 self.result = failure.Failure() | |
345 | |
346 if isinstance(self.result, failure.Failure): | |
347 self.result.cleanFailure() | |
348 if self._debugInfo is None: | |
349 self._debugInfo = DebugInfo() | |
350 self._debugInfo.failResult = self.result | |
351 else: | |
352 if self._debugInfo is not None: | |
353 self._debugInfo.failResult = None | |
354 | |
355 def setTimeout(self, seconds, timeoutFunc=timeout, *args, **kw): | |
356 """Set a timeout function to be triggered if I am not called. | |
357 | |
358 @param seconds: How long to wait (from now) before firing the | |
359 timeoutFunc. | |
360 | |
361 @param timeoutFunc: will receive the Deferred and *args, **kw as its | |
362 arguments. The default timeoutFunc will call the errback with a | |
363 L{TimeoutError}. | |
364 """ | |
365 warnings.warn( | |
366 "Deferred.setTimeout is deprecated. Look for timeout " | |
367 "support specific to the API you are using instead.", | |
368 DeprecationWarning, stacklevel=2) | |
369 | |
370 if self.called: | |
371 return | |
372 assert not self.timeoutCall, "Don't call setTimeout twice on the same De
ferred." | |
373 | |
374 from twisted.internet import reactor | |
375 self.timeoutCall = reactor.callLater( | |
376 seconds, | |
377 lambda: self.called or timeoutFunc(self, *args, **kw)) | |
378 return self.timeoutCall | |
379 | |
380 def __str__(self): | |
381 cname = self.__class__.__name__ | |
382 if hasattr(self, 'result'): | |
383 return "<%s at %s current result: %r>" % (cname, hex(unsignedID(sel
f)), | |
384 self.result) | |
385 return "<%s at %s>" % (cname, hex(unsignedID(self))) | |
386 __repr__ = __str__ | |
387 | |
388 class DebugInfo: | |
389 """Deferred debug helper""" | |
390 failResult = None | |
391 | |
392 def _getDebugTracebacks(self): | |
393 info = '' | |
394 if hasattr(self, "creator"): | |
395 info += " C: Deferred was created:\n C:" | |
396 info += "".join(self.creator).rstrip().replace("\n","\n C:") | |
397 info += "\n" | |
398 if hasattr(self, "invoker"): | |
399 info += " I: First Invoker was:\n I:" | |
400 info += "".join(self.invoker).rstrip().replace("\n","\n I:") | |
401 info += "\n" | |
402 return info | |
403 | |
404 def __del__(self): | |
405 """Print tracebacks and die. | |
406 | |
407 If the *last* (and I do mean *last*) callback leaves me in an error | |
408 state, print a traceback (if said errback is a Failure). | |
409 """ | |
410 if self.failResult is not None: | |
411 log.msg("Unhandled error in Deferred:", isError=True) | |
412 debugInfo = self._getDebugTracebacks() | |
413 if debugInfo != '': | |
414 log.msg("(debug: " + debugInfo + ")", isError=True) | |
415 log.err(self.failResult) | |
416 | |
417 class FirstError(Exception): | |
418 """First error to occur in a DeferredList if fireOnOneErrback is set. | |
419 | |
420 @ivar subFailure: the L{Failure} that occurred. | |
421 @ivar index: the index of the Deferred in the DeferredList where it | |
422 happened. | |
423 """ | |
424 def __init__(self, failure, index): | |
425 self.subFailure = failure | |
426 self.index = index | |
427 | |
428 def __repr__(self): | |
429 return 'FirstError(%r, %d)' % (self.subFailure, self.index) | |
430 | |
431 def __str__(self): | |
432 return repr(self) | |
433 | |
434 def __getitem__(self, index): | |
435 warnings.warn("FirstError.__getitem__ is deprecated. " | |
436 "Use attributes instead.", | |
437 category=DeprecationWarning, stacklevel=2) | |
438 return [self.subFailure, self.index][index] | |
439 | |
440 def __getslice__(self, start, stop): | |
441 warnings.warn("FirstError.__getslice__ is deprecated. " | |
442 "Use attributes instead.", | |
443 category=DeprecationWarning, stacklevel=2) | |
444 return [self.subFailure, self.index][start:stop] | |
445 | |
446 def __eq__(self, other): | |
447 if isinstance(other, tuple): | |
448 return tuple(self) == other | |
449 elif isinstance(other, FirstError): | |
450 return (self.subFailure == other.subFailure and | |
451 self.index == other.index) | |
452 return False | |
453 | |
454 class DeferredList(Deferred): | |
455 """I combine a group of deferreds into one callback. | |
456 | |
457 I track a list of L{Deferred}s for their callbacks, and make a single | |
458 callback when they have all completed, a list of (success, result) | |
459 tuples, 'success' being a boolean. | |
460 | |
461 Note that you can still use a L{Deferred} after putting it in a | |
462 DeferredList. For example, you can suppress 'Unhandled error in Deferred' | |
463 messages by adding errbacks to the Deferreds *after* putting them in the | |
464 DeferredList, as a DeferredList won't swallow the errors. (Although a more | |
465 convenient way to do this is simply to set the consumeErrors flag) | |
466 """ | |
467 | |
468 fireOnOneCallback = 0 | |
469 fireOnOneErrback = 0 | |
470 | |
471 def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0, | |
472 consumeErrors=0): | |
473 """Initialize a DeferredList. | |
474 | |
475 @type deferredList: C{list} of L{Deferred}s | |
476 @param deferredList: The list of deferreds to track. | |
477 @param fireOnOneCallback: (keyword param) a flag indicating that | |
478 only one callback needs to be fired for me to call | |
479 my callback | |
480 @param fireOnOneErrback: (keyword param) a flag indicating that | |
481 only one errback needs to be fired for me to call | |
482 my errback | |
483 @param consumeErrors: (keyword param) a flag indicating that any errors | |
484 raised in the original deferreds should be | |
485 consumed by this DeferredList. This is useful to | |
486 prevent spurious warnings being logged. | |
487 """ | |
488 self.resultList = [None] * len(deferredList) | |
489 Deferred.__init__(self) | |
490 if len(deferredList) == 0 and not fireOnOneCallback: | |
491 self.callback(self.resultList) | |
492 | |
493 # These flags need to be set *before* attaching callbacks to the | |
494 # deferreds, because the callbacks use these flags, and will run | |
495 # synchronously if any of the deferreds are already fired. | |
496 self.fireOnOneCallback = fireOnOneCallback | |
497 self.fireOnOneErrback = fireOnOneErrback | |
498 self.consumeErrors = consumeErrors | |
499 self.finishedCount = 0 | |
500 | |
501 index = 0 | |
502 for deferred in deferredList: | |
503 deferred.addCallbacks(self._cbDeferred, self._cbDeferred, | |
504 callbackArgs=(index,SUCCESS), | |
505 errbackArgs=(index,FAILURE)) | |
506 index = index + 1 | |
507 | |
508 def _cbDeferred(self, result, index, succeeded): | |
509 """(internal) Callback for when one of my deferreds fires. | |
510 """ | |
511 self.resultList[index] = (succeeded, result) | |
512 | |
513 self.finishedCount += 1 | |
514 if not self.called: | |
515 if succeeded == SUCCESS and self.fireOnOneCallback: | |
516 self.callback((result, index)) | |
517 elif succeeded == FAILURE and self.fireOnOneErrback: | |
518 self.errback(failure.Failure(FirstError(result, index))) | |
519 elif self.finishedCount == len(self.resultList): | |
520 self.callback(self.resultList) | |
521 | |
522 if succeeded == FAILURE and self.consumeErrors: | |
523 result = None | |
524 | |
525 return result | |
526 | |
527 | |
528 def _parseDListResult(l, fireOnOneErrback=0): | |
529 if __debug__: | |
530 for success, value in l: | |
531 assert success | |
532 return [x[1] for x in l] | |
533 | |
534 def gatherResults(deferredList): | |
535 """Returns list with result of given Deferreds. | |
536 | |
537 This builds on C{DeferredList} but is useful since you don't | |
538 need to parse the result for success/failure. | |
539 | |
540 @type deferredList: C{list} of L{Deferred}s | |
541 """ | |
542 d = DeferredList(deferredList, fireOnOneErrback=1) | |
543 d.addCallback(_parseDListResult) | |
544 return d | |
545 | |
546 # Constants for use with DeferredList | |
547 | |
548 SUCCESS = True | |
549 FAILURE = False | |
550 | |
551 | |
552 | |
553 ## deferredGenerator | |
554 | |
555 class waitForDeferred: | |
556 """ | |
557 See L{deferredGenerator}. | |
558 """ | |
559 | |
560 def __init__(self, d): | |
561 if not isinstance(d, Deferred): | |
562 raise TypeError("You must give waitForDeferred a Deferred. You gave
it %r." % (d,)) | |
563 self.d = d | |
564 | |
565 | |
566 def getResult(self): | |
567 if isinstance(self.result, failure.Failure): | |
568 self.result.raiseException() | |
569 return self.result | |
570 | |
571 | |
572 | |
573 def _deferGenerator(g, deferred): | |
574 """ | |
575 See L{deferredGenerator}. | |
576 """ | |
577 result = None | |
578 | |
579 # This function is complicated by the need to prevent unbounded recursion | |
580 # arising from repeatedly yielding immediately ready deferreds. This while | |
581 # loop and the waiting variable solve that by manually unfolding the | |
582 # recursion. | |
583 | |
584 waiting = [True, # defgen is waiting for result? | |
585 None] # result | |
586 | |
587 while 1: | |
588 try: | |
589 result = g.next() | |
590 except StopIteration: | |
591 deferred.callback(result) | |
592 return deferred | |
593 except: | |
594 deferred.errback() | |
595 return deferred | |
596 | |
597 # Deferred.callback(Deferred) raises an error; we catch this case | |
598 # early here and give a nicer error message to the user in case | |
599 # they yield a Deferred. | |
600 if isinstance(result, Deferred): | |
601 return fail(TypeError("Yield waitForDeferred(d), not d!")) | |
602 | |
603 if isinstance(result, waitForDeferred): | |
604 # a waitForDeferred was yielded, get the result. | |
605 # Pass result in so it don't get changed going around the loop | |
606 # This isn't a problem for waiting, as it's only reused if | |
607 # gotResult has already been executed. | |
608 def gotResult(r, result=result): | |
609 result.result = r | |
610 if waiting[0]: | |
611 waiting[0] = False | |
612 waiting[1] = r | |
613 else: | |
614 _deferGenerator(g, deferred) | |
615 result.d.addBoth(gotResult) | |
616 if waiting[0]: | |
617 # Haven't called back yet, set flag so that we get reinvoked | |
618 # and return from the loop | |
619 waiting[0] = False | |
620 return deferred | |
621 # Reset waiting to initial values for next loop | |
622 waiting[0] = True | |
623 waiting[1] = None | |
624 | |
625 result = None | |
626 | |
627 | |
628 | |
629 def deferredGenerator(f): | |
630 """ | |
631 Maintainer: U{Christopher Armstrong<mailto:radix@twistedmatrix.com>} | |
632 | |
633 deferredGenerator and waitForDeferred help you write Deferred-using code | |
634 that looks like a regular sequential function. If your code has a minimum | |
635 requirement of Python 2.5, consider the use of L{inlineCallbacks} instead, | |
636 which can accomplish the same thing in a more concise manner. | |
637 | |
638 There are two important functions involved: waitForDeferred, and | |
639 deferredGenerator. They are used together, like this:: | |
640 | |
641 def thingummy(): | |
642 thing = waitForDeferred(makeSomeRequestResultingInDeferred()) | |
643 yield thing | |
644 thing = thing.getResult() | |
645 print thing #the result! hoorj! | |
646 thingummy = deferredGenerator(thingummy) | |
647 | |
648 waitForDeferred returns something that you should immediately yield; when | |
649 your generator is resumed, calling thing.getResult() will either give you | |
650 the result of the Deferred if it was a success, or raise an exception if it | |
651 was a failure. Calling C{getResult} is B{absolutely mandatory}. If you do | |
652 not call it, I{your program will not work}. | |
653 | |
654 deferredGenerator takes one of these waitForDeferred-using generator | |
655 functions and converts it into a function that returns a Deferred. The | |
656 result of the Deferred will be the last value that your generator yielded | |
657 unless the last value is a waitForDeferred instance, in which case the | |
658 result will be C{None}. If the function raises an unhandled exception, the | |
659 Deferred will errback instead. Remember that 'return result' won't work; | |
660 use 'yield result; return' in place of that. | |
661 | |
662 Note that not yielding anything from your generator will make the Deferred | |
663 result in None. Yielding a Deferred from your generator is also an error | |
664 condition; always yield waitForDeferred(d) instead. | |
665 | |
666 The Deferred returned from your deferred generator may also errback if your | |
667 generator raised an exception. For example:: | |
668 | |
669 def thingummy(): | |
670 thing = waitForDeferred(makeSomeRequestResultingInDeferred()) | |
671 yield thing | |
672 thing = thing.getResult() | |
673 if thing == 'I love Twisted': | |
674 # will become the result of the Deferred | |
675 yield 'TWISTED IS GREAT!' | |
676 return | |
677 else: | |
678 # will trigger an errback | |
679 raise Exception('DESTROY ALL LIFE') | |
680 thingummy = deferredGenerator(thingummy) | |
681 | |
682 Put succinctly, these functions connect deferred-using code with this 'fake | |
683 blocking' style in both directions: waitForDeferred converts from a | |
684 Deferred to the 'blocking' style, and deferredGenerator converts from the | |
685 'blocking' style to a Deferred. | |
686 """ | |
687 def unwindGenerator(*args, **kwargs): | |
688 return _deferGenerator(f(*args, **kwargs), Deferred()) | |
689 return mergeFunctionMetadata(f, unwindGenerator) | |
690 | |
691 | |
692 ## inlineCallbacks | |
693 | |
694 # BaseException is only in Py 2.5. | |
695 try: | |
696 BaseException | |
697 except NameError: | |
698 BaseException=Exception | |
699 | |
700 class _DefGen_Return(BaseException): | |
701 def __init__(self, value): | |
702 self.value = value | |
703 | |
704 def returnValue(val): | |
705 """ | |
706 Return val from a L{inlineCallbacks} generator. | |
707 | |
708 Note: this is currently implemented by raising an exception | |
709 derived from BaseException. You might want to change any | |
710 'except:' clauses to an 'except Exception:' clause so as not to | |
711 catch this exception. | |
712 | |
713 Also: while this function currently will work when called from | |
714 within arbitrary functions called from within the generator, do | |
715 not rely upon this behavior. | |
716 """ | |
717 raise _DefGen_Return(val) | |
718 | |
719 def _inlineCallbacks(result, g, deferred): | |
720 """ | |
721 See L{inlineCallbacks}. | |
722 """ | |
723 # This function is complicated by the need to prevent unbounded recursion | |
724 # arising from repeatedly yielding immediately ready deferreds. This while | |
725 # loop and the waiting variable solve that by manually unfolding the | |
726 # recursion. | |
727 | |
728 waiting = [True, # waiting for result? | |
729 None] # result | |
730 | |
731 while 1: | |
732 try: | |
733 # Send the last result back as the result of the yield expression. | |
734 if isinstance(result, failure.Failure): | |
735 result = result.throwExceptionIntoGenerator(g) | |
736 else: | |
737 result = g.send(result) | |
738 except StopIteration: | |
739 # fell off the end, or "return" statement | |
740 deferred.callback(None) | |
741 return deferred | |
742 except _DefGen_Return, e: | |
743 # returnValue call | |
744 deferred.callback(e.value) | |
745 return deferred | |
746 except: | |
747 deferred.errback() | |
748 return deferred | |
749 | |
750 if isinstance(result, Deferred): | |
751 # a deferred was yielded, get the result. | |
752 def gotResult(r): | |
753 if waiting[0]: | |
754 waiting[0] = False | |
755 waiting[1] = r | |
756 else: | |
757 _inlineCallbacks(r, g, deferred) | |
758 | |
759 result.addBoth(gotResult) | |
760 if waiting[0]: | |
761 # Haven't called back yet, set flag so that we get reinvoked | |
762 # and return from the loop | |
763 waiting[0] = False | |
764 return deferred | |
765 | |
766 result = waiting[1] | |
767 # Reset waiting to initial values for next loop. gotResult uses | |
768 # waiting, but this isn't a problem because gotResult is only | |
769 # executed once, and if it hasn't been executed yet, the return | |
770 # branch above would have been taken. | |
771 | |
772 | |
773 waiting[0] = True | |
774 waiting[1] = None | |
775 | |
776 | |
777 return deferred | |
778 | |
779 def inlineCallbacks(f): | |
780 """ | |
781 Maintainer: U{Christopher Armstrong<mailto:radix@twistedmatrix.com>} | |
782 | |
783 WARNING: this function will not work in Python 2.4 and earlier! | |
784 | |
785 inlineCallbacks helps you write Deferred-using code that looks like a | |
786 regular sequential function. This function uses features of Python 2.5 | |
787 generators. If you need to be compatible with Python 2.4 or before, use | |
788 the L{deferredGenerator} function instead, which accomplishes the same | |
789 thing, but with somewhat more boilerplate. For example:: | |
790 | |
791 def thingummy(): | |
792 thing = yield makeSomeRequestResultingInDeferred() | |
793 print thing #the result! hoorj! | |
794 thingummy = inlineCallbacks(thingummy) | |
795 | |
796 When you call anything that results in a Deferred, you can simply yield it; | |
797 your generator will automatically be resumed when the Deferred's result is | |
798 available. The generator will be sent the result of the Deferred with the | |
799 'send' method on generators, or if the result was a failure, 'throw'. | |
800 | |
801 Your inlineCallbacks-enabled generator will return a Deferred object, which | |
802 will result in the return value of the generator (or will fail with a | |
803 failure object if your generator raises an unhandled exception). Note that | |
804 you can't use 'return result' to return a value; use 'returnValue(result)' | |
805 instead. Falling off the end of the generator, or simply using 'return' | |
806 will cause the Deferred to have a result of None. | |
807 | |
808 The Deferred returned from your deferred generator may errback if your | |
809 generator raised an exception:: | |
810 | |
811 def thingummy(): | |
812 thing = yield makeSomeRequestResultingInDeferred() | |
813 if thing == 'I love Twisted': | |
814 # will become the result of the Deferred | |
815 returnValue('TWISTED IS GREAT!') | |
816 else: | |
817 # will trigger an errback | |
818 raise Exception('DESTROY ALL LIFE') | |
819 thingummy = inlineCallbacks(thingummy) | |
820 """ | |
821 def unwindGenerator(*args, **kwargs): | |
822 return _inlineCallbacks(None, f(*args, **kwargs), Deferred()) | |
823 return mergeFunctionMetadata(f, unwindGenerator) | |
824 | |
825 | |
826 ## DeferredLock/DeferredQueue | |
827 | |
828 class _ConcurrencyPrimitive(object): | |
829 def __init__(self): | |
830 self.waiting = [] | |
831 | |
832 def _releaseAndReturn(self, r): | |
833 self.release() | |
834 return r | |
835 | |
836 def run(*args, **kwargs): | |
837 """Acquire, run, release. | |
838 | |
839 This function takes a callable as its first argument and any | |
840 number of other positional and keyword arguments. When the | |
841 lock or semaphore is acquired, the callable will be invoked | |
842 with those arguments. | |
843 | |
844 The callable may return a Deferred; if it does, the lock or | |
845 semaphore won't be released until that Deferred fires. | |
846 | |
847 @return: Deferred of function result. | |
848 """ | |
849 if len(args) < 2: | |
850 if not args: | |
851 raise TypeError("run() takes at least 2 arguments, none given.") | |
852 raise TypeError("%s.run() takes at least 2 arguments, 1 given" % ( | |
853 args[0].__class__.__name__,)) | |
854 self, f = args[:2] | |
855 args = args[2:] | |
856 | |
857 def execute(ignoredResult): | |
858 d = maybeDeferred(f, *args, **kwargs) | |
859 d.addBoth(self._releaseAndReturn) | |
860 return d | |
861 | |
862 d = self.acquire() | |
863 d.addCallback(execute) | |
864 return d | |
865 | |
866 | |
867 class DeferredLock(_ConcurrencyPrimitive): | |
868 """ | |
869 A lock for event driven systems. | |
870 | |
871 @ivar locked: True when this Lock has been acquired, false at all | |
872 other times. Do not change this value, but it is useful to | |
873 examine for the equivalent of a \"non-blocking\" acquisition. | |
874 """ | |
875 | |
876 locked = 0 | |
877 | |
878 def acquire(self): | |
879 """Attempt to acquire the lock. | |
880 | |
881 @return: a Deferred which fires on lock acquisition. | |
882 """ | |
883 d = Deferred() | |
884 if self.locked: | |
885 self.waiting.append(d) | |
886 else: | |
887 self.locked = 1 | |
888 d.callback(self) | |
889 return d | |
890 | |
891 def release(self): | |
892 """Release the lock. | |
893 | |
894 Should be called by whomever did the acquire() when the shared | |
895 resource is free. | |
896 """ | |
897 assert self.locked, "Tried to release an unlocked lock" | |
898 self.locked = 0 | |
899 if self.waiting: | |
900 # someone is waiting to acquire lock | |
901 self.locked = 1 | |
902 d = self.waiting.pop(0) | |
903 d.callback(self) | |
904 | |
905 class DeferredSemaphore(_ConcurrencyPrimitive): | |
906 """ | |
907 A semaphore for event driven systems. | |
908 """ | |
909 | |
910 def __init__(self, tokens): | |
911 _ConcurrencyPrimitive.__init__(self) | |
912 self.tokens = tokens | |
913 self.limit = tokens | |
914 | |
915 def acquire(self): | |
916 """Attempt to acquire the token. | |
917 | |
918 @return: a Deferred which fires on token acquisition. | |
919 """ | |
920 assert self.tokens >= 0, "Internal inconsistency?? tokens should never
be negative" | |
921 d = Deferred() | |
922 if not self.tokens: | |
923 self.waiting.append(d) | |
924 else: | |
925 self.tokens = self.tokens - 1 | |
926 d.callback(self) | |
927 return d | |
928 | |
929 def release(self): | |
930 """Release the token. | |
931 | |
932 Should be called by whoever did the acquire() when the shared | |
933 resource is free. | |
934 """ | |
935 assert self.tokens < self.limit, "Someone released me too many times: to
o many tokens!" | |
936 self.tokens = self.tokens + 1 | |
937 if self.waiting: | |
938 # someone is waiting to acquire token | |
939 self.tokens = self.tokens - 1 | |
940 d = self.waiting.pop(0) | |
941 d.callback(self) | |
942 | |
943 class QueueOverflow(Exception): | |
944 pass | |
945 | |
946 class QueueUnderflow(Exception): | |
947 pass | |
948 | |
949 | |
950 class DeferredQueue(object): | |
951 """ | |
952 An event driven queue. | |
953 | |
954 Objects may be added as usual to this queue. When an attempt is | |
955 made to retrieve an object when the queue is empty, a Deferred is | |
956 returned which will fire when an object becomes available. | |
957 | |
958 @ivar size: The maximum number of objects to allow into the queue | |
959 at a time. When an attempt to add a new object would exceed this | |
960 limit, QueueOverflow is raised synchronously. None for no limit. | |
961 | |
962 @ivar backlog: The maximum number of Deferred gets to allow at | |
963 one time. When an attempt is made to get an object which would | |
964 exceed this limit, QueueUnderflow is raised synchronously. None | |
965 for no limit. | |
966 """ | |
967 | |
968 def __init__(self, size=None, backlog=None): | |
969 self.waiting = [] | |
970 self.pending = [] | |
971 self.size = size | |
972 self.backlog = backlog | |
973 | |
974 def put(self, obj): | |
975 """Add an object to this queue. | |
976 | |
977 @raise QueueOverflow: Too many objects are in this queue. | |
978 """ | |
979 if self.waiting: | |
980 self.waiting.pop(0).callback(obj) | |
981 elif self.size is None or len(self.pending) < self.size: | |
982 self.pending.append(obj) | |
983 else: | |
984 raise QueueOverflow() | |
985 | |
986 def get(self): | |
987 """Attempt to retrieve and remove an object from the queue. | |
988 | |
989 @return: a Deferred which fires with the next object available in the qu
eue. | |
990 | |
991 @raise QueueUnderflow: Too many (more than C{backlog}) | |
992 Deferreds are already waiting for an object from this queue. | |
993 """ | |
994 if self.pending: | |
995 return succeed(self.pending.pop(0)) | |
996 elif self.backlog is None or len(self.waiting) < self.backlog: | |
997 d = Deferred() | |
998 self.waiting.append(d) | |
999 return d | |
1000 else: | |
1001 raise QueueUnderflow() | |
1002 | |
1003 | |
1004 class AlreadyTryingToLockError(Exception): | |
1005 """ | |
1006 Raised when DeferredFilesystemLock.deferUntilLocked is called twice on a | |
1007 single DeferredFilesystemLock. | |
1008 """ | |
1009 | |
1010 | |
1011 class DeferredFilesystemLock(lockfile.FilesystemLock): | |
1012 """ | |
1013 A FilesystemLock that allows for a deferred to be fired when the lock is | |
1014 acquired. | |
1015 | |
1016 @ivar _scheduler: The object in charge of scheduling retries. In this | |
1017 implementation this is parameterized for testing. | |
1018 | |
1019 @ivar _interval: The retry interval for an L{IReactorTime} based scheduler. | |
1020 | |
1021 @ivar _tryLockCall: A L{DelayedCall} based on _interval that will managex | |
1022 the next retry for aquiring the lock. | |
1023 | |
1024 @ivar _timeoutCall: A L{DelayedCall} based on deferUntilLocked's timeout | |
1025 argument. This is in charge of timing out our attempt to acquire the | |
1026 lock. | |
1027 """ | |
1028 _interval = 1 | |
1029 _tryLockCall = None | |
1030 _timeoutCall = None | |
1031 | |
1032 def __init__(self, name, scheduler=None): | |
1033 """ | |
1034 @param name: The name of the lock to acquire | |
1035 @param scheduler: An object which provides L{IReactorTime} | |
1036 """ | |
1037 lockfile.FilesystemLock.__init__(self, name) | |
1038 | |
1039 if scheduler is None: | |
1040 from twisted.internet import reactor | |
1041 scheduler = reactor | |
1042 | |
1043 self._scheduler = scheduler | |
1044 | |
1045 def deferUntilLocked(self, timeout=None): | |
1046 """ | |
1047 Wait until we acquire this lock. This method is not safe for | |
1048 concurrent use. | |
1049 | |
1050 @type timeout: C{float} or C{int} | |
1051 @param timeout: the number of seconds after which to time out if the | |
1052 lock has not been acquired. | |
1053 | |
1054 @return: a deferred which will callback when the lock is acquired, or | |
1055 errback with a L{TimeoutError} after timing out or an | |
1056 L{AlreadyTryingToLockError} if the L{deferUntilLocked} has already | |
1057 been called and not successfully locked the file. | |
1058 """ | |
1059 if self._tryLockCall is not None: | |
1060 return fail( | |
1061 AlreadyTryingToLockError( | |
1062 "deferUntilLocked isn't safe for concurrent use.")) | |
1063 | |
1064 d = Deferred() | |
1065 | |
1066 def _cancelLock(): | |
1067 self._tryLockCall.cancel() | |
1068 self._tryLockCall = None | |
1069 self._timeoutCall = None | |
1070 | |
1071 if self.lock(): | |
1072 d.callback(None) | |
1073 else: | |
1074 d.errback(failure.Failure( | |
1075 TimeoutError("Timed out aquiring lock: %s after %fs" % ( | |
1076 self.name, | |
1077 timeout)))) | |
1078 | |
1079 def _tryLock(): | |
1080 if self.lock(): | |
1081 if self._timeoutCall is not None: | |
1082 self._timeoutCall.cancel() | |
1083 self._timeoutCall = None | |
1084 | |
1085 self._tryLockCall = None | |
1086 | |
1087 d.callback(None) | |
1088 else: | |
1089 if timeout is not None and self._timeoutCall is None: | |
1090 self._timeoutCall = self._scheduler.callLater( | |
1091 timeout, _cancelLock) | |
1092 | |
1093 self._tryLockCall = self._scheduler.callLater( | |
1094 self._interval, _tryLock) | |
1095 | |
1096 _tryLock() | |
1097 | |
1098 return d | |
1099 | |
1100 | |
1101 __all__ = ["Deferred", "DeferredList", "succeed", "fail", "FAILURE", "SUCCESS", | |
1102 "AlreadyCalledError", "TimeoutError", "gatherResults", | |
1103 "maybeDeferred", | |
1104 "waitForDeferred", "deferredGenerator", "inlineCallbacks", | |
1105 "DeferredLock", "DeferredSemaphore", "DeferredQueue", | |
1106 "DeferredFilesystemLock", "AlreadyTryingToLockError", | |
1107 ] | |
OLD | NEW |