| OLD | NEW |
| (Empty) |
| 1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
| 2 # See LICENSE for details. | |
| 3 | |
| 4 # | |
| 5 # Author: Clark Evans (cce@clarkevans.com) | |
| 6 # Stability: The API is stable, but the implementation may still | |
| 7 # have one or more bugs; threads are tough. | |
| 8 # | |
| 9 | |
| 10 """ flow.thread | |
| 11 | |
| 12 Support for threads within a flow | |
| 13 """ | |
| 14 | |
| 15 from __future__ import nested_scopes | |
| 16 | |
| 17 from base import * | |
| 18 from twisted.python.failure import Failure | |
| 19 from twisted.internet import reactor | |
| 20 from time import sleep | |
| 21 | |
| 22 class Threaded(Stage): | |
| 23 """ | |
| 24 A stage which runs a blocking iterable in a separate thread | |
| 25 | |
| 26 This stage tunnels output from an iterable executed in a separate thread to | |
| 27 the main thread. This process is carried out by a result buffer, and | |
| 28 returning Cooperate if the buffer is empty. The wrapped iterable's | |
| 29 __iter__ and next() methods will only be invoked in the spawned thread. | |
| 30 | |
| 31 This can be used in one of two ways, first, it can be extended via | |
| 32 inheritance; with the functionality of the inherited code implementing | |
| 33 next(), and using init() for initialization code to be run in the thread. | |
| 34 | |
| 35 If the iterable happens to have a chunked attribute, and that attribute is | |
| 36 true, then this wrapper will assume that data arrives in chunks via a | |
| 37 sequence instead of by values. | |
| 38 | |
| 39 For example:: | |
| 40 | |
| 41 from __future__ import generators | |
| 42 from twisted.internet import reactor, defer | |
| 43 from twisted.flow import flow | |
| 44 from twisted.flow.threads import Threaded | |
| 45 | |
| 46 def countSleep(index): | |
| 47 from time import sleep | |
| 48 for index in range(index): | |
| 49 sleep(.3) | |
| 50 print "sleep", index | |
| 51 yield index | |
| 52 | |
| 53 def countCooperate(index): | |
| 54 for index in range(index): | |
| 55 yield flow.Cooperate(.1) | |
| 56 print "cooperate", index | |
| 57 yield "coop %s" % index | |
| 58 | |
| 59 d = flow.Deferred( flow.Merge( | |
| 60 Threaded(countSleep(5)), | |
| 61 countCooperate(5))) | |
| 62 | |
| 63 def prn(x): | |
| 64 print x | |
| 65 reactor.stop() | |
| 66 d.addCallback(prn) | |
| 67 reactor.run() | |
| 68 """ | |
| 69 class Instruction(CallLater): | |
| 70 def __init__(self): | |
| 71 self.callable = None | |
| 72 self.immediate = False | |
| 73 def callLater(self, callable): | |
| 74 if self.immediate: | |
| 75 reactor.callLater(0,callable) | |
| 76 else: | |
| 77 self.callable = callable | |
| 78 def __call__(self): | |
| 79 callable = self.callable | |
| 80 if callable: | |
| 81 self.callable = None | |
| 82 callable() | |
| 83 | |
| 84 def __init__(self, iterable, *trap): | |
| 85 Stage.__init__(self, trap) | |
| 86 self._iterable = iterable | |
| 87 self._cooperate = Threaded.Instruction() | |
| 88 self.srcchunked = getattr(iterable, 'chunked', False) | |
| 89 reactor.callInThread(self._process) | |
| 90 | |
| 91 def _process_result(self, val): | |
| 92 if self.srcchunked: | |
| 93 self.results.extend(val) | |
| 94 else: | |
| 95 self.results.append(val) | |
| 96 self._cooperate() | |
| 97 | |
| 98 def _stopping(self): | |
| 99 self.stop = True | |
| 100 self._cooperate() | |
| 101 | |
| 102 def _process(self): | |
| 103 try: | |
| 104 self._iterable = iter(self._iterable) | |
| 105 except: | |
| 106 self.failure = Failure() | |
| 107 else: | |
| 108 try: | |
| 109 while True: | |
| 110 val = self._iterable.next() | |
| 111 reactor.callFromThread(self._process_result, val) | |
| 112 except StopIteration: | |
| 113 reactor.callFromThread(self._stopping) | |
| 114 except: | |
| 115 self.failure = Failure() | |
| 116 reactor.callFromThread(self._cooperate) | |
| 117 self._cooperate.immediate = True | |
| 118 | |
| 119 def _yield(self): | |
| 120 if self.results or self.stop or self.failure: | |
| 121 return | |
| 122 return self._cooperate | |
| 123 | |
| 124 | |
| 125 class QueryIterator: | |
| 126 """ | |
| 127 Converts a database query into a result iterator | |
| 128 | |
| 129 Example usage:: | |
| 130 | |
| 131 from __future__ import generators | |
| 132 from twisted.enterprise import adbapi | |
| 133 from twisted.internet import reactor | |
| 134 from twisted.flow import flow | |
| 135 from twisted.flow.threads import QueryIterator, Threaded | |
| 136 | |
| 137 dbpool = adbapi.ConnectionPool("SomeDriver",host='localhost', | |
| 138 db='Database',user='User',passwd='Password') | |
| 139 | |
| 140 # # I test with... | |
| 141 # from pyPgSQL import PgSQL | |
| 142 # dbpool = PgSQL | |
| 143 | |
| 144 sql = ''' | |
| 145 (SELECT 'one') | |
| 146 UNION ALL | |
| 147 (SELECT 'two') | |
| 148 UNION ALL | |
| 149 (SELECT 'three') | |
| 150 ''' | |
| 151 def consumer(): | |
| 152 print "executing" | |
| 153 query = Threaded(QueryIterator(dbpool, sql)) | |
| 154 print "yielding" | |
| 155 yield query | |
| 156 print "done yeilding" | |
| 157 for row in query: | |
| 158 print "Processed result : ", row | |
| 159 yield query | |
| 160 | |
| 161 from twisted.internet import reactor | |
| 162 def finish(result): | |
| 163 print "Deferred Complete : ", result | |
| 164 reactor.stop() | |
| 165 f = flow.Deferred(consumer()) | |
| 166 f.addBoth(finish) | |
| 167 reactor.run() | |
| 168 """ | |
| 169 | |
| 170 def __init__(self, pool, sql, fetchmany=False, fetchall=False): | |
| 171 self.curs = None | |
| 172 self.sql = sql | |
| 173 self.pool = pool | |
| 174 if fetchmany: | |
| 175 self.next = self.next_fetchmany | |
| 176 self.chunked = True | |
| 177 if fetchall: | |
| 178 self.next = self.next_fetchall | |
| 179 self.chunked = True | |
| 180 | |
| 181 def __iter__(self): | |
| 182 self.conn = self.pool.connect() | |
| 183 self.curs = self.conn.cursor() | |
| 184 self.curs.execute(self.sql) | |
| 185 return self | |
| 186 | |
| 187 def next_fetchall(self): | |
| 188 if self.curs: | |
| 189 ret = self.curs.fetchall() | |
| 190 self.curs = None | |
| 191 self.conn = None | |
| 192 return ret | |
| 193 raise StopIteration | |
| 194 | |
| 195 def next_fetchmany(self): | |
| 196 ret = self.curs.fetchmany() | |
| 197 if not ret: | |
| 198 self.curs = None | |
| 199 self.conn = None | |
| 200 raise StopIteration | |
| 201 return ret | |
| 202 | |
| 203 def next(self): | |
| 204 ret = self.curs.fetchone() | |
| 205 if not ret: | |
| 206 self.curs = None | |
| 207 self.conn = None | |
| 208 raise StopIteration | |
| 209 return ret | |
| 210 | |
| OLD | NEW |