Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(890)

Side by Side Diff: third_party/twisted_8_1/twisted/internet/_threadedselect.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # -*- test-case-name: twisted.test.test_internet -*-
2 # $Id: default.py,v 1.90 2004/01/06 22:35:22 warner Exp $
3 #
4 # Copyright (c) 2001-2004 Twisted Matrix Laboratories.
5 # See LICENSE for details.
6
7 from __future__ import generators
8
9 """
10 Threaded select reactor
11
12 Maintainer: U{Bob Ippolito<mailto:bob@redivi.com>}
13
14
15 The threadedselectreactor is a specialized reactor for integrating with
16 arbitrary foreign event loop, such as those you find in GUI toolkits.
17
18 There are three things you'll need to do to use this reactor.
19
20 Install the reactor at the beginning of your program, before importing
21 the rest of Twisted::
22
23 | from twisted.internet import _threadedselect
24 | _threadedselect.install()
25
26 Interleave this reactor with your foreign event loop, at some point after
27 your event loop is initialized::
28
29 | from twisted.internet import reactor
30 | reactor.interleave(foreignEventLoopWakerFunction)
31 | self.addSystemEventTrigger('after', 'shutdown', foreignEventLoopStop)
32
33 Instead of shutting down the foreign event loop directly, shut down the
34 reactor::
35
36 | from twisted.internet import reactor
37 | reactor.stop()
38
39 In order for Twisted to do its work in the main thread (the thread that
40 interleave is called from), a waker function is necessary. The waker function
41 will be called from a "background" thread with one argument: func.
42 The waker function's purpose is to call func() from the main thread.
43 Many GUI toolkits ship with appropriate waker functions.
44 Some examples of this are wxPython's wx.callAfter (may be wxCallAfter in
45 older versions of wxPython) or PyObjC's PyObjCTools.AppHelper.callAfter.
46 These would be used in place of "foreignEventLoopWakerFunction" in the above
47 example.
48
49 The other integration point at which the foreign event loop and this reactor
50 must integrate is shutdown. In order to ensure clean shutdown of Twisted,
51 you must allow for Twisted to come to a complete stop before quitting the
52 application. Typically, you will do this by setting up an after shutdown
53 trigger to stop your foreign event loop, and call reactor.stop() where you
54 would normally have initiated the shutdown procedure for the foreign event
55 loop. Shutdown functions that could be used in place of
56 "foreignEventloopStop" would be the ExitMainLoop method of the wxApp instance
57 with wxPython, or the PyObjCTools.AppHelper.stopEventLoop function.
58 """
59
60 from threading import Thread
61 from Queue import Queue, Empty
62 from time import sleep
63 import sys
64
65 from zope.interface import implements
66
67 from twisted.internet.interfaces import IReactorFDSet
68 from twisted.internet import error
69 from twisted.internet import posixbase
70 from twisted.python import log, failure, threadable
71 from twisted.persisted import styles
72 from twisted.python.runtime import platformType
73
74 import select
75 from errno import EINTR, EBADF
76
77 from twisted.internet.selectreactor import _select
78
79 # Exceptions that doSelect might return frequently
80 _NO_FILENO = error.ConnectionFdescWentAway('Handler has no fileno method')
81 _NO_FILEDESC = error.ConnectionFdescWentAway('Filedescriptor went away')
82
83 def dictRemove(dct, value):
84 try:
85 del dct[value]
86 except KeyError:
87 pass
88
89 def raiseException(e):
90 raise e
91
92 class ThreadedSelectReactor(posixbase.PosixReactorBase):
93 """A threaded select() based reactor - runs on all POSIX platforms and on
94 Win32.
95 """
96 implements(IReactorFDSet)
97
98 def __init__(self):
99 threadable.init(1)
100 self.reads = {}
101 self.writes = {}
102 self.toThreadQueue = Queue()
103 self.toMainThread = Queue()
104 self.workerThread = None
105 self.mainWaker = None
106 posixbase.PosixReactorBase.__init__(self)
107 self.addSystemEventTrigger('after', 'shutdown', self._mainLoopShutdown)
108
109 def wakeUp(self):
110 # we want to wake up from any thread
111 self.waker.wakeUp()
112
113 def callLater(self, *args, **kw):
114 tple = posixbase.PosixReactorBase.callLater(self, *args, **kw)
115 self.wakeUp()
116 return tple
117
118 def _sendToMain(self, msg, *args):
119 #print >>sys.stderr, 'sendToMain', msg, args
120 self.toMainThread.put((msg, args))
121 if self.mainWaker is not None:
122 self.mainWaker()
123
124 def _sendToThread(self, fn, *args):
125 #print >>sys.stderr, 'sendToThread', fn, args
126 self.toThreadQueue.put((fn, args))
127
128 def _preenDescriptorsInThread(self):
129 log.msg("Malformed file descriptor found. Preening lists.")
130 readers = self.reads.keys()
131 writers = self.writes.keys()
132 self.reads.clear()
133 self.writes.clear()
134 for selDict, selList in ((self.reads, readers), (self.writes, writers)):
135 for selectable in selList:
136 try:
137 select.select([selectable], [selectable], [selectable], 0)
138 except:
139 log.msg("bad descriptor %s" % selectable)
140 else:
141 selDict[selectable] = 1
142
143 def _workerInThread(self):
144 try:
145 while 1:
146 fn, args = self.toThreadQueue.get()
147 #print >>sys.stderr, "worker got", fn, args
148 fn(*args)
149 except SystemExit:
150 pass # exception indicates this thread should exit
151 except:
152 f = failure.Failure()
153 self._sendToMain('Failure', f)
154 #print >>sys.stderr, "worker finished"
155
156 def _doSelectInThread(self, timeout):
157 """Run one iteration of the I/O monitor loop.
158
159 This will run all selectables who had input or output readiness
160 waiting for them.
161 """
162 reads = self.reads
163 writes = self.writes
164 while 1:
165 try:
166 r, w, ignored = _select(reads.keys(),
167 writes.keys(),
168 [], timeout)
169 break
170 except ValueError, ve:
171 # Possibly a file descriptor has gone negative?
172 log.err()
173 self._preenDescriptorsInThread()
174 except TypeError, te:
175 # Something *totally* invalid (object w/o fileno, non-integral
176 # result) was passed
177 log.err()
178 self._preenDescriptorsInThread()
179 except (select.error, IOError), se:
180 # select(2) encountered an error
181 if se.args[0] in (0, 2):
182 # windows does this if it got an empty list
183 if (not reads) and (not writes):
184 return
185 else:
186 raise
187 elif se.args[0] == EINTR:
188 return
189 elif se.args[0] == EBADF:
190 self._preenDescriptorsInThread()
191 else:
192 # OK, I really don't know what's going on. Blow up.
193 raise
194 self._sendToMain('Notify', r, w)
195
196 def _process_Notify(self, r, w):
197 #print >>sys.stderr, "_process_Notify"
198 reads = self.reads
199 writes = self.writes
200
201 _drdw = self._doReadOrWrite
202 _logrun = log.callWithLogger
203 for selectables, method, dct in ((r, "doRead", reads), (w, "doWrite", wr ites)):
204 for selectable in selectables:
205 # if this was disconnected in another thread, kill it.
206 if selectable not in dct:
207 continue
208 # This for pausing input when we're not ready for more.
209 _logrun(selectable, _drdw, selectable, method, dct)
210 #print >>sys.stderr, "done _process_Notify"
211
212 def _process_Failure(self, f):
213 f.raiseException()
214
215 _doIterationInThread = _doSelectInThread
216
217 def ensureWorkerThread(self):
218 if self.workerThread is None or not self.workerThread.isAlive():
219 self.workerThread = Thread(target=self._workerInThread)
220 self.workerThread.start()
221
222 def doThreadIteration(self, timeout):
223 self._sendToThread(self._doIterationInThread, timeout)
224 self.ensureWorkerThread()
225 #print >>sys.stderr, 'getting...'
226 msg, args = self.toMainThread.get()
227 #print >>sys.stderr, 'got', msg, args
228 getattr(self, '_process_' + msg)(*args)
229
230 doIteration = doThreadIteration
231
232 def _interleave(self):
233 while self.running:
234 #print >>sys.stderr, "runUntilCurrent"
235 self.runUntilCurrent()
236 t2 = self.timeout()
237 t = self.running and t2
238 self._sendToThread(self._doIterationInThread, t)
239 #print >>sys.stderr, "yielding"
240 yield None
241 #print >>sys.stderr, "fetching"
242 msg, args = self.toMainThread.get_nowait()
243 getattr(self, '_process_' + msg)(*args)
244
245 def interleave(self, waker, *args, **kw):
246 """
247 interleave(waker) interleaves this reactor with the
248 current application by moving the blocking parts of
249 the reactor (select() in this case) to a separate
250 thread. This is typically useful for integration with
251 GUI applications which have their own event loop
252 already running.
253
254 See the module docstring for more information.
255 """
256 self.startRunning(*args, **kw)
257 loop = self._interleave()
258 def mainWaker(waker=waker, loop=loop):
259 #print >>sys.stderr, "mainWaker()"
260 waker(loop.next)
261 self.mainWaker = mainWaker
262 loop.next()
263 self.ensureWorkerThread()
264
265 def _mainLoopShutdown(self):
266 self.mainWaker = None
267 if self.workerThread is not None:
268 #print >>sys.stderr, 'getting...'
269 self._sendToThread(raiseException, SystemExit)
270 self.wakeUp()
271 try:
272 while 1:
273 msg, args = self.toMainThread.get_nowait()
274 #print >>sys.stderr, "ignored:", (msg, args)
275 except Empty:
276 pass
277 self.workerThread.join()
278 self.workerThread = None
279 try:
280 while 1:
281 fn, args = self.toThreadQueue.get_nowait()
282 if fn is self._doIterationInThread:
283 log.msg('Iteration is still in the thread queue!')
284 elif fn is raiseException and args[0] is SystemExit:
285 pass
286 else:
287 fn(*args)
288 except Empty:
289 pass
290
291 def _doReadOrWrite(self, selectable, method, dict):
292 try:
293 why = getattr(selectable, method)()
294 handfn = getattr(selectable, 'fileno', None)
295 if not handfn:
296 why = _NO_FILENO
297 elif handfn() == -1:
298 why = _NO_FILEDESC
299 except:
300 why = sys.exc_info()[1]
301 log.err()
302 if why:
303 self._disconnectSelectable(selectable, why, method == "doRead")
304
305 def addReader(self, reader):
306 """Add a FileDescriptor for notification of data available to read.
307 """
308 self._sendToThread(self.reads.__setitem__, reader, 1)
309 self.wakeUp()
310
311 def addWriter(self, writer):
312 """Add a FileDescriptor for notification of data available to write.
313 """
314 self._sendToThread(self.writes.__setitem__, writer, 1)
315 self.wakeUp()
316
317 def removeReader(self, reader):
318 """Remove a Selectable for notification of data available to read.
319 """
320 self._sendToThread(dictRemove, self.reads, reader)
321
322 def removeWriter(self, writer):
323 """Remove a Selectable for notification of data available to write.
324 """
325 self._sendToThread(dictRemove, self.writes, writer)
326
327 def removeAll(self):
328 return self._removeAll(self.reads, self.writes)
329
330
331 def getReaders(self):
332 return self.reads.keys()
333
334
335 def getWriters(self):
336 return self.writes.keys()
337
338
339 def run(self, installSignalHandlers=1):
340 self.startRunning(installSignalHandlers=installSignalHandlers)
341 self.mainLoop()
342
343 def mainLoop(self):
344 q = Queue()
345 self.interleave(q.put)
346 while self.running:
347 try:
348 q.get()()
349 except StopIteration:
350 break
351
352
353
354 def install():
355 """Configure the twisted mainloop to be run using the select() reactor.
356 """
357 reactor = ThreadedSelectReactor()
358 from twisted.internet.main import installReactor
359 installReactor(reactor)
360 return reactor
361
362 __all__ = ['install']
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/internet/_sslverify.py ('k') | third_party/twisted_8_1/twisted/internet/_win32serialport.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698