OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.test.test_internet -*- | |
2 # $Id: default.py,v 1.90 2004/01/06 22:35:22 warner Exp $ | |
3 # | |
4 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
5 # See LICENSE for details. | |
6 | |
7 from __future__ import generators | |
8 | |
9 """ | |
10 Threaded select reactor | |
11 | |
12 Maintainer: U{Bob Ippolito<mailto:bob@redivi.com>} | |
13 | |
14 | |
15 The threadedselectreactor is a specialized reactor for integrating with | |
16 arbitrary foreign event loop, such as those you find in GUI toolkits. | |
17 | |
18 There are three things you'll need to do to use this reactor. | |
19 | |
20 Install the reactor at the beginning of your program, before importing | |
21 the rest of Twisted:: | |
22 | |
23 | from twisted.internet import _threadedselect | |
24 | _threadedselect.install() | |
25 | |
26 Interleave this reactor with your foreign event loop, at some point after | |
27 your event loop is initialized:: | |
28 | |
29 | from twisted.internet import reactor | |
30 | reactor.interleave(foreignEventLoopWakerFunction) | |
31 | self.addSystemEventTrigger('after', 'shutdown', foreignEventLoopStop) | |
32 | |
33 Instead of shutting down the foreign event loop directly, shut down the | |
34 reactor:: | |
35 | |
36 | from twisted.internet import reactor | |
37 | reactor.stop() | |
38 | |
39 In order for Twisted to do its work in the main thread (the thread that | |
40 interleave is called from), a waker function is necessary. The waker function | |
41 will be called from a "background" thread with one argument: func. | |
42 The waker function's purpose is to call func() from the main thread. | |
43 Many GUI toolkits ship with appropriate waker functions. | |
44 Some examples of this are wxPython's wx.callAfter (may be wxCallAfter in | |
45 older versions of wxPython) or PyObjC's PyObjCTools.AppHelper.callAfter. | |
46 These would be used in place of "foreignEventLoopWakerFunction" in the above | |
47 example. | |
48 | |
49 The other integration point at which the foreign event loop and this reactor | |
50 must integrate is shutdown. In order to ensure clean shutdown of Twisted, | |
51 you must allow for Twisted to come to a complete stop before quitting the | |
52 application. Typically, you will do this by setting up an after shutdown | |
53 trigger to stop your foreign event loop, and call reactor.stop() where you | |
54 would normally have initiated the shutdown procedure for the foreign event | |
55 loop. Shutdown functions that could be used in place of | |
56 "foreignEventloopStop" would be the ExitMainLoop method of the wxApp instance | |
57 with wxPython, or the PyObjCTools.AppHelper.stopEventLoop function. | |
58 """ | |
59 | |
60 from threading import Thread | |
61 from Queue import Queue, Empty | |
62 from time import sleep | |
63 import sys | |
64 | |
65 from zope.interface import implements | |
66 | |
67 from twisted.internet.interfaces import IReactorFDSet | |
68 from twisted.internet import error | |
69 from twisted.internet import posixbase | |
70 from twisted.python import log, failure, threadable | |
71 from twisted.persisted import styles | |
72 from twisted.python.runtime import platformType | |
73 | |
74 import select | |
75 from errno import EINTR, EBADF | |
76 | |
77 from twisted.internet.selectreactor import _select | |
78 | |
79 # Exceptions that doSelect might return frequently | |
80 _NO_FILENO = error.ConnectionFdescWentAway('Handler has no fileno method') | |
81 _NO_FILEDESC = error.ConnectionFdescWentAway('Filedescriptor went away') | |
82 | |
83 def dictRemove(dct, value): | |
84 try: | |
85 del dct[value] | |
86 except KeyError: | |
87 pass | |
88 | |
89 def raiseException(e): | |
90 raise e | |
91 | |
92 class ThreadedSelectReactor(posixbase.PosixReactorBase): | |
93 """A threaded select() based reactor - runs on all POSIX platforms and on | |
94 Win32. | |
95 """ | |
96 implements(IReactorFDSet) | |
97 | |
98 def __init__(self): | |
99 threadable.init(1) | |
100 self.reads = {} | |
101 self.writes = {} | |
102 self.toThreadQueue = Queue() | |
103 self.toMainThread = Queue() | |
104 self.workerThread = None | |
105 self.mainWaker = None | |
106 posixbase.PosixReactorBase.__init__(self) | |
107 self.addSystemEventTrigger('after', 'shutdown', self._mainLoopShutdown) | |
108 | |
109 def wakeUp(self): | |
110 # we want to wake up from any thread | |
111 self.waker.wakeUp() | |
112 | |
113 def callLater(self, *args, **kw): | |
114 tple = posixbase.PosixReactorBase.callLater(self, *args, **kw) | |
115 self.wakeUp() | |
116 return tple | |
117 | |
118 def _sendToMain(self, msg, *args): | |
119 #print >>sys.stderr, 'sendToMain', msg, args | |
120 self.toMainThread.put((msg, args)) | |
121 if self.mainWaker is not None: | |
122 self.mainWaker() | |
123 | |
124 def _sendToThread(self, fn, *args): | |
125 #print >>sys.stderr, 'sendToThread', fn, args | |
126 self.toThreadQueue.put((fn, args)) | |
127 | |
128 def _preenDescriptorsInThread(self): | |
129 log.msg("Malformed file descriptor found. Preening lists.") | |
130 readers = self.reads.keys() | |
131 writers = self.writes.keys() | |
132 self.reads.clear() | |
133 self.writes.clear() | |
134 for selDict, selList in ((self.reads, readers), (self.writes, writers)): | |
135 for selectable in selList: | |
136 try: | |
137 select.select([selectable], [selectable], [selectable], 0) | |
138 except: | |
139 log.msg("bad descriptor %s" % selectable) | |
140 else: | |
141 selDict[selectable] = 1 | |
142 | |
143 def _workerInThread(self): | |
144 try: | |
145 while 1: | |
146 fn, args = self.toThreadQueue.get() | |
147 #print >>sys.stderr, "worker got", fn, args | |
148 fn(*args) | |
149 except SystemExit: | |
150 pass # exception indicates this thread should exit | |
151 except: | |
152 f = failure.Failure() | |
153 self._sendToMain('Failure', f) | |
154 #print >>sys.stderr, "worker finished" | |
155 | |
156 def _doSelectInThread(self, timeout): | |
157 """Run one iteration of the I/O monitor loop. | |
158 | |
159 This will run all selectables who had input or output readiness | |
160 waiting for them. | |
161 """ | |
162 reads = self.reads | |
163 writes = self.writes | |
164 while 1: | |
165 try: | |
166 r, w, ignored = _select(reads.keys(), | |
167 writes.keys(), | |
168 [], timeout) | |
169 break | |
170 except ValueError, ve: | |
171 # Possibly a file descriptor has gone negative? | |
172 log.err() | |
173 self._preenDescriptorsInThread() | |
174 except TypeError, te: | |
175 # Something *totally* invalid (object w/o fileno, non-integral | |
176 # result) was passed | |
177 log.err() | |
178 self._preenDescriptorsInThread() | |
179 except (select.error, IOError), se: | |
180 # select(2) encountered an error | |
181 if se.args[0] in (0, 2): | |
182 # windows does this if it got an empty list | |
183 if (not reads) and (not writes): | |
184 return | |
185 else: | |
186 raise | |
187 elif se.args[0] == EINTR: | |
188 return | |
189 elif se.args[0] == EBADF: | |
190 self._preenDescriptorsInThread() | |
191 else: | |
192 # OK, I really don't know what's going on. Blow up. | |
193 raise | |
194 self._sendToMain('Notify', r, w) | |
195 | |
196 def _process_Notify(self, r, w): | |
197 #print >>sys.stderr, "_process_Notify" | |
198 reads = self.reads | |
199 writes = self.writes | |
200 | |
201 _drdw = self._doReadOrWrite | |
202 _logrun = log.callWithLogger | |
203 for selectables, method, dct in ((r, "doRead", reads), (w, "doWrite", wr
ites)): | |
204 for selectable in selectables: | |
205 # if this was disconnected in another thread, kill it. | |
206 if selectable not in dct: | |
207 continue | |
208 # This for pausing input when we're not ready for more. | |
209 _logrun(selectable, _drdw, selectable, method, dct) | |
210 #print >>sys.stderr, "done _process_Notify" | |
211 | |
212 def _process_Failure(self, f): | |
213 f.raiseException() | |
214 | |
215 _doIterationInThread = _doSelectInThread | |
216 | |
217 def ensureWorkerThread(self): | |
218 if self.workerThread is None or not self.workerThread.isAlive(): | |
219 self.workerThread = Thread(target=self._workerInThread) | |
220 self.workerThread.start() | |
221 | |
222 def doThreadIteration(self, timeout): | |
223 self._sendToThread(self._doIterationInThread, timeout) | |
224 self.ensureWorkerThread() | |
225 #print >>sys.stderr, 'getting...' | |
226 msg, args = self.toMainThread.get() | |
227 #print >>sys.stderr, 'got', msg, args | |
228 getattr(self, '_process_' + msg)(*args) | |
229 | |
230 doIteration = doThreadIteration | |
231 | |
232 def _interleave(self): | |
233 while self.running: | |
234 #print >>sys.stderr, "runUntilCurrent" | |
235 self.runUntilCurrent() | |
236 t2 = self.timeout() | |
237 t = self.running and t2 | |
238 self._sendToThread(self._doIterationInThread, t) | |
239 #print >>sys.stderr, "yielding" | |
240 yield None | |
241 #print >>sys.stderr, "fetching" | |
242 msg, args = self.toMainThread.get_nowait() | |
243 getattr(self, '_process_' + msg)(*args) | |
244 | |
245 def interleave(self, waker, *args, **kw): | |
246 """ | |
247 interleave(waker) interleaves this reactor with the | |
248 current application by moving the blocking parts of | |
249 the reactor (select() in this case) to a separate | |
250 thread. This is typically useful for integration with | |
251 GUI applications which have their own event loop | |
252 already running. | |
253 | |
254 See the module docstring for more information. | |
255 """ | |
256 self.startRunning(*args, **kw) | |
257 loop = self._interleave() | |
258 def mainWaker(waker=waker, loop=loop): | |
259 #print >>sys.stderr, "mainWaker()" | |
260 waker(loop.next) | |
261 self.mainWaker = mainWaker | |
262 loop.next() | |
263 self.ensureWorkerThread() | |
264 | |
265 def _mainLoopShutdown(self): | |
266 self.mainWaker = None | |
267 if self.workerThread is not None: | |
268 #print >>sys.stderr, 'getting...' | |
269 self._sendToThread(raiseException, SystemExit) | |
270 self.wakeUp() | |
271 try: | |
272 while 1: | |
273 msg, args = self.toMainThread.get_nowait() | |
274 #print >>sys.stderr, "ignored:", (msg, args) | |
275 except Empty: | |
276 pass | |
277 self.workerThread.join() | |
278 self.workerThread = None | |
279 try: | |
280 while 1: | |
281 fn, args = self.toThreadQueue.get_nowait() | |
282 if fn is self._doIterationInThread: | |
283 log.msg('Iteration is still in the thread queue!') | |
284 elif fn is raiseException and args[0] is SystemExit: | |
285 pass | |
286 else: | |
287 fn(*args) | |
288 except Empty: | |
289 pass | |
290 | |
291 def _doReadOrWrite(self, selectable, method, dict): | |
292 try: | |
293 why = getattr(selectable, method)() | |
294 handfn = getattr(selectable, 'fileno', None) | |
295 if not handfn: | |
296 why = _NO_FILENO | |
297 elif handfn() == -1: | |
298 why = _NO_FILEDESC | |
299 except: | |
300 why = sys.exc_info()[1] | |
301 log.err() | |
302 if why: | |
303 self._disconnectSelectable(selectable, why, method == "doRead") | |
304 | |
305 def addReader(self, reader): | |
306 """Add a FileDescriptor for notification of data available to read. | |
307 """ | |
308 self._sendToThread(self.reads.__setitem__, reader, 1) | |
309 self.wakeUp() | |
310 | |
311 def addWriter(self, writer): | |
312 """Add a FileDescriptor for notification of data available to write. | |
313 """ | |
314 self._sendToThread(self.writes.__setitem__, writer, 1) | |
315 self.wakeUp() | |
316 | |
317 def removeReader(self, reader): | |
318 """Remove a Selectable for notification of data available to read. | |
319 """ | |
320 self._sendToThread(dictRemove, self.reads, reader) | |
321 | |
322 def removeWriter(self, writer): | |
323 """Remove a Selectable for notification of data available to write. | |
324 """ | |
325 self._sendToThread(dictRemove, self.writes, writer) | |
326 | |
327 def removeAll(self): | |
328 return self._removeAll(self.reads, self.writes) | |
329 | |
330 | |
331 def getReaders(self): | |
332 return self.reads.keys() | |
333 | |
334 | |
335 def getWriters(self): | |
336 return self.writes.keys() | |
337 | |
338 | |
339 def run(self, installSignalHandlers=1): | |
340 self.startRunning(installSignalHandlers=installSignalHandlers) | |
341 self.mainLoop() | |
342 | |
343 def mainLoop(self): | |
344 q = Queue() | |
345 self.interleave(q.put) | |
346 while self.running: | |
347 try: | |
348 q.get()() | |
349 except StopIteration: | |
350 break | |
351 | |
352 | |
353 | |
354 def install(): | |
355 """Configure the twisted mainloop to be run using the select() reactor. | |
356 """ | |
357 reactor = ThreadedSelectReactor() | |
358 from twisted.internet.main import installReactor | |
359 installReactor(reactor) | |
360 return reactor | |
361 | |
362 __all__ = ['install'] | |
OLD | NEW |