Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Side by Side Diff: third_party/twisted_8_1/twisted/flow/threads.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 #
5 # Author: Clark Evans (cce@clarkevans.com)
6 # Stability: The API is stable, but the implementation may still
7 # have one or more bugs; threads are tough.
8 #
9
10 """ flow.thread
11
12 Support for threads within a flow
13 """
14
15 from __future__ import nested_scopes
16
17 from base import *
18 from twisted.python.failure import Failure
19 from twisted.internet import reactor
20 from time import sleep
21
22 class Threaded(Stage):
23 """
24 A stage which runs a blocking iterable in a separate thread
25
26 This stage tunnels output from an iterable executed in a separate thread to
27 the main thread. This process is carried out by a result buffer, and
28 returning Cooperate if the buffer is empty. The wrapped iterable's
29 __iter__ and next() methods will only be invoked in the spawned thread.
30
31 This can be used in one of two ways, first, it can be extended via
32 inheritance; with the functionality of the inherited code implementing
33 next(), and using init() for initialization code to be run in the thread.
34
35 If the iterable happens to have a chunked attribute, and that attribute is
36 true, then this wrapper will assume that data arrives in chunks via a
37 sequence instead of by values.
38
39 For example::
40
41 from __future__ import generators
42 from twisted.internet import reactor, defer
43 from twisted.flow import flow
44 from twisted.flow.threads import Threaded
45
46 def countSleep(index):
47 from time import sleep
48 for index in range(index):
49 sleep(.3)
50 print "sleep", index
51 yield index
52
53 def countCooperate(index):
54 for index in range(index):
55 yield flow.Cooperate(.1)
56 print "cooperate", index
57 yield "coop %s" % index
58
59 d = flow.Deferred( flow.Merge(
60 Threaded(countSleep(5)),
61 countCooperate(5)))
62
63 def prn(x):
64 print x
65 reactor.stop()
66 d.addCallback(prn)
67 reactor.run()
68 """
69 class Instruction(CallLater):
70 def __init__(self):
71 self.callable = None
72 self.immediate = False
73 def callLater(self, callable):
74 if self.immediate:
75 reactor.callLater(0,callable)
76 else:
77 self.callable = callable
78 def __call__(self):
79 callable = self.callable
80 if callable:
81 self.callable = None
82 callable()
83
84 def __init__(self, iterable, *trap):
85 Stage.__init__(self, trap)
86 self._iterable = iterable
87 self._cooperate = Threaded.Instruction()
88 self.srcchunked = getattr(iterable, 'chunked', False)
89 reactor.callInThread(self._process)
90
91 def _process_result(self, val):
92 if self.srcchunked:
93 self.results.extend(val)
94 else:
95 self.results.append(val)
96 self._cooperate()
97
98 def _stopping(self):
99 self.stop = True
100 self._cooperate()
101
102 def _process(self):
103 try:
104 self._iterable = iter(self._iterable)
105 except:
106 self.failure = Failure()
107 else:
108 try:
109 while True:
110 val = self._iterable.next()
111 reactor.callFromThread(self._process_result, val)
112 except StopIteration:
113 reactor.callFromThread(self._stopping)
114 except:
115 self.failure = Failure()
116 reactor.callFromThread(self._cooperate)
117 self._cooperate.immediate = True
118
119 def _yield(self):
120 if self.results or self.stop or self.failure:
121 return
122 return self._cooperate
123
124
125 class QueryIterator:
126 """
127 Converts a database query into a result iterator
128
129 Example usage::
130
131 from __future__ import generators
132 from twisted.enterprise import adbapi
133 from twisted.internet import reactor
134 from twisted.flow import flow
135 from twisted.flow.threads import QueryIterator, Threaded
136
137 dbpool = adbapi.ConnectionPool("SomeDriver",host='localhost',
138 db='Database',user='User',passwd='Password')
139
140 # # I test with...
141 # from pyPgSQL import PgSQL
142 # dbpool = PgSQL
143
144 sql = '''
145 (SELECT 'one')
146 UNION ALL
147 (SELECT 'two')
148 UNION ALL
149 (SELECT 'three')
150 '''
151 def consumer():
152 print "executing"
153 query = Threaded(QueryIterator(dbpool, sql))
154 print "yielding"
155 yield query
156 print "done yeilding"
157 for row in query:
158 print "Processed result : ", row
159 yield query
160
161 from twisted.internet import reactor
162 def finish(result):
163 print "Deferred Complete : ", result
164 reactor.stop()
165 f = flow.Deferred(consumer())
166 f.addBoth(finish)
167 reactor.run()
168 """
169
170 def __init__(self, pool, sql, fetchmany=False, fetchall=False):
171 self.curs = None
172 self.sql = sql
173 self.pool = pool
174 if fetchmany:
175 self.next = self.next_fetchmany
176 self.chunked = True
177 if fetchall:
178 self.next = self.next_fetchall
179 self.chunked = True
180
181 def __iter__(self):
182 self.conn = self.pool.connect()
183 self.curs = self.conn.cursor()
184 self.curs.execute(self.sql)
185 return self
186
187 def next_fetchall(self):
188 if self.curs:
189 ret = self.curs.fetchall()
190 self.curs = None
191 self.conn = None
192 return ret
193 raise StopIteration
194
195 def next_fetchmany(self):
196 ret = self.curs.fetchmany()
197 if not ret:
198 self.curs = None
199 self.conn = None
200 raise StopIteration
201 return ret
202
203 def next(self):
204 ret = self.curs.fetchone()
205 if not ret:
206 self.curs = None
207 self.conn = None
208 raise StopIteration
209 return ret
210
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/flow/test/test_flow.py ('k') | third_party/twisted_8_1/twisted/flow/topfiles/README » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698