OLD | NEW |
| (Empty) |
1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 # | |
5 # Author: Clark Evans (cce@clarkevans.com) | |
6 # Stability: The API is stable, but the implementation may still | |
7 # have one or more bugs; threads are tough. | |
8 # | |
9 | |
10 """ flow.thread | |
11 | |
12 Support for threads within a flow | |
13 """ | |
14 | |
15 from __future__ import nested_scopes | |
16 | |
17 from base import * | |
18 from twisted.python.failure import Failure | |
19 from twisted.internet import reactor | |
20 from time import sleep | |
21 | |
22 class Threaded(Stage): | |
23 """ | |
24 A stage which runs a blocking iterable in a separate thread | |
25 | |
26 This stage tunnels output from an iterable executed in a separate thread to | |
27 the main thread. This process is carried out by a result buffer, and | |
28 returning Cooperate if the buffer is empty. The wrapped iterable's | |
29 __iter__ and next() methods will only be invoked in the spawned thread. | |
30 | |
31 This can be used in one of two ways, first, it can be extended via | |
32 inheritance; with the functionality of the inherited code implementing | |
33 next(), and using init() for initialization code to be run in the thread. | |
34 | |
35 If the iterable happens to have a chunked attribute, and that attribute is | |
36 true, then this wrapper will assume that data arrives in chunks via a | |
37 sequence instead of by values. | |
38 | |
39 For example:: | |
40 | |
41 from __future__ import generators | |
42 from twisted.internet import reactor, defer | |
43 from twisted.flow import flow | |
44 from twisted.flow.threads import Threaded | |
45 | |
46 def countSleep(index): | |
47 from time import sleep | |
48 for index in range(index): | |
49 sleep(.3) | |
50 print "sleep", index | |
51 yield index | |
52 | |
53 def countCooperate(index): | |
54 for index in range(index): | |
55 yield flow.Cooperate(.1) | |
56 print "cooperate", index | |
57 yield "coop %s" % index | |
58 | |
59 d = flow.Deferred( flow.Merge( | |
60 Threaded(countSleep(5)), | |
61 countCooperate(5))) | |
62 | |
63 def prn(x): | |
64 print x | |
65 reactor.stop() | |
66 d.addCallback(prn) | |
67 reactor.run() | |
68 """ | |
69 class Instruction(CallLater): | |
70 def __init__(self): | |
71 self.callable = None | |
72 self.immediate = False | |
73 def callLater(self, callable): | |
74 if self.immediate: | |
75 reactor.callLater(0,callable) | |
76 else: | |
77 self.callable = callable | |
78 def __call__(self): | |
79 callable = self.callable | |
80 if callable: | |
81 self.callable = None | |
82 callable() | |
83 | |
84 def __init__(self, iterable, *trap): | |
85 Stage.__init__(self, trap) | |
86 self._iterable = iterable | |
87 self._cooperate = Threaded.Instruction() | |
88 self.srcchunked = getattr(iterable, 'chunked', False) | |
89 reactor.callInThread(self._process) | |
90 | |
91 def _process_result(self, val): | |
92 if self.srcchunked: | |
93 self.results.extend(val) | |
94 else: | |
95 self.results.append(val) | |
96 self._cooperate() | |
97 | |
98 def _stopping(self): | |
99 self.stop = True | |
100 self._cooperate() | |
101 | |
102 def _process(self): | |
103 try: | |
104 self._iterable = iter(self._iterable) | |
105 except: | |
106 self.failure = Failure() | |
107 else: | |
108 try: | |
109 while True: | |
110 val = self._iterable.next() | |
111 reactor.callFromThread(self._process_result, val) | |
112 except StopIteration: | |
113 reactor.callFromThread(self._stopping) | |
114 except: | |
115 self.failure = Failure() | |
116 reactor.callFromThread(self._cooperate) | |
117 self._cooperate.immediate = True | |
118 | |
119 def _yield(self): | |
120 if self.results or self.stop or self.failure: | |
121 return | |
122 return self._cooperate | |
123 | |
124 | |
125 class QueryIterator: | |
126 """ | |
127 Converts a database query into a result iterator | |
128 | |
129 Example usage:: | |
130 | |
131 from __future__ import generators | |
132 from twisted.enterprise import adbapi | |
133 from twisted.internet import reactor | |
134 from twisted.flow import flow | |
135 from twisted.flow.threads import QueryIterator, Threaded | |
136 | |
137 dbpool = adbapi.ConnectionPool("SomeDriver",host='localhost', | |
138 db='Database',user='User',passwd='Password') | |
139 | |
140 # # I test with... | |
141 # from pyPgSQL import PgSQL | |
142 # dbpool = PgSQL | |
143 | |
144 sql = ''' | |
145 (SELECT 'one') | |
146 UNION ALL | |
147 (SELECT 'two') | |
148 UNION ALL | |
149 (SELECT 'three') | |
150 ''' | |
151 def consumer(): | |
152 print "executing" | |
153 query = Threaded(QueryIterator(dbpool, sql)) | |
154 print "yielding" | |
155 yield query | |
156 print "done yeilding" | |
157 for row in query: | |
158 print "Processed result : ", row | |
159 yield query | |
160 | |
161 from twisted.internet import reactor | |
162 def finish(result): | |
163 print "Deferred Complete : ", result | |
164 reactor.stop() | |
165 f = flow.Deferred(consumer()) | |
166 f.addBoth(finish) | |
167 reactor.run() | |
168 """ | |
169 | |
170 def __init__(self, pool, sql, fetchmany=False, fetchall=False): | |
171 self.curs = None | |
172 self.sql = sql | |
173 self.pool = pool | |
174 if fetchmany: | |
175 self.next = self.next_fetchmany | |
176 self.chunked = True | |
177 if fetchall: | |
178 self.next = self.next_fetchall | |
179 self.chunked = True | |
180 | |
181 def __iter__(self): | |
182 self.conn = self.pool.connect() | |
183 self.curs = self.conn.cursor() | |
184 self.curs.execute(self.sql) | |
185 return self | |
186 | |
187 def next_fetchall(self): | |
188 if self.curs: | |
189 ret = self.curs.fetchall() | |
190 self.curs = None | |
191 self.conn = None | |
192 return ret | |
193 raise StopIteration | |
194 | |
195 def next_fetchmany(self): | |
196 ret = self.curs.fetchmany() | |
197 if not ret: | |
198 self.curs = None | |
199 self.conn = None | |
200 raise StopIteration | |
201 return ret | |
202 | |
203 def next(self): | |
204 ret = self.curs.fetchone() | |
205 if not ret: | |
206 self.curs = None | |
207 self.conn = None | |
208 raise StopIteration | |
209 return ret | |
210 | |
OLD | NEW |