| OLD | NEW |
| (Empty) |
| 1 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
| 2 # See LICENSE for details. | |
| 3 | |
| 4 | |
| 5 """ | |
| 6 A win32event based implementation of the Twisted main loop. | |
| 7 | |
| 8 This requires win32all or ActivePython to be installed. | |
| 9 | |
| 10 Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>} | |
| 11 | |
| 12 | |
| 13 LIMITATIONS: | |
| 14 1. WaitForMultipleObjects and thus the event loop can only handle 64 objects. | |
| 15 2. Process running has some problems (see Process docstring). | |
| 16 | |
| 17 | |
| 18 TODO: | |
| 19 1. Event loop handling of writes is *very* problematic (this is causing failed
tests). | |
| 20 Switch to doing it the correct way, whatever that means (see below). | |
| 21 2. Replace icky socket loopback waker with event based waker (use dummyEvent ob
ject) | |
| 22 3. Switch everyone to using Free Software so we don't have to deal with proprie
tary APIs. | |
| 23 | |
| 24 | |
| 25 ALTERNATIVE SOLUTIONS: | |
| 26 - IIRC, sockets can only be registered once. So we switch to a structure | |
| 27 like the poll() reactor, thus allowing us to deal with write events in | |
| 28 a decent fashion. This should allow us to pass tests, but we're still | |
| 29 limited to 64 events. | |
| 30 | |
| 31 Or: | |
| 32 | |
| 33 - Instead of doing a reactor, we make this an addon to the select reactor. | |
| 34 The WFMO event loop runs in a separate thread. This means no need to maintain | |
| 35 separate code for networking, 64 event limit doesn't apply to sockets, | |
| 36 we can run processes and other win32 stuff in default event loop. The | |
| 37 only problem is that we're stuck with the icky socket based waker. | |
| 38 Another benefit is that this could be extended to support >64 events | |
| 39 in a simpler manner than the previous solution. | |
| 40 | |
| 41 The 2nd solution is probably what will get implemented. | |
| 42 """ | |
| 43 | |
| 44 # System imports | |
| 45 import time | |
| 46 import sys | |
| 47 | |
| 48 from zope.interface import implements | |
| 49 | |
| 50 # Win32 imports | |
| 51 from win32file import WSAEventSelect, FD_READ, FD_CLOSE, FD_ACCEPT, FD_CONNECT | |
| 52 from win32event import CreateEvent, MsgWaitForMultipleObjects | |
| 53 from win32event import WAIT_OBJECT_0, WAIT_TIMEOUT, QS_ALLINPUT, QS_ALLEVENTS | |
| 54 | |
| 55 import win32gui | |
| 56 | |
| 57 # Twisted imports | |
| 58 from twisted.internet import posixbase | |
| 59 from twisted.python import log, threadable, failure | |
| 60 from twisted.internet.interfaces import IReactorFDSet, IReactorProcess | |
| 61 | |
| 62 from twisted.internet._dumbwin32proc import Process | |
| 63 | |
| 64 | |
| 65 class Win32Reactor(posixbase.PosixReactorBase): | |
| 66 """ | |
| 67 Reactor that uses Win32 event APIs. | |
| 68 | |
| 69 @ivar _reads: A dictionary mapping L{FileDescriptor} instances to a | |
| 70 win32 event object used to check for read events for that descriptor. | |
| 71 | |
| 72 @ivar _writes: A dictionary mapping L{FileDescriptor} instances to a | |
| 73 arbitrary value. Keys in this dictionary will be given a chance to | |
| 74 write out their data. | |
| 75 | |
| 76 @ivar _events: A dictionary mapping win32 event object to tuples of | |
| 77 L{FileDescriptor} instances and event masks. | |
| 78 """ | |
| 79 implements(IReactorFDSet, IReactorProcess) | |
| 80 | |
| 81 dummyEvent = CreateEvent(None, 0, 0, None) | |
| 82 | |
| 83 def __init__(self): | |
| 84 self._reads = {} | |
| 85 self._writes = {} | |
| 86 self._events = {} | |
| 87 posixbase.PosixReactorBase.__init__(self) | |
| 88 | |
| 89 | |
| 90 def _makeSocketEvent(self, fd, action, why): | |
| 91 """ | |
| 92 Make a win32 event object for a socket. | |
| 93 """ | |
| 94 event = CreateEvent(None, 0, 0, None) | |
| 95 WSAEventSelect(fd, event, why) | |
| 96 self._events[event] = (fd, action) | |
| 97 return event | |
| 98 | |
| 99 | |
| 100 def addEvent(self, event, fd, action): | |
| 101 """ | |
| 102 Add a new win32 event to the event loop. | |
| 103 """ | |
| 104 self._events[event] = (fd, action) | |
| 105 | |
| 106 | |
| 107 def removeEvent(self, event): | |
| 108 """ | |
| 109 Remove an event. | |
| 110 """ | |
| 111 del self._events[event] | |
| 112 | |
| 113 | |
| 114 def addReader(self, reader): | |
| 115 """ | |
| 116 Add a socket FileDescriptor for notification of data available to read. | |
| 117 """ | |
| 118 if reader not in self._reads: | |
| 119 self._reads[reader] = self._makeSocketEvent( | |
| 120 reader, 'doRead', FD_READ | FD_ACCEPT | FD_CONNECT | FD_CLOSE) | |
| 121 | |
| 122 def addWriter(self, writer): | |
| 123 """ | |
| 124 Add a socket FileDescriptor for notification of data available to write. | |
| 125 """ | |
| 126 if writer not in self._writes: | |
| 127 self._writes[writer] = 1 | |
| 128 | |
| 129 def removeReader(self, reader): | |
| 130 """Remove a Selectable for notification of data available to read. | |
| 131 """ | |
| 132 if reader in self._reads: | |
| 133 del self._events[self._reads[reader]] | |
| 134 del self._reads[reader] | |
| 135 | |
| 136 def removeWriter(self, writer): | |
| 137 """Remove a Selectable for notification of data available to write. | |
| 138 """ | |
| 139 if writer in self._writes: | |
| 140 del self._writes[writer] | |
| 141 | |
| 142 def removeAll(self): | |
| 143 """ | |
| 144 Remove all selectables, and return a list of them. | |
| 145 """ | |
| 146 return self._removeAll(self._reads, self._writes) | |
| 147 | |
| 148 | |
| 149 def getReaders(self): | |
| 150 return self._reads.keys() | |
| 151 | |
| 152 | |
| 153 def getWriters(self): | |
| 154 return self._writes.keys() | |
| 155 | |
| 156 | |
| 157 def doWaitForMultipleEvents(self, timeout): | |
| 158 log.msg(channel='system', event='iteration', reactor=self) | |
| 159 if timeout is None: | |
| 160 #timeout = INFINITE | |
| 161 timeout = 100 | |
| 162 else: | |
| 163 timeout = int(timeout * 1000) | |
| 164 | |
| 165 if not (self._events or self._writes): | |
| 166 # sleep so we don't suck up CPU time | |
| 167 time.sleep(timeout / 1000.0) | |
| 168 return | |
| 169 | |
| 170 canDoMoreWrites = 0 | |
| 171 for fd in self._writes.keys(): | |
| 172 if log.callWithLogger(fd, self._runWrite, fd): | |
| 173 canDoMoreWrites = 1 | |
| 174 | |
| 175 if canDoMoreWrites: | |
| 176 timeout = 0 | |
| 177 | |
| 178 handles = self._events.keys() or [self.dummyEvent] | |
| 179 val = MsgWaitForMultipleObjects(handles, 0, timeout, QS_ALLINPUT | QS_AL
LEVENTS) | |
| 180 if val == WAIT_TIMEOUT: | |
| 181 return | |
| 182 elif val == WAIT_OBJECT_0 + len(handles): | |
| 183 exit = win32gui.PumpWaitingMessages() | |
| 184 if exit: | |
| 185 self.callLater(0, self.stop) | |
| 186 return | |
| 187 elif val >= WAIT_OBJECT_0 and val < WAIT_OBJECT_0 + len(handles): | |
| 188 fd, action = self._events[handles[val - WAIT_OBJECT_0]] | |
| 189 log.callWithLogger(fd, self._runAction, action, fd) | |
| 190 | |
| 191 def _runWrite(self, fd): | |
| 192 closed = 0 | |
| 193 try: | |
| 194 closed = fd.doWrite() | |
| 195 except: | |
| 196 closed = sys.exc_info()[1] | |
| 197 log.deferr() | |
| 198 | |
| 199 if closed: | |
| 200 self.removeReader(fd) | |
| 201 self.removeWriter(fd) | |
| 202 try: | |
| 203 fd.connectionLost(failure.Failure(closed)) | |
| 204 except: | |
| 205 log.deferr() | |
| 206 elif closed is None: | |
| 207 return 1 | |
| 208 | |
| 209 def _runAction(self, action, fd): | |
| 210 try: | |
| 211 closed = getattr(fd, action)() | |
| 212 except: | |
| 213 closed = sys.exc_info()[1] | |
| 214 log.deferr() | |
| 215 | |
| 216 if closed: | |
| 217 self._disconnectSelectable(fd, closed, action == 'doRead') | |
| 218 | |
| 219 doIteration = doWaitForMultipleEvents | |
| 220 | |
| 221 def spawnProcess(self, processProtocol, executable, args=(), env={}, path=No
ne, uid=None, gid=None, usePTY=0, childFDs=None): | |
| 222 """Spawn a process.""" | |
| 223 if uid is not None: | |
| 224 raise ValueError("Setting UID is unsupported on this platform.") | |
| 225 if gid is not None: | |
| 226 raise ValueError("Setting GID is unsupported on this platform.") | |
| 227 if usePTY: | |
| 228 raise ValueError("PTYs are unsupported on this platform.") | |
| 229 if childFDs is not None: | |
| 230 raise ValueError( | |
| 231 "Custom child file descriptor mappings are unsupported on " | |
| 232 "this platform.") | |
| 233 args, env = self._checkProcessArgs(args, env) | |
| 234 return Process(self, processProtocol, executable, args, env, path) | |
| 235 | |
| 236 | |
| 237 def install(): | |
| 238 threadable.init(1) | |
| 239 r = Win32Reactor() | |
| 240 import main | |
| 241 main.installReactor(r) | |
| 242 | |
| 243 | |
| 244 __all__ = ["Win32Reactor", "install"] | |
| OLD | NEW |