OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.test.test_internet -*- | |
2 # Copyright (c) 2001-2008 Twisted Matrix Laboratories. | |
3 # See LICENSE for details. | |
4 | |
5 | |
6 """ | |
7 Very basic functionality for a Reactor implementation. | |
8 | |
9 Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>} | |
10 """ | |
11 | |
12 import socket # needed only for sync-dns | |
13 from zope.interface import implements, classImplements | |
14 | |
15 import sys | |
16 import warnings | |
17 import operator | |
18 from heapq import heappush, heappop, heapify | |
19 | |
20 try: | |
21 import fcntl | |
22 except ImportError: | |
23 fcntl = None | |
24 import traceback | |
25 | |
26 from twisted.internet.interfaces import IReactorCore, IReactorTime, IReactorThre
ads | |
27 from twisted.internet.interfaces import IResolverSimple, IReactorPluggableResolv
er | |
28 from twisted.internet.interfaces import IConnector, IDelayedCall | |
29 from twisted.internet import main, error, abstract, defer, threads | |
30 from twisted.python import log, failure, reflect | |
31 from twisted.python.runtime import seconds as runtimeSeconds, platform, platform
Type | |
32 from twisted.internet.defer import Deferred, DeferredList | |
33 from twisted.persisted import styles | |
34 | |
35 # This import is for side-effects! Even if you don't see any code using it | |
36 # in this module, don't delete it. | |
37 from twisted.python import threadable | |
38 | |
39 | |
40 class DelayedCall(styles.Ephemeral): | |
41 | |
42 implements(IDelayedCall) | |
43 # enable .debug to record creator call stack, and it will be logged if | |
44 # an exception occurs while the function is being run | |
45 debug = False | |
46 _str = None | |
47 | |
48 def __init__(self, time, func, args, kw, cancel, reset, | |
49 seconds=runtimeSeconds): | |
50 """ | |
51 @param time: Seconds from the epoch at which to call C{func}. | |
52 @param func: The callable to call. | |
53 @param args: The positional arguments to pass to the callable. | |
54 @param kw: The keyword arguments to pass to the callable. | |
55 @param cancel: A callable which will be called with this | |
56 DelayedCall before cancellation. | |
57 @param reset: A callable which will be called with this | |
58 DelayedCall after changing this DelayedCall's scheduled | |
59 execution time. The callable should adjust any necessary | |
60 scheduling details to ensure this DelayedCall is invoked | |
61 at the new appropriate time. | |
62 @param seconds: If provided, a no-argument callable which will be | |
63 used to determine the current time any time that information is | |
64 needed. | |
65 """ | |
66 self.time, self.func, self.args, self.kw = time, func, args, kw | |
67 self.resetter = reset | |
68 self.canceller = cancel | |
69 self.seconds = seconds | |
70 self.cancelled = self.called = 0 | |
71 self.delayed_time = 0 | |
72 if self.debug: | |
73 self.creator = traceback.format_stack()[:-2] | |
74 | |
75 def getTime(self): | |
76 """Return the time at which this call will fire | |
77 | |
78 @rtype: C{float} | |
79 @return: The number of seconds after the epoch at which this call is | |
80 scheduled to be made. | |
81 """ | |
82 return self.time + self.delayed_time | |
83 | |
84 def cancel(self): | |
85 """Unschedule this call | |
86 | |
87 @raise AlreadyCancelled: Raised if this call has already been | |
88 unscheduled. | |
89 | |
90 @raise AlreadyCalled: Raised if this call has already been made. | |
91 """ | |
92 if self.cancelled: | |
93 raise error.AlreadyCancelled | |
94 elif self.called: | |
95 raise error.AlreadyCalled | |
96 else: | |
97 self.canceller(self) | |
98 self.cancelled = 1 | |
99 if self.debug: | |
100 self._str = str(self) | |
101 del self.func, self.args, self.kw | |
102 | |
103 def reset(self, secondsFromNow): | |
104 """Reschedule this call for a different time | |
105 | |
106 @type secondsFromNow: C{float} | |
107 @param secondsFromNow: The number of seconds from the time of the | |
108 C{reset} call at which this call will be scheduled. | |
109 | |
110 @raise AlreadyCancelled: Raised if this call has been cancelled. | |
111 @raise AlreadyCalled: Raised if this call has already been made. | |
112 """ | |
113 if self.cancelled: | |
114 raise error.AlreadyCancelled | |
115 elif self.called: | |
116 raise error.AlreadyCalled | |
117 else: | |
118 newTime = self.seconds() + secondsFromNow | |
119 if newTime < self.time: | |
120 self.delayed_time = 0 | |
121 self.time = newTime | |
122 self.resetter(self) | |
123 else: | |
124 self.delayed_time = newTime - self.time | |
125 | |
126 def delay(self, secondsLater): | |
127 """Reschedule this call for a later time | |
128 | |
129 @type secondsLater: C{float} | |
130 @param secondsLater: The number of seconds after the originally | |
131 scheduled time for which to reschedule this call. | |
132 | |
133 @raise AlreadyCancelled: Raised if this call has been cancelled. | |
134 @raise AlreadyCalled: Raised if this call has already been made. | |
135 """ | |
136 if self.cancelled: | |
137 raise error.AlreadyCancelled | |
138 elif self.called: | |
139 raise error.AlreadyCalled | |
140 else: | |
141 self.delayed_time += secondsLater | |
142 if self.delayed_time < 0: | |
143 self.activate_delay() | |
144 self.resetter(self) | |
145 | |
146 def activate_delay(self): | |
147 self.time += self.delayed_time | |
148 self.delayed_time = 0 | |
149 | |
150 def active(self): | |
151 """Determine whether this call is still pending | |
152 | |
153 @rtype: C{bool} | |
154 @return: True if this call has not yet been made or cancelled, | |
155 False otherwise. | |
156 """ | |
157 return not (self.cancelled or self.called) | |
158 | |
159 def __le__(self, other): | |
160 return self.time <= other.time | |
161 | |
162 def __str__(self): | |
163 if self._str is not None: | |
164 return self._str | |
165 if hasattr(self, 'func'): | |
166 if hasattr(self.func, 'func_name'): | |
167 func = self.func.func_name | |
168 if hasattr(self.func, 'im_class'): | |
169 func = self.func.im_class.__name__ + '.' + func | |
170 else: | |
171 func = reflect.safe_repr(self.func) | |
172 else: | |
173 func = None | |
174 | |
175 now = self.seconds() | |
176 L = ["<DelayedCall %s [%ss] called=%s cancelled=%s" % ( | |
177 id(self), self.time - now, self.called, self.cancelled)] | |
178 if func is not None: | |
179 L.extend((" ", func, "(")) | |
180 if self.args: | |
181 L.append(", ".join([reflect.safe_repr(e) for e in self.args])) | |
182 if self.kw: | |
183 L.append(", ") | |
184 if self.kw: | |
185 L.append(", ".join(['%s=%s' % (k, reflect.safe_repr(v)) for (k,
v) in self.kw.iteritems()])) | |
186 L.append(")") | |
187 | |
188 if self.debug: | |
189 L.append("\n\ntraceback at creation: \n\n%s" % (' '.join(self.cre
ator))) | |
190 L.append('>') | |
191 | |
192 return "".join(L) | |
193 | |
194 | |
195 class ThreadedResolver: | |
196 implements(IResolverSimple) | |
197 | |
198 def __init__(self, reactor): | |
199 self.reactor = reactor | |
200 self._runningQueries = {} | |
201 | |
202 def _fail(self, name, err): | |
203 err = error.DNSLookupError("address %r not found: %s" % (name, err)) | |
204 return failure.Failure(err) | |
205 | |
206 def _cleanup(self, name, lookupDeferred): | |
207 userDeferred, cancelCall = self._runningQueries[lookupDeferred] | |
208 del self._runningQueries[lookupDeferred] | |
209 userDeferred.errback(self._fail(name, "timeout error")) | |
210 | |
211 def _checkTimeout(self, result, name, lookupDeferred): | |
212 try: | |
213 userDeferred, cancelCall = self._runningQueries[lookupDeferred] | |
214 except KeyError: | |
215 pass | |
216 else: | |
217 del self._runningQueries[lookupDeferred] | |
218 cancelCall.cancel() | |
219 | |
220 if isinstance(result, failure.Failure): | |
221 userDeferred.errback(self._fail(name, result.getErrorMessage())) | |
222 else: | |
223 userDeferred.callback(result) | |
224 | |
225 def getHostByName(self, name, timeout = (1, 3, 11, 45)): | |
226 if timeout: | |
227 timeoutDelay = reduce(operator.add, timeout) | |
228 else: | |
229 timeoutDelay = 60 | |
230 userDeferred = defer.Deferred() | |
231 lookupDeferred = threads.deferToThread(socket.gethostbyname, name) | |
232 cancelCall = self.reactor.callLater( | |
233 timeoutDelay, self._cleanup, name, lookupDeferred) | |
234 self._runningQueries[lookupDeferred] = (userDeferred, cancelCall) | |
235 lookupDeferred.addBoth(self._checkTimeout, name, lookupDeferred) | |
236 return userDeferred | |
237 | |
238 class BlockingResolver: | |
239 implements(IResolverSimple) | |
240 | |
241 def getHostByName(self, name, timeout = (1, 3, 11, 45)): | |
242 try: | |
243 address = socket.gethostbyname(name) | |
244 except socket.error: | |
245 msg = "address %r not found" % (name,) | |
246 err = error.DNSLookupError(msg) | |
247 return defer.fail(err) | |
248 else: | |
249 return defer.succeed(address) | |
250 | |
251 | |
252 class _ThreePhaseEvent(object): | |
253 """ | |
254 Collection of callables (with arguments) which can be invoked as a group in | |
255 a particular order. | |
256 | |
257 This provides the underlying implementation for the reactor's system event | |
258 triggers. An instance of this class tracks triggers for all phases of a | |
259 single type of event. | |
260 | |
261 @ivar before: A list of the before-phase triggers containing three-tuples | |
262 of a callable, a tuple of positional arguments, and a dict of keyword | |
263 arguments | |
264 | |
265 @ivar finishedBefore: A list of the before-phase triggers which have | |
266 already been executed. This is only populated in the C{'BEFORE'} state. | |
267 | |
268 @ivar during: A list of the during-phase triggers containing three-tuples | |
269 of a callable, a tuple of positional arguments, and a dict of keyword | |
270 arguments | |
271 | |
272 @ivar after: A list of the after-phase triggers containing three-tuples | |
273 of a callable, a tuple of positional arguments, and a dict of keyword | |
274 arguments | |
275 | |
276 @ivar state: A string indicating what is currently going on with this | |
277 object. One of C{'BASE'} (for when nothing in particular is happening; | |
278 this is the initial value), C{'BEFORE'} (when the before-phase triggers | |
279 are in the process of being executed). | |
280 """ | |
281 def __init__(self): | |
282 self.before = [] | |
283 self.during = [] | |
284 self.after = [] | |
285 self.state = 'BASE' | |
286 | |
287 | |
288 def addTrigger(self, phase, callable, *args, **kwargs): | |
289 """ | |
290 Add a trigger to the indicate phase. | |
291 | |
292 @param phase: One of C{'before'}, C{'during'}, or C{'after'}. | |
293 | |
294 @param callable: An object to be called when this event is triggered. | |
295 @param *args: Positional arguments to pass to C{callable}. | |
296 @param **kwargs: Keyword arguments to pass to C{callable}. | |
297 | |
298 @return: An opaque handle which may be passed to L{removeTrigger} to | |
299 reverse the effects of calling this method. | |
300 """ | |
301 if phase not in ('before', 'during', 'after'): | |
302 raise KeyError("invalid phase") | |
303 getattr(self, phase).append((callable, args, kwargs)) | |
304 return phase, callable, args, kwargs | |
305 | |
306 | |
307 def removeTrigger(self, handle): | |
308 """ | |
309 Remove a previously added trigger callable. | |
310 | |
311 @param handle: An object previously returned by L{addTrigger}. The | |
312 trigger added by that call will be removed. | |
313 | |
314 @raise ValueError: If the trigger associated with C{handle} has already | |
315 been removed or if C{handle} is not a valid handle. | |
316 """ | |
317 return getattr(self, 'removeTrigger_' + self.state)(handle) | |
318 | |
319 | |
320 def removeTrigger_BASE(self, handle): | |
321 """ | |
322 Just try to remove the trigger. | |
323 | |
324 @see removeTrigger | |
325 """ | |
326 try: | |
327 phase, callable, args, kwargs = handle | |
328 except (TypeError, ValueError), e: | |
329 raise ValueError("invalid trigger handle") | |
330 else: | |
331 if phase not in ('before', 'during', 'after'): | |
332 raise KeyError("invalid phase") | |
333 getattr(self, phase).remove((callable, args, kwargs)) | |
334 | |
335 | |
336 def removeTrigger_BEFORE(self, handle): | |
337 """ | |
338 Remove the trigger if it has yet to be executed, otherwise emit a | |
339 warning that in the future an exception will be raised when removing an | |
340 already-executed trigger. | |
341 | |
342 @see removeTrigger | |
343 """ | |
344 phase, callable, args, kwargs = handle | |
345 if phase != 'before': | |
346 return self.removeTrigger_BASE(handle) | |
347 if (callable, args, kwargs) in self.finishedBefore: | |
348 warnings.warn( | |
349 "Removing already-fired system event triggers will raise an " | |
350 "exception in a future version of Twisted.", | |
351 category=DeprecationWarning, | |
352 stacklevel=3) | |
353 else: | |
354 self.removeTrigger_BASE(handle) | |
355 | |
356 | |
357 def fireEvent(self): | |
358 """ | |
359 Call the triggers added to this event. | |
360 """ | |
361 self.state = 'BEFORE' | |
362 self.finishedBefore = [] | |
363 beforeResults = [] | |
364 while self.before: | |
365 callable, args, kwargs = self.before.pop(0) | |
366 self.finishedBefore.append((callable, args, kwargs)) | |
367 try: | |
368 result = callable(*args, **kwargs) | |
369 except: | |
370 log.err() | |
371 else: | |
372 if isinstance(result, Deferred): | |
373 beforeResults.append(result) | |
374 DeferredList(beforeResults).addCallback(self._continueFiring) | |
375 | |
376 | |
377 def _continueFiring(self, ignored): | |
378 """ | |
379 Call the during and after phase triggers for this event. | |
380 """ | |
381 self.state = 'BASE' | |
382 self.finishedBefore = [] | |
383 for phase in self.during, self.after: | |
384 while phase: | |
385 callable, args, kwargs = phase.pop(0) | |
386 try: | |
387 callable(*args, **kwargs) | |
388 except: | |
389 log.err() | |
390 | |
391 | |
392 | |
393 class ReactorBase(object): | |
394 """ | |
395 Default base class for Reactors. | |
396 | |
397 @type _stopped: C{bool} | |
398 @ivar _stopped: A flag which is true between paired calls to C{reactor.run} | |
399 and C{reactor.stop}. | |
400 """ | |
401 | |
402 implements(IReactorCore, IReactorTime, IReactorPluggableResolver) | |
403 | |
404 _stopped = True | |
405 installed = False | |
406 usingThreads = False | |
407 resolver = BlockingResolver() | |
408 | |
409 __name__ = "twisted.internet.reactor" | |
410 | |
411 def __init__(self): | |
412 self.threadCallQueue = [] | |
413 self._eventTriggers = {} | |
414 self._pendingTimedCalls = [] | |
415 self._newTimedCalls = [] | |
416 self._cancellations = 0 | |
417 self.running = False | |
418 self.waker = None | |
419 | |
420 self.addSystemEventTrigger('during', 'shutdown', self.crash) | |
421 self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll) | |
422 | |
423 if platform.supportsThreads(): | |
424 self._initThreads() | |
425 | |
426 # override in subclasses | |
427 | |
428 _lock = None | |
429 | |
430 def installWaker(self): | |
431 raise NotImplementedError() | |
432 | |
433 def installResolver(self, resolver): | |
434 assert IResolverSimple.providedBy(resolver) | |
435 oldResolver = self.resolver | |
436 self.resolver = resolver | |
437 return oldResolver | |
438 | |
439 def wakeUp(self): | |
440 """Wake up the event loop.""" | |
441 if not threadable.isInIOThread(): | |
442 if self.waker: | |
443 self.waker.wakeUp() | |
444 # if the waker isn't installed, the reactor isn't running, and | |
445 # therefore doesn't need to be woken up | |
446 | |
447 def doIteration(self, delay): | |
448 """Do one iteration over the readers and writers we know about.""" | |
449 raise NotImplementedError | |
450 | |
451 def addReader(self, reader): | |
452 raise NotImplementedError | |
453 | |
454 def addWriter(self, writer): | |
455 raise NotImplementedError | |
456 | |
457 def removeReader(self, reader): | |
458 raise NotImplementedError | |
459 | |
460 def removeWriter(self, writer): | |
461 raise NotImplementedError | |
462 | |
463 def removeAll(self): | |
464 raise NotImplementedError | |
465 | |
466 | |
467 def getReaders(self): | |
468 raise NotImplementedError() | |
469 | |
470 | |
471 def getWriters(self): | |
472 raise NotImplementedError() | |
473 | |
474 | |
475 def resolve(self, name, timeout = (1, 3, 11, 45)): | |
476 """Return a Deferred that will resolve a hostname. | |
477 """ | |
478 if not name: | |
479 # XXX - This is *less than* '::', and will screw up IPv6 servers | |
480 return defer.succeed('0.0.0.0') | |
481 if abstract.isIPAddress(name): | |
482 return defer.succeed(name) | |
483 return self.resolver.getHostByName(name, timeout) | |
484 | |
485 # Installation. | |
486 | |
487 # IReactorCore | |
488 | |
489 def stop(self): | |
490 """ | |
491 See twisted.internet.interfaces.IReactorCore.stop. | |
492 """ | |
493 if self._stopped: | |
494 raise error.ReactorNotRunning( | |
495 "Can't stop reactor that isn't running.") | |
496 self._stopped = True | |
497 self.callLater(0, self.fireSystemEvent, "shutdown") | |
498 | |
499 def crash(self): | |
500 """ | |
501 See twisted.internet.interfaces.IReactorCore.crash. | |
502 """ | |
503 self.running = False | |
504 | |
505 def sigInt(self, *args): | |
506 """Handle a SIGINT interrupt. | |
507 """ | |
508 log.msg("Received SIGINT, shutting down.") | |
509 self.callFromThread(self.stop) | |
510 | |
511 def sigBreak(self, *args): | |
512 """Handle a SIGBREAK interrupt. | |
513 """ | |
514 log.msg("Received SIGBREAK, shutting down.") | |
515 self.callFromThread(self.stop) | |
516 | |
517 def sigTerm(self, *args): | |
518 """Handle a SIGTERM interrupt. | |
519 """ | |
520 log.msg("Received SIGTERM, shutting down.") | |
521 self.callFromThread(self.stop) | |
522 | |
523 def disconnectAll(self): | |
524 """Disconnect every reader, and writer in the system. | |
525 """ | |
526 selectables = self.removeAll() | |
527 for reader in selectables: | |
528 log.callWithLogger(reader, | |
529 reader.connectionLost, | |
530 failure.Failure(main.CONNECTION_LOST)) | |
531 | |
532 | |
533 def iterate(self, delay=0): | |
534 """See twisted.internet.interfaces.IReactorCore.iterate. | |
535 """ | |
536 self.runUntilCurrent() | |
537 self.doIteration(delay) | |
538 | |
539 | |
540 def fireSystemEvent(self, eventType): | |
541 """See twisted.internet.interfaces.IReactorCore.fireSystemEvent. | |
542 """ | |
543 event = self._eventTriggers.get(eventType) | |
544 if event is not None: | |
545 event.fireEvent() | |
546 | |
547 | |
548 def addSystemEventTrigger(self, _phase, _eventType, _f, *args, **kw): | |
549 """See twisted.internet.interfaces.IReactorCore.addSystemEventTrigger. | |
550 """ | |
551 assert callable(_f), "%s is not callable" % _f | |
552 if _eventType not in self._eventTriggers: | |
553 self._eventTriggers[_eventType] = _ThreePhaseEvent() | |
554 return (_eventType, self._eventTriggers[_eventType].addTrigger( | |
555 _phase, _f, *args, **kw)) | |
556 | |
557 | |
558 def removeSystemEventTrigger(self, triggerID): | |
559 """See twisted.internet.interfaces.IReactorCore.removeSystemEventTrigger
. | |
560 """ | |
561 eventType, handle = triggerID | |
562 self._eventTriggers[eventType].removeTrigger(handle) | |
563 | |
564 | |
565 def callWhenRunning(self, _callable, *args, **kw): | |
566 """See twisted.internet.interfaces.IReactorCore.callWhenRunning. | |
567 """ | |
568 if self.running: | |
569 _callable(*args, **kw) | |
570 else: | |
571 return self.addSystemEventTrigger('after', 'startup', | |
572 _callable, *args, **kw) | |
573 | |
574 def startRunning(self): | |
575 """ | |
576 Method called when reactor starts: do some initialization and fire | |
577 startup events. | |
578 | |
579 Don't call this directly, call reactor.run() instead: it should take | |
580 care of calling this. | |
581 """ | |
582 if self.running: | |
583 warnings.warn( | |
584 "Reactor already running! This behavior is deprecated " | |
585 "since Twisted 8.0", | |
586 category=DeprecationWarning, stacklevel=3) | |
587 self.running = True | |
588 self._stopped = False | |
589 threadable.registerAsIOThread() | |
590 self.fireSystemEvent('startup') | |
591 | |
592 # IReactorTime | |
593 | |
594 seconds = staticmethod(runtimeSeconds) | |
595 | |
596 def callLater(self, _seconds, _f, *args, **kw): | |
597 """See twisted.internet.interfaces.IReactorTime.callLater. | |
598 """ | |
599 assert callable(_f), "%s is not callable" % _f | |
600 assert sys.maxint >= _seconds >= 0, \ | |
601 "%s is not greater than or equal to 0 seconds" % (_seconds,) | |
602 tple = DelayedCall(self.seconds() + _seconds, _f, args, kw, | |
603 self._cancelCallLater, | |
604 self._moveCallLaterSooner, | |
605 seconds=self.seconds) | |
606 self._newTimedCalls.append(tple) | |
607 return tple | |
608 | |
609 def _moveCallLaterSooner(self, tple): | |
610 # Linear time find: slow. | |
611 heap = self._pendingTimedCalls | |
612 try: | |
613 pos = heap.index(tple) | |
614 | |
615 # Move elt up the heap until it rests at the right place. | |
616 elt = heap[pos] | |
617 while pos != 0: | |
618 parent = (pos-1) // 2 | |
619 if heap[parent] <= elt: | |
620 break | |
621 # move parent down | |
622 heap[pos] = heap[parent] | |
623 pos = parent | |
624 heap[pos] = elt | |
625 except ValueError: | |
626 # element was not found in heap - oh well... | |
627 pass | |
628 | |
629 def _cancelCallLater(self, tple): | |
630 self._cancellations+=1 | |
631 | |
632 def cancelCallLater(self, callID): | |
633 """See twisted.internet.interfaces.IReactorTime.cancelCallLater. | |
634 """ | |
635 # DO NOT DELETE THIS - this is documented in Python in a Nutshell, so we | |
636 # we can't get rid of it for a long time. | |
637 warnings.warn("reactor.cancelCallLater(callID) is deprecated - use callI
D.cancel() instead") | |
638 callID.cancel() | |
639 | |
640 def getDelayedCalls(self): | |
641 """Return all the outstanding delayed calls in the system. | |
642 They are returned in no particular order. | |
643 This method is not efficient -- it is really only meant for | |
644 test cases.""" | |
645 return [x for x in (self._pendingTimedCalls + self._newTimedCalls) if no
t x.cancelled] | |
646 | |
647 def _insertNewDelayedCalls(self): | |
648 for call in self._newTimedCalls: | |
649 if call.cancelled: | |
650 self._cancellations-=1 | |
651 else: | |
652 call.activate_delay() | |
653 heappush(self._pendingTimedCalls, call) | |
654 self._newTimedCalls = [] | |
655 | |
656 def timeout(self): | |
657 # insert new delayed calls to make sure to include them in timeout value | |
658 self._insertNewDelayedCalls() | |
659 | |
660 if not self._pendingTimedCalls: | |
661 return None | |
662 | |
663 return max(0, self._pendingTimedCalls[0].time - self.seconds()) | |
664 | |
665 | |
666 def runUntilCurrent(self): | |
667 """Run all pending timed calls. | |
668 """ | |
669 if self.threadCallQueue: | |
670 # Keep track of how many calls we actually make, as we're | |
671 # making them, in case another call is added to the queue | |
672 # while we're in this loop. | |
673 count = 0 | |
674 total = len(self.threadCallQueue) | |
675 for (f, a, kw) in self.threadCallQueue: | |
676 try: | |
677 f(*a, **kw) | |
678 except: | |
679 log.err() | |
680 count += 1 | |
681 if count == total: | |
682 break | |
683 del self.threadCallQueue[:count] | |
684 if self.threadCallQueue: | |
685 if self.waker: | |
686 self.waker.wakeUp() | |
687 | |
688 # insert new delayed calls now | |
689 self._insertNewDelayedCalls() | |
690 | |
691 now = self.seconds() | |
692 while self._pendingTimedCalls and (self._pendingTimedCalls[0].time <= no
w): | |
693 call = heappop(self._pendingTimedCalls) | |
694 if call.cancelled: | |
695 self._cancellations-=1 | |
696 continue | |
697 | |
698 if call.delayed_time > 0: | |
699 call.activate_delay() | |
700 heappush(self._pendingTimedCalls, call) | |
701 continue | |
702 | |
703 try: | |
704 call.called = 1 | |
705 call.func(*call.args, **call.kw) | |
706 except: | |
707 log.deferr() | |
708 if hasattr(call, "creator"): | |
709 e = "\n" | |
710 e += " C: previous exception occurred in " + \ | |
711 "a DelayedCall created here:\n" | |
712 e += " C:" | |
713 e += "".join(call.creator).rstrip().replace("\n","\n C:") | |
714 e += "\n" | |
715 log.msg(e) | |
716 | |
717 | |
718 if (self._cancellations > 50 and | |
719 self._cancellations > len(self._pendingTimedCalls) >> 1): | |
720 self._cancellations = 0 | |
721 self._pendingTimedCalls = [x for x in self._pendingTimedCalls | |
722 if not x.cancelled] | |
723 heapify(self._pendingTimedCalls) | |
724 | |
725 # IReactorProcess | |
726 | |
727 def _checkProcessArgs(self, args, env): | |
728 """ | |
729 Check for valid arguments and environment to spawnProcess. | |
730 | |
731 @return: A two element tuple giving values to use when creating the | |
732 process. The first element of the tuple is a C{list} of C{str} | |
733 giving the values for argv of the child process. The second element | |
734 of the tuple is either C{None} if C{env} was C{None} or a C{dict} | |
735 mapping C{str} environment keys to C{str} environment values. | |
736 """ | |
737 # Any unicode string which Python would successfully implicitly | |
738 # encode to a byte string would have worked before these explicit | |
739 # checks were added. Anything which would have failed with a | |
740 # UnicodeEncodeError during that implicit encoding step would have | |
741 # raised an exception in the child process and that would have been | |
742 # a pain in the butt to debug. | |
743 # | |
744 # So, we will explicitly attempt the same encoding which Python | |
745 # would implicitly do later. If it fails, we will report an error | |
746 # without ever spawning a child process. If it succeeds, we'll save | |
747 # the result so that Python doesn't need to do it implicitly later. | |
748 # | |
749 # For any unicode which we can actually encode, we'll also issue a | |
750 # deprecation warning, because no one should be passing unicode here | |
751 # anyway. | |
752 # | |
753 # -exarkun | |
754 defaultEncoding = sys.getdefaultencoding() | |
755 | |
756 # Common check function | |
757 def argChecker(arg): | |
758 """ | |
759 Return either a str or None. If the given value is not | |
760 allowable for some reason, None is returned. Otherwise, a | |
761 possibly different object which should be used in place of arg | |
762 is returned. This forces unicode encoding to happen now, rather | |
763 than implicitly later. | |
764 """ | |
765 if isinstance(arg, unicode): | |
766 try: | |
767 arg = arg.encode(defaultEncoding) | |
768 except UnicodeEncodeError: | |
769 return None | |
770 warnings.warn( | |
771 "Argument strings and environment keys/values passed to " | |
772 "reactor.spawnProcess should be str, not unicode.", | |
773 category=DeprecationWarning, | |
774 stacklevel=4) | |
775 if isinstance(arg, str) and '\0' not in arg: | |
776 return arg | |
777 return None | |
778 | |
779 # Make a few tests to check input validity | |
780 if not isinstance(args, (tuple, list)): | |
781 raise TypeError("Arguments must be a tuple or list") | |
782 | |
783 outputArgs = [] | |
784 for arg in args: | |
785 arg = argChecker(arg) | |
786 if arg is None: | |
787 raise TypeError("Arguments contain a non-string value") | |
788 else: | |
789 outputArgs.append(arg) | |
790 | |
791 outputEnv = None | |
792 if env is not None: | |
793 outputEnv = {} | |
794 for key, val in env.iteritems(): | |
795 key = argChecker(key) | |
796 if key is None: | |
797 raise TypeError("Environment contains a non-string key") | |
798 val = argChecker(val) | |
799 if val is None: | |
800 raise TypeError("Environment contains a non-string value") | |
801 outputEnv[key] = val | |
802 return outputArgs, outputEnv | |
803 | |
804 # IReactorThreads | |
805 if platform.supportsThreads(): | |
806 threadpool = None | |
807 # ID of the trigger stopping the threadpool | |
808 threadpoolShutdownID = None | |
809 | |
810 def _initThreads(self): | |
811 self.usingThreads = True | |
812 self.resolver = ThreadedResolver(self) | |
813 self.installWaker() | |
814 | |
815 def callFromThread(self, f, *args, **kw): | |
816 """ | |
817 See L{twisted.internet.interfaces.IReactorThreads.callFromThread}. | |
818 """ | |
819 assert callable(f), "%s is not callable" % (f,) | |
820 # lists are thread-safe in CPython, but not in Jython | |
821 # this is probably a bug in Jython, but until fixed this code | |
822 # won't work in Jython. | |
823 self.threadCallQueue.append((f, args, kw)) | |
824 self.wakeUp() | |
825 | |
826 def _initThreadPool(self): | |
827 """ | |
828 Create the threadpool accessible with callFromThread. | |
829 """ | |
830 from twisted.python import threadpool | |
831 self.threadpool = threadpool.ThreadPool(0, 10, 'twisted.internet.rea
ctor') | |
832 self.callWhenRunning(self.threadpool.start) | |
833 self.threadpoolShutdownID = self.addSystemEventTrigger( | |
834 'during', 'shutdown', self._stopThreadPool) | |
835 | |
836 def _stopThreadPool(self): | |
837 """ | |
838 Stop the reactor threadpool. | |
839 """ | |
840 self.threadpoolShutdownID = None | |
841 self.threadpool.stop() | |
842 self.threadpool = None | |
843 | |
844 def callInThread(self, _callable, *args, **kwargs): | |
845 """ | |
846 See L{twisted.internet.interfaces.IReactorThreads.callInThread}. | |
847 """ | |
848 if self.threadpool is None: | |
849 self._initThreadPool() | |
850 self.threadpool.callInThread(_callable, *args, **kwargs) | |
851 | |
852 def suggestThreadPoolSize(self, size): | |
853 """ | |
854 See L{twisted.internet.interfaces.IReactorThreads.suggestThreadPoolS
ize}. | |
855 """ | |
856 if size == 0 and self.threadpool is None: | |
857 return | |
858 if self.threadpool is None: | |
859 self._initThreadPool() | |
860 self.threadpool.adjustPoolsize(maxthreads=size) | |
861 else: | |
862 # This is for signal handlers. | |
863 def callFromThread(self, f, *args, **kw): | |
864 assert callable(f), "%s is not callable" % (f,) | |
865 # See comment in the other callFromThread implementation. | |
866 self.threadCallQueue.append((f, args, kw)) | |
867 | |
868 if platform.supportsThreads(): | |
869 classImplements(ReactorBase, IReactorThreads) | |
870 | |
871 | |
872 class BaseConnector(styles.Ephemeral): | |
873 """Basic implementation of connector. | |
874 | |
875 State can be: "connecting", "connected", "disconnected" | |
876 """ | |
877 | |
878 implements(IConnector) | |
879 | |
880 timeoutID = None | |
881 factoryStarted = 0 | |
882 | |
883 def __init__(self, factory, timeout, reactor): | |
884 self.state = "disconnected" | |
885 self.reactor = reactor | |
886 self.factory = factory | |
887 self.timeout = timeout | |
888 | |
889 def disconnect(self): | |
890 """Disconnect whatever our state is.""" | |
891 if self.state == 'connecting': | |
892 self.stopConnecting() | |
893 elif self.state == 'connected': | |
894 self.transport.loseConnection() | |
895 | |
896 def connect(self): | |
897 """Start connection to remote server.""" | |
898 if self.state != "disconnected": | |
899 raise RuntimeError, "can't connect in this state" | |
900 | |
901 self.state = "connecting" | |
902 if not self.factoryStarted: | |
903 self.factory.doStart() | |
904 self.factoryStarted = 1 | |
905 self.transport = transport = self._makeTransport() | |
906 if self.timeout is not None: | |
907 self.timeoutID = self.reactor.callLater(self.timeout, transport.fail
IfNotConnected, error.TimeoutError()) | |
908 self.factory.startedConnecting(self) | |
909 | |
910 def stopConnecting(self): | |
911 """Stop attempting to connect.""" | |
912 if self.state != "connecting": | |
913 raise error.NotConnectingError, "we're not trying to connect" | |
914 | |
915 self.state = "disconnected" | |
916 self.transport.failIfNotConnected(error.UserError()) | |
917 del self.transport | |
918 | |
919 def cancelTimeout(self): | |
920 if self.timeoutID is not None: | |
921 try: | |
922 self.timeoutID.cancel() | |
923 except ValueError: | |
924 pass | |
925 del self.timeoutID | |
926 | |
927 def buildProtocol(self, addr): | |
928 self.state = "connected" | |
929 self.cancelTimeout() | |
930 return self.factory.buildProtocol(addr) | |
931 | |
932 def connectionFailed(self, reason): | |
933 self.cancelTimeout() | |
934 self.transport = None | |
935 self.state = "disconnected" | |
936 self.factory.clientConnectionFailed(self, reason) | |
937 if self.state == "disconnected": | |
938 # factory hasn't called our connect() method | |
939 self.factory.doStop() | |
940 self.factoryStarted = 0 | |
941 | |
942 def connectionLost(self, reason): | |
943 self.state = "disconnected" | |
944 self.factory.clientConnectionLost(self, reason) | |
945 if self.state == "disconnected": | |
946 # factory hasn't called our connect() method | |
947 self.factory.doStop() | |
948 self.factoryStarted = 0 | |
949 | |
950 def getDestination(self): | |
951 raise NotImplementedError, "implement in subclasses" | |
952 | |
953 | |
954 class BasePort(abstract.FileDescriptor): | |
955 """Basic implementation of a ListeningPort. | |
956 | |
957 Note: This does not actually implement IListeningPort. | |
958 """ | |
959 | |
960 addressFamily = None | |
961 socketType = None | |
962 | |
963 def createInternetSocket(self): | |
964 s = socket.socket(self.addressFamily, self.socketType) | |
965 s.setblocking(0) | |
966 if fcntl and hasattr(fcntl, 'FD_CLOEXEC'): | |
967 old = fcntl.fcntl(s.fileno(), fcntl.F_GETFD) | |
968 fcntl.fcntl(s.fileno(), fcntl.F_SETFD, old | fcntl.FD_CLOEXEC) | |
969 return s | |
970 | |
971 | |
972 def doWrite(self): | |
973 """Raises a RuntimeError""" | |
974 raise RuntimeError, "doWrite called on a %s" % reflect.qual(self.__class
__) | |
975 | |
976 | |
977 | |
978 class _SignalReactorMixin: | |
979 """ | |
980 Private mixin to manage signals: it installs signal handlers at start time, | |
981 and define run method. | |
982 | |
983 It can only be used mixed in with L{ReactorBase}, and has to be defined | |
984 first in the inheritance (so that method resolution order finds | |
985 startRunning first). | |
986 """ | |
987 | |
988 def _handleSignals(self): | |
989 """ | |
990 Install the signal handlers for the Twisted event loop. | |
991 """ | |
992 try: | |
993 import signal | |
994 except ImportError: | |
995 log.msg("Warning: signal module unavailable -- " | |
996 "not installing signal handlers.") | |
997 return | |
998 | |
999 if signal.getsignal(signal.SIGINT) == signal.default_int_handler: | |
1000 # only handle if there isn't already a handler, e.g. for Pdb. | |
1001 signal.signal(signal.SIGINT, self.sigInt) | |
1002 signal.signal(signal.SIGTERM, self.sigTerm) | |
1003 | |
1004 # Catch Ctrl-Break in windows | |
1005 if hasattr(signal, "SIGBREAK"): | |
1006 signal.signal(signal.SIGBREAK, self.sigBreak) | |
1007 | |
1008 if platformType == 'posix': | |
1009 signal.signal(signal.SIGCHLD, self._handleSigchld) | |
1010 | |
1011 | |
1012 def _handleSigchld(self, signum, frame, _threadSupport=platform.supportsThre
ads()): | |
1013 """ | |
1014 Reap all processes on SIGCHLD. | |
1015 | |
1016 This gets called on SIGCHLD. We do no processing inside a signal | |
1017 handler, as the calls we make here could occur between any two | |
1018 python bytecode instructions. Deferring processing to the next | |
1019 eventloop round prevents us from violating the state constraints | |
1020 of arbitrary classes. | |
1021 """ | |
1022 from twisted.internet.process import reapAllProcesses | |
1023 if _threadSupport: | |
1024 self.callFromThread(reapAllProcesses) | |
1025 else: | |
1026 self.callLater(0, reapAllProcesses) | |
1027 | |
1028 | |
1029 def startRunning(self, installSignalHandlers=True): | |
1030 """ | |
1031 Forward call to ReactorBase, arrange for signal handlers to be | |
1032 installed if asked. | |
1033 """ | |
1034 if installSignalHandlers: | |
1035 # Make sure this happens before after-startup events, since the | |
1036 # expectation of after-startup is that the reactor is fully | |
1037 # initialized. Don't do it right away for historical reasons | |
1038 # (perhaps some before-startup triggers don't want there to be a | |
1039 # custom SIGCHLD handler so that they can run child processes with | |
1040 # some blocking api). | |
1041 self.addSystemEventTrigger( | |
1042 'during', 'startup', self._handleSignals) | |
1043 ReactorBase.startRunning(self) | |
1044 | |
1045 | |
1046 def run(self, installSignalHandlers=True): | |
1047 self.startRunning(installSignalHandlers=installSignalHandlers) | |
1048 self.mainLoop() | |
1049 | |
1050 | |
1051 def mainLoop(self): | |
1052 while self.running: | |
1053 try: | |
1054 while self.running: | |
1055 # Advance simulation time in delayed event | |
1056 # processors. | |
1057 self.runUntilCurrent() | |
1058 t2 = self.timeout() | |
1059 t = self.running and t2 | |
1060 self.doIteration(t) | |
1061 except: | |
1062 log.msg("Unexpected error in main loop.") | |
1063 log.err() | |
1064 else: | |
1065 log.msg('Main loop terminated.') | |
1066 | |
1067 | |
1068 | |
1069 __all__ = [] | |
OLD | NEW |