OLD | NEW |
| (Empty) |
1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 # | |
5 # Author: Clark Evans (cce@clarkevans.com) | |
6 # | |
7 """ | |
8 flow.base | |
9 | |
10 This module contains the core exceptions and base classes in the flow module. | |
11 See flow.flow for more detailed information | |
12 """ | |
13 | |
14 import twisted.python.compat | |
15 from twisted.internet import reactor | |
16 import time | |
17 | |
18 # | |
19 # Exceptions used within flow | |
20 # | |
21 class Unsupported(NotImplementedError): | |
22 """ Indicates that the given stage does not know what to do | |
23 with the flow instruction that was returned. | |
24 """ | |
25 def __init__(self, inst): | |
26 msg = "Unsupported flow instruction: %s " % repr(inst) | |
27 TypeError.__init__(self,msg) | |
28 | |
29 class NotReadyError(RuntimeError): | |
30 """ Raised when a stage has not been subject to a yield """ | |
31 pass | |
32 | |
33 # | |
34 # Abstract/Base Classes | |
35 # | |
36 | |
37 class Instruction: | |
38 """ Has special meaning when yielded in a flow """ | |
39 pass | |
40 | |
41 class Controller: | |
42 """ | |
43 Flow controller | |
44 | |
45 At the base of every flow, is a controller class which interprets the | |
46 instructions, especially the CallLater instructions. This is primarly just | |
47 a marker class to denote which classes consume Instruction events. If a | |
48 controller cannot handle a particular instruction, it raises the | |
49 Unsupported exception. | |
50 """ | |
51 pass | |
52 | |
53 class CallLater(Instruction): | |
54 """ | |
55 Instruction to support callbacks | |
56 | |
57 This is the instruction which is returned during the yield of the _Deferred | |
58 and Callback stage. The underlying flow driver should call the 'callLater' | |
59 function with the callable to be executed after each callback. | |
60 """ | |
61 def callLater(self, callable): | |
62 pass | |
63 | |
64 class Cooperate(CallLater): | |
65 """ | |
66 Requests that processing be paused so other tasks can resume | |
67 | |
68 Yield this object when the current chain would block or periodically during | |
69 an intensive processing task. The flow mechanism uses these objects to | |
70 signal that the current processing chain should be paused and resumed | |
71 later. This allows other delayed operations to be processed, etc. Usage | |
72 is quite simple:: | |
73 | |
74 # within some generator wrapped by a Controller | |
75 yield Cooperate(1) # yield for a second or more | |
76 | |
77 """ | |
78 def __init__(self, timeout = 0): | |
79 self.timeout = timeout | |
80 def callLater(self, callable): | |
81 reactor.callLater(self.timeout, callable) | |
82 | |
83 class Stage(Instruction): | |
84 """ | |
85 Abstract base defining protocol for iterator/generators in a flow | |
86 | |
87 This is the primary component in the flow system, it is an iterable object | |
88 which must be passed to a yield statement before each call to next(). | |
89 Usage:: | |
90 | |
91 iterable = DerivedStage( ... , SpamError, EggsError)) | |
92 yield iterable | |
93 for result in iterable: | |
94 # handle good result, or SpamError or EggsError | |
95 yield iterable | |
96 | |
97 Alternatively, when inside a generator, the next() method can be used | |
98 directly. In this case, if no results are available, StopIteration is | |
99 raised, and if left uncaught, will nicely end the generator. Of course, | |
100 unexpected failures are raised. This technique is especially useful when | |
101 pulling from more than one stage at a time. For example:: | |
102 | |
103 def someGenerator(): | |
104 iterable = SomeStage( ... , SpamError, EggsError) | |
105 while True: | |
106 yield iterable | |
107 result = iterable.next() | |
108 # handle good result or SpamError or EggsError | |
109 | |
110 For many generators, the results become available in chunks of rows. While | |
111 the default value is to get one row at a time, there is a 'chunked' | |
112 property which allows them to be returned via the next() method as many | |
113 rows rather than row by row. For example:: | |
114 | |
115 iterable = DerivedStage(...) | |
116 iterable.chunked = True | |
117 for results in iterable: | |
118 for result in results: | |
119 # handle good result | |
120 yield iterable | |
121 | |
122 For those wishing more control at the cost of a painful experience, the | |
123 following member variables can be used to great effect:: | |
124 | |
125 - results: This is a list of results produced by the generator, they | |
126 can be fetched one by one using next() or in a group | |
127 together. If no results were produced, then this is an | |
128 empty list. These results should be removed from the list | |
129 after they are read; or, after reading all of the results | |
130 set to an empty list | |
131 | |
132 - stop: This is true if the underlying generator has finished execution | |
133 (raised a StopIteration or returned). Note that several | |
134 results may exist, and stop may be true. | |
135 | |
136 - failure: If the generator produced an exception, then it is wrapped | |
137 as a Failure object and put here. Note that several results | |
138 may have been produced before the failure. To ensure that | |
139 the failure isn't accidently reported twice, it is | |
140 adviseable to set stop to True. | |
141 | |
142 The order in which these member variables is used is *critical* for | |
143 proper adherance to the flow protocol. First, all successful | |
144 results should be handled. Second, the iterable should be checked | |
145 to see if it is finished. Third, a failure should be checked; | |
146 while handling a failure, either the loop should be exited, or | |
147 the iterable's stop member should be set. For example:: | |
148 | |
149 iterable = SomeStage(...) | |
150 while True: | |
151 yield iterable | |
152 if iterable.results: | |
153 for result in iterable.results: | |
154 # handle good result | |
155 iterable.results = [] | |
156 if iterable.stop: | |
157 break | |
158 if iterable.failure: | |
159 iterable.stop = True | |
160 # handle iterable.failure | |
161 break | |
162 """ | |
163 def __init__(self, *trap): | |
164 self._trap = trap | |
165 self.stop = False | |
166 self.failure = None | |
167 self.results = [] | |
168 self.chunked = False | |
169 | |
170 def __iter__(self): | |
171 return self | |
172 | |
173 def next(self): | |
174 """ | |
175 return current result | |
176 | |
177 This is the primary function to be called to retrieve the current | |
178 result. It complies with the iterator protocol by raising | |
179 StopIteration when the stage is complete. It also raises an exception | |
180 if it is called before the stage is yielded. | |
181 """ | |
182 if self.results: | |
183 if self.chunked: | |
184 ret = self.results | |
185 self.results = [] | |
186 return ret | |
187 else: | |
188 return self.results.pop(0) | |
189 if self.stop: | |
190 raise StopIteration() | |
191 | |
192 if self.failure: | |
193 self.stop = True | |
194 | |
195 cr = self.failure.check(*self._trap) | |
196 | |
197 if cr: | |
198 return cr | |
199 | |
200 self.failure.raiseException() | |
201 | |
202 raise NotReadyError("Must 'yield' this object before calling next()") | |
203 | |
204 def _yield(self): | |
205 """ | |
206 executed during a yield statement by previous stage | |
207 | |
208 This method is private within the scope of the flow module, it is used | |
209 by one stage in the flow to ask a subsequent stage to produce its | |
210 value. The result of the yield is then stored in self.result and is an | |
211 instance of Failure if a problem occurred. | |
212 """ | |
213 raise NotImplementedError | |
OLD | NEW |