OLD | NEW |
| (Empty) |
1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 # | |
5 # Author: Clark Evans (cce@clarkevans.com) | |
6 # | |
7 | |
8 """ | |
9 flow.controller | |
10 | |
11 This implements the various flow controllers, that is, those things which run | |
12 the flow stack. | |
13 """ | |
14 | |
15 from base import * | |
16 from wrap import wrap | |
17 from twisted.internet import defer | |
18 | |
19 class Block(Controller,Stage): | |
20 """ | |
21 A controller which blocks on Cooperate events | |
22 | |
23 This converts a Stage into an iterable which can be used directly in python | |
24 for loops and other iteratable constructs. It does this by eating any | |
25 Cooperate values and sleeping. This is largely helpful for testing or | |
26 within a threaded environment. It converts other stages into one which | |
27 does not emit cooperate events, ie:: | |
28 | |
29 [1,2, Cooperate(), 3] => [1,2,3] | |
30 """ | |
31 def __init__(self, stage, *trap): | |
32 Stage.__init__(self) | |
33 self._stage = wrap(stage,*trap) | |
34 self.block = time.sleep | |
35 | |
36 def next(self): | |
37 """ fetch the next value from the Stage flow """ | |
38 stage = self._stage | |
39 while True: | |
40 result = stage._yield() | |
41 if result: | |
42 if isinstance(result, Cooperate): | |
43 if result.__class__ == Cooperate: | |
44 self.block(result.timeout) | |
45 continue | |
46 raise Unsupported(result) | |
47 return stage.next() | |
48 | |
49 class Deferred(Controller, defer.Deferred): | |
50 """ | |
51 wraps up a Stage with a Deferred interface | |
52 | |
53 In this version, the results of the Stage are used to construct a list of | |
54 results and then sent to deferred. Further, in this version Cooperate is | |
55 implemented via reactor's callLater. | |
56 | |
57 For example:: | |
58 | |
59 from twisted.internet import reactor | |
60 from twisted.flow import flow | |
61 | |
62 def res(x): print x | |
63 d = flow.Deferred([1,2,3]) | |
64 d.addCallback(res) | |
65 reactor.iterate() | |
66 """ | |
67 def __init__(self, stage, *trap): | |
68 defer.Deferred.__init__(self) | |
69 self._results = [] | |
70 self._stage = wrap(stage, *trap) | |
71 self._execute() | |
72 | |
73 def results(self, results): | |
74 self._results.extend(results) | |
75 | |
76 def _execute(self, dummy = None): | |
77 cmd = self._stage | |
78 while True: | |
79 result = cmd._yield() | |
80 if cmd.results: | |
81 self.results(cmd.results) | |
82 cmd.results = [] | |
83 if cmd.stop: | |
84 if not self.called: | |
85 self.callback(self._results) | |
86 return | |
87 if cmd.failure: | |
88 cmd.stop = True | |
89 if cmd._trap: | |
90 error = cmd.failure.check(*cmd._trap) | |
91 if error: | |
92 self._results.append(error) | |
93 continue | |
94 self.errback(cmd.failure) | |
95 return | |
96 if result: | |
97 if isinstance(result, CallLater): | |
98 result.callLater(self._execute) | |
99 return | |
100 raise Unsupported(result) | |
101 | |
OLD | NEW |