| OLD | NEW |
| (Empty) |
| 1 # -*- test-case-name: twisted.test.test_internet -*- | |
| 2 # $Id: default.py,v 1.90 2004/01/06 22:35:22 warner Exp $ | |
| 3 # | |
| 4 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
| 5 # See LICENSE for details. | |
| 6 | |
| 7 from __future__ import generators | |
| 8 | |
| 9 """ | |
| 10 Threaded select reactor | |
| 11 | |
| 12 Maintainer: U{Bob Ippolito<mailto:bob@redivi.com>} | |
| 13 | |
| 14 | |
| 15 The threadedselectreactor is a specialized reactor for integrating with | |
| 16 arbitrary foreign event loop, such as those you find in GUI toolkits. | |
| 17 | |
| 18 There are three things you'll need to do to use this reactor. | |
| 19 | |
| 20 Install the reactor at the beginning of your program, before importing | |
| 21 the rest of Twisted:: | |
| 22 | |
| 23 | from twisted.internet import _threadedselect | |
| 24 | _threadedselect.install() | |
| 25 | |
| 26 Interleave this reactor with your foreign event loop, at some point after | |
| 27 your event loop is initialized:: | |
| 28 | |
| 29 | from twisted.internet import reactor | |
| 30 | reactor.interleave(foreignEventLoopWakerFunction) | |
| 31 | self.addSystemEventTrigger('after', 'shutdown', foreignEventLoopStop) | |
| 32 | |
| 33 Instead of shutting down the foreign event loop directly, shut down the | |
| 34 reactor:: | |
| 35 | |
| 36 | from twisted.internet import reactor | |
| 37 | reactor.stop() | |
| 38 | |
| 39 In order for Twisted to do its work in the main thread (the thread that | |
| 40 interleave is called from), a waker function is necessary. The waker function | |
| 41 will be called from a "background" thread with one argument: func. | |
| 42 The waker function's purpose is to call func() from the main thread. | |
| 43 Many GUI toolkits ship with appropriate waker functions. | |
| 44 Some examples of this are wxPython's wx.callAfter (may be wxCallAfter in | |
| 45 older versions of wxPython) or PyObjC's PyObjCTools.AppHelper.callAfter. | |
| 46 These would be used in place of "foreignEventLoopWakerFunction" in the above | |
| 47 example. | |
| 48 | |
| 49 The other integration point at which the foreign event loop and this reactor | |
| 50 must integrate is shutdown. In order to ensure clean shutdown of Twisted, | |
| 51 you must allow for Twisted to come to a complete stop before quitting the | |
| 52 application. Typically, you will do this by setting up an after shutdown | |
| 53 trigger to stop your foreign event loop, and call reactor.stop() where you | |
| 54 would normally have initiated the shutdown procedure for the foreign event | |
| 55 loop. Shutdown functions that could be used in place of | |
| 56 "foreignEventloopStop" would be the ExitMainLoop method of the wxApp instance | |
| 57 with wxPython, or the PyObjCTools.AppHelper.stopEventLoop function. | |
| 58 """ | |
| 59 | |
| 60 from threading import Thread | |
| 61 from Queue import Queue, Empty | |
| 62 from time import sleep | |
| 63 import sys | |
| 64 | |
| 65 from zope.interface import implements | |
| 66 | |
| 67 from twisted.internet.interfaces import IReactorFDSet | |
| 68 from twisted.internet import error | |
| 69 from twisted.internet import posixbase | |
| 70 from twisted.python import log, failure, threadable | |
| 71 from twisted.persisted import styles | |
| 72 from twisted.python.runtime import platformType | |
| 73 | |
| 74 import select | |
| 75 from errno import EINTR, EBADF | |
| 76 | |
| 77 from twisted.internet.selectreactor import _select | |
| 78 | |
| 79 # Exceptions that doSelect might return frequently | |
| 80 _NO_FILENO = error.ConnectionFdescWentAway('Handler has no fileno method') | |
| 81 _NO_FILEDESC = error.ConnectionFdescWentAway('Filedescriptor went away') | |
| 82 | |
| 83 def dictRemove(dct, value): | |
| 84 try: | |
| 85 del dct[value] | |
| 86 except KeyError: | |
| 87 pass | |
| 88 | |
| 89 def raiseException(e): | |
| 90 raise e | |
| 91 | |
| 92 class ThreadedSelectReactor(posixbase.PosixReactorBase): | |
| 93 """A threaded select() based reactor - runs on all POSIX platforms and on | |
| 94 Win32. | |
| 95 """ | |
| 96 implements(IReactorFDSet) | |
| 97 | |
| 98 def __init__(self): | |
| 99 threadable.init(1) | |
| 100 self.reads = {} | |
| 101 self.writes = {} | |
| 102 self.toThreadQueue = Queue() | |
| 103 self.toMainThread = Queue() | |
| 104 self.workerThread = None | |
| 105 self.mainWaker = None | |
| 106 posixbase.PosixReactorBase.__init__(self) | |
| 107 self.addSystemEventTrigger('after', 'shutdown', self._mainLoopShutdown) | |
| 108 | |
| 109 def wakeUp(self): | |
| 110 # we want to wake up from any thread | |
| 111 self.waker.wakeUp() | |
| 112 | |
| 113 def callLater(self, *args, **kw): | |
| 114 tple = posixbase.PosixReactorBase.callLater(self, *args, **kw) | |
| 115 self.wakeUp() | |
| 116 return tple | |
| 117 | |
| 118 def _sendToMain(self, msg, *args): | |
| 119 #print >>sys.stderr, 'sendToMain', msg, args | |
| 120 self.toMainThread.put((msg, args)) | |
| 121 if self.mainWaker is not None: | |
| 122 self.mainWaker() | |
| 123 | |
| 124 def _sendToThread(self, fn, *args): | |
| 125 #print >>sys.stderr, 'sendToThread', fn, args | |
| 126 self.toThreadQueue.put((fn, args)) | |
| 127 | |
| 128 def _preenDescriptorsInThread(self): | |
| 129 log.msg("Malformed file descriptor found. Preening lists.") | |
| 130 readers = self.reads.keys() | |
| 131 writers = self.writes.keys() | |
| 132 self.reads.clear() | |
| 133 self.writes.clear() | |
| 134 for selDict, selList in ((self.reads, readers), (self.writes, writers)): | |
| 135 for selectable in selList: | |
| 136 try: | |
| 137 select.select([selectable], [selectable], [selectable], 0) | |
| 138 except: | |
| 139 log.msg("bad descriptor %s" % selectable) | |
| 140 else: | |
| 141 selDict[selectable] = 1 | |
| 142 | |
| 143 def _workerInThread(self): | |
| 144 try: | |
| 145 while 1: | |
| 146 fn, args = self.toThreadQueue.get() | |
| 147 #print >>sys.stderr, "worker got", fn, args | |
| 148 fn(*args) | |
| 149 except SystemExit: | |
| 150 pass # exception indicates this thread should exit | |
| 151 except: | |
| 152 f = failure.Failure() | |
| 153 self._sendToMain('Failure', f) | |
| 154 #print >>sys.stderr, "worker finished" | |
| 155 | |
| 156 def _doSelectInThread(self, timeout): | |
| 157 """Run one iteration of the I/O monitor loop. | |
| 158 | |
| 159 This will run all selectables who had input or output readiness | |
| 160 waiting for them. | |
| 161 """ | |
| 162 reads = self.reads | |
| 163 writes = self.writes | |
| 164 while 1: | |
| 165 try: | |
| 166 r, w, ignored = _select(reads.keys(), | |
| 167 writes.keys(), | |
| 168 [], timeout) | |
| 169 break | |
| 170 except ValueError, ve: | |
| 171 # Possibly a file descriptor has gone negative? | |
| 172 log.err() | |
| 173 self._preenDescriptorsInThread() | |
| 174 except TypeError, te: | |
| 175 # Something *totally* invalid (object w/o fileno, non-integral | |
| 176 # result) was passed | |
| 177 log.err() | |
| 178 self._preenDescriptorsInThread() | |
| 179 except (select.error, IOError), se: | |
| 180 # select(2) encountered an error | |
| 181 if se.args[0] in (0, 2): | |
| 182 # windows does this if it got an empty list | |
| 183 if (not reads) and (not writes): | |
| 184 return | |
| 185 else: | |
| 186 raise | |
| 187 elif se.args[0] == EINTR: | |
| 188 return | |
| 189 elif se.args[0] == EBADF: | |
| 190 self._preenDescriptorsInThread() | |
| 191 else: | |
| 192 # OK, I really don't know what's going on. Blow up. | |
| 193 raise | |
| 194 self._sendToMain('Notify', r, w) | |
| 195 | |
| 196 def _process_Notify(self, r, w): | |
| 197 #print >>sys.stderr, "_process_Notify" | |
| 198 reads = self.reads | |
| 199 writes = self.writes | |
| 200 | |
| 201 _drdw = self._doReadOrWrite | |
| 202 _logrun = log.callWithLogger | |
| 203 for selectables, method, dct in ((r, "doRead", reads), (w, "doWrite", wr
ites)): | |
| 204 for selectable in selectables: | |
| 205 # if this was disconnected in another thread, kill it. | |
| 206 if selectable not in dct: | |
| 207 continue | |
| 208 # This for pausing input when we're not ready for more. | |
| 209 _logrun(selectable, _drdw, selectable, method, dct) | |
| 210 #print >>sys.stderr, "done _process_Notify" | |
| 211 | |
| 212 def _process_Failure(self, f): | |
| 213 f.raiseException() | |
| 214 | |
| 215 _doIterationInThread = _doSelectInThread | |
| 216 | |
| 217 def ensureWorkerThread(self): | |
| 218 if self.workerThread is None or not self.workerThread.isAlive(): | |
| 219 self.workerThread = Thread(target=self._workerInThread) | |
| 220 self.workerThread.start() | |
| 221 | |
| 222 def doThreadIteration(self, timeout): | |
| 223 self._sendToThread(self._doIterationInThread, timeout) | |
| 224 self.ensureWorkerThread() | |
| 225 #print >>sys.stderr, 'getting...' | |
| 226 msg, args = self.toMainThread.get() | |
| 227 #print >>sys.stderr, 'got', msg, args | |
| 228 getattr(self, '_process_' + msg)(*args) | |
| 229 | |
| 230 doIteration = doThreadIteration | |
| 231 | |
| 232 def _interleave(self): | |
| 233 while self.running: | |
| 234 #print >>sys.stderr, "runUntilCurrent" | |
| 235 self.runUntilCurrent() | |
| 236 t2 = self.timeout() | |
| 237 t = self.running and t2 | |
| 238 self._sendToThread(self._doIterationInThread, t) | |
| 239 #print >>sys.stderr, "yielding" | |
| 240 yield None | |
| 241 #print >>sys.stderr, "fetching" | |
| 242 msg, args = self.toMainThread.get_nowait() | |
| 243 getattr(self, '_process_' + msg)(*args) | |
| 244 | |
| 245 def interleave(self, waker, *args, **kw): | |
| 246 """ | |
| 247 interleave(waker) interleaves this reactor with the | |
| 248 current application by moving the blocking parts of | |
| 249 the reactor (select() in this case) to a separate | |
| 250 thread. This is typically useful for integration with | |
| 251 GUI applications which have their own event loop | |
| 252 already running. | |
| 253 | |
| 254 See the module docstring for more information. | |
| 255 """ | |
| 256 self.startRunning(*args, **kw) | |
| 257 loop = self._interleave() | |
| 258 def mainWaker(waker=waker, loop=loop): | |
| 259 #print >>sys.stderr, "mainWaker()" | |
| 260 waker(loop.next) | |
| 261 self.mainWaker = mainWaker | |
| 262 loop.next() | |
| 263 self.ensureWorkerThread() | |
| 264 | |
| 265 def _mainLoopShutdown(self): | |
| 266 self.mainWaker = None | |
| 267 if self.workerThread is not None: | |
| 268 #print >>sys.stderr, 'getting...' | |
| 269 self._sendToThread(raiseException, SystemExit) | |
| 270 self.wakeUp() | |
| 271 try: | |
| 272 while 1: | |
| 273 msg, args = self.toMainThread.get_nowait() | |
| 274 #print >>sys.stderr, "ignored:", (msg, args) | |
| 275 except Empty: | |
| 276 pass | |
| 277 self.workerThread.join() | |
| 278 self.workerThread = None | |
| 279 try: | |
| 280 while 1: | |
| 281 fn, args = self.toThreadQueue.get_nowait() | |
| 282 if fn is self._doIterationInThread: | |
| 283 log.msg('Iteration is still in the thread queue!') | |
| 284 elif fn is raiseException and args[0] is SystemExit: | |
| 285 pass | |
| 286 else: | |
| 287 fn(*args) | |
| 288 except Empty: | |
| 289 pass | |
| 290 | |
| 291 def _doReadOrWrite(self, selectable, method, dict): | |
| 292 try: | |
| 293 why = getattr(selectable, method)() | |
| 294 handfn = getattr(selectable, 'fileno', None) | |
| 295 if not handfn: | |
| 296 why = _NO_FILENO | |
| 297 elif handfn() == -1: | |
| 298 why = _NO_FILEDESC | |
| 299 except: | |
| 300 why = sys.exc_info()[1] | |
| 301 log.err() | |
| 302 if why: | |
| 303 self._disconnectSelectable(selectable, why, method == "doRead") | |
| 304 | |
| 305 def addReader(self, reader): | |
| 306 """Add a FileDescriptor for notification of data available to read. | |
| 307 """ | |
| 308 self._sendToThread(self.reads.__setitem__, reader, 1) | |
| 309 self.wakeUp() | |
| 310 | |
| 311 def addWriter(self, writer): | |
| 312 """Add a FileDescriptor for notification of data available to write. | |
| 313 """ | |
| 314 self._sendToThread(self.writes.__setitem__, writer, 1) | |
| 315 self.wakeUp() | |
| 316 | |
| 317 def removeReader(self, reader): | |
| 318 """Remove a Selectable for notification of data available to read. | |
| 319 """ | |
| 320 self._sendToThread(dictRemove, self.reads, reader) | |
| 321 | |
| 322 def removeWriter(self, writer): | |
| 323 """Remove a Selectable for notification of data available to write. | |
| 324 """ | |
| 325 self._sendToThread(dictRemove, self.writes, writer) | |
| 326 | |
| 327 def removeAll(self): | |
| 328 return self._removeAll(self.reads, self.writes) | |
| 329 | |
| 330 | |
| 331 def getReaders(self): | |
| 332 return self.reads.keys() | |
| 333 | |
| 334 | |
| 335 def getWriters(self): | |
| 336 return self.writes.keys() | |
| 337 | |
| 338 | |
| 339 def run(self, installSignalHandlers=1): | |
| 340 self.startRunning(installSignalHandlers=installSignalHandlers) | |
| 341 self.mainLoop() | |
| 342 | |
| 343 def mainLoop(self): | |
| 344 q = Queue() | |
| 345 self.interleave(q.put) | |
| 346 while self.running: | |
| 347 try: | |
| 348 q.get()() | |
| 349 except StopIteration: | |
| 350 break | |
| 351 | |
| 352 | |
| 353 | |
| 354 def install(): | |
| 355 """Configure the twisted mainloop to be run using the select() reactor. | |
| 356 """ | |
| 357 reactor = ThreadedSelectReactor() | |
| 358 from twisted.internet.main import installReactor | |
| 359 installReactor(reactor) | |
| 360 return reactor | |
| 361 | |
| 362 __all__ = ['install'] | |
| OLD | NEW |