OLD | NEW |
| (Empty) |
1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 # | |
5 # Author: Clark Evans (cce@clarkevans.com) | |
6 # | |
7 | |
8 """ | |
9 flow.stage | |
10 | |
11 Various stages for manipulating data flows, in particular, those stages which | |
12 take more than one input stages or alternative input, such as a callback. | |
13 """ | |
14 | |
15 from base import * | |
16 from wrap import wrap | |
17 from twisted.python.failure import Failure | |
18 | |
19 class Map(Stage): | |
20 """ | |
21 flow equivalent to map: Map(function, stage, ... ) | |
22 | |
23 Apply a function to every item yielded and yield the results. If | |
24 additional stages are passed, the function must take that many arguments | |
25 and is applied to the items of all lists in parallel. If a list is shorter | |
26 than another, it is assumed to be extended with None items. If the | |
27 function is None, the identity function is assumed; if there are multiple | |
28 list arguments, Map stage returns a sequence consisting of tuples | |
29 containing the corresponding items from all lists. | |
30 | |
31 For example:: | |
32 | |
33 def fn(val): | |
34 return val + 10 | |
35 | |
36 source = flow.Map(fn,range(4)) | |
37 printFlow(source) | |
38 """ | |
39 def __init__(self, func, stage, *stages): | |
40 Stage.__init__(self) | |
41 self.func = func | |
42 self._stage = [wrap(stage)] | |
43 for stage in stages: | |
44 self._stage.append(wrap(stage)) | |
45 self._index = 0 | |
46 | |
47 def _yield(self): | |
48 if self.results or self.stop or self.failure: | |
49 return | |
50 if not self._index: | |
51 self._curr = [] | |
52 self._done = True | |
53 while self._index < len(self._stage): | |
54 idx = self._index | |
55 curr = self._stage[idx] | |
56 instruction = curr._yield() | |
57 if instruction: | |
58 return instruction | |
59 if curr.results: | |
60 self._curr.append(curr.results.pop(0)) | |
61 self._index += 1 | |
62 self._done = False | |
63 continue | |
64 if curr.stop: | |
65 self._curr.append(None) | |
66 self._index += 1 | |
67 continue | |
68 if curr.failure: | |
69 self.failure = curr.failure | |
70 return | |
71 raise AssertionError("flow.Map ; no results, stop or failure?") | |
72 if self._done: | |
73 self.stop = 1 | |
74 return | |
75 curr = tuple(self._curr) | |
76 if self.func: | |
77 try: | |
78 curr = self.func(*curr) | |
79 except Failure, fail: | |
80 self.failure = fail | |
81 return | |
82 except: | |
83 self.failure = Failure() | |
84 return | |
85 self.results.append(curr) | |
86 self._index = 0 | |
87 | |
88 class Zip(Map): | |
89 """ | |
90 Zips two or more stages into a stream of N tuples | |
91 | |
92 For example:: | |
93 | |
94 source = flow.Zip([1,flow.Cooperate(),2,3],["one","two"]) | |
95 printFlow(source) | |
96 | |
97 """ | |
98 def __init__(self, *stages): | |
99 Map.__init__(self, None, stages[0], *stages[1:]) | |
100 | |
101 class Concurrent(Stage): | |
102 """ | |
103 Executes stages concurrently | |
104 | |
105 This stage allows two or more stages (branches) to be executed at the same | |
106 time. It returns each stage as it becomes available. This can be used if | |
107 you have N callbacks, and you want to yield and wait for the first | |
108 available one that produces results. Once a stage is retuned, its next() | |
109 method should be used to extract the value for the stage. | |
110 """ | |
111 | |
112 class Instruction(CallLater): | |
113 def __init__(self, inst): | |
114 self.inst = inst | |
115 def callLater(self, callable): | |
116 for inst in self.inst: | |
117 inst.callLater(callable) | |
118 | |
119 def __init__(self, *stages): | |
120 Stage.__init__(self) | |
121 self._stages = [] | |
122 for stage in stages: | |
123 self._stages.append(wrap(stage)) | |
124 | |
125 def _yield(self): | |
126 if self.results or self.stop or self.failure: | |
127 return | |
128 stages = self._stages | |
129 later = [] | |
130 exit = None | |
131 while stages: | |
132 if stages[0] is exit: | |
133 if self.results: | |
134 return | |
135 break | |
136 curr = stages.pop(0) | |
137 instruction = curr._yield() | |
138 if curr.results: | |
139 self.results.append(curr) | |
140 if curr.failure: | |
141 self.failure = curr.failure | |
142 return | |
143 if curr.stop: | |
144 exit = None | |
145 if self.results: | |
146 return | |
147 continue | |
148 stages.append(curr) | |
149 if not exit: | |
150 exit = curr | |
151 if instruction: | |
152 if isinstance(instruction, CallLater): | |
153 if instruction not in later: | |
154 later.append(instruction) | |
155 continue | |
156 raise Unsupported(instruction) | |
157 if later: | |
158 return Concurrent.Instruction(later) | |
159 self.stop = True | |
160 | |
161 class Merge(Stage): | |
162 """ | |
163 Merges two or more Stages results into a single stream | |
164 | |
165 For example:: | |
166 | |
167 source = flow.Zip([1,flow.Cooperate(),2,3],["one","two"]) | |
168 printFlow(source) | |
169 """ | |
170 def __init__(self, *stages): | |
171 Stage.__init__(self) | |
172 self.concurrent = Concurrent(*stages) | |
173 | |
174 def _yield(self): | |
175 if self.results or self.stop or self.failure: | |
176 return | |
177 instruction = self.concurrent._yield() | |
178 if instruction: | |
179 return instruction | |
180 for stage in self.concurrent.results: | |
181 self.results.extend(stage.results) | |
182 stage.results = [] | |
183 self.concurrent.results = [] | |
184 if self.concurrent.stop: | |
185 self.stop = True | |
186 self.failure = self.concurrent.failure | |
187 | |
188 class Callback(Stage): | |
189 """ | |
190 Converts a single-thread push interface into a pull interface. | |
191 | |
192 Once this stage is constructed, its result, errback, and finish member | |
193 variables may be called by a producer. The results of which can be | |
194 obtained by yielding the Callback and then calling next(). | |
195 | |
196 For example:: | |
197 | |
198 source = flow.Callback() | |
199 reactor.callLater(0, lambda: source.result("one")) | |
200 reactor.callLater(.5, lambda: source.result("two")) | |
201 reactor.callLater(1, lambda: source.finish()) | |
202 printFlow(source) | |
203 | |
204 """ | |
205 # TODO: Potentially rename this 'Consumer' and make it | |
206 # comply with protocols.IConsumer | |
207 # TODO: Make the inverse stage, which is an IProducer | |
208 class Instruction(CallLater): | |
209 def __init__(self): | |
210 self.flow = lambda: True | |
211 def callLater(self, callable): | |
212 self.flow = callable | |
213 def __init__(self, *trap): | |
214 Stage.__init__(self, *trap) | |
215 self._finished = False | |
216 self._cooperate = Callback.Instruction() | |
217 def result(self,result): | |
218 """ called by the producer to indicate a successful result """ | |
219 self.results.append(result) | |
220 self._cooperate.flow() | |
221 def finish(self): | |
222 """ called by producer to indicate successful stream completion """ | |
223 assert not self.failure, "failed streams should not be finished" | |
224 self._finished = True | |
225 self._cooperate.flow() | |
226 def errback(self, fail): | |
227 """ called by the producer in case of Failure """ | |
228 self.failure = fail | |
229 self._cooperate.flow() | |
230 def _yield(self): | |
231 if self.results or self.stop or self.failure: | |
232 return | |
233 if not self.results: | |
234 if self._finished: | |
235 self.stop = True | |
236 return | |
237 return self._cooperate | |
238 __call__ = result | |
239 | |
OLD | NEW |