OLD | NEW |
| (Empty) |
1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 | |
5 """ | |
6 This module provides support for Twisted to interact with CoreFoundation | |
7 CFRunLoops. This includes Cocoa's NSRunLoop. | |
8 | |
9 In order to use this support, simply do the following:: | |
10 | |
11 | from twisted.internet import cfreactor | |
12 | cfreactor.install() | |
13 | |
14 Then use the twisted.internet APIs as usual. The other methods here are not | |
15 intended to be called directly under normal use. However, install can take | |
16 a runLoop kwarg, and run will take a withRunLoop arg if you need to explicitly | |
17 pass a CFRunLoop for some reason. Otherwise it will make a pretty good guess | |
18 as to which runLoop you want (the current NSRunLoop if PyObjC is imported, | |
19 otherwise the current CFRunLoop. Either way, if one doesn't exist, it will | |
20 be created). | |
21 | |
22 Maintainer: U{Bob Ippolito<mailto:bob@redivi.com>} | |
23 """ | |
24 | |
25 __all__ = ['install'] | |
26 | |
27 import sys | |
28 | |
29 # hints for py2app | |
30 import Carbon.CF | |
31 import traceback | |
32 | |
33 import cfsupport as cf | |
34 | |
35 from zope.interface import implements | |
36 | |
37 from twisted.python import log, threadable, failure | |
38 from twisted.internet.interfaces import IReactorFDSet | |
39 from twisted.internet import posixbase, error | |
40 from weakref import WeakKeyDictionary | |
41 from Foundation import NSRunLoop | |
42 from AppKit import NSApp | |
43 | |
44 # cache two extremely common "failures" without traceback info | |
45 _faildict = { | |
46 error.ConnectionDone: failure.Failure(error.ConnectionDone()), | |
47 error.ConnectionLost: failure.Failure(error.ConnectionLost()), | |
48 } | |
49 | |
50 class SelectableSocketWrapper(object): | |
51 _objCache = WeakKeyDictionary() | |
52 | |
53 cf = None | |
54 def socketWrapperForReactorAndObject(klass, reactor, obj): | |
55 _objCache = klass._objCache | |
56 if obj in _objCache: | |
57 return _objCache[obj] | |
58 v = _objCache[obj] = klass(reactor, obj) | |
59 return v | |
60 socketWrapperForReactorAndObject = classmethod(socketWrapperForReactorAndObj
ect) | |
61 | |
62 def __init__(self, reactor, obj): | |
63 if self.cf: | |
64 raise ValueError, "This socket wrapper is already initialized" | |
65 self.reactor = reactor | |
66 self.obj = obj | |
67 obj._orig_ssw_connectionLost = obj.connectionLost | |
68 obj.connectionLost = self.objConnectionLost | |
69 self.fd = obj.fileno() | |
70 self.writing = False | |
71 self.reading = False | |
72 self.wouldRead = False | |
73 self.wouldWrite = False | |
74 self.cf = cf.PyCFSocket(obj.fileno(), self.doRead, self.doWrite, self.do
Connect) | |
75 self.cf.stopWriting() | |
76 reactor.getRunLoop().addSocket(self.cf) | |
77 | |
78 def __repr__(self): | |
79 return 'SSW(fd=%r r=%r w=%r x=%08x o=%08x)' % (self.fd, int(self.reading
), int(self.writing), id(self), id(self.obj)) | |
80 | |
81 def objConnectionLost(self, *args, **kwargs): | |
82 obj = self.obj | |
83 self.reactor.removeReader(obj) | |
84 self.reactor.removeWriter(obj) | |
85 obj.connectionLost = obj._orig_ssw_connectionLost | |
86 obj.connectionLost(*args, **kwargs) | |
87 try: | |
88 del self._objCache[obj] | |
89 except: | |
90 pass | |
91 self.obj = None | |
92 self.cf = None | |
93 | |
94 def doConnect(self, why): | |
95 pass | |
96 | |
97 def startReading(self): | |
98 self.cf.startReading() | |
99 self.reading = True | |
100 if self.wouldRead: | |
101 if not self.reactor.running: | |
102 self.reactor.callLater(0, self.doRead) | |
103 else: | |
104 self.doRead() | |
105 self.wouldRead = False | |
106 return self | |
107 | |
108 def stopReading(self): | |
109 self.cf.stopReading() | |
110 self.reading = False | |
111 self.wouldRead = False | |
112 return self | |
113 | |
114 def startWriting(self): | |
115 self.cf.startWriting() | |
116 self.writing = True | |
117 if self.wouldWrite: | |
118 if not self.reactor.running: | |
119 self.reactor.callLater(0, self.doWrite) | |
120 else: | |
121 self.doWrite() | |
122 self.wouldWrite = False | |
123 return self | |
124 | |
125 def stopWriting(self): | |
126 self.cf.stopWriting() | |
127 self.writing = False | |
128 self.wouldWrite = False | |
129 | |
130 def _finishReadOrWrite(self, fn, faildict=_faildict): | |
131 try: | |
132 why = fn() | |
133 except: | |
134 why = sys.exc_info()[1] | |
135 log.err() | |
136 if why: | |
137 try: | |
138 f = faildict.get(why.__class__) or failure.Failure(why) | |
139 self.objConnectionLost(f) | |
140 except: | |
141 log.err() | |
142 if self.reactor.running: | |
143 self.reactor.simulate() | |
144 | |
145 def doRead(self): | |
146 obj = self.obj | |
147 if not obj: | |
148 return | |
149 if not self.reading: | |
150 self.wouldRead = True | |
151 if self.reactor.running: | |
152 self.reactor.simulate() | |
153 return | |
154 self._finishReadOrWrite(obj.doRead) | |
155 | |
156 def doWrite(self): | |
157 obj = self.obj | |
158 if not obj: | |
159 return | |
160 if not self.writing: | |
161 self.wouldWrite = True | |
162 if self.reactor.running: | |
163 self.reactor.simulate() | |
164 return | |
165 self._finishReadOrWrite(obj.doWrite) | |
166 | |
167 def __hash__(self): | |
168 return hash(self.fd) | |
169 | |
170 class CFReactor(posixbase.PosixReactorBase): | |
171 implements(IReactorFDSet) | |
172 # how long to poll if we're don't care about signals | |
173 longIntervalOfTime = 60.0 | |
174 | |
175 # how long we should poll if we do care about signals | |
176 shortIntervalOfTime = 1.0 | |
177 | |
178 # don't set this | |
179 pollInterval = longIntervalOfTime | |
180 | |
181 def __init__(self, runLoop=None): | |
182 self.readers = {} | |
183 self.writers = {} | |
184 self.running = 0 | |
185 self.crashing = False | |
186 self._doRunUntilCurrent = True | |
187 self.timer = None | |
188 self.runLoop = None | |
189 self.nsRunLoop = None | |
190 self.didStartRunLoop = False | |
191 if runLoop is not None: | |
192 self.getRunLoop(runLoop) | |
193 posixbase.PosixReactorBase.__init__(self) | |
194 | |
195 def getRunLoop(self, runLoop=None): | |
196 if self.runLoop is None: | |
197 self.nsRunLoop = runLoop or NSRunLoop.currentRunLoop() | |
198 self.runLoop = cf.PyCFRunLoop(self.nsRunLoop.getCFRunLoop()) | |
199 return self.runLoop | |
200 | |
201 def addReader(self, reader): | |
202 self.readers[reader] = SelectableSocketWrapper.socketWrapperForReactorAn
dObject(self, reader).startReading() | |
203 | |
204 def addWriter(self, writer): | |
205 self.writers[writer] = SelectableSocketWrapper.socketWrapperForReactorAn
dObject(self, writer).startWriting() | |
206 | |
207 def removeReader(self, reader): | |
208 wrapped = self.readers.get(reader, None) | |
209 if wrapped is not None: | |
210 del self.readers[reader] | |
211 wrapped.stopReading() | |
212 | |
213 def removeWriter(self, writer): | |
214 wrapped = self.writers.get(writer, None) | |
215 if wrapped is not None: | |
216 del self.writers[writer] | |
217 wrapped.stopWriting() | |
218 | |
219 | |
220 def getReaders(self): | |
221 return self.readers.keys() | |
222 | |
223 | |
224 def getWriters(self): | |
225 return self.writers.keys() | |
226 | |
227 | |
228 def removeAll(self): | |
229 r = self.readers.keys() | |
230 for s in self.readers.itervalues(): | |
231 s.stopReading() | |
232 for s in self.writers.itervalues(): | |
233 s.stopWriting() | |
234 self.readers.clear() | |
235 self.writers.clear() | |
236 return r | |
237 | |
238 def run(self, installSignalHandlers=1, withRunLoop=None): | |
239 if self.running: | |
240 raise ValueError, "Reactor already running" | |
241 if installSignalHandlers: | |
242 self.pollInterval = self.shortIntervalOfTime | |
243 runLoop = self.getRunLoop(withRunLoop) | |
244 self._startup() | |
245 | |
246 self.startRunning(installSignalHandlers=installSignalHandlers) | |
247 | |
248 self.running = True | |
249 if NSApp() is None and self.nsRunLoop.currentMode() is None: | |
250 # Most of the time the NSRunLoop will have already started, | |
251 # but in this case it wasn't. | |
252 runLoop.run() | |
253 self.crashing = False | |
254 self.didStartRunLoop = True | |
255 | |
256 def callLater(self, howlong, *args, **kwargs): | |
257 rval = posixbase.PosixReactorBase.callLater(self, howlong, *args, **kwar
gs) | |
258 if self.timer: | |
259 timeout = self.timeout() | |
260 if timeout is None: | |
261 timeout = howlong | |
262 sleepUntil = cf.now() + min(timeout, howlong) | |
263 if sleepUntil < self.timer.getNextFireDate(): | |
264 self.timer.setNextFireDate(sleepUntil) | |
265 else: | |
266 pass | |
267 return rval | |
268 | |
269 def iterate(self, howlong=0.0): | |
270 if self.running: | |
271 raise ValueError, "Can't iterate a running reactor" | |
272 self.runUntilCurrent() | |
273 self.doIteration(howlong) | |
274 | |
275 def doIteration(self, howlong): | |
276 if self.running: | |
277 raise ValueError, "Can't iterate a running reactor" | |
278 howlong = howlong or 0.01 | |
279 pi = self.pollInterval | |
280 self.pollInterval = howlong | |
281 self._doRunUntilCurrent = False | |
282 self.run() | |
283 self._doRunUntilCurrent = True | |
284 self.pollInterval = pi | |
285 | |
286 def simulate(self): | |
287 if self.crashing: | |
288 return | |
289 if not self.running: | |
290 raise ValueError, "You can't simulate a stopped reactor" | |
291 if self._doRunUntilCurrent: | |
292 self.runUntilCurrent() | |
293 if self.crashing: | |
294 return | |
295 if self.timer is None: | |
296 return | |
297 nap = self.timeout() | |
298 if nap is None: | |
299 nap = self.pollInterval | |
300 else: | |
301 nap = min(self.pollInterval, nap) | |
302 if self.running: | |
303 self.timer.setNextFireDate(cf.now() + nap) | |
304 if not self._doRunUntilCurrent: | |
305 self.crash() | |
306 | |
307 def _startup(self): | |
308 if self.running: | |
309 raise ValueError, "Can't bootstrap a running reactor" | |
310 self.timer = cf.PyCFRunLoopTimer(cf.now(), self.pollInterval, self.simul
ate) | |
311 self.runLoop.addTimer(self.timer) | |
312 | |
313 def cleanup(self): | |
314 pass | |
315 | |
316 def sigInt(self, *args): | |
317 self.callLater(0.0, self.stop) | |
318 | |
319 def crash(self): | |
320 if not self.running: | |
321 raise ValueError, "Can't crash a stopped reactor" | |
322 posixbase.PosixReactorBase.crash(self) | |
323 self.crashing = True | |
324 if self.timer is not None: | |
325 self.runLoop.removeTimer(self.timer) | |
326 self.timer = None | |
327 if self.didStartRunLoop: | |
328 self.runLoop.stop() | |
329 | |
330 def stop(self): | |
331 if not self.running: | |
332 raise ValueError, "Can't stop a stopped reactor" | |
333 posixbase.PosixReactorBase.stop(self) | |
334 | |
335 def install(runLoop=None): | |
336 """Configure the twisted mainloop to be run inside CFRunLoop. | |
337 """ | |
338 reactor = CFReactor(runLoop=runLoop) | |
339 reactor.addSystemEventTrigger('after', 'shutdown', reactor.cleanup) | |
340 from twisted.internet.main import installReactor | |
341 installReactor(reactor) | |
342 return reactor | |
OLD | NEW |