| OLD | NEW |
| (Empty) |
| 1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
| 2 # See LICENSE for details. | |
| 3 | |
| 4 # | |
| 5 # Author: Clark Evans (cce@clarkevans.com) | |
| 6 # | |
| 7 | |
| 8 """ | |
| 9 flow.stage | |
| 10 | |
| 11 Various stages for manipulating data flows, in particular, those stages which | |
| 12 take more than one input stages or alternative input, such as a callback. | |
| 13 """ | |
| 14 | |
| 15 from base import * | |
| 16 from wrap import wrap | |
| 17 from twisted.python.failure import Failure | |
| 18 | |
| 19 class Map(Stage): | |
| 20 """ | |
| 21 flow equivalent to map: Map(function, stage, ... ) | |
| 22 | |
| 23 Apply a function to every item yielded and yield the results. If | |
| 24 additional stages are passed, the function must take that many arguments | |
| 25 and is applied to the items of all lists in parallel. If a list is shorter | |
| 26 than another, it is assumed to be extended with None items. If the | |
| 27 function is None, the identity function is assumed; if there are multiple | |
| 28 list arguments, Map stage returns a sequence consisting of tuples | |
| 29 containing the corresponding items from all lists. | |
| 30 | |
| 31 For example:: | |
| 32 | |
| 33 def fn(val): | |
| 34 return val + 10 | |
| 35 | |
| 36 source = flow.Map(fn,range(4)) | |
| 37 printFlow(source) | |
| 38 """ | |
| 39 def __init__(self, func, stage, *stages): | |
| 40 Stage.__init__(self) | |
| 41 self.func = func | |
| 42 self._stage = [wrap(stage)] | |
| 43 for stage in stages: | |
| 44 self._stage.append(wrap(stage)) | |
| 45 self._index = 0 | |
| 46 | |
| 47 def _yield(self): | |
| 48 if self.results or self.stop or self.failure: | |
| 49 return | |
| 50 if not self._index: | |
| 51 self._curr = [] | |
| 52 self._done = True | |
| 53 while self._index < len(self._stage): | |
| 54 idx = self._index | |
| 55 curr = self._stage[idx] | |
| 56 instruction = curr._yield() | |
| 57 if instruction: | |
| 58 return instruction | |
| 59 if curr.results: | |
| 60 self._curr.append(curr.results.pop(0)) | |
| 61 self._index += 1 | |
| 62 self._done = False | |
| 63 continue | |
| 64 if curr.stop: | |
| 65 self._curr.append(None) | |
| 66 self._index += 1 | |
| 67 continue | |
| 68 if curr.failure: | |
| 69 self.failure = curr.failure | |
| 70 return | |
| 71 raise AssertionError("flow.Map ; no results, stop or failure?") | |
| 72 if self._done: | |
| 73 self.stop = 1 | |
| 74 return | |
| 75 curr = tuple(self._curr) | |
| 76 if self.func: | |
| 77 try: | |
| 78 curr = self.func(*curr) | |
| 79 except Failure, fail: | |
| 80 self.failure = fail | |
| 81 return | |
| 82 except: | |
| 83 self.failure = Failure() | |
| 84 return | |
| 85 self.results.append(curr) | |
| 86 self._index = 0 | |
| 87 | |
| 88 class Zip(Map): | |
| 89 """ | |
| 90 Zips two or more stages into a stream of N tuples | |
| 91 | |
| 92 For example:: | |
| 93 | |
| 94 source = flow.Zip([1,flow.Cooperate(),2,3],["one","two"]) | |
| 95 printFlow(source) | |
| 96 | |
| 97 """ | |
| 98 def __init__(self, *stages): | |
| 99 Map.__init__(self, None, stages[0], *stages[1:]) | |
| 100 | |
| 101 class Concurrent(Stage): | |
| 102 """ | |
| 103 Executes stages concurrently | |
| 104 | |
| 105 This stage allows two or more stages (branches) to be executed at the same | |
| 106 time. It returns each stage as it becomes available. This can be used if | |
| 107 you have N callbacks, and you want to yield and wait for the first | |
| 108 available one that produces results. Once a stage is retuned, its next() | |
| 109 method should be used to extract the value for the stage. | |
| 110 """ | |
| 111 | |
| 112 class Instruction(CallLater): | |
| 113 def __init__(self, inst): | |
| 114 self.inst = inst | |
| 115 def callLater(self, callable): | |
| 116 for inst in self.inst: | |
| 117 inst.callLater(callable) | |
| 118 | |
| 119 def __init__(self, *stages): | |
| 120 Stage.__init__(self) | |
| 121 self._stages = [] | |
| 122 for stage in stages: | |
| 123 self._stages.append(wrap(stage)) | |
| 124 | |
| 125 def _yield(self): | |
| 126 if self.results or self.stop or self.failure: | |
| 127 return | |
| 128 stages = self._stages | |
| 129 later = [] | |
| 130 exit = None | |
| 131 while stages: | |
| 132 if stages[0] is exit: | |
| 133 if self.results: | |
| 134 return | |
| 135 break | |
| 136 curr = stages.pop(0) | |
| 137 instruction = curr._yield() | |
| 138 if curr.results: | |
| 139 self.results.append(curr) | |
| 140 if curr.failure: | |
| 141 self.failure = curr.failure | |
| 142 return | |
| 143 if curr.stop: | |
| 144 exit = None | |
| 145 if self.results: | |
| 146 return | |
| 147 continue | |
| 148 stages.append(curr) | |
| 149 if not exit: | |
| 150 exit = curr | |
| 151 if instruction: | |
| 152 if isinstance(instruction, CallLater): | |
| 153 if instruction not in later: | |
| 154 later.append(instruction) | |
| 155 continue | |
| 156 raise Unsupported(instruction) | |
| 157 if later: | |
| 158 return Concurrent.Instruction(later) | |
| 159 self.stop = True | |
| 160 | |
| 161 class Merge(Stage): | |
| 162 """ | |
| 163 Merges two or more Stages results into a single stream | |
| 164 | |
| 165 For example:: | |
| 166 | |
| 167 source = flow.Zip([1,flow.Cooperate(),2,3],["one","two"]) | |
| 168 printFlow(source) | |
| 169 """ | |
| 170 def __init__(self, *stages): | |
| 171 Stage.__init__(self) | |
| 172 self.concurrent = Concurrent(*stages) | |
| 173 | |
| 174 def _yield(self): | |
| 175 if self.results or self.stop or self.failure: | |
| 176 return | |
| 177 instruction = self.concurrent._yield() | |
| 178 if instruction: | |
| 179 return instruction | |
| 180 for stage in self.concurrent.results: | |
| 181 self.results.extend(stage.results) | |
| 182 stage.results = [] | |
| 183 self.concurrent.results = [] | |
| 184 if self.concurrent.stop: | |
| 185 self.stop = True | |
| 186 self.failure = self.concurrent.failure | |
| 187 | |
| 188 class Callback(Stage): | |
| 189 """ | |
| 190 Converts a single-thread push interface into a pull interface. | |
| 191 | |
| 192 Once this stage is constructed, its result, errback, and finish member | |
| 193 variables may be called by a producer. The results of which can be | |
| 194 obtained by yielding the Callback and then calling next(). | |
| 195 | |
| 196 For example:: | |
| 197 | |
| 198 source = flow.Callback() | |
| 199 reactor.callLater(0, lambda: source.result("one")) | |
| 200 reactor.callLater(.5, lambda: source.result("two")) | |
| 201 reactor.callLater(1, lambda: source.finish()) | |
| 202 printFlow(source) | |
| 203 | |
| 204 """ | |
| 205 # TODO: Potentially rename this 'Consumer' and make it | |
| 206 # comply with protocols.IConsumer | |
| 207 # TODO: Make the inverse stage, which is an IProducer | |
| 208 class Instruction(CallLater): | |
| 209 def __init__(self): | |
| 210 self.flow = lambda: True | |
| 211 def callLater(self, callable): | |
| 212 self.flow = callable | |
| 213 def __init__(self, *trap): | |
| 214 Stage.__init__(self, *trap) | |
| 215 self._finished = False | |
| 216 self._cooperate = Callback.Instruction() | |
| 217 def result(self,result): | |
| 218 """ called by the producer to indicate a successful result """ | |
| 219 self.results.append(result) | |
| 220 self._cooperate.flow() | |
| 221 def finish(self): | |
| 222 """ called by producer to indicate successful stream completion """ | |
| 223 assert not self.failure, "failed streams should not be finished" | |
| 224 self._finished = True | |
| 225 self._cooperate.flow() | |
| 226 def errback(self, fail): | |
| 227 """ called by the producer in case of Failure """ | |
| 228 self.failure = fail | |
| 229 self._cooperate.flow() | |
| 230 def _yield(self): | |
| 231 if self.results or self.stop or self.failure: | |
| 232 return | |
| 233 if not self.results: | |
| 234 if self._finished: | |
| 235 self.stop = True | |
| 236 return | |
| 237 return self._cooperate | |
| 238 __call__ = result | |
| 239 | |
| OLD | NEW |