Index: third_party/twisted_8_1/twisted/internet/_threadedselect.py |
diff --git a/third_party/twisted_8_1/twisted/internet/_threadedselect.py b/third_party/twisted_8_1/twisted/internet/_threadedselect.py |
deleted file mode 100644 |
index 9dc90a0f8eba5c70cef446f5ff7dd0b5ca7d0125..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/internet/_threadedselect.py |
+++ /dev/null |
@@ -1,362 +0,0 @@ |
-# -*- test-case-name: twisted.test.test_internet -*- |
-# $Id: default.py,v 1.90 2004/01/06 22:35:22 warner Exp $ |
-# |
-# Copyright (c) 2001-2004 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
-from __future__ import generators |
- |
-""" |
-Threaded select reactor |
- |
-Maintainer: U{Bob Ippolito<mailto:bob@redivi.com>} |
- |
- |
-The threadedselectreactor is a specialized reactor for integrating with |
-arbitrary foreign event loop, such as those you find in GUI toolkits. |
- |
-There are three things you'll need to do to use this reactor. |
- |
-Install the reactor at the beginning of your program, before importing |
-the rest of Twisted:: |
- |
- | from twisted.internet import _threadedselect |
- | _threadedselect.install() |
- |
-Interleave this reactor with your foreign event loop, at some point after |
-your event loop is initialized:: |
- |
- | from twisted.internet import reactor |
- | reactor.interleave(foreignEventLoopWakerFunction) |
- | self.addSystemEventTrigger('after', 'shutdown', foreignEventLoopStop) |
- |
-Instead of shutting down the foreign event loop directly, shut down the |
-reactor:: |
- |
- | from twisted.internet import reactor |
- | reactor.stop() |
- |
-In order for Twisted to do its work in the main thread (the thread that |
-interleave is called from), a waker function is necessary. The waker function |
-will be called from a "background" thread with one argument: func. |
-The waker function's purpose is to call func() from the main thread. |
-Many GUI toolkits ship with appropriate waker functions. |
-Some examples of this are wxPython's wx.callAfter (may be wxCallAfter in |
-older versions of wxPython) or PyObjC's PyObjCTools.AppHelper.callAfter. |
-These would be used in place of "foreignEventLoopWakerFunction" in the above |
-example. |
- |
-The other integration point at which the foreign event loop and this reactor |
-must integrate is shutdown. In order to ensure clean shutdown of Twisted, |
-you must allow for Twisted to come to a complete stop before quitting the |
-application. Typically, you will do this by setting up an after shutdown |
-trigger to stop your foreign event loop, and call reactor.stop() where you |
-would normally have initiated the shutdown procedure for the foreign event |
-loop. Shutdown functions that could be used in place of |
-"foreignEventloopStop" would be the ExitMainLoop method of the wxApp instance |
-with wxPython, or the PyObjCTools.AppHelper.stopEventLoop function. |
-""" |
- |
-from threading import Thread |
-from Queue import Queue, Empty |
-from time import sleep |
-import sys |
- |
-from zope.interface import implements |
- |
-from twisted.internet.interfaces import IReactorFDSet |
-from twisted.internet import error |
-from twisted.internet import posixbase |
-from twisted.python import log, failure, threadable |
-from twisted.persisted import styles |
-from twisted.python.runtime import platformType |
- |
-import select |
-from errno import EINTR, EBADF |
- |
-from twisted.internet.selectreactor import _select |
- |
-# Exceptions that doSelect might return frequently |
-_NO_FILENO = error.ConnectionFdescWentAway('Handler has no fileno method') |
-_NO_FILEDESC = error.ConnectionFdescWentAway('Filedescriptor went away') |
- |
-def dictRemove(dct, value): |
- try: |
- del dct[value] |
- except KeyError: |
- pass |
- |
-def raiseException(e): |
- raise e |
- |
-class ThreadedSelectReactor(posixbase.PosixReactorBase): |
- """A threaded select() based reactor - runs on all POSIX platforms and on |
- Win32. |
- """ |
- implements(IReactorFDSet) |
- |
- def __init__(self): |
- threadable.init(1) |
- self.reads = {} |
- self.writes = {} |
- self.toThreadQueue = Queue() |
- self.toMainThread = Queue() |
- self.workerThread = None |
- self.mainWaker = None |
- posixbase.PosixReactorBase.__init__(self) |
- self.addSystemEventTrigger('after', 'shutdown', self._mainLoopShutdown) |
- |
- def wakeUp(self): |
- # we want to wake up from any thread |
- self.waker.wakeUp() |
- |
- def callLater(self, *args, **kw): |
- tple = posixbase.PosixReactorBase.callLater(self, *args, **kw) |
- self.wakeUp() |
- return tple |
- |
- def _sendToMain(self, msg, *args): |
- #print >>sys.stderr, 'sendToMain', msg, args |
- self.toMainThread.put((msg, args)) |
- if self.mainWaker is not None: |
- self.mainWaker() |
- |
- def _sendToThread(self, fn, *args): |
- #print >>sys.stderr, 'sendToThread', fn, args |
- self.toThreadQueue.put((fn, args)) |
- |
- def _preenDescriptorsInThread(self): |
- log.msg("Malformed file descriptor found. Preening lists.") |
- readers = self.reads.keys() |
- writers = self.writes.keys() |
- self.reads.clear() |
- self.writes.clear() |
- for selDict, selList in ((self.reads, readers), (self.writes, writers)): |
- for selectable in selList: |
- try: |
- select.select([selectable], [selectable], [selectable], 0) |
- except: |
- log.msg("bad descriptor %s" % selectable) |
- else: |
- selDict[selectable] = 1 |
- |
- def _workerInThread(self): |
- try: |
- while 1: |
- fn, args = self.toThreadQueue.get() |
- #print >>sys.stderr, "worker got", fn, args |
- fn(*args) |
- except SystemExit: |
- pass # exception indicates this thread should exit |
- except: |
- f = failure.Failure() |
- self._sendToMain('Failure', f) |
- #print >>sys.stderr, "worker finished" |
- |
- def _doSelectInThread(self, timeout): |
- """Run one iteration of the I/O monitor loop. |
- |
- This will run all selectables who had input or output readiness |
- waiting for them. |
- """ |
- reads = self.reads |
- writes = self.writes |
- while 1: |
- try: |
- r, w, ignored = _select(reads.keys(), |
- writes.keys(), |
- [], timeout) |
- break |
- except ValueError, ve: |
- # Possibly a file descriptor has gone negative? |
- log.err() |
- self._preenDescriptorsInThread() |
- except TypeError, te: |
- # Something *totally* invalid (object w/o fileno, non-integral |
- # result) was passed |
- log.err() |
- self._preenDescriptorsInThread() |
- except (select.error, IOError), se: |
- # select(2) encountered an error |
- if se.args[0] in (0, 2): |
- # windows does this if it got an empty list |
- if (not reads) and (not writes): |
- return |
- else: |
- raise |
- elif se.args[0] == EINTR: |
- return |
- elif se.args[0] == EBADF: |
- self._preenDescriptorsInThread() |
- else: |
- # OK, I really don't know what's going on. Blow up. |
- raise |
- self._sendToMain('Notify', r, w) |
- |
- def _process_Notify(self, r, w): |
- #print >>sys.stderr, "_process_Notify" |
- reads = self.reads |
- writes = self.writes |
- |
- _drdw = self._doReadOrWrite |
- _logrun = log.callWithLogger |
- for selectables, method, dct in ((r, "doRead", reads), (w, "doWrite", writes)): |
- for selectable in selectables: |
- # if this was disconnected in another thread, kill it. |
- if selectable not in dct: |
- continue |
- # This for pausing input when we're not ready for more. |
- _logrun(selectable, _drdw, selectable, method, dct) |
- #print >>sys.stderr, "done _process_Notify" |
- |
- def _process_Failure(self, f): |
- f.raiseException() |
- |
- _doIterationInThread = _doSelectInThread |
- |
- def ensureWorkerThread(self): |
- if self.workerThread is None or not self.workerThread.isAlive(): |
- self.workerThread = Thread(target=self._workerInThread) |
- self.workerThread.start() |
- |
- def doThreadIteration(self, timeout): |
- self._sendToThread(self._doIterationInThread, timeout) |
- self.ensureWorkerThread() |
- #print >>sys.stderr, 'getting...' |
- msg, args = self.toMainThread.get() |
- #print >>sys.stderr, 'got', msg, args |
- getattr(self, '_process_' + msg)(*args) |
- |
- doIteration = doThreadIteration |
- |
- def _interleave(self): |
- while self.running: |
- #print >>sys.stderr, "runUntilCurrent" |
- self.runUntilCurrent() |
- t2 = self.timeout() |
- t = self.running and t2 |
- self._sendToThread(self._doIterationInThread, t) |
- #print >>sys.stderr, "yielding" |
- yield None |
- #print >>sys.stderr, "fetching" |
- msg, args = self.toMainThread.get_nowait() |
- getattr(self, '_process_' + msg)(*args) |
- |
- def interleave(self, waker, *args, **kw): |
- """ |
- interleave(waker) interleaves this reactor with the |
- current application by moving the blocking parts of |
- the reactor (select() in this case) to a separate |
- thread. This is typically useful for integration with |
- GUI applications which have their own event loop |
- already running. |
- |
- See the module docstring for more information. |
- """ |
- self.startRunning(*args, **kw) |
- loop = self._interleave() |
- def mainWaker(waker=waker, loop=loop): |
- #print >>sys.stderr, "mainWaker()" |
- waker(loop.next) |
- self.mainWaker = mainWaker |
- loop.next() |
- self.ensureWorkerThread() |
- |
- def _mainLoopShutdown(self): |
- self.mainWaker = None |
- if self.workerThread is not None: |
- #print >>sys.stderr, 'getting...' |
- self._sendToThread(raiseException, SystemExit) |
- self.wakeUp() |
- try: |
- while 1: |
- msg, args = self.toMainThread.get_nowait() |
- #print >>sys.stderr, "ignored:", (msg, args) |
- except Empty: |
- pass |
- self.workerThread.join() |
- self.workerThread = None |
- try: |
- while 1: |
- fn, args = self.toThreadQueue.get_nowait() |
- if fn is self._doIterationInThread: |
- log.msg('Iteration is still in the thread queue!') |
- elif fn is raiseException and args[0] is SystemExit: |
- pass |
- else: |
- fn(*args) |
- except Empty: |
- pass |
- |
- def _doReadOrWrite(self, selectable, method, dict): |
- try: |
- why = getattr(selectable, method)() |
- handfn = getattr(selectable, 'fileno', None) |
- if not handfn: |
- why = _NO_FILENO |
- elif handfn() == -1: |
- why = _NO_FILEDESC |
- except: |
- why = sys.exc_info()[1] |
- log.err() |
- if why: |
- self._disconnectSelectable(selectable, why, method == "doRead") |
- |
- def addReader(self, reader): |
- """Add a FileDescriptor for notification of data available to read. |
- """ |
- self._sendToThread(self.reads.__setitem__, reader, 1) |
- self.wakeUp() |
- |
- def addWriter(self, writer): |
- """Add a FileDescriptor for notification of data available to write. |
- """ |
- self._sendToThread(self.writes.__setitem__, writer, 1) |
- self.wakeUp() |
- |
- def removeReader(self, reader): |
- """Remove a Selectable for notification of data available to read. |
- """ |
- self._sendToThread(dictRemove, self.reads, reader) |
- |
- def removeWriter(self, writer): |
- """Remove a Selectable for notification of data available to write. |
- """ |
- self._sendToThread(dictRemove, self.writes, writer) |
- |
- def removeAll(self): |
- return self._removeAll(self.reads, self.writes) |
- |
- |
- def getReaders(self): |
- return self.reads.keys() |
- |
- |
- def getWriters(self): |
- return self.writes.keys() |
- |
- |
- def run(self, installSignalHandlers=1): |
- self.startRunning(installSignalHandlers=installSignalHandlers) |
- self.mainLoop() |
- |
- def mainLoop(self): |
- q = Queue() |
- self.interleave(q.put) |
- while self.running: |
- try: |
- q.get()() |
- except StopIteration: |
- break |
- |
- |
- |
-def install(): |
- """Configure the twisted mainloop to be run using the select() reactor. |
- """ |
- reactor = ThreadedSelectReactor() |
- from twisted.internet.main import installReactor |
- installReactor(reactor) |
- return reactor |
- |
-__all__ = ['install'] |