Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(218)

Side by Side Diff: third_party/twisted_8_1/twisted/flow/stage.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 #
5 # Author: Clark Evans (cce@clarkevans.com)
6 #
7
8 """
9 flow.stage
10
11 Various stages for manipulating data flows, in particular, those stages which
12 take more than one input stages or alternative input, such as a callback.
13 """
14
15 from base import *
16 from wrap import wrap
17 from twisted.python.failure import Failure
18
19 class Map(Stage):
20 """
21 flow equivalent to map: Map(function, stage, ... )
22
23 Apply a function to every item yielded and yield the results. If
24 additional stages are passed, the function must take that many arguments
25 and is applied to the items of all lists in parallel. If a list is shorter
26 than another, it is assumed to be extended with None items. If the
27 function is None, the identity function is assumed; if there are multiple
28 list arguments, Map stage returns a sequence consisting of tuples
29 containing the corresponding items from all lists.
30
31 For example::
32
33 def fn(val):
34 return val + 10
35
36 source = flow.Map(fn,range(4))
37 printFlow(source)
38 """
39 def __init__(self, func, stage, *stages):
40 Stage.__init__(self)
41 self.func = func
42 self._stage = [wrap(stage)]
43 for stage in stages:
44 self._stage.append(wrap(stage))
45 self._index = 0
46
47 def _yield(self):
48 if self.results or self.stop or self.failure:
49 return
50 if not self._index:
51 self._curr = []
52 self._done = True
53 while self._index < len(self._stage):
54 idx = self._index
55 curr = self._stage[idx]
56 instruction = curr._yield()
57 if instruction:
58 return instruction
59 if curr.results:
60 self._curr.append(curr.results.pop(0))
61 self._index += 1
62 self._done = False
63 continue
64 if curr.stop:
65 self._curr.append(None)
66 self._index += 1
67 continue
68 if curr.failure:
69 self.failure = curr.failure
70 return
71 raise AssertionError("flow.Map ; no results, stop or failure?")
72 if self._done:
73 self.stop = 1
74 return
75 curr = tuple(self._curr)
76 if self.func:
77 try:
78 curr = self.func(*curr)
79 except Failure, fail:
80 self.failure = fail
81 return
82 except:
83 self.failure = Failure()
84 return
85 self.results.append(curr)
86 self._index = 0
87
88 class Zip(Map):
89 """
90 Zips two or more stages into a stream of N tuples
91
92 For example::
93
94 source = flow.Zip([1,flow.Cooperate(),2,3],["one","two"])
95 printFlow(source)
96
97 """
98 def __init__(self, *stages):
99 Map.__init__(self, None, stages[0], *stages[1:])
100
101 class Concurrent(Stage):
102 """
103 Executes stages concurrently
104
105 This stage allows two or more stages (branches) to be executed at the same
106 time. It returns each stage as it becomes available. This can be used if
107 you have N callbacks, and you want to yield and wait for the first
108 available one that produces results. Once a stage is retuned, its next()
109 method should be used to extract the value for the stage.
110 """
111
112 class Instruction(CallLater):
113 def __init__(self, inst):
114 self.inst = inst
115 def callLater(self, callable):
116 for inst in self.inst:
117 inst.callLater(callable)
118
119 def __init__(self, *stages):
120 Stage.__init__(self)
121 self._stages = []
122 for stage in stages:
123 self._stages.append(wrap(stage))
124
125 def _yield(self):
126 if self.results or self.stop or self.failure:
127 return
128 stages = self._stages
129 later = []
130 exit = None
131 while stages:
132 if stages[0] is exit:
133 if self.results:
134 return
135 break
136 curr = stages.pop(0)
137 instruction = curr._yield()
138 if curr.results:
139 self.results.append(curr)
140 if curr.failure:
141 self.failure = curr.failure
142 return
143 if curr.stop:
144 exit = None
145 if self.results:
146 return
147 continue
148 stages.append(curr)
149 if not exit:
150 exit = curr
151 if instruction:
152 if isinstance(instruction, CallLater):
153 if instruction not in later:
154 later.append(instruction)
155 continue
156 raise Unsupported(instruction)
157 if later:
158 return Concurrent.Instruction(later)
159 self.stop = True
160
161 class Merge(Stage):
162 """
163 Merges two or more Stages results into a single stream
164
165 For example::
166
167 source = flow.Zip([1,flow.Cooperate(),2,3],["one","two"])
168 printFlow(source)
169 """
170 def __init__(self, *stages):
171 Stage.__init__(self)
172 self.concurrent = Concurrent(*stages)
173
174 def _yield(self):
175 if self.results or self.stop or self.failure:
176 return
177 instruction = self.concurrent._yield()
178 if instruction:
179 return instruction
180 for stage in self.concurrent.results:
181 self.results.extend(stage.results)
182 stage.results = []
183 self.concurrent.results = []
184 if self.concurrent.stop:
185 self.stop = True
186 self.failure = self.concurrent.failure
187
188 class Callback(Stage):
189 """
190 Converts a single-thread push interface into a pull interface.
191
192 Once this stage is constructed, its result, errback, and finish member
193 variables may be called by a producer. The results of which can be
194 obtained by yielding the Callback and then calling next().
195
196 For example::
197
198 source = flow.Callback()
199 reactor.callLater(0, lambda: source.result("one"))
200 reactor.callLater(.5, lambda: source.result("two"))
201 reactor.callLater(1, lambda: source.finish())
202 printFlow(source)
203
204 """
205 # TODO: Potentially rename this 'Consumer' and make it
206 # comply with protocols.IConsumer
207 # TODO: Make the inverse stage, which is an IProducer
208 class Instruction(CallLater):
209 def __init__(self):
210 self.flow = lambda: True
211 def callLater(self, callable):
212 self.flow = callable
213 def __init__(self, *trap):
214 Stage.__init__(self, *trap)
215 self._finished = False
216 self._cooperate = Callback.Instruction()
217 def result(self,result):
218 """ called by the producer to indicate a successful result """
219 self.results.append(result)
220 self._cooperate.flow()
221 def finish(self):
222 """ called by producer to indicate successful stream completion """
223 assert not self.failure, "failed streams should not be finished"
224 self._finished = True
225 self._cooperate.flow()
226 def errback(self, fail):
227 """ called by the producer in case of Failure """
228 self.failure = fail
229 self._cooperate.flow()
230 def _yield(self):
231 if self.results or self.stop or self.failure:
232 return
233 if not self.results:
234 if self._finished:
235 self.stop = True
236 return
237 return self._cooperate
238 __call__ = result
239
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/flow/protocol.py ('k') | third_party/twisted_8_1/twisted/flow/test/__init__.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698