| Index: third_party/twisted_8_1/twisted/flow/threads.py
|
| diff --git a/third_party/twisted_8_1/twisted/flow/threads.py b/third_party/twisted_8_1/twisted/flow/threads.py
|
| deleted file mode 100644
|
| index b6a74fec843b004078fd7cf4fa70bee31d9cb2ed..0000000000000000000000000000000000000000
|
| --- a/third_party/twisted_8_1/twisted/flow/threads.py
|
| +++ /dev/null
|
| @@ -1,210 +0,0 @@
|
| -# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
|
| -# See LICENSE for details.
|
| -
|
| -#
|
| -# Author: Clark Evans (cce@clarkevans.com)
|
| -# Stability: The API is stable, but the implementation may still
|
| -# have one or more bugs; threads are tough.
|
| -#
|
| -
|
| -""" flow.thread
|
| -
|
| - Support for threads within a flow
|
| -"""
|
| -
|
| -from __future__ import nested_scopes
|
| -
|
| -from base import *
|
| -from twisted.python.failure import Failure
|
| -from twisted.internet import reactor
|
| -from time import sleep
|
| -
|
| -class Threaded(Stage):
|
| - """
|
| - A stage which runs a blocking iterable in a separate thread
|
| -
|
| - This stage tunnels output from an iterable executed in a separate thread to
|
| - the main thread. This process is carried out by a result buffer, and
|
| - returning Cooperate if the buffer is empty. The wrapped iterable's
|
| - __iter__ and next() methods will only be invoked in the spawned thread.
|
| -
|
| - This can be used in one of two ways, first, it can be extended via
|
| - inheritance; with the functionality of the inherited code implementing
|
| - next(), and using init() for initialization code to be run in the thread.
|
| -
|
| - If the iterable happens to have a chunked attribute, and that attribute is
|
| - true, then this wrapper will assume that data arrives in chunks via a
|
| - sequence instead of by values.
|
| -
|
| - For example::
|
| -
|
| - from __future__ import generators
|
| - from twisted.internet import reactor, defer
|
| - from twisted.flow import flow
|
| - from twisted.flow.threads import Threaded
|
| -
|
| - def countSleep(index):
|
| - from time import sleep
|
| - for index in range(index):
|
| - sleep(.3)
|
| - print "sleep", index
|
| - yield index
|
| -
|
| - def countCooperate(index):
|
| - for index in range(index):
|
| - yield flow.Cooperate(.1)
|
| - print "cooperate", index
|
| - yield "coop %s" % index
|
| -
|
| - d = flow.Deferred( flow.Merge(
|
| - Threaded(countSleep(5)),
|
| - countCooperate(5)))
|
| -
|
| - def prn(x):
|
| - print x
|
| - reactor.stop()
|
| - d.addCallback(prn)
|
| - reactor.run()
|
| - """
|
| - class Instruction(CallLater):
|
| - def __init__(self):
|
| - self.callable = None
|
| - self.immediate = False
|
| - def callLater(self, callable):
|
| - if self.immediate:
|
| - reactor.callLater(0,callable)
|
| - else:
|
| - self.callable = callable
|
| - def __call__(self):
|
| - callable = self.callable
|
| - if callable:
|
| - self.callable = None
|
| - callable()
|
| -
|
| - def __init__(self, iterable, *trap):
|
| - Stage.__init__(self, trap)
|
| - self._iterable = iterable
|
| - self._cooperate = Threaded.Instruction()
|
| - self.srcchunked = getattr(iterable, 'chunked', False)
|
| - reactor.callInThread(self._process)
|
| -
|
| - def _process_result(self, val):
|
| - if self.srcchunked:
|
| - self.results.extend(val)
|
| - else:
|
| - self.results.append(val)
|
| - self._cooperate()
|
| -
|
| - def _stopping(self):
|
| - self.stop = True
|
| - self._cooperate()
|
| -
|
| - def _process(self):
|
| - try:
|
| - self._iterable = iter(self._iterable)
|
| - except:
|
| - self.failure = Failure()
|
| - else:
|
| - try:
|
| - while True:
|
| - val = self._iterable.next()
|
| - reactor.callFromThread(self._process_result, val)
|
| - except StopIteration:
|
| - reactor.callFromThread(self._stopping)
|
| - except:
|
| - self.failure = Failure()
|
| - reactor.callFromThread(self._cooperate)
|
| - self._cooperate.immediate = True
|
| -
|
| - def _yield(self):
|
| - if self.results or self.stop or self.failure:
|
| - return
|
| - return self._cooperate
|
| -
|
| -
|
| -class QueryIterator:
|
| - """
|
| - Converts a database query into a result iterator
|
| -
|
| - Example usage::
|
| -
|
| - from __future__ import generators
|
| - from twisted.enterprise import adbapi
|
| - from twisted.internet import reactor
|
| - from twisted.flow import flow
|
| - from twisted.flow.threads import QueryIterator, Threaded
|
| -
|
| - dbpool = adbapi.ConnectionPool("SomeDriver",host='localhost',
|
| - db='Database',user='User',passwd='Password')
|
| -
|
| - # # I test with...
|
| - # from pyPgSQL import PgSQL
|
| - # dbpool = PgSQL
|
| -
|
| - sql = '''
|
| - (SELECT 'one')
|
| - UNION ALL
|
| - (SELECT 'two')
|
| - UNION ALL
|
| - (SELECT 'three')
|
| - '''
|
| - def consumer():
|
| - print "executing"
|
| - query = Threaded(QueryIterator(dbpool, sql))
|
| - print "yielding"
|
| - yield query
|
| - print "done yeilding"
|
| - for row in query:
|
| - print "Processed result : ", row
|
| - yield query
|
| -
|
| - from twisted.internet import reactor
|
| - def finish(result):
|
| - print "Deferred Complete : ", result
|
| - reactor.stop()
|
| - f = flow.Deferred(consumer())
|
| - f.addBoth(finish)
|
| - reactor.run()
|
| - """
|
| -
|
| - def __init__(self, pool, sql, fetchmany=False, fetchall=False):
|
| - self.curs = None
|
| - self.sql = sql
|
| - self.pool = pool
|
| - if fetchmany:
|
| - self.next = self.next_fetchmany
|
| - self.chunked = True
|
| - if fetchall:
|
| - self.next = self.next_fetchall
|
| - self.chunked = True
|
| -
|
| - def __iter__(self):
|
| - self.conn = self.pool.connect()
|
| - self.curs = self.conn.cursor()
|
| - self.curs.execute(self.sql)
|
| - return self
|
| -
|
| - def next_fetchall(self):
|
| - if self.curs:
|
| - ret = self.curs.fetchall()
|
| - self.curs = None
|
| - self.conn = None
|
| - return ret
|
| - raise StopIteration
|
| -
|
| - def next_fetchmany(self):
|
| - ret = self.curs.fetchmany()
|
| - if not ret:
|
| - self.curs = None
|
| - self.conn = None
|
| - raise StopIteration
|
| - return ret
|
| -
|
| - def next(self):
|
| - ret = self.curs.fetchone()
|
| - if not ret:
|
| - self.curs = None
|
| - self.conn = None
|
| - raise StopIteration
|
| - return ret
|
| -
|
|
|