Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(34)

Side by Side Diff: third_party/twisted_8_1/twisted/internet/defer.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # -*- test-case-name: twisted.test.test_defer -*-
2 #
3 # Copyright (c) 2001-2007 Twisted Matrix Laboratories.
4 # See LICENSE for details.
5
6 """
7 Support for results that aren't immediately available.
8
9 Maintainer: U{Glyph Lefkowitz<mailto:glyph@twistedmatrix.com>}
10 """
11
12 from __future__ import nested_scopes, generators
13 import traceback
14 import warnings
15
16 # Twisted imports
17 from twisted.python import log, failure, lockfile
18 from twisted.python.util import unsignedID, mergeFunctionMetadata
19
20 class AlreadyCalledError(Exception):
21 pass
22
23 class TimeoutError(Exception):
24 pass
25
26 def logError(err):
27 log.err(err)
28 return err
29
30 def succeed(result):
31 """
32 Return a Deferred that has already had '.callback(result)' called.
33
34 This is useful when you're writing synchronous code to an
35 asynchronous interface: i.e., some code is calling you expecting a
36 Deferred result, but you don't actually need to do anything
37 asynchronous. Just return defer.succeed(theResult).
38
39 See L{fail} for a version of this function that uses a failing
40 Deferred rather than a successful one.
41
42 @param result: The result to give to the Deferred's 'callback'
43 method.
44
45 @rtype: L{Deferred}
46 """
47 d = Deferred()
48 d.callback(result)
49 return d
50
51
52 def fail(result=None):
53 """
54 Return a Deferred that has already had '.errback(result)' called.
55
56 See L{succeed}'s docstring for rationale.
57
58 @param result: The same argument that L{Deferred.errback} takes.
59
60 @raise NoCurrentExceptionError: If C{result} is C{None} but there is no
61 current exception state.
62
63 @rtype: L{Deferred}
64 """
65 d = Deferred()
66 d.errback(result)
67 return d
68
69
70 def execute(callable, *args, **kw):
71 """Create a deferred from a callable and arguments.
72
73 Call the given function with the given arguments. Return a deferred which
74 has been fired with its callback as the result of that invocation or its
75 errback with a Failure for the exception thrown.
76 """
77 try:
78 result = callable(*args, **kw)
79 except:
80 return fail()
81 else:
82 return succeed(result)
83
84 def maybeDeferred(f, *args, **kw):
85 """Invoke a function that may or may not return a deferred.
86
87 Call the given function with the given arguments. If the returned
88 object is a C{Deferred}, return it. If the returned object is a C{Failure},
89 wrap it with C{fail} and return it. Otherwise, wrap it in C{succeed} and
90 return it. If an exception is raised, convert it to a C{Failure}, wrap it
91 in C{fail}, and then return it.
92
93 @type f: Any callable
94 @param f: The callable to invoke
95
96 @param args: The arguments to pass to C{f}
97 @param kw: The keyword arguments to pass to C{f}
98
99 @rtype: C{Deferred}
100 @return: The result of the function call, wrapped in a C{Deferred} if
101 necessary.
102 """
103 deferred = None
104
105 try:
106 result = f(*args, **kw)
107 except:
108 return fail(failure.Failure())
109 else:
110 if isinstance(result, Deferred):
111 return result
112 elif isinstance(result, failure.Failure):
113 return fail(result)
114 else:
115 return succeed(result)
116 return deferred
117
118 def timeout(deferred):
119 deferred.errback(failure.Failure(TimeoutError("Callback timed out")))
120
121 def passthru(arg):
122 return arg
123
124 def setDebugging(on):
125 """Enable or disable Deferred debugging.
126
127 When debugging is on, the call stacks from creation and invocation are
128 recorded, and added to any AlreadyCalledErrors we raise.
129 """
130 Deferred.debug=bool(on)
131
132 def getDebugging():
133 """Determine whether Deferred debugging is enabled.
134 """
135 return Deferred.debug
136
137 class Deferred:
138 """This is a callback which will be put off until later.
139
140 Why do we want this? Well, in cases where a function in a threaded
141 program would block until it gets a result, for Twisted it should
142 not block. Instead, it should return a Deferred.
143
144 This can be implemented for protocols that run over the network by
145 writing an asynchronous protocol for twisted.internet. For methods
146 that come from outside packages that are not under our control, we use
147 threads (see for example L{twisted.enterprise.adbapi}).
148
149 For more information about Deferreds, see doc/howto/defer.html or
150 U{http://twistedmatrix.com/projects/core/documentation/howto/defer.html}
151 """
152 called = 0
153 paused = 0
154 timeoutCall = None
155 _debugInfo = None
156
157 # Are we currently running a user-installed callback? Meant to prevent
158 # recursive running of callbacks when a reentrant call to add a callback is
159 # used.
160 _runningCallbacks = False
161
162 # Keep this class attribute for now, for compatibility with code that
163 # sets it directly.
164 debug = False
165
166 def __init__(self):
167 self.callbacks = []
168 if self.debug:
169 self._debugInfo = DebugInfo()
170 self._debugInfo.creator = traceback.format_stack()[:-1]
171
172 def addCallbacks(self, callback, errback=None,
173 callbackArgs=None, callbackKeywords=None,
174 errbackArgs=None, errbackKeywords=None):
175 """Add a pair of callbacks (success and error) to this Deferred.
176
177 These will be executed when the 'master' callback is run.
178 """
179 assert callable(callback)
180 assert errback == None or callable(errback)
181 cbs = ((callback, callbackArgs, callbackKeywords),
182 (errback or (passthru), errbackArgs, errbackKeywords))
183 self.callbacks.append(cbs)
184
185 if self.called:
186 self._runCallbacks()
187 return self
188
189 def addCallback(self, callback, *args, **kw):
190 """Convenience method for adding just a callback.
191
192 See L{addCallbacks}.
193 """
194 return self.addCallbacks(callback, callbackArgs=args,
195 callbackKeywords=kw)
196
197 def addErrback(self, errback, *args, **kw):
198 """Convenience method for adding just an errback.
199
200 See L{addCallbacks}.
201 """
202 return self.addCallbacks(passthru, errback,
203 errbackArgs=args,
204 errbackKeywords=kw)
205
206 def addBoth(self, callback, *args, **kw):
207 """Convenience method for adding a single callable as both a callback
208 and an errback.
209
210 See L{addCallbacks}.
211 """
212 return self.addCallbacks(callback, callback,
213 callbackArgs=args, errbackArgs=args,
214 callbackKeywords=kw, errbackKeywords=kw)
215
216 def chainDeferred(self, d):
217 """Chain another Deferred to this Deferred.
218
219 This method adds callbacks to this Deferred to call d's callback or
220 errback, as appropriate. It is merely a shorthand way of performing
221 the following::
222
223 self.addCallbacks(d.callback, d.errback)
224
225 When you chain a deferred d2 to another deferred d1 with
226 d1.chainDeferred(d2), you are making d2 participate in the callback
227 chain of d1. Thus any event that fires d1 will also fire d2.
228 However, the converse is B{not} true; if d2 is fired d1 will not be
229 affected.
230 """
231 return self.addCallbacks(d.callback, d.errback)
232
233 def callback(self, result):
234 """Run all success callbacks that have been added to this Deferred.
235
236 Each callback will have its result passed as the first
237 argument to the next; this way, the callbacks act as a
238 'processing chain'. Also, if the success-callback returns a Failure
239 or raises an Exception, processing will continue on the *error*-
240 callback chain.
241 """
242 assert not isinstance(result, Deferred)
243 self._startRunCallbacks(result)
244
245
246 def errback(self, fail=None):
247 """
248 Run all error callbacks that have been added to this Deferred.
249
250 Each callback will have its result passed as the first
251 argument to the next; this way, the callbacks act as a
252 'processing chain'. Also, if the error-callback returns a non-Failure
253 or doesn't raise an Exception, processing will continue on the
254 *success*-callback chain.
255
256 If the argument that's passed to me is not a failure.Failure instance,
257 it will be embedded in one. If no argument is passed, a failure.Failure
258 instance will be created based on the current traceback stack.
259
260 Passing a string as `fail' is deprecated, and will be punished with
261 a warning message.
262
263 @raise NoCurrentExceptionError: If C{fail} is C{None} but there is
264 no current exception state.
265 """
266 if not isinstance(fail, failure.Failure):
267 fail = failure.Failure(fail)
268
269 self._startRunCallbacks(fail)
270
271
272 def pause(self):
273 """Stop processing on a Deferred until L{unpause}() is called.
274 """
275 self.paused = self.paused + 1
276
277
278 def unpause(self):
279 """Process all callbacks made since L{pause}() was called.
280 """
281 self.paused = self.paused - 1
282 if self.paused:
283 return
284 if self.called:
285 self._runCallbacks()
286
287 def _continue(self, result):
288 self.result = result
289 self.unpause()
290
291 def _startRunCallbacks(self, result):
292 if self.called:
293 if self.debug:
294 if self._debugInfo is None:
295 self._debugInfo = DebugInfo()
296 extra = "\n" + self._debugInfo._getDebugTracebacks()
297 raise AlreadyCalledError(extra)
298 raise AlreadyCalledError
299 if self.debug:
300 if self._debugInfo is None:
301 self._debugInfo = DebugInfo()
302 self._debugInfo.invoker = traceback.format_stack()[:-2]
303 self.called = True
304 self.result = result
305 if self.timeoutCall:
306 try:
307 self.timeoutCall.cancel()
308 except:
309 pass
310
311 del self.timeoutCall
312 self._runCallbacks()
313
314 def _runCallbacks(self):
315 if self._runningCallbacks:
316 # Don't recursively run callbacks
317 return
318 if not self.paused:
319 while self.callbacks:
320 item = self.callbacks.pop(0)
321 callback, args, kw = item[
322 isinstance(self.result, failure.Failure)]
323 args = args or ()
324 kw = kw or {}
325 try:
326 self._runningCallbacks = True
327 try:
328 self.result = callback(self.result, *args, **kw)
329 finally:
330 self._runningCallbacks = False
331 if isinstance(self.result, Deferred):
332 # note: this will cause _runCallbacks to be called
333 # recursively if self.result already has a result.
334 # This shouldn't cause any problems, since there is no
335 # relevant state in this stack frame at this point.
336 # The recursive call will continue to process
337 # self.callbacks until it is empty, then return here,
338 # where there is no more work to be done, so this call
339 # will return as well.
340 self.pause()
341 self.result.addBoth(self._continue)
342 break
343 except:
344 self.result = failure.Failure()
345
346 if isinstance(self.result, failure.Failure):
347 self.result.cleanFailure()
348 if self._debugInfo is None:
349 self._debugInfo = DebugInfo()
350 self._debugInfo.failResult = self.result
351 else:
352 if self._debugInfo is not None:
353 self._debugInfo.failResult = None
354
355 def setTimeout(self, seconds, timeoutFunc=timeout, *args, **kw):
356 """Set a timeout function to be triggered if I am not called.
357
358 @param seconds: How long to wait (from now) before firing the
359 timeoutFunc.
360
361 @param timeoutFunc: will receive the Deferred and *args, **kw as its
362 arguments. The default timeoutFunc will call the errback with a
363 L{TimeoutError}.
364 """
365 warnings.warn(
366 "Deferred.setTimeout is deprecated. Look for timeout "
367 "support specific to the API you are using instead.",
368 DeprecationWarning, stacklevel=2)
369
370 if self.called:
371 return
372 assert not self.timeoutCall, "Don't call setTimeout twice on the same De ferred."
373
374 from twisted.internet import reactor
375 self.timeoutCall = reactor.callLater(
376 seconds,
377 lambda: self.called or timeoutFunc(self, *args, **kw))
378 return self.timeoutCall
379
380 def __str__(self):
381 cname = self.__class__.__name__
382 if hasattr(self, 'result'):
383 return "<%s at %s current result: %r>" % (cname, hex(unsignedID(sel f)),
384 self.result)
385 return "<%s at %s>" % (cname, hex(unsignedID(self)))
386 __repr__ = __str__
387
388 class DebugInfo:
389 """Deferred debug helper"""
390 failResult = None
391
392 def _getDebugTracebacks(self):
393 info = ''
394 if hasattr(self, "creator"):
395 info += " C: Deferred was created:\n C:"
396 info += "".join(self.creator).rstrip().replace("\n","\n C:")
397 info += "\n"
398 if hasattr(self, "invoker"):
399 info += " I: First Invoker was:\n I:"
400 info += "".join(self.invoker).rstrip().replace("\n","\n I:")
401 info += "\n"
402 return info
403
404 def __del__(self):
405 """Print tracebacks and die.
406
407 If the *last* (and I do mean *last*) callback leaves me in an error
408 state, print a traceback (if said errback is a Failure).
409 """
410 if self.failResult is not None:
411 log.msg("Unhandled error in Deferred:", isError=True)
412 debugInfo = self._getDebugTracebacks()
413 if debugInfo != '':
414 log.msg("(debug: " + debugInfo + ")", isError=True)
415 log.err(self.failResult)
416
417 class FirstError(Exception):
418 """First error to occur in a DeferredList if fireOnOneErrback is set.
419
420 @ivar subFailure: the L{Failure} that occurred.
421 @ivar index: the index of the Deferred in the DeferredList where it
422 happened.
423 """
424 def __init__(self, failure, index):
425 self.subFailure = failure
426 self.index = index
427
428 def __repr__(self):
429 return 'FirstError(%r, %d)' % (self.subFailure, self.index)
430
431 def __str__(self):
432 return repr(self)
433
434 def __getitem__(self, index):
435 warnings.warn("FirstError.__getitem__ is deprecated. "
436 "Use attributes instead.",
437 category=DeprecationWarning, stacklevel=2)
438 return [self.subFailure, self.index][index]
439
440 def __getslice__(self, start, stop):
441 warnings.warn("FirstError.__getslice__ is deprecated. "
442 "Use attributes instead.",
443 category=DeprecationWarning, stacklevel=2)
444 return [self.subFailure, self.index][start:stop]
445
446 def __eq__(self, other):
447 if isinstance(other, tuple):
448 return tuple(self) == other
449 elif isinstance(other, FirstError):
450 return (self.subFailure == other.subFailure and
451 self.index == other.index)
452 return False
453
454 class DeferredList(Deferred):
455 """I combine a group of deferreds into one callback.
456
457 I track a list of L{Deferred}s for their callbacks, and make a single
458 callback when they have all completed, a list of (success, result)
459 tuples, 'success' being a boolean.
460
461 Note that you can still use a L{Deferred} after putting it in a
462 DeferredList. For example, you can suppress 'Unhandled error in Deferred'
463 messages by adding errbacks to the Deferreds *after* putting them in the
464 DeferredList, as a DeferredList won't swallow the errors. (Although a more
465 convenient way to do this is simply to set the consumeErrors flag)
466 """
467
468 fireOnOneCallback = 0
469 fireOnOneErrback = 0
470
471 def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0,
472 consumeErrors=0):
473 """Initialize a DeferredList.
474
475 @type deferredList: C{list} of L{Deferred}s
476 @param deferredList: The list of deferreds to track.
477 @param fireOnOneCallback: (keyword param) a flag indicating that
478 only one callback needs to be fired for me to call
479 my callback
480 @param fireOnOneErrback: (keyword param) a flag indicating that
481 only one errback needs to be fired for me to call
482 my errback
483 @param consumeErrors: (keyword param) a flag indicating that any errors
484 raised in the original deferreds should be
485 consumed by this DeferredList. This is useful to
486 prevent spurious warnings being logged.
487 """
488 self.resultList = [None] * len(deferredList)
489 Deferred.__init__(self)
490 if len(deferredList) == 0 and not fireOnOneCallback:
491 self.callback(self.resultList)
492
493 # These flags need to be set *before* attaching callbacks to the
494 # deferreds, because the callbacks use these flags, and will run
495 # synchronously if any of the deferreds are already fired.
496 self.fireOnOneCallback = fireOnOneCallback
497 self.fireOnOneErrback = fireOnOneErrback
498 self.consumeErrors = consumeErrors
499 self.finishedCount = 0
500
501 index = 0
502 for deferred in deferredList:
503 deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
504 callbackArgs=(index,SUCCESS),
505 errbackArgs=(index,FAILURE))
506 index = index + 1
507
508 def _cbDeferred(self, result, index, succeeded):
509 """(internal) Callback for when one of my deferreds fires.
510 """
511 self.resultList[index] = (succeeded, result)
512
513 self.finishedCount += 1
514 if not self.called:
515 if succeeded == SUCCESS and self.fireOnOneCallback:
516 self.callback((result, index))
517 elif succeeded == FAILURE and self.fireOnOneErrback:
518 self.errback(failure.Failure(FirstError(result, index)))
519 elif self.finishedCount == len(self.resultList):
520 self.callback(self.resultList)
521
522 if succeeded == FAILURE and self.consumeErrors:
523 result = None
524
525 return result
526
527
528 def _parseDListResult(l, fireOnOneErrback=0):
529 if __debug__:
530 for success, value in l:
531 assert success
532 return [x[1] for x in l]
533
534 def gatherResults(deferredList):
535 """Returns list with result of given Deferreds.
536
537 This builds on C{DeferredList} but is useful since you don't
538 need to parse the result for success/failure.
539
540 @type deferredList: C{list} of L{Deferred}s
541 """
542 d = DeferredList(deferredList, fireOnOneErrback=1)
543 d.addCallback(_parseDListResult)
544 return d
545
546 # Constants for use with DeferredList
547
548 SUCCESS = True
549 FAILURE = False
550
551
552
553 ## deferredGenerator
554
555 class waitForDeferred:
556 """
557 See L{deferredGenerator}.
558 """
559
560 def __init__(self, d):
561 if not isinstance(d, Deferred):
562 raise TypeError("You must give waitForDeferred a Deferred. You gave it %r." % (d,))
563 self.d = d
564
565
566 def getResult(self):
567 if isinstance(self.result, failure.Failure):
568 self.result.raiseException()
569 return self.result
570
571
572
573 def _deferGenerator(g, deferred):
574 """
575 See L{deferredGenerator}.
576 """
577 result = None
578
579 # This function is complicated by the need to prevent unbounded recursion
580 # arising from repeatedly yielding immediately ready deferreds. This while
581 # loop and the waiting variable solve that by manually unfolding the
582 # recursion.
583
584 waiting = [True, # defgen is waiting for result?
585 None] # result
586
587 while 1:
588 try:
589 result = g.next()
590 except StopIteration:
591 deferred.callback(result)
592 return deferred
593 except:
594 deferred.errback()
595 return deferred
596
597 # Deferred.callback(Deferred) raises an error; we catch this case
598 # early here and give a nicer error message to the user in case
599 # they yield a Deferred.
600 if isinstance(result, Deferred):
601 return fail(TypeError("Yield waitForDeferred(d), not d!"))
602
603 if isinstance(result, waitForDeferred):
604 # a waitForDeferred was yielded, get the result.
605 # Pass result in so it don't get changed going around the loop
606 # This isn't a problem for waiting, as it's only reused if
607 # gotResult has already been executed.
608 def gotResult(r, result=result):
609 result.result = r
610 if waiting[0]:
611 waiting[0] = False
612 waiting[1] = r
613 else:
614 _deferGenerator(g, deferred)
615 result.d.addBoth(gotResult)
616 if waiting[0]:
617 # Haven't called back yet, set flag so that we get reinvoked
618 # and return from the loop
619 waiting[0] = False
620 return deferred
621 # Reset waiting to initial values for next loop
622 waiting[0] = True
623 waiting[1] = None
624
625 result = None
626
627
628
629 def deferredGenerator(f):
630 """
631 Maintainer: U{Christopher Armstrong<mailto:radix@twistedmatrix.com>}
632
633 deferredGenerator and waitForDeferred help you write Deferred-using code
634 that looks like a regular sequential function. If your code has a minimum
635 requirement of Python 2.5, consider the use of L{inlineCallbacks} instead,
636 which can accomplish the same thing in a more concise manner.
637
638 There are two important functions involved: waitForDeferred, and
639 deferredGenerator. They are used together, like this::
640
641 def thingummy():
642 thing = waitForDeferred(makeSomeRequestResultingInDeferred())
643 yield thing
644 thing = thing.getResult()
645 print thing #the result! hoorj!
646 thingummy = deferredGenerator(thingummy)
647
648 waitForDeferred returns something that you should immediately yield; when
649 your generator is resumed, calling thing.getResult() will either give you
650 the result of the Deferred if it was a success, or raise an exception if it
651 was a failure. Calling C{getResult} is B{absolutely mandatory}. If you do
652 not call it, I{your program will not work}.
653
654 deferredGenerator takes one of these waitForDeferred-using generator
655 functions and converts it into a function that returns a Deferred. The
656 result of the Deferred will be the last value that your generator yielded
657 unless the last value is a waitForDeferred instance, in which case the
658 result will be C{None}. If the function raises an unhandled exception, the
659 Deferred will errback instead. Remember that 'return result' won't work;
660 use 'yield result; return' in place of that.
661
662 Note that not yielding anything from your generator will make the Deferred
663 result in None. Yielding a Deferred from your generator is also an error
664 condition; always yield waitForDeferred(d) instead.
665
666 The Deferred returned from your deferred generator may also errback if your
667 generator raised an exception. For example::
668
669 def thingummy():
670 thing = waitForDeferred(makeSomeRequestResultingInDeferred())
671 yield thing
672 thing = thing.getResult()
673 if thing == 'I love Twisted':
674 # will become the result of the Deferred
675 yield 'TWISTED IS GREAT!'
676 return
677 else:
678 # will trigger an errback
679 raise Exception('DESTROY ALL LIFE')
680 thingummy = deferredGenerator(thingummy)
681
682 Put succinctly, these functions connect deferred-using code with this 'fake
683 blocking' style in both directions: waitForDeferred converts from a
684 Deferred to the 'blocking' style, and deferredGenerator converts from the
685 'blocking' style to a Deferred.
686 """
687 def unwindGenerator(*args, **kwargs):
688 return _deferGenerator(f(*args, **kwargs), Deferred())
689 return mergeFunctionMetadata(f, unwindGenerator)
690
691
692 ## inlineCallbacks
693
694 # BaseException is only in Py 2.5.
695 try:
696 BaseException
697 except NameError:
698 BaseException=Exception
699
700 class _DefGen_Return(BaseException):
701 def __init__(self, value):
702 self.value = value
703
704 def returnValue(val):
705 """
706 Return val from a L{inlineCallbacks} generator.
707
708 Note: this is currently implemented by raising an exception
709 derived from BaseException. You might want to change any
710 'except:' clauses to an 'except Exception:' clause so as not to
711 catch this exception.
712
713 Also: while this function currently will work when called from
714 within arbitrary functions called from within the generator, do
715 not rely upon this behavior.
716 """
717 raise _DefGen_Return(val)
718
719 def _inlineCallbacks(result, g, deferred):
720 """
721 See L{inlineCallbacks}.
722 """
723 # This function is complicated by the need to prevent unbounded recursion
724 # arising from repeatedly yielding immediately ready deferreds. This while
725 # loop and the waiting variable solve that by manually unfolding the
726 # recursion.
727
728 waiting = [True, # waiting for result?
729 None] # result
730
731 while 1:
732 try:
733 # Send the last result back as the result of the yield expression.
734 if isinstance(result, failure.Failure):
735 result = result.throwExceptionIntoGenerator(g)
736 else:
737 result = g.send(result)
738 except StopIteration:
739 # fell off the end, or "return" statement
740 deferred.callback(None)
741 return deferred
742 except _DefGen_Return, e:
743 # returnValue call
744 deferred.callback(e.value)
745 return deferred
746 except:
747 deferred.errback()
748 return deferred
749
750 if isinstance(result, Deferred):
751 # a deferred was yielded, get the result.
752 def gotResult(r):
753 if waiting[0]:
754 waiting[0] = False
755 waiting[1] = r
756 else:
757 _inlineCallbacks(r, g, deferred)
758
759 result.addBoth(gotResult)
760 if waiting[0]:
761 # Haven't called back yet, set flag so that we get reinvoked
762 # and return from the loop
763 waiting[0] = False
764 return deferred
765
766 result = waiting[1]
767 # Reset waiting to initial values for next loop. gotResult uses
768 # waiting, but this isn't a problem because gotResult is only
769 # executed once, and if it hasn't been executed yet, the return
770 # branch above would have been taken.
771
772
773 waiting[0] = True
774 waiting[1] = None
775
776
777 return deferred
778
779 def inlineCallbacks(f):
780 """
781 Maintainer: U{Christopher Armstrong<mailto:radix@twistedmatrix.com>}
782
783 WARNING: this function will not work in Python 2.4 and earlier!
784
785 inlineCallbacks helps you write Deferred-using code that looks like a
786 regular sequential function. This function uses features of Python 2.5
787 generators. If you need to be compatible with Python 2.4 or before, use
788 the L{deferredGenerator} function instead, which accomplishes the same
789 thing, but with somewhat more boilerplate. For example::
790
791 def thingummy():
792 thing = yield makeSomeRequestResultingInDeferred()
793 print thing #the result! hoorj!
794 thingummy = inlineCallbacks(thingummy)
795
796 When you call anything that results in a Deferred, you can simply yield it;
797 your generator will automatically be resumed when the Deferred's result is
798 available. The generator will be sent the result of the Deferred with the
799 'send' method on generators, or if the result was a failure, 'throw'.
800
801 Your inlineCallbacks-enabled generator will return a Deferred object, which
802 will result in the return value of the generator (or will fail with a
803 failure object if your generator raises an unhandled exception). Note that
804 you can't use 'return result' to return a value; use 'returnValue(result)'
805 instead. Falling off the end of the generator, or simply using 'return'
806 will cause the Deferred to have a result of None.
807
808 The Deferred returned from your deferred generator may errback if your
809 generator raised an exception::
810
811 def thingummy():
812 thing = yield makeSomeRequestResultingInDeferred()
813 if thing == 'I love Twisted':
814 # will become the result of the Deferred
815 returnValue('TWISTED IS GREAT!')
816 else:
817 # will trigger an errback
818 raise Exception('DESTROY ALL LIFE')
819 thingummy = inlineCallbacks(thingummy)
820 """
821 def unwindGenerator(*args, **kwargs):
822 return _inlineCallbacks(None, f(*args, **kwargs), Deferred())
823 return mergeFunctionMetadata(f, unwindGenerator)
824
825
826 ## DeferredLock/DeferredQueue
827
828 class _ConcurrencyPrimitive(object):
829 def __init__(self):
830 self.waiting = []
831
832 def _releaseAndReturn(self, r):
833 self.release()
834 return r
835
836 def run(*args, **kwargs):
837 """Acquire, run, release.
838
839 This function takes a callable as its first argument and any
840 number of other positional and keyword arguments. When the
841 lock or semaphore is acquired, the callable will be invoked
842 with those arguments.
843
844 The callable may return a Deferred; if it does, the lock or
845 semaphore won't be released until that Deferred fires.
846
847 @return: Deferred of function result.
848 """
849 if len(args) < 2:
850 if not args:
851 raise TypeError("run() takes at least 2 arguments, none given.")
852 raise TypeError("%s.run() takes at least 2 arguments, 1 given" % (
853 args[0].__class__.__name__,))
854 self, f = args[:2]
855 args = args[2:]
856
857 def execute(ignoredResult):
858 d = maybeDeferred(f, *args, **kwargs)
859 d.addBoth(self._releaseAndReturn)
860 return d
861
862 d = self.acquire()
863 d.addCallback(execute)
864 return d
865
866
867 class DeferredLock(_ConcurrencyPrimitive):
868 """
869 A lock for event driven systems.
870
871 @ivar locked: True when this Lock has been acquired, false at all
872 other times. Do not change this value, but it is useful to
873 examine for the equivalent of a \"non-blocking\" acquisition.
874 """
875
876 locked = 0
877
878 def acquire(self):
879 """Attempt to acquire the lock.
880
881 @return: a Deferred which fires on lock acquisition.
882 """
883 d = Deferred()
884 if self.locked:
885 self.waiting.append(d)
886 else:
887 self.locked = 1
888 d.callback(self)
889 return d
890
891 def release(self):
892 """Release the lock.
893
894 Should be called by whomever did the acquire() when the shared
895 resource is free.
896 """
897 assert self.locked, "Tried to release an unlocked lock"
898 self.locked = 0
899 if self.waiting:
900 # someone is waiting to acquire lock
901 self.locked = 1
902 d = self.waiting.pop(0)
903 d.callback(self)
904
905 class DeferredSemaphore(_ConcurrencyPrimitive):
906 """
907 A semaphore for event driven systems.
908 """
909
910 def __init__(self, tokens):
911 _ConcurrencyPrimitive.__init__(self)
912 self.tokens = tokens
913 self.limit = tokens
914
915 def acquire(self):
916 """Attempt to acquire the token.
917
918 @return: a Deferred which fires on token acquisition.
919 """
920 assert self.tokens >= 0, "Internal inconsistency?? tokens should never be negative"
921 d = Deferred()
922 if not self.tokens:
923 self.waiting.append(d)
924 else:
925 self.tokens = self.tokens - 1
926 d.callback(self)
927 return d
928
929 def release(self):
930 """Release the token.
931
932 Should be called by whoever did the acquire() when the shared
933 resource is free.
934 """
935 assert self.tokens < self.limit, "Someone released me too many times: to o many tokens!"
936 self.tokens = self.tokens + 1
937 if self.waiting:
938 # someone is waiting to acquire token
939 self.tokens = self.tokens - 1
940 d = self.waiting.pop(0)
941 d.callback(self)
942
943 class QueueOverflow(Exception):
944 pass
945
946 class QueueUnderflow(Exception):
947 pass
948
949
950 class DeferredQueue(object):
951 """
952 An event driven queue.
953
954 Objects may be added as usual to this queue. When an attempt is
955 made to retrieve an object when the queue is empty, a Deferred is
956 returned which will fire when an object becomes available.
957
958 @ivar size: The maximum number of objects to allow into the queue
959 at a time. When an attempt to add a new object would exceed this
960 limit, QueueOverflow is raised synchronously. None for no limit.
961
962 @ivar backlog: The maximum number of Deferred gets to allow at
963 one time. When an attempt is made to get an object which would
964 exceed this limit, QueueUnderflow is raised synchronously. None
965 for no limit.
966 """
967
968 def __init__(self, size=None, backlog=None):
969 self.waiting = []
970 self.pending = []
971 self.size = size
972 self.backlog = backlog
973
974 def put(self, obj):
975 """Add an object to this queue.
976
977 @raise QueueOverflow: Too many objects are in this queue.
978 """
979 if self.waiting:
980 self.waiting.pop(0).callback(obj)
981 elif self.size is None or len(self.pending) < self.size:
982 self.pending.append(obj)
983 else:
984 raise QueueOverflow()
985
986 def get(self):
987 """Attempt to retrieve and remove an object from the queue.
988
989 @return: a Deferred which fires with the next object available in the qu eue.
990
991 @raise QueueUnderflow: Too many (more than C{backlog})
992 Deferreds are already waiting for an object from this queue.
993 """
994 if self.pending:
995 return succeed(self.pending.pop(0))
996 elif self.backlog is None or len(self.waiting) < self.backlog:
997 d = Deferred()
998 self.waiting.append(d)
999 return d
1000 else:
1001 raise QueueUnderflow()
1002
1003
1004 class AlreadyTryingToLockError(Exception):
1005 """
1006 Raised when DeferredFilesystemLock.deferUntilLocked is called twice on a
1007 single DeferredFilesystemLock.
1008 """
1009
1010
1011 class DeferredFilesystemLock(lockfile.FilesystemLock):
1012 """
1013 A FilesystemLock that allows for a deferred to be fired when the lock is
1014 acquired.
1015
1016 @ivar _scheduler: The object in charge of scheduling retries. In this
1017 implementation this is parameterized for testing.
1018
1019 @ivar _interval: The retry interval for an L{IReactorTime} based scheduler.
1020
1021 @ivar _tryLockCall: A L{DelayedCall} based on _interval that will managex
1022 the next retry for aquiring the lock.
1023
1024 @ivar _timeoutCall: A L{DelayedCall} based on deferUntilLocked's timeout
1025 argument. This is in charge of timing out our attempt to acquire the
1026 lock.
1027 """
1028 _interval = 1
1029 _tryLockCall = None
1030 _timeoutCall = None
1031
1032 def __init__(self, name, scheduler=None):
1033 """
1034 @param name: The name of the lock to acquire
1035 @param scheduler: An object which provides L{IReactorTime}
1036 """
1037 lockfile.FilesystemLock.__init__(self, name)
1038
1039 if scheduler is None:
1040 from twisted.internet import reactor
1041 scheduler = reactor
1042
1043 self._scheduler = scheduler
1044
1045 def deferUntilLocked(self, timeout=None):
1046 """
1047 Wait until we acquire this lock. This method is not safe for
1048 concurrent use.
1049
1050 @type timeout: C{float} or C{int}
1051 @param timeout: the number of seconds after which to time out if the
1052 lock has not been acquired.
1053
1054 @return: a deferred which will callback when the lock is acquired, or
1055 errback with a L{TimeoutError} after timing out or an
1056 L{AlreadyTryingToLockError} if the L{deferUntilLocked} has already
1057 been called and not successfully locked the file.
1058 """
1059 if self._tryLockCall is not None:
1060 return fail(
1061 AlreadyTryingToLockError(
1062 "deferUntilLocked isn't safe for concurrent use."))
1063
1064 d = Deferred()
1065
1066 def _cancelLock():
1067 self._tryLockCall.cancel()
1068 self._tryLockCall = None
1069 self._timeoutCall = None
1070
1071 if self.lock():
1072 d.callback(None)
1073 else:
1074 d.errback(failure.Failure(
1075 TimeoutError("Timed out aquiring lock: %s after %fs" % (
1076 self.name,
1077 timeout))))
1078
1079 def _tryLock():
1080 if self.lock():
1081 if self._timeoutCall is not None:
1082 self._timeoutCall.cancel()
1083 self._timeoutCall = None
1084
1085 self._tryLockCall = None
1086
1087 d.callback(None)
1088 else:
1089 if timeout is not None and self._timeoutCall is None:
1090 self._timeoutCall = self._scheduler.callLater(
1091 timeout, _cancelLock)
1092
1093 self._tryLockCall = self._scheduler.callLater(
1094 self._interval, _tryLock)
1095
1096 _tryLock()
1097
1098 return d
1099
1100
1101 __all__ = ["Deferred", "DeferredList", "succeed", "fail", "FAILURE", "SUCCESS",
1102 "AlreadyCalledError", "TimeoutError", "gatherResults",
1103 "maybeDeferred",
1104 "waitForDeferred", "deferredGenerator", "inlineCallbacks",
1105 "DeferredLock", "DeferredSemaphore", "DeferredQueue",
1106 "DeferredFilesystemLock", "AlreadyTryingToLockError",
1107 ]
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/internet/default.py ('k') | third_party/twisted_8_1/twisted/internet/epollreactor.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698