Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(476)

Side by Side Diff: third_party/twisted_8_1/twisted/internet/base.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # -*- test-case-name: twisted.test.test_internet -*-
2 # Copyright (c) 2001-2008 Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5
6 """
7 Very basic functionality for a Reactor implementation.
8
9 Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>}
10 """
11
12 import socket # needed only for sync-dns
13 from zope.interface import implements, classImplements
14
15 import sys
16 import warnings
17 import operator
18 from heapq import heappush, heappop, heapify
19
20 try:
21 import fcntl
22 except ImportError:
23 fcntl = None
24 import traceback
25
26 from twisted.internet.interfaces import IReactorCore, IReactorTime, IReactorThre ads
27 from twisted.internet.interfaces import IResolverSimple, IReactorPluggableResolv er
28 from twisted.internet.interfaces import IConnector, IDelayedCall
29 from twisted.internet import main, error, abstract, defer, threads
30 from twisted.python import log, failure, reflect
31 from twisted.python.runtime import seconds as runtimeSeconds, platform, platform Type
32 from twisted.internet.defer import Deferred, DeferredList
33 from twisted.persisted import styles
34
35 # This import is for side-effects! Even if you don't see any code using it
36 # in this module, don't delete it.
37 from twisted.python import threadable
38
39
40 class DelayedCall(styles.Ephemeral):
41
42 implements(IDelayedCall)
43 # enable .debug to record creator call stack, and it will be logged if
44 # an exception occurs while the function is being run
45 debug = False
46 _str = None
47
48 def __init__(self, time, func, args, kw, cancel, reset,
49 seconds=runtimeSeconds):
50 """
51 @param time: Seconds from the epoch at which to call C{func}.
52 @param func: The callable to call.
53 @param args: The positional arguments to pass to the callable.
54 @param kw: The keyword arguments to pass to the callable.
55 @param cancel: A callable which will be called with this
56 DelayedCall before cancellation.
57 @param reset: A callable which will be called with this
58 DelayedCall after changing this DelayedCall's scheduled
59 execution time. The callable should adjust any necessary
60 scheduling details to ensure this DelayedCall is invoked
61 at the new appropriate time.
62 @param seconds: If provided, a no-argument callable which will be
63 used to determine the current time any time that information is
64 needed.
65 """
66 self.time, self.func, self.args, self.kw = time, func, args, kw
67 self.resetter = reset
68 self.canceller = cancel
69 self.seconds = seconds
70 self.cancelled = self.called = 0
71 self.delayed_time = 0
72 if self.debug:
73 self.creator = traceback.format_stack()[:-2]
74
75 def getTime(self):
76 """Return the time at which this call will fire
77
78 @rtype: C{float}
79 @return: The number of seconds after the epoch at which this call is
80 scheduled to be made.
81 """
82 return self.time + self.delayed_time
83
84 def cancel(self):
85 """Unschedule this call
86
87 @raise AlreadyCancelled: Raised if this call has already been
88 unscheduled.
89
90 @raise AlreadyCalled: Raised if this call has already been made.
91 """
92 if self.cancelled:
93 raise error.AlreadyCancelled
94 elif self.called:
95 raise error.AlreadyCalled
96 else:
97 self.canceller(self)
98 self.cancelled = 1
99 if self.debug:
100 self._str = str(self)
101 del self.func, self.args, self.kw
102
103 def reset(self, secondsFromNow):
104 """Reschedule this call for a different time
105
106 @type secondsFromNow: C{float}
107 @param secondsFromNow: The number of seconds from the time of the
108 C{reset} call at which this call will be scheduled.
109
110 @raise AlreadyCancelled: Raised if this call has been cancelled.
111 @raise AlreadyCalled: Raised if this call has already been made.
112 """
113 if self.cancelled:
114 raise error.AlreadyCancelled
115 elif self.called:
116 raise error.AlreadyCalled
117 else:
118 newTime = self.seconds() + secondsFromNow
119 if newTime < self.time:
120 self.delayed_time = 0
121 self.time = newTime
122 self.resetter(self)
123 else:
124 self.delayed_time = newTime - self.time
125
126 def delay(self, secondsLater):
127 """Reschedule this call for a later time
128
129 @type secondsLater: C{float}
130 @param secondsLater: The number of seconds after the originally
131 scheduled time for which to reschedule this call.
132
133 @raise AlreadyCancelled: Raised if this call has been cancelled.
134 @raise AlreadyCalled: Raised if this call has already been made.
135 """
136 if self.cancelled:
137 raise error.AlreadyCancelled
138 elif self.called:
139 raise error.AlreadyCalled
140 else:
141 self.delayed_time += secondsLater
142 if self.delayed_time < 0:
143 self.activate_delay()
144 self.resetter(self)
145
146 def activate_delay(self):
147 self.time += self.delayed_time
148 self.delayed_time = 0
149
150 def active(self):
151 """Determine whether this call is still pending
152
153 @rtype: C{bool}
154 @return: True if this call has not yet been made or cancelled,
155 False otherwise.
156 """
157 return not (self.cancelled or self.called)
158
159 def __le__(self, other):
160 return self.time <= other.time
161
162 def __str__(self):
163 if self._str is not None:
164 return self._str
165 if hasattr(self, 'func'):
166 if hasattr(self.func, 'func_name'):
167 func = self.func.func_name
168 if hasattr(self.func, 'im_class'):
169 func = self.func.im_class.__name__ + '.' + func
170 else:
171 func = reflect.safe_repr(self.func)
172 else:
173 func = None
174
175 now = self.seconds()
176 L = ["<DelayedCall %s [%ss] called=%s cancelled=%s" % (
177 id(self), self.time - now, self.called, self.cancelled)]
178 if func is not None:
179 L.extend((" ", func, "("))
180 if self.args:
181 L.append(", ".join([reflect.safe_repr(e) for e in self.args]))
182 if self.kw:
183 L.append(", ")
184 if self.kw:
185 L.append(", ".join(['%s=%s' % (k, reflect.safe_repr(v)) for (k, v) in self.kw.iteritems()]))
186 L.append(")")
187
188 if self.debug:
189 L.append("\n\ntraceback at creation: \n\n%s" % (' '.join(self.cre ator)))
190 L.append('>')
191
192 return "".join(L)
193
194
195 class ThreadedResolver:
196 implements(IResolverSimple)
197
198 def __init__(self, reactor):
199 self.reactor = reactor
200 self._runningQueries = {}
201
202 def _fail(self, name, err):
203 err = error.DNSLookupError("address %r not found: %s" % (name, err))
204 return failure.Failure(err)
205
206 def _cleanup(self, name, lookupDeferred):
207 userDeferred, cancelCall = self._runningQueries[lookupDeferred]
208 del self._runningQueries[lookupDeferred]
209 userDeferred.errback(self._fail(name, "timeout error"))
210
211 def _checkTimeout(self, result, name, lookupDeferred):
212 try:
213 userDeferred, cancelCall = self._runningQueries[lookupDeferred]
214 except KeyError:
215 pass
216 else:
217 del self._runningQueries[lookupDeferred]
218 cancelCall.cancel()
219
220 if isinstance(result, failure.Failure):
221 userDeferred.errback(self._fail(name, result.getErrorMessage()))
222 else:
223 userDeferred.callback(result)
224
225 def getHostByName(self, name, timeout = (1, 3, 11, 45)):
226 if timeout:
227 timeoutDelay = reduce(operator.add, timeout)
228 else:
229 timeoutDelay = 60
230 userDeferred = defer.Deferred()
231 lookupDeferred = threads.deferToThread(socket.gethostbyname, name)
232 cancelCall = self.reactor.callLater(
233 timeoutDelay, self._cleanup, name, lookupDeferred)
234 self._runningQueries[lookupDeferred] = (userDeferred, cancelCall)
235 lookupDeferred.addBoth(self._checkTimeout, name, lookupDeferred)
236 return userDeferred
237
238 class BlockingResolver:
239 implements(IResolverSimple)
240
241 def getHostByName(self, name, timeout = (1, 3, 11, 45)):
242 try:
243 address = socket.gethostbyname(name)
244 except socket.error:
245 msg = "address %r not found" % (name,)
246 err = error.DNSLookupError(msg)
247 return defer.fail(err)
248 else:
249 return defer.succeed(address)
250
251
252 class _ThreePhaseEvent(object):
253 """
254 Collection of callables (with arguments) which can be invoked as a group in
255 a particular order.
256
257 This provides the underlying implementation for the reactor's system event
258 triggers. An instance of this class tracks triggers for all phases of a
259 single type of event.
260
261 @ivar before: A list of the before-phase triggers containing three-tuples
262 of a callable, a tuple of positional arguments, and a dict of keyword
263 arguments
264
265 @ivar finishedBefore: A list of the before-phase triggers which have
266 already been executed. This is only populated in the C{'BEFORE'} state.
267
268 @ivar during: A list of the during-phase triggers containing three-tuples
269 of a callable, a tuple of positional arguments, and a dict of keyword
270 arguments
271
272 @ivar after: A list of the after-phase triggers containing three-tuples
273 of a callable, a tuple of positional arguments, and a dict of keyword
274 arguments
275
276 @ivar state: A string indicating what is currently going on with this
277 object. One of C{'BASE'} (for when nothing in particular is happening;
278 this is the initial value), C{'BEFORE'} (when the before-phase triggers
279 are in the process of being executed).
280 """
281 def __init__(self):
282 self.before = []
283 self.during = []
284 self.after = []
285 self.state = 'BASE'
286
287
288 def addTrigger(self, phase, callable, *args, **kwargs):
289 """
290 Add a trigger to the indicate phase.
291
292 @param phase: One of C{'before'}, C{'during'}, or C{'after'}.
293
294 @param callable: An object to be called when this event is triggered.
295 @param *args: Positional arguments to pass to C{callable}.
296 @param **kwargs: Keyword arguments to pass to C{callable}.
297
298 @return: An opaque handle which may be passed to L{removeTrigger} to
299 reverse the effects of calling this method.
300 """
301 if phase not in ('before', 'during', 'after'):
302 raise KeyError("invalid phase")
303 getattr(self, phase).append((callable, args, kwargs))
304 return phase, callable, args, kwargs
305
306
307 def removeTrigger(self, handle):
308 """
309 Remove a previously added trigger callable.
310
311 @param handle: An object previously returned by L{addTrigger}. The
312 trigger added by that call will be removed.
313
314 @raise ValueError: If the trigger associated with C{handle} has already
315 been removed or if C{handle} is not a valid handle.
316 """
317 return getattr(self, 'removeTrigger_' + self.state)(handle)
318
319
320 def removeTrigger_BASE(self, handle):
321 """
322 Just try to remove the trigger.
323
324 @see removeTrigger
325 """
326 try:
327 phase, callable, args, kwargs = handle
328 except (TypeError, ValueError), e:
329 raise ValueError("invalid trigger handle")
330 else:
331 if phase not in ('before', 'during', 'after'):
332 raise KeyError("invalid phase")
333 getattr(self, phase).remove((callable, args, kwargs))
334
335
336 def removeTrigger_BEFORE(self, handle):
337 """
338 Remove the trigger if it has yet to be executed, otherwise emit a
339 warning that in the future an exception will be raised when removing an
340 already-executed trigger.
341
342 @see removeTrigger
343 """
344 phase, callable, args, kwargs = handle
345 if phase != 'before':
346 return self.removeTrigger_BASE(handle)
347 if (callable, args, kwargs) in self.finishedBefore:
348 warnings.warn(
349 "Removing already-fired system event triggers will raise an "
350 "exception in a future version of Twisted.",
351 category=DeprecationWarning,
352 stacklevel=3)
353 else:
354 self.removeTrigger_BASE(handle)
355
356
357 def fireEvent(self):
358 """
359 Call the triggers added to this event.
360 """
361 self.state = 'BEFORE'
362 self.finishedBefore = []
363 beforeResults = []
364 while self.before:
365 callable, args, kwargs = self.before.pop(0)
366 self.finishedBefore.append((callable, args, kwargs))
367 try:
368 result = callable(*args, **kwargs)
369 except:
370 log.err()
371 else:
372 if isinstance(result, Deferred):
373 beforeResults.append(result)
374 DeferredList(beforeResults).addCallback(self._continueFiring)
375
376
377 def _continueFiring(self, ignored):
378 """
379 Call the during and after phase triggers for this event.
380 """
381 self.state = 'BASE'
382 self.finishedBefore = []
383 for phase in self.during, self.after:
384 while phase:
385 callable, args, kwargs = phase.pop(0)
386 try:
387 callable(*args, **kwargs)
388 except:
389 log.err()
390
391
392
393 class ReactorBase(object):
394 """
395 Default base class for Reactors.
396
397 @type _stopped: C{bool}
398 @ivar _stopped: A flag which is true between paired calls to C{reactor.run}
399 and C{reactor.stop}.
400 """
401
402 implements(IReactorCore, IReactorTime, IReactorPluggableResolver)
403
404 _stopped = True
405 installed = False
406 usingThreads = False
407 resolver = BlockingResolver()
408
409 __name__ = "twisted.internet.reactor"
410
411 def __init__(self):
412 self.threadCallQueue = []
413 self._eventTriggers = {}
414 self._pendingTimedCalls = []
415 self._newTimedCalls = []
416 self._cancellations = 0
417 self.running = False
418 self.waker = None
419
420 self.addSystemEventTrigger('during', 'shutdown', self.crash)
421 self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
422
423 if platform.supportsThreads():
424 self._initThreads()
425
426 # override in subclasses
427
428 _lock = None
429
430 def installWaker(self):
431 raise NotImplementedError()
432
433 def installResolver(self, resolver):
434 assert IResolverSimple.providedBy(resolver)
435 oldResolver = self.resolver
436 self.resolver = resolver
437 return oldResolver
438
439 def wakeUp(self):
440 """Wake up the event loop."""
441 if not threadable.isInIOThread():
442 if self.waker:
443 self.waker.wakeUp()
444 # if the waker isn't installed, the reactor isn't running, and
445 # therefore doesn't need to be woken up
446
447 def doIteration(self, delay):
448 """Do one iteration over the readers and writers we know about."""
449 raise NotImplementedError
450
451 def addReader(self, reader):
452 raise NotImplementedError
453
454 def addWriter(self, writer):
455 raise NotImplementedError
456
457 def removeReader(self, reader):
458 raise NotImplementedError
459
460 def removeWriter(self, writer):
461 raise NotImplementedError
462
463 def removeAll(self):
464 raise NotImplementedError
465
466
467 def getReaders(self):
468 raise NotImplementedError()
469
470
471 def getWriters(self):
472 raise NotImplementedError()
473
474
475 def resolve(self, name, timeout = (1, 3, 11, 45)):
476 """Return a Deferred that will resolve a hostname.
477 """
478 if not name:
479 # XXX - This is *less than* '::', and will screw up IPv6 servers
480 return defer.succeed('0.0.0.0')
481 if abstract.isIPAddress(name):
482 return defer.succeed(name)
483 return self.resolver.getHostByName(name, timeout)
484
485 # Installation.
486
487 # IReactorCore
488
489 def stop(self):
490 """
491 See twisted.internet.interfaces.IReactorCore.stop.
492 """
493 if self._stopped:
494 raise error.ReactorNotRunning(
495 "Can't stop reactor that isn't running.")
496 self._stopped = True
497 self.callLater(0, self.fireSystemEvent, "shutdown")
498
499 def crash(self):
500 """
501 See twisted.internet.interfaces.IReactorCore.crash.
502 """
503 self.running = False
504
505 def sigInt(self, *args):
506 """Handle a SIGINT interrupt.
507 """
508 log.msg("Received SIGINT, shutting down.")
509 self.callFromThread(self.stop)
510
511 def sigBreak(self, *args):
512 """Handle a SIGBREAK interrupt.
513 """
514 log.msg("Received SIGBREAK, shutting down.")
515 self.callFromThread(self.stop)
516
517 def sigTerm(self, *args):
518 """Handle a SIGTERM interrupt.
519 """
520 log.msg("Received SIGTERM, shutting down.")
521 self.callFromThread(self.stop)
522
523 def disconnectAll(self):
524 """Disconnect every reader, and writer in the system.
525 """
526 selectables = self.removeAll()
527 for reader in selectables:
528 log.callWithLogger(reader,
529 reader.connectionLost,
530 failure.Failure(main.CONNECTION_LOST))
531
532
533 def iterate(self, delay=0):
534 """See twisted.internet.interfaces.IReactorCore.iterate.
535 """
536 self.runUntilCurrent()
537 self.doIteration(delay)
538
539
540 def fireSystemEvent(self, eventType):
541 """See twisted.internet.interfaces.IReactorCore.fireSystemEvent.
542 """
543 event = self._eventTriggers.get(eventType)
544 if event is not None:
545 event.fireEvent()
546
547
548 def addSystemEventTrigger(self, _phase, _eventType, _f, *args, **kw):
549 """See twisted.internet.interfaces.IReactorCore.addSystemEventTrigger.
550 """
551 assert callable(_f), "%s is not callable" % _f
552 if _eventType not in self._eventTriggers:
553 self._eventTriggers[_eventType] = _ThreePhaseEvent()
554 return (_eventType, self._eventTriggers[_eventType].addTrigger(
555 _phase, _f, *args, **kw))
556
557
558 def removeSystemEventTrigger(self, triggerID):
559 """See twisted.internet.interfaces.IReactorCore.removeSystemEventTrigger .
560 """
561 eventType, handle = triggerID
562 self._eventTriggers[eventType].removeTrigger(handle)
563
564
565 def callWhenRunning(self, _callable, *args, **kw):
566 """See twisted.internet.interfaces.IReactorCore.callWhenRunning.
567 """
568 if self.running:
569 _callable(*args, **kw)
570 else:
571 return self.addSystemEventTrigger('after', 'startup',
572 _callable, *args, **kw)
573
574 def startRunning(self):
575 """
576 Method called when reactor starts: do some initialization and fire
577 startup events.
578
579 Don't call this directly, call reactor.run() instead: it should take
580 care of calling this.
581 """
582 if self.running:
583 warnings.warn(
584 "Reactor already running! This behavior is deprecated "
585 "since Twisted 8.0",
586 category=DeprecationWarning, stacklevel=3)
587 self.running = True
588 self._stopped = False
589 threadable.registerAsIOThread()
590 self.fireSystemEvent('startup')
591
592 # IReactorTime
593
594 seconds = staticmethod(runtimeSeconds)
595
596 def callLater(self, _seconds, _f, *args, **kw):
597 """See twisted.internet.interfaces.IReactorTime.callLater.
598 """
599 assert callable(_f), "%s is not callable" % _f
600 assert sys.maxint >= _seconds >= 0, \
601 "%s is not greater than or equal to 0 seconds" % (_seconds,)
602 tple = DelayedCall(self.seconds() + _seconds, _f, args, kw,
603 self._cancelCallLater,
604 self._moveCallLaterSooner,
605 seconds=self.seconds)
606 self._newTimedCalls.append(tple)
607 return tple
608
609 def _moveCallLaterSooner(self, tple):
610 # Linear time find: slow.
611 heap = self._pendingTimedCalls
612 try:
613 pos = heap.index(tple)
614
615 # Move elt up the heap until it rests at the right place.
616 elt = heap[pos]
617 while pos != 0:
618 parent = (pos-1) // 2
619 if heap[parent] <= elt:
620 break
621 # move parent down
622 heap[pos] = heap[parent]
623 pos = parent
624 heap[pos] = elt
625 except ValueError:
626 # element was not found in heap - oh well...
627 pass
628
629 def _cancelCallLater(self, tple):
630 self._cancellations+=1
631
632 def cancelCallLater(self, callID):
633 """See twisted.internet.interfaces.IReactorTime.cancelCallLater.
634 """
635 # DO NOT DELETE THIS - this is documented in Python in a Nutshell, so we
636 # we can't get rid of it for a long time.
637 warnings.warn("reactor.cancelCallLater(callID) is deprecated - use callI D.cancel() instead")
638 callID.cancel()
639
640 def getDelayedCalls(self):
641 """Return all the outstanding delayed calls in the system.
642 They are returned in no particular order.
643 This method is not efficient -- it is really only meant for
644 test cases."""
645 return [x for x in (self._pendingTimedCalls + self._newTimedCalls) if no t x.cancelled]
646
647 def _insertNewDelayedCalls(self):
648 for call in self._newTimedCalls:
649 if call.cancelled:
650 self._cancellations-=1
651 else:
652 call.activate_delay()
653 heappush(self._pendingTimedCalls, call)
654 self._newTimedCalls = []
655
656 def timeout(self):
657 # insert new delayed calls to make sure to include them in timeout value
658 self._insertNewDelayedCalls()
659
660 if not self._pendingTimedCalls:
661 return None
662
663 return max(0, self._pendingTimedCalls[0].time - self.seconds())
664
665
666 def runUntilCurrent(self):
667 """Run all pending timed calls.
668 """
669 if self.threadCallQueue:
670 # Keep track of how many calls we actually make, as we're
671 # making them, in case another call is added to the queue
672 # while we're in this loop.
673 count = 0
674 total = len(self.threadCallQueue)
675 for (f, a, kw) in self.threadCallQueue:
676 try:
677 f(*a, **kw)
678 except:
679 log.err()
680 count += 1
681 if count == total:
682 break
683 del self.threadCallQueue[:count]
684 if self.threadCallQueue:
685 if self.waker:
686 self.waker.wakeUp()
687
688 # insert new delayed calls now
689 self._insertNewDelayedCalls()
690
691 now = self.seconds()
692 while self._pendingTimedCalls and (self._pendingTimedCalls[0].time <= no w):
693 call = heappop(self._pendingTimedCalls)
694 if call.cancelled:
695 self._cancellations-=1
696 continue
697
698 if call.delayed_time > 0:
699 call.activate_delay()
700 heappush(self._pendingTimedCalls, call)
701 continue
702
703 try:
704 call.called = 1
705 call.func(*call.args, **call.kw)
706 except:
707 log.deferr()
708 if hasattr(call, "creator"):
709 e = "\n"
710 e += " C: previous exception occurred in " + \
711 "a DelayedCall created here:\n"
712 e += " C:"
713 e += "".join(call.creator).rstrip().replace("\n","\n C:")
714 e += "\n"
715 log.msg(e)
716
717
718 if (self._cancellations > 50 and
719 self._cancellations > len(self._pendingTimedCalls) >> 1):
720 self._cancellations = 0
721 self._pendingTimedCalls = [x for x in self._pendingTimedCalls
722 if not x.cancelled]
723 heapify(self._pendingTimedCalls)
724
725 # IReactorProcess
726
727 def _checkProcessArgs(self, args, env):
728 """
729 Check for valid arguments and environment to spawnProcess.
730
731 @return: A two element tuple giving values to use when creating the
732 process. The first element of the tuple is a C{list} of C{str}
733 giving the values for argv of the child process. The second element
734 of the tuple is either C{None} if C{env} was C{None} or a C{dict}
735 mapping C{str} environment keys to C{str} environment values.
736 """
737 # Any unicode string which Python would successfully implicitly
738 # encode to a byte string would have worked before these explicit
739 # checks were added. Anything which would have failed with a
740 # UnicodeEncodeError during that implicit encoding step would have
741 # raised an exception in the child process and that would have been
742 # a pain in the butt to debug.
743 #
744 # So, we will explicitly attempt the same encoding which Python
745 # would implicitly do later. If it fails, we will report an error
746 # without ever spawning a child process. If it succeeds, we'll save
747 # the result so that Python doesn't need to do it implicitly later.
748 #
749 # For any unicode which we can actually encode, we'll also issue a
750 # deprecation warning, because no one should be passing unicode here
751 # anyway.
752 #
753 # -exarkun
754 defaultEncoding = sys.getdefaultencoding()
755
756 # Common check function
757 def argChecker(arg):
758 """
759 Return either a str or None. If the given value is not
760 allowable for some reason, None is returned. Otherwise, a
761 possibly different object which should be used in place of arg
762 is returned. This forces unicode encoding to happen now, rather
763 than implicitly later.
764 """
765 if isinstance(arg, unicode):
766 try:
767 arg = arg.encode(defaultEncoding)
768 except UnicodeEncodeError:
769 return None
770 warnings.warn(
771 "Argument strings and environment keys/values passed to "
772 "reactor.spawnProcess should be str, not unicode.",
773 category=DeprecationWarning,
774 stacklevel=4)
775 if isinstance(arg, str) and '\0' not in arg:
776 return arg
777 return None
778
779 # Make a few tests to check input validity
780 if not isinstance(args, (tuple, list)):
781 raise TypeError("Arguments must be a tuple or list")
782
783 outputArgs = []
784 for arg in args:
785 arg = argChecker(arg)
786 if arg is None:
787 raise TypeError("Arguments contain a non-string value")
788 else:
789 outputArgs.append(arg)
790
791 outputEnv = None
792 if env is not None:
793 outputEnv = {}
794 for key, val in env.iteritems():
795 key = argChecker(key)
796 if key is None:
797 raise TypeError("Environment contains a non-string key")
798 val = argChecker(val)
799 if val is None:
800 raise TypeError("Environment contains a non-string value")
801 outputEnv[key] = val
802 return outputArgs, outputEnv
803
804 # IReactorThreads
805 if platform.supportsThreads():
806 threadpool = None
807 # ID of the trigger stopping the threadpool
808 threadpoolShutdownID = None
809
810 def _initThreads(self):
811 self.usingThreads = True
812 self.resolver = ThreadedResolver(self)
813 self.installWaker()
814
815 def callFromThread(self, f, *args, **kw):
816 """
817 See L{twisted.internet.interfaces.IReactorThreads.callFromThread}.
818 """
819 assert callable(f), "%s is not callable" % (f,)
820 # lists are thread-safe in CPython, but not in Jython
821 # this is probably a bug in Jython, but until fixed this code
822 # won't work in Jython.
823 self.threadCallQueue.append((f, args, kw))
824 self.wakeUp()
825
826 def _initThreadPool(self):
827 """
828 Create the threadpool accessible with callFromThread.
829 """
830 from twisted.python import threadpool
831 self.threadpool = threadpool.ThreadPool(0, 10, 'twisted.internet.rea ctor')
832 self.callWhenRunning(self.threadpool.start)
833 self.threadpoolShutdownID = self.addSystemEventTrigger(
834 'during', 'shutdown', self._stopThreadPool)
835
836 def _stopThreadPool(self):
837 """
838 Stop the reactor threadpool.
839 """
840 self.threadpoolShutdownID = None
841 self.threadpool.stop()
842 self.threadpool = None
843
844 def callInThread(self, _callable, *args, **kwargs):
845 """
846 See L{twisted.internet.interfaces.IReactorThreads.callInThread}.
847 """
848 if self.threadpool is None:
849 self._initThreadPool()
850 self.threadpool.callInThread(_callable, *args, **kwargs)
851
852 def suggestThreadPoolSize(self, size):
853 """
854 See L{twisted.internet.interfaces.IReactorThreads.suggestThreadPoolS ize}.
855 """
856 if size == 0 and self.threadpool is None:
857 return
858 if self.threadpool is None:
859 self._initThreadPool()
860 self.threadpool.adjustPoolsize(maxthreads=size)
861 else:
862 # This is for signal handlers.
863 def callFromThread(self, f, *args, **kw):
864 assert callable(f), "%s is not callable" % (f,)
865 # See comment in the other callFromThread implementation.
866 self.threadCallQueue.append((f, args, kw))
867
868 if platform.supportsThreads():
869 classImplements(ReactorBase, IReactorThreads)
870
871
872 class BaseConnector(styles.Ephemeral):
873 """Basic implementation of connector.
874
875 State can be: "connecting", "connected", "disconnected"
876 """
877
878 implements(IConnector)
879
880 timeoutID = None
881 factoryStarted = 0
882
883 def __init__(self, factory, timeout, reactor):
884 self.state = "disconnected"
885 self.reactor = reactor
886 self.factory = factory
887 self.timeout = timeout
888
889 def disconnect(self):
890 """Disconnect whatever our state is."""
891 if self.state == 'connecting':
892 self.stopConnecting()
893 elif self.state == 'connected':
894 self.transport.loseConnection()
895
896 def connect(self):
897 """Start connection to remote server."""
898 if self.state != "disconnected":
899 raise RuntimeError, "can't connect in this state"
900
901 self.state = "connecting"
902 if not self.factoryStarted:
903 self.factory.doStart()
904 self.factoryStarted = 1
905 self.transport = transport = self._makeTransport()
906 if self.timeout is not None:
907 self.timeoutID = self.reactor.callLater(self.timeout, transport.fail IfNotConnected, error.TimeoutError())
908 self.factory.startedConnecting(self)
909
910 def stopConnecting(self):
911 """Stop attempting to connect."""
912 if self.state != "connecting":
913 raise error.NotConnectingError, "we're not trying to connect"
914
915 self.state = "disconnected"
916 self.transport.failIfNotConnected(error.UserError())
917 del self.transport
918
919 def cancelTimeout(self):
920 if self.timeoutID is not None:
921 try:
922 self.timeoutID.cancel()
923 except ValueError:
924 pass
925 del self.timeoutID
926
927 def buildProtocol(self, addr):
928 self.state = "connected"
929 self.cancelTimeout()
930 return self.factory.buildProtocol(addr)
931
932 def connectionFailed(self, reason):
933 self.cancelTimeout()
934 self.transport = None
935 self.state = "disconnected"
936 self.factory.clientConnectionFailed(self, reason)
937 if self.state == "disconnected":
938 # factory hasn't called our connect() method
939 self.factory.doStop()
940 self.factoryStarted = 0
941
942 def connectionLost(self, reason):
943 self.state = "disconnected"
944 self.factory.clientConnectionLost(self, reason)
945 if self.state == "disconnected":
946 # factory hasn't called our connect() method
947 self.factory.doStop()
948 self.factoryStarted = 0
949
950 def getDestination(self):
951 raise NotImplementedError, "implement in subclasses"
952
953
954 class BasePort(abstract.FileDescriptor):
955 """Basic implementation of a ListeningPort.
956
957 Note: This does not actually implement IListeningPort.
958 """
959
960 addressFamily = None
961 socketType = None
962
963 def createInternetSocket(self):
964 s = socket.socket(self.addressFamily, self.socketType)
965 s.setblocking(0)
966 if fcntl and hasattr(fcntl, 'FD_CLOEXEC'):
967 old = fcntl.fcntl(s.fileno(), fcntl.F_GETFD)
968 fcntl.fcntl(s.fileno(), fcntl.F_SETFD, old | fcntl.FD_CLOEXEC)
969 return s
970
971
972 def doWrite(self):
973 """Raises a RuntimeError"""
974 raise RuntimeError, "doWrite called on a %s" % reflect.qual(self.__class __)
975
976
977
978 class _SignalReactorMixin:
979 """
980 Private mixin to manage signals: it installs signal handlers at start time,
981 and define run method.
982
983 It can only be used mixed in with L{ReactorBase}, and has to be defined
984 first in the inheritance (so that method resolution order finds
985 startRunning first).
986 """
987
988 def _handleSignals(self):
989 """
990 Install the signal handlers for the Twisted event loop.
991 """
992 try:
993 import signal
994 except ImportError:
995 log.msg("Warning: signal module unavailable -- "
996 "not installing signal handlers.")
997 return
998
999 if signal.getsignal(signal.SIGINT) == signal.default_int_handler:
1000 # only handle if there isn't already a handler, e.g. for Pdb.
1001 signal.signal(signal.SIGINT, self.sigInt)
1002 signal.signal(signal.SIGTERM, self.sigTerm)
1003
1004 # Catch Ctrl-Break in windows
1005 if hasattr(signal, "SIGBREAK"):
1006 signal.signal(signal.SIGBREAK, self.sigBreak)
1007
1008 if platformType == 'posix':
1009 signal.signal(signal.SIGCHLD, self._handleSigchld)
1010
1011
1012 def _handleSigchld(self, signum, frame, _threadSupport=platform.supportsThre ads()):
1013 """
1014 Reap all processes on SIGCHLD.
1015
1016 This gets called on SIGCHLD. We do no processing inside a signal
1017 handler, as the calls we make here could occur between any two
1018 python bytecode instructions. Deferring processing to the next
1019 eventloop round prevents us from violating the state constraints
1020 of arbitrary classes.
1021 """
1022 from twisted.internet.process import reapAllProcesses
1023 if _threadSupport:
1024 self.callFromThread(reapAllProcesses)
1025 else:
1026 self.callLater(0, reapAllProcesses)
1027
1028
1029 def startRunning(self, installSignalHandlers=True):
1030 """
1031 Forward call to ReactorBase, arrange for signal handlers to be
1032 installed if asked.
1033 """
1034 if installSignalHandlers:
1035 # Make sure this happens before after-startup events, since the
1036 # expectation of after-startup is that the reactor is fully
1037 # initialized. Don't do it right away for historical reasons
1038 # (perhaps some before-startup triggers don't want there to be a
1039 # custom SIGCHLD handler so that they can run child processes with
1040 # some blocking api).
1041 self.addSystemEventTrigger(
1042 'during', 'startup', self._handleSignals)
1043 ReactorBase.startRunning(self)
1044
1045
1046 def run(self, installSignalHandlers=True):
1047 self.startRunning(installSignalHandlers=installSignalHandlers)
1048 self.mainLoop()
1049
1050
1051 def mainLoop(self):
1052 while self.running:
1053 try:
1054 while self.running:
1055 # Advance simulation time in delayed event
1056 # processors.
1057 self.runUntilCurrent()
1058 t2 = self.timeout()
1059 t = self.running and t2
1060 self.doIteration(t)
1061 except:
1062 log.msg("Unexpected error in main loop.")
1063 log.err()
1064 else:
1065 log.msg('Main loop terminated.')
1066
1067
1068
1069 __all__ = []
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/internet/address.py ('k') | third_party/twisted_8_1/twisted/internet/cfreactor.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698