| OLD | NEW |
| (Empty) |
| 1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
| 2 # See LICENSE for details. | |
| 3 | |
| 4 # | |
| 5 # Author: Clark Evans (cce@clarkevans.com) | |
| 6 # | |
| 7 | |
| 8 """ | |
| 9 flow.controller | |
| 10 | |
| 11 This implements the various flow controllers, that is, those things which run | |
| 12 the flow stack. | |
| 13 """ | |
| 14 | |
| 15 from base import * | |
| 16 from wrap import wrap | |
| 17 from twisted.internet import defer | |
| 18 | |
| 19 class Block(Controller,Stage): | |
| 20 """ | |
| 21 A controller which blocks on Cooperate events | |
| 22 | |
| 23 This converts a Stage into an iterable which can be used directly in python | |
| 24 for loops and other iteratable constructs. It does this by eating any | |
| 25 Cooperate values and sleeping. This is largely helpful for testing or | |
| 26 within a threaded environment. It converts other stages into one which | |
| 27 does not emit cooperate events, ie:: | |
| 28 | |
| 29 [1,2, Cooperate(), 3] => [1,2,3] | |
| 30 """ | |
| 31 def __init__(self, stage, *trap): | |
| 32 Stage.__init__(self) | |
| 33 self._stage = wrap(stage,*trap) | |
| 34 self.block = time.sleep | |
| 35 | |
| 36 def next(self): | |
| 37 """ fetch the next value from the Stage flow """ | |
| 38 stage = self._stage | |
| 39 while True: | |
| 40 result = stage._yield() | |
| 41 if result: | |
| 42 if isinstance(result, Cooperate): | |
| 43 if result.__class__ == Cooperate: | |
| 44 self.block(result.timeout) | |
| 45 continue | |
| 46 raise Unsupported(result) | |
| 47 return stage.next() | |
| 48 | |
| 49 class Deferred(Controller, defer.Deferred): | |
| 50 """ | |
| 51 wraps up a Stage with a Deferred interface | |
| 52 | |
| 53 In this version, the results of the Stage are used to construct a list of | |
| 54 results and then sent to deferred. Further, in this version Cooperate is | |
| 55 implemented via reactor's callLater. | |
| 56 | |
| 57 For example:: | |
| 58 | |
| 59 from twisted.internet import reactor | |
| 60 from twisted.flow import flow | |
| 61 | |
| 62 def res(x): print x | |
| 63 d = flow.Deferred([1,2,3]) | |
| 64 d.addCallback(res) | |
| 65 reactor.iterate() | |
| 66 """ | |
| 67 def __init__(self, stage, *trap): | |
| 68 defer.Deferred.__init__(self) | |
| 69 self._results = [] | |
| 70 self._stage = wrap(stage, *trap) | |
| 71 self._execute() | |
| 72 | |
| 73 def results(self, results): | |
| 74 self._results.extend(results) | |
| 75 | |
| 76 def _execute(self, dummy = None): | |
| 77 cmd = self._stage | |
| 78 while True: | |
| 79 result = cmd._yield() | |
| 80 if cmd.results: | |
| 81 self.results(cmd.results) | |
| 82 cmd.results = [] | |
| 83 if cmd.stop: | |
| 84 if not self.called: | |
| 85 self.callback(self._results) | |
| 86 return | |
| 87 if cmd.failure: | |
| 88 cmd.stop = True | |
| 89 if cmd._trap: | |
| 90 error = cmd.failure.check(*cmd._trap) | |
| 91 if error: | |
| 92 self._results.append(error) | |
| 93 continue | |
| 94 self.errback(cmd.failure) | |
| 95 return | |
| 96 if result: | |
| 97 if isinstance(result, CallLater): | |
| 98 result.callLater(self._execute) | |
| 99 return | |
| 100 raise Unsupported(result) | |
| 101 | |
| OLD | NEW |